i_comm_manager.c 4.75 KB
/**
 * \file
 *
 * \author	Georg Hopp
 *
 * \copyright
 * Copyright © 2014 Georg Hopp
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <errno.h>
#include <poll.h>

#include "trbase.h"
#include "trevent.h"

#include "tr/interface/comm_manager.h"
#include "tr/comm_end_point.h"
#include "tr/comm_manager.h"
#include "tr/_comm_manager.h"

TR_CREATE_INTERFACE(TR_CommManager, 5);

void
TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint)
{
	TR_CommManager this = _this;

	if (this->endpoints[endpoint->transport->handle]) {
		// this should never happen, but if so we assume this is a leftover
		// that still has to be deleted.
		TR_delete(this->endpoints[endpoint->transport->handle]);
	}

	this->max_handle = endpoint->transport->handle > this->max_handle
		? endpoint->transport->handle
		: this->max_handle;

	this->endpoints[endpoint->transport->handle] = endpoint;

	if (TR_socketFdGetter(endpoint->transport)) {
		TR_setAdd(this->accept, endpoint);
		TR_ISSUE_IO_ACC_EVENT(this, endpoint);
	} else {
		TR_setAdd(this->read, endpoint);
		TR_ISSUE_IO_READ_EVENT(this, endpoint);
	}

	TR_CALL(_this, TR_CommManager, addEndpoint, endpoint);
}

TR_EventDone
TR_commManagerSelect(void * _this, TR_Event event)
{
	TR_CommManager     this       = _this;
	TR_Timer           timer      = (TR_Timer)event->data;
	TR_EventDispatcher dispatcher = (TR_EventDispatcher)event->subject;
	unsigned long      timeout;  // milliseconds

#define IO_TRIGGERED             \
	(TR_setSize(this->write) ||  \
	 TR_setSize(this->accept) || \
	 TR_setSize(this->read))

	TR_iterableForeach(this->write) {
		TR_ISSUE_IO_WRITE_EVENT(this, TR_iterableCurrent(this->write));
	}
	TR_iterableForeach(this->accept) {
		TR_ISSUE_IO_ACC_EVENT(this, TR_iterableCurrent(this->accept));
	}
	TR_iterableForeach(this->read) {
		TR_ISSUE_IO_READ_EVENT(this, TR_iterableCurrent(this->read));
	}

	if (IO_TRIGGERED) {
		timeout = 0;
	} else if (NULL == timer) {
		timeout = TR_eventDispatcherGetDataWaitTime(dispatcher);
	} else {
		timeout = TR_timerGet(timer, NULL);
	}

	TR_CALL(_this, TR_CommManager, select, event, timeout);

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerPollWrite(void * _this, TR_Event event)
{
	TR_CommManager this = _this;

	TR_setDelete(this->write, event->subject);
	if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) {
		TR_CALL(_this, TR_CommManager, pollWrite, event);
	}

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerPollRead(void * _this, TR_Event event)
{
	TR_CommManager this      = _this;
	TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;

	if (TR_socketFdGetter(endpoint->transport)) {
		TR_setDelete(this->accept, event->subject);
	} else {
		TR_setDelete(this->read, event->subject);
	}

	if (! TR_socketFinRd(endpoint->transport)) {
		TR_CALL(_this, TR_CommManager, pollRead, event);
	}

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerDisableRead(void * _this, TR_Event event)
{
	TR_CommManager this = _this;
	TR_setDelete(this->read, event->subject);
	TR_CALL(_this, TR_CommManager, disableRead, event);

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerDisableWrite(void * _this, TR_Event event)
{
	TR_CommManager this = _this;

	TR_setDelete(this->write, event->subject);
	if (! event->subject->fin) {
		TR_setAdd(this->read, event->subject);
	}
	TR_CALL(_this, TR_CommManager, disableWrite, event);

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerClose(void * _this, TR_Event event)
{
	TR_CommManager  this     = _this;
	TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
	int             handle   = endpoint->transport->handle;

	if (! TR_socketFinRdWr(endpoint->transport)) {
		TR_socketShutdown(endpoint->transport);
	}

	if (handle == this->max_handle) {
		while (! this->endpoints[--this->max_handle] &&
				this->max_handle > 0);
	}

	if (this->endpoints[handle]) {
		TR_eventSubjectFinalize((TR_EventSubject)this->endpoints[handle]);
		TR_CALL(_this, TR_CommManager, disableWrite, event);
		TR_CALL(_this, TR_CommManager, disableRead, event);
		TR_setDelete(this->write, endpoint);
		TR_setDelete(this->read, endpoint);
	}

	TR_CALL(_this, TR_CommManager, close, event);

	this->endpoints[handle] = NULL;

	return TR_EVENT_DONE;
}

// vim: set ts=4 sw=4: