comm_manager_epoll.c 5.93 KB
/**
 * \file
 *
 * \author	Georg Hopp
 *
 * \copyright
 * Copyright © 2014 Georg Hopp
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>

#include "trbase.h"
#include "trdata.h"
#include "trevent.h"

#include "tr/comm_manager.h"
#include "tr/comm_manager_epoll.h"
#include "tr/interface/comm_manager.h"
#include "tr/comm_end_point.h"
#include "tr/connection.h"
#include "tr/connect_entry_point.h"

#define MAXEVENTS 256

struct epoll_event events[MAXEVENTS];

static
int
commManagerEpollCtor(void * _this, va_list * params)
{
	TR_CommManagerEpoll this = _this;
	TR_CommManager      cmgr = _this;

	TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, ctor, params);
	this->handle = epoll_create(cmgr->n_endpoints);
	this->events = TR_calloc(cmgr->n_endpoints, sizeof(uint32_t));

	return 0;
}

static
void
commManagerEpollDtor(void * _this)
{
	TR_CommManagerEpoll this = _this;

	TR_MEM_FREE(this->events);
	close(this->handle);
	TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, dtor);
}

static
void
TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
{
	TR_CommManagerEpoll this   = _this;
	int                 handle = endpoint->transport->handle;
	struct epoll_event  event;

	this->events[handle] = EPOLLIN;
	event.data.ptr       = endpoint;
	event.events         = this->events[handle];

	epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &event);
}

static
void
TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
{
	TR_CommManagerEpoll this = _this;
	int                 i, nevents;

	nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout);

	for (i=0; i<nevents; i++) {
		TR_CommEndPoint endpoint = (TR_CommEndPoint)events[i].data.ptr;

		if ((events[i].events & EPOLLIN) == EPOLLIN) {
			if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
					&& ((TR_TcpSocket)endpoint->transport)->listen) {
				TR_eventHandlerIssueEvent((TR_EventHandler)this,
						TR_eventSubjectEmit(
							(TR_EventSubject)endpoint,
							TR_CET_EVENT_ACC_READY,
							NULL));
			} else {
				TR_eventHandlerIssueEvent((TR_EventHandler)this,
						TR_eventSubjectEmit(
							(TR_EventSubject)endpoint,
							TR_CEP_EVENT_READ_READY,
							NULL));
			}
		}

		if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
			TR_eventHandlerIssueEvent((TR_EventHandler)this,
					TR_eventSubjectEmit(
						(TR_EventSubject)endpoint,
						TR_CEP_EVENT_WRITE_READY,
						NULL));
		}
	}
}

static
void
TR_commManagerEpollEnableWrite(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;

	if (! TR_socketFinWr(endpoint->transport)) {
		int                handle = endpoint->transport->handle;
		struct epoll_event _event;

		this->events[handle] |= EPOLLOUT;
		_event.data.ptr = endpoint;
		_event.events   = this->events[handle];

		epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
	}
}

static
void
TR_commManagerEpollDisableWrite(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;
	int                 handle = endpoint->transport->handle;
	struct epoll_event  _event;

	this->events[handle] &= ~EPOLLOUT;
	_event.data.ptr = endpoint;
	_event.events   = this->events[handle];

	epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
}

static
void
TR_commManagerEpollEnableRead(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;

	if (! TR_socketFinRd(endpoint->transport)) {
		int                 handle = endpoint->transport->handle;
		struct epoll_event  _event;

		this->events[handle] |= EPOLLIN;
		_event.data.ptr = endpoint;
		_event.events   = this->events[handle];

		epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
	}
}

static
void
TR_commManagerEpollDisableRead(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;
	int                 handle = endpoint->transport->handle;
	struct epoll_event  _event;

	this->events[handle] &= ~EPOLLIN;
	_event.data.ptr = endpoint;
	_event.events   = this->events[handle];

	epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
}

static
void
TR_commManagerEpollClose(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;

	epoll_ctl(this->handle, EPOLL_CTL_DEL, endpoint->transport->handle, NULL);
}

static
void
TR_commManagerEpollCvInit(TR_class_ptr cls) {
	TR_INHERIT_CLASSVARS(TR_CommManagerEpoll, TR_CommManager);
}

TR_INIT_IFACE(TR_Class, commManagerEpollCtor, commManagerEpollDtor, NULL);
TR_INIT_IFACE(
		TR_CommManager,
		TR_commManagerEpollAddEndpoint,  // TR_CON_EVENT_NEW_CON
		TR_commManagerEpollSelect,       // TR_DISPATCHER_EVENT_DATA_WAIT
		TR_commManagerEpollEnableWrite,  // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK
		TR_commManagerEpollDisableWrite, // TR_CEP_EVENT_END_DATA
		TR_commManagerEpollEnableRead,   // TR_CEP_EVENT_READ_BLOCK
		TR_commManagerEpollClose,        // TR_CEP_EVENT_CLOSE
		TR_commManagerEpollDisableWrite, // TR_CEP_EVENT_SHUT_READ
		TR_commManagerEpollEnableRead);  // TR_CEP_EVENT_SHUT_WRITE
TR_CREATE_CLASS(
		TR_CommManagerEpoll,
		TR_CommManager,
		TR_commManagerEpollCvInit,
		TR_IF(TR_Class),
		TR_IF(TR_CommManager));

// vim: set ts=4 sw=4: