i_comm_manager.c 6.16 KB
/**
 * \file
 *
 * \author	Georg Hopp
 *
 * \copyright
 * Copyright © 2014 Georg Hopp
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#define _GNU_SOURCE

#include <errno.h>
#include <poll.h>
#include <pthread.h>

#include "trbase.h"
#include "trevent.h"

#include "tr/interface/comm_manager.h"
#include "tr/comm_end_point.h"
#include "tr/comm_manager.h"
#include "tr/_comm_manager.h"

TR_CREATE_INTERFACE(TR_CommManager, 5);

void
TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint)
{
	TR_CommManager this = _this;

	if (this->endpoints[endpoint->transport->handle]) {
		// this should never happen, but if so we assume this is a leftover
		// that still has to be deleted.
		TR_delete(this->endpoints[endpoint->transport->handle]);
	}

	this->max_handle = endpoint->transport->handle > this->max_handle
		? endpoint->transport->handle
		: this->max_handle;

	this->endpoints[endpoint->transport->handle] = endpoint;

	if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
			&& ((TR_TcpSocket)endpoint->transport)->listen) {
		TR_hashAdd(this->accept, endpoint);
		TR_ISSUE_IO_ACC_EVENT(this, endpoint);
	} else {
		TR_hashAdd(this->read, endpoint);
		TR_ISSUE_IO_READ_EVENT(this, endpoint);
	}

	this->io_triggered++;

	TR_CALL(_this, TR_CommManager, addEndpoint, endpoint);
}

static
void
commManagerIssueAcceptEvents(const void * endpoint, const void * comm_manager)
{
	TR_ISSUE_IO_ACC_EVENT(comm_manager, endpoint);
}

static
void
commManagerIssueWriteEvents(const void * endpoint, const void * comm_manager)
{
	TR_ISSUE_IO_WRITE_EVENT(comm_manager, endpoint);
}

static
void
commManagerIssueReadEvents(const void * endpoint, const void * comm_manager)
{
	TR_ISSUE_IO_READ_EVENT(comm_manager, endpoint);
}

TR_EventDone
TR_commManagerSelect(void * _this, TR_Event event)
{
	TR_CommManager     this       = _this;
	TR_Timer           timer      = (TR_Timer)event->data;
	TR_EventDispatcher dispatcher = (TR_EventDispatcher)event->subject;
	unsigned long      timeout;  // milliseconds
	char               buffer[17];

	pthread_getname_np(pthread_self(), buffer, 17);

	if (! this->io_triggered) {
		printf("[DEBUG] [%s] io triggerd was empty\n", buffer);
		fflush(stdout);

		pthread_mutex_lock(&this->io_triggered_lock);
		this->io_triggered  = TR_hashEach(this->write, this, commManagerIssueWriteEvents);
		this->io_triggered += TR_hashEach(this->accept, this, commManagerIssueAcceptEvents);
		this->io_triggered += TR_hashEach(this->read, this, commManagerIssueReadEvents);
	}

	printf("[DEBUG] [%s] io triggerd: %lu\n", buffer, this->io_triggered);
	fflush(stdout);

	if (! this->io_triggered) {
		pthread_mutex_unlock(&this->io_triggered_lock);
		if (NULL == timer) {
			timeout = TR_eventDispatcherGetDataWaitTime(dispatcher);
		} else {
			timeout = TR_timerGet(timer, NULL);
		}

		printf("[DEBUG] [%s] select timeout: %lu\n", buffer, timeout);
		fflush(stdout);

		TR_CALL(_this, TR_CommManager, select, event, timeout);
	} else {
		pthread_mutex_unlock(&this->io_triggered_lock);
	}

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerPollWrite(void * _this, TR_Event event)
{
	TR_CommManager this = _this;

	if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) {
		pthread_mutex_lock(&this->io_triggered_lock);
		TR_hashAdd(this->write, event->subject);
		TR_CALL(_this, TR_CommManager, pollWrite, event);
		pthread_mutex_unlock(&this->io_triggered_lock);
		printf("[!DEBUG!] socket added to write hash\n");
		fflush(stdout);
	}

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerPollRead(void * _this, TR_Event event)
{
	TR_CommManager this      = _this;
	TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;

	if (! TR_socketFinRd(endpoint->transport)) {
		TR_CALL(_this, TR_CommManager, pollRead, event);
	}

	if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
			&& ((TR_TcpSocket)endpoint->transport)->listen) {
		TR_hashDeleteByVal(this->accept, TR_hashableGetHash(event->subject));
	} else {
		TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
	}

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerDisableRead(void * _this, TR_Event event)
{
	TR_CommManager this = _this;
	TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
	TR_CALL(_this, TR_CommManager, disableRead, event);

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerDisableWrite(void * _this, TR_Event event)
{
	TR_CommManager this      = _this;
	TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;

	if (! endpoint->write_buffer->nmsg) {
		// TODO think about a better way...
		TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
		//if (! event->subject->fin) {
		//	TR_hashAdd(this->read, event->subject);
		//}
		TR_CALL(_this, TR_CommManager, disableWrite, event);
	}

	return TR_EVENT_DONE;
}

TR_EventDone
TR_commManagerClose(void * _this, TR_Event event)
{
	TR_CommManager  this     = _this;
	TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
	int             handle   = endpoint->transport->handle;

	if (! TR_socketFinRdWr(endpoint->transport)) {
		TR_socketShutdown(endpoint->transport);
	}

	if (handle == this->max_handle) {
		while (! this->endpoints[--this->max_handle] &&
				this->max_handle > 0);
	}

	if (this->endpoints[handle]) {
		TR_eventSubjectFinalize((TR_EventSubject)this->endpoints[handle]);
		TR_CALL(_this, TR_CommManager, disableWrite, event);
		TR_CALL(_this, TR_CommManager, disableRead, event);
		TR_hashDeleteByVal(this->write, TR_hashableGetHash(endpoint));
		TR_hashDeleteByVal(this->read, TR_hashableGetHash(endpoint));
	}

	TR_CALL(_this, TR_CommManager, close, event);

	this->endpoints[handle] = NULL;

	return TR_EVENT_DONE;
}

// vim: set ts=4 sw=4: