comm_manager_dispatcher.c 3.09 KB
/**
 * \file
 *
 * \author	Georg Hopp
 *
 * \copyright
 * Copyright © 2014 Georg Hopp
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
#include <sys/types.h>
#include <sys/wait.h>
#include <semaphore.h>
#include <errno.h>

#include "trbase.h"
#include "trdata.h"
#include "trevent.h"

#include "tr/comm_manager.h"
#include "tr/comm_manager_dispatcher.h"
#include "tr/interface/comm_manager.h"
#include "tr/comm_end_point.h"
#include "tr/connection.h"
#include "tr/connect_entry_point.h"
#include "tr/_comm_manager.h" // ?? may we don't need this one


static
int
commManagerDispatcherCtor(void * _this, va_list * params)
{
	TR_CommManagerDispatcher this = _this;
	TR_PARENTCALL(TR_CommManagerDispatcher, _this, TR_Class, ctor, params);
	this->workers = va_arg(*params, TR_Set);

	return 0;
}

static
void
commManagerDispatcherDtor(void * _this)
{
	TR_PARENTCALL(TR_CommManagerDispatcher, _this, TR_Class, dtor);
}

static
TR_EventDone
TR_commManagerDispatcherSendEndpoint(void * _this, TR_CommEndPoint endpoint)
{
	TR_CommManagerDispatcher this   = _this;
	TR_CommWorker            worker = NULL;
	int                      pid_stat;

	/* code to dispatch stuff.... */
	while (! TR_setEmpty(this->workers) && ! worker) {
		if (! TR_iterableValid(this->workers)) {
			TR_iterableRewind(this->workers);
		}
		worker = (TR_CommWorker)TR_iterableCurrent(this->workers);
		if (0 != waitpid(worker->pid, &pid_stat, WNOHANG)) {
			TR_setDelete(this->workers, worker);
			TR_delete(worker);
			continue;
		}
	}

	if (! worker) {
		TR_delete(endpoint); // no ones there to handle you, sorry...
		return TR_EVENT_DONE;
	}

	TR_iterableNext(this->workers);

	if (-1 == sem_trywait(&(worker->shm->semaphore))) {
		switch(errno) {
			case EAGAIN:
			case EINTR:
				break;

			default:
				TR_setDelete(this->workers, worker);
				TR_delete(worker);
				break;
		}
	}

	memcpy(
			&(worker->shm->socket),
			endpoint->transport,
			sizeof(struct c_TR_Socket));
	TR_socketSendFd(worker->socket, TR_socketHandle(endpoint->transport));
	TR_delete(endpoint);

	return TR_EVENT_DONE;
}


static
void
TR_commManagerDispatcherCvInit(TR_class_ptr cls)
{
	TR_INHERIT_CLASSVARS(TR_CommManagerDispatcher, TR_CommManager);
}

TR_INIT_IFACE(TR_Class, commManagerDispatcherCtor,
		commManagerDispatcherDtor, NULL);
TR_INIT_IFACE(TR_CommManager, TR_commManagerDispatcherSendEndpoint,
		NULL, NULL, NULL, NULL, NULL, NULL);
TR_CREATE_CLASS(
		TR_CommManagerDispatcher,
		TR_CommManager,
		TR_commManagerDispatcherCvInit,
		TR_IF(TR_Class),
		TR_IF(TR_CommManager));

// vim: set ts=4 sw=4: