comm_manager_epoll.c 6.34 KB
/**
 * \file
 *
 * \author	Georg Hopp
 *
 * \copyright
 * Copyright © 2014 Georg Hopp
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>

#include "trbase.h"
#include "trdata.h"
#include "trevent.h"

#include "tr/comm_manager.h"
#include "tr/comm_manager_epoll.h"
#include "tr/interface/comm_manager.h"
#include "tr/comm_end_point.h"
#include "tr/connection.h"
#include "tr/connect_entry_point.h"

#define MAXEVENTS 256

struct epoll_event events[MAXEVENTS];

static
int
commManagerEpollCtor(void * _this, va_list * params)
{
	TR_CommManagerEpoll this = _this;
	TR_CommManager      cmgr = _this;

	TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, ctor, params);
	this->handle      = epoll_create(cmgr->n_endpoints);
	this->read_ready  = TR_new(TR_Queue);
	this->write_ready = TR_new(TR_Queue);

	this->read_ready->free_msgs  = 0;
	this->write_ready->free_msgs = 0;

	return 0;
}

static
void
commManagerEpollDtor(void * _this)
{
	TR_CommManagerEpoll this = _this;

	TR_delete(this->read_ready);
	TR_delete(this->write_ready);

	close(this->handle);
	TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, dtor);
}

static
void
TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
{
	TR_CommManagerEpoll this   = _this;
	int                 handle = endpoint->transport->handle;
	struct epoll_event  event;

	event.data.ptr = endpoint;
	event.events   = EPOLLIN | EPOLLOUT | EPOLLET;

	epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &event);
}

static
void
TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
{
	TR_CommManagerEpoll this = _this;
	int                 i, nevents;
	TR_Queue            node;

	if (0 != (this->read_ready->nmsg & this->write_ready->nmsg)) {
		timeout = 0;
	}

	nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout);

	for (i=0; i<nevents; i++) {
		TR_CommEndPoint endpoint = (TR_CommEndPoint)events[i].data.ptr;

		if ((events[i].events & EPOLLIN) == EPOLLIN) {
			if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
					&& ((TR_TcpSocket)endpoint->transport)->listen) {
				TR_eventHandlerIssueEvent((TR_EventHandler)this,
						TR_eventSubjectEmit(
							(TR_EventSubject)endpoint,
							TR_CET_EVENT_ACC_READY,
							NULL));
			} else {
				if (! ((TR_EventSubject)endpoint)->fin) {
					TR_queuePut(this->read_ready, endpoint);
				}
			}
		}

		if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
			if (TR_cepHasPendingData(endpoint) && 
					! ((TR_EventSubject)endpoint)->fin) {
				TR_queuePut(this->write_ready, endpoint);
			}
		}
	}

	/* now issue reads and write events */
	for (node=this->read_ready->first; node; node=node->next) {
		TR_CommEndPoint endpoint = (TR_CommEndPoint)node->msg;

		if (! TR_socketFinRd(endpoint->transport)) {
			TR_eventHandlerIssueEvent(
					(TR_EventHandler)this,
					TR_eventSubjectEmit(
						(TR_EventSubject)endpoint,
						TR_CEP_EVENT_READ_READY,
						NULL));
		}
	}

	for (node=this->write_ready->first; node; node=node->next) {
		TR_CommEndPoint endpoint = (TR_CommEndPoint)node->msg;

		if (! TR_socketFinWr(endpoint->transport)) {
			TR_eventHandlerIssueEvent(
					(TR_EventHandler)this,
					TR_eventSubjectEmit(
						(TR_EventSubject)endpoint,
						TR_CEP_EVENT_WRITE_READY,
						NULL));
		}
	}
}

static
void
TR_commManagerEpollRemoveWrite(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;

	TR_queueDelete(this->write_ready, endpoint);
}

static
void
TR_commManagerEpollRemoveRead(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;

	TR_queueDelete(this->read_ready, endpoint);
}

static
void
TR_commManagerEpollClose(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;

	TR_queueDelete(this->read_ready, endpoint);
	TR_queueDelete(this->write_ready, endpoint);

	epoll_ctl(this->handle, EPOLL_CTL_DEL, endpoint->transport->handle, NULL);
}

static
void
TR_commManagerEpollShutRead(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;
	struct epoll_event  _event;

	TR_queueDelete(this->read_ready, endpoint);

	_event.data.ptr = endpoint;
	_event.events   = EPOLLOUT | EPOLLET;

	epoll_ctl(
			this->handle,
			EPOLL_CTL_MOD,
			endpoint->transport->handle,
			&_event);
}

static
void
TR_commManagerEpollShutWrite(void * _this, TR_Event event)
{
	TR_CommManagerEpoll this     = _this;
	TR_CommEndPoint     endpoint = (TR_CommEndPoint)event->subject;
	struct epoll_event  _event;

	TR_queueDelete(this->write_ready, endpoint);

	_event.data.ptr = endpoint;
	_event.events   = EPOLLIN | EPOLLET;

	epoll_ctl(
			this->handle,
			EPOLL_CTL_MOD,
			endpoint->transport->handle,
			&_event);
}


static void TR_commManagerEpollNoop(void * _this, TR_Event event) {}

static
void
TR_commManagerEpollCvInit(TR_class_ptr cls) {
	TR_INHERIT_CLASSVARS(TR_CommManagerEpoll, TR_CommManager);
}

TR_INIT_IFACE(TR_Class, commManagerEpollCtor, commManagerEpollDtor, NULL);
TR_INIT_IFACE(
		TR_CommManager,
		TR_commManagerEpollAddEndpoint, // TR_CON_EVENT_NEW_CON
		TR_commManagerEpollSelect,      // TR_DISPATCHER_EVENT_DATA_WAIT
		TR_commManagerEpollRemoveWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK
		TR_commManagerEpollNoop,        // TR_CEP_EVENT_END_DATA
		TR_commManagerEpollRemoveRead,  // TR_CEP_EVENT_READ_BLOCK
		TR_commManagerEpollClose,       // TR_CEP_EVENT_CLOSE
		TR_commManagerEpollShutWrite,   // TR_CEP_EVENT_SHUT_READ
		TR_commManagerEpollShutRead);   // TR_CEP_EVENT_SHUT_WRITE
TR_CREATE_CLASS(
		TR_CommManagerEpoll,
		TR_CommManager,
		TR_commManagerEpollCvInit,
		TR_IF(TR_Class),
		TR_IF(TR_CommManager));

// vim: set ts=4 sw=4: