comm_manager_epoll.c 5.92 KB
/**
 * \file
 *
 * \author	Georg Hopp
 *
 * \copyright
 * Copyright © 2014 Georg Hopp
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>

#include "trbase.h"
#include "trevent.h"

#include "tr/comm_manager.h"
#include "tr/comm_manager_epoll.h"
#include "tr/interface/comm_manager.h"
#include "tr/comm_end_point.h"
#include "tr/connection.h"
#include "tr/connect_entry_point.h"

#define MAXEVENTS 256

struct epoll_event events[MAXEVENTS];

extern int count_write_ready;

static
int
commManagerEpollCtor(void * _this, va_list * params)
{
	TR_CommManagerEpoll this = _this;
	TR_CommManager      cmgr = _this;
	nfds_t              i;

	TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, ctor, params);
	this->handle = epoll_create(cmgr->n_endpoints);
	this->events = TR_malloc(sizeof(struct epoll_event) * cmgr->n_endpoints);
	for (i = 0; i < cmgr->n_endpoints; i++) {
		this->events[i].data.ptr = NULL;
		this->events[i].events   = EPOLLET | EPOLLONESHOT;
	}


	return 0;
}

static
void
commManagerEpollDtor(void * _this)
{
	TR_CommManagerEpoll this = _this;

	close(this->handle);
	TR_MEM_FREE(this->events);
	TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, dtor);
}

static
void
TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
{
	TR_CommManagerEpoll this   = _this;
	int                 handle = endpoint->transport->handle;

	this->events[handle].data.ptr  = endpoint;
	this->events[handle].events   |= EPOLLIN;

	epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &(this->events[handle]));
}

static
void
TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
{
	TR_CommManagerEpoll this = _this;
	int                 i, nevents;

	nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout);

	for (i=0; i<nevents; i++) {
		TR_CommEndPoint endpoint = (TR_CommEndPoint)events[i].data.ptr;

		if ((events[i].events & EPOLLIN) == EPOLLIN) {
			TR_Event event;

			if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
					&& ((TR_TcpSocket)endpoint->transport)->listen) {
				event = TR_eventSubjectEmit(
						(TR_EventSubject)endpoint,
						TR_CET_EVENT_ACC_READY,
						NULL);
			} else {
				event = TR_eventSubjectEmit(
						(TR_EventSubject)endpoint,
						TR_CEP_EVENT_READ_READY,
						NULL);
			}

			TR_eventHandlerIssueEvent((TR_EventHandler)this, event);
			this->events[i].events &= ~EPOLLIN;
		}

		if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
			TR_Event _event = TR_eventSubjectEmit(
					(TR_EventSubject)endpoint,
					TR_CEP_EVENT_WRITE_READY,
					NULL);

			TR_eventHandlerIssueEvent((TR_EventHandler)this, _event);
			this->events[i].events &= ~EPOLLOUT;
			count_write_ready++;
		}
	}
}

static
void
TR_commManagerEpollEnableWrite(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;

	if (! TR_socketFinWr(endpoint->transport)) {
		int handle = endpoint->transport->handle;

		this->events[handle].data.ptr  = endpoint;
		this->events[handle].events   |= EPOLLOUT;

		epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
	}
}

static
void
TR_commManagerEpollDisableWrite(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;
	int                 handle   = endpoint->transport->handle;

	this->events[handle].data.ptr  = endpoint;
	this->events[handle].events   &= ~EPOLLOUT;

	epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
}

static
void
TR_commManagerEpollEnableRead(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;

	if (! TR_socketFinRd(endpoint->transport)) {
		int handle = endpoint->transport->handle;

		this->events[handle].data.ptr  = endpoint;
		this->events[handle].events   |= EPOLLIN;

		epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
	}
}

static
void
TR_commManagerEpollDisableRead(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;
	int                 handle   = endpoint->transport->handle;

	this->events[handle].data.ptr  = endpoint;
	this->events[handle].events   &= ~EPOLLIN;

	epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
}

static
void
TR_commManagerEpollClose(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;
	int                 handle   = endpoint->transport->handle;

	this->events[handle].data.ptr = NULL;
	this->events[handle].events   = EPOLLET | EPOLLONESHOT;

	epoll_ctl(this->handle, EPOLL_CTL_DEL, handle, NULL);
}

static
void
TR_commManagerEpollCvInit(TR_class_ptr cls) {
	TR_INHERIT_CLASSVARS(TR_CommManagerEpoll, TR_CommManager);
}

TR_INIT_IFACE(TR_Class, commManagerEpollCtor, commManagerEpollDtor, NULL);
TR_INIT_IFACE(
		TR_CommManager,
		TR_commManagerEpollAddEndpoint,
		TR_commManagerEpollSelect,
		TR_commManagerEpollEnableWrite,
		TR_commManagerEpollDisableWrite,
		TR_commManagerEpollEnableRead,
		TR_commManagerEpollClose,
		TR_commManagerEpollDisableRead,
		TR_commManagerEpollDisableWrite);
TR_CREATE_CLASS(
		TR_CommManagerEpoll,
		TR_CommManager,
		TR_commManagerEpollCvInit,
		TR_IF(TR_Class),
		TR_IF(TR_CommManager));

// vim: set ts=4 sw=4: