comm_manager_epoll.c 5.96 KB
/**
 * \file
 *
 * \author	Georg Hopp
 *
 * \copyright
 * Copyright © 2014 Georg Hopp
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>

#include "trbase.h"
#include "trdata.h"
#include "trevent.h"

#include "tr/comm_manager.h"
#include "tr/comm_manager_epoll.h"
#include "tr/interface/comm_manager.h"
#include "tr/comm_end_point.h"
#include "tr/connection.h"
#include "tr/connect_entry_point.h"
#include "tr/_comm_manager.h"

#define MAXEVENTS 1024

struct epoll_event events[MAXEVENTS];

static
int
commManagerEpollCtor(void * _this, va_list * params)
{
	TR_CommManagerEpoll this = _this;
	TR_CommManager      cmgr = _this;

	TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, ctor, params);
	this->handle = epoll_create(cmgr->n_endpoints);
	this->events = TR_calloc(cmgr->n_endpoints, sizeof(uint32_t));

	return 0;
}

static
void
commManagerEpollDtor(void * _this)
{
	TR_CommManagerEpoll this = _this;

	TR_MEM_FREE(this->events);
	close(this->handle);
	TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, dtor);
}

static
void
TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
{
	TR_CommManagerEpoll this   = _this;
	int                 handle = endpoint->transport->handle;
	struct epoll_event  event;

	//this->events[handle] = EPOLLET;
	this->events[handle] = 0;
	event.data.ptr       = endpoint;
	event.events         = this->events[handle];

	epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &event);
}

static
void
TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
{
	TR_CommManagerEpoll this = _this;
	TR_CommManager      cmgr = _this;
	int                 i, nevents;
	struct epoll_event  _event;

	nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout);

	for (i=0; i<nevents; i++) {
		TR_CommEndPoint endpoint = (TR_CommEndPoint)events[i].data.ptr;
		int             handle   = endpoint->transport->handle;

		if ((events[i].events & EPOLLIN) == EPOLLIN) {
			if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
					&& ((TR_TcpSocket)endpoint->transport)->listen) {
				TR_hashAdd(cmgr->accept, endpoint);
			} else {
				if (! event->subject->fin) {
					TR_hashAdd(cmgr->read, endpoint);
				}
			}

			this->events[handle] &= ~EPOLLIN;
			_event.data.ptr = endpoint;
			_event.events   = this->events[handle];
			epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
		}

		if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
			if (! event->subject->fin) {
				TR_hashAdd(cmgr->write, endpoint);
			}
			this->events[handle] &= ~EPOLLOUT;
			_event.data.ptr = endpoint;
			_event.events   = this->events[handle];
			epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
		}

		if ((events[i].events & EPOLLHUP) == EPOLLHUP) {
			TR_eventHandlerIssueEvent(
					(TR_EventHandler)_this,
					TR_eventSubjectEmit(
						(TR_EventSubject)endpoint,
						TR_CEP_EVENT_SHUT_WRITE,
						NULL));
		}
	}
}

static
inline
void
TR_commManagerEpollDisable(void * _this, uint32_t mask, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;
	int                 handle = endpoint->transport->handle;
	struct epoll_event  _event;

	this->events[handle] &= ~mask;
	_event.data.ptr = endpoint;
	_event.events   = this->events[handle];

	epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
}

static
inline
void
TR_commManagerEpollEnable(void * _this, uint32_t mask, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;
	int                 handle = endpoint->transport->handle;
	struct epoll_event  _event;

	this->events[handle] |= mask;
	_event.data.ptr = endpoint;
	_event.events   = this->events[handle];

	epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
}

static
void
TR_commManagerEpollEnableWrite(void * _this, TR_Event event)
{
	if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) {
		TR_commManagerEpollEnable(_this, EPOLLOUT, event);
	}
}

static
void
TR_commManagerEpollEnableRead(void * _this, TR_Event event)
{
	if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
		TR_commManagerEpollEnable(_this, EPOLLIN, event);
	}
}

static
void
TR_commManagerEpollDisableWrite(void * _this, TR_Event event)
{
	TR_commManagerEpollDisable(_this, EPOLLOUT, event);
}

static
void
TR_commManagerEpollDisableRead(void * _this, TR_Event event)
{
	TR_commManagerEpollDisable(_this, EPOLLIN, event);
}

static
void
TR_commManagerEpollClose(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;

	epoll_ctl(this->handle, EPOLL_CTL_DEL, endpoint->transport->handle, NULL);
}

static
void
TR_commManagerEpollCvInit(TR_class_ptr cls) {
	TR_INHERIT_CLASSVARS(TR_CommManagerEpoll, TR_CommManager);
}

TR_INIT_IFACE(TR_Class, commManagerEpollCtor, commManagerEpollDtor, NULL);
TR_INIT_IFACE(
		TR_CommManager,
		TR_commManagerEpollAddEndpoint,
		TR_commManagerEpollSelect,      // TR_DISPATCHER_EVENT_DATA_WAIT
		TR_commManagerEpollEnableWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK
		TR_commManagerEpollEnableRead,  // TR_CEP_EVENT_READ_BLOCK
		TR_commManagerEpollDisableWrite,
		TR_commManagerEpollDisableRead,
		TR_commManagerEpollClose);      // TR_CEP_EVENT_CLOSE
TR_CREATE_CLASS(
		TR_CommManagerEpoll,
		TR_CommManager,
		TR_commManagerEpollCvInit,
		TR_IF(TR_Class),
		TR_IF(TR_CommManager));

// vim: set ts=4 sw=4: