i_comm_manager.c 5.3 KB
/**
 * \file
 *
 * \author	Georg Hopp
 *
 * \copyright
 * Copyright © 2014 Georg Hopp
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <errno.h>
#include <poll.h>

#include "trbase.h"
#include "trevent.h"

#include "tr/interface/comm_manager.h"
#include "tr/comm_end_point.h"
#include "tr/comm_manager.h"
#include "tr/_comm_manager.h"

TR_CREATE_INTERFACE(TR_CommManager, 5);

void
TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint)
{
	TR_CommManager this = _this;

	if (this->endpoints[endpoint->transport->handle]) {
		// this should never happen, but if so we assume this is a leftover
		// that still has to be deleted.
		TR_delete(this->endpoints[endpoint->transport->handle]);
	}

	this->max_handle = endpoint->transport->handle > this->max_handle
		? endpoint->transport->handle
		: this->max_handle;

	this->endpoints[endpoint->transport->handle] = endpoint;

	if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
			&& ((TR_TcpSocket)endpoint->transport)->listen) {
		TR_hashAdd(this->accept, endpoint);
		TR_ISSUE_IO_ACC_EVENT(this, endpoint);
	} else {
		TR_hashAdd(this->read, endpoint);
		TR_ISSUE_IO_READ_EVENT(this, endpoint);
	}

	TR_CALL(_this, TR_CommManager, addEndpoint, endpoint);
}

static
void
commManagerIssueAcceptEvents(const void * endpoint, const void * comm_manager)
{
	TR_ISSUE_IO_ACC_EVENT(comm_manager, endpoint);
}

static
void
commManagerIssueWriteEvents(const void * endpoint, const void * comm_manager)
{
	TR_ISSUE_IO_WRITE_EVENT(comm_manager, endpoint);
}

static
void
commManagerIssueReadEvents(const void * endpoint, const void * comm_manager)
{
	TR_ISSUE_IO_READ_EVENT(comm_manager, endpoint);
}

TR_EventDone
TR_commManagerSelect(void * _this, TR_Event event)
{
	TR_CommManager       this       = _this;
	int                  timeout;  // milliseconds
	int                * timeoutptr = event->data;
	TR_EventDispatcher   dispatcher = (TR_EventDispatcher)event->subject;

	if (! (TR_hashEmpty(this->read)
				&& TR_hashEmpty(this->write)
				&& TR_hashEmpty(this->accept))) {
		timeout = 0;
	} else if (NULL == timeoutptr) {
		timeout = TR_eventDispatcherGetDataWaitTime(dispatcher);
	} else {
		timeout = *timeoutptr;
	}

	TR_CALL(_this, TR_CommManager, select, event, timeout);

	TR_hashEach(this->write, this, commManagerIssueWriteEvents);
	TR_hashEach(this->accept, this, commManagerIssueAcceptEvents);
	TR_hashEach(this->read, this, commManagerIssueReadEvents);

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerPollWrite(void * _this, TR_Event event)
{
	TR_CommManager this = _this;

	TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
	if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) {
		TR_CALL(_this, TR_CommManager, pollWrite, event);
	}

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerPollRead(void * _this, TR_Event event)
{
	TR_CommManager this      = _this;
	TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;

	if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
			&& ((TR_TcpSocket)endpoint->transport)->listen) {
		TR_hashDeleteByVal(this->accept, TR_hashableGetHash(event->subject));
	} else {
		TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
	}

	if (! TR_socketFinRd(endpoint->transport)) {
		TR_CALL(_this, TR_CommManager, pollRead, event);
	}

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerDisableRead(void * _this, TR_Event event)
{
	TR_CommManager this = _this;
	TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
	TR_CALL(_this, TR_CommManager, disableRead, event);

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerDisableWrite(void * _this, TR_Event event)
{
	TR_CommManager this = _this;

	TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
	if (! event->subject->fin) {
		TR_hashAdd(this->read, event->subject);
	}
	TR_CALL(_this, TR_CommManager, disableWrite, event);

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerClose(void * _this, TR_Event event)
{
	TR_CommManager  this     = _this;
	TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
	int             handle   = endpoint->transport->handle;

	if (! TR_socketFinRdWr(endpoint->transport)) {
		TR_socketShutdown(endpoint->transport);
	}

	if (handle == this->max_handle) {
		while (! this->endpoints[--this->max_handle] &&
				this->max_handle > 0);
	}

	if (this->endpoints[handle]) {
		TR_eventSubjectFinalize((TR_EventSubject)this->endpoints[handle]);
		TR_CALL(_this, TR_CommManager, disableWrite, event);
		TR_CALL(_this, TR_CommManager, disableRead, event);
		TR_hashDeleteByVal(this->write, TR_hashableGetHash(endpoint));
		TR_hashDeleteByVal(this->read, TR_hashableGetHash(endpoint));
	}

	TR_CALL(_this, TR_CommManager, close, event);

	this->endpoints[handle] = NULL;

	return TR_EVENT_DONE;
}

// vim: set ts=4 sw=4: