Commit db4561556ebe1a45d21c0f1b4732b6a04819f81c

Authored by Georg Hopp
1 parent 7360a155

Add first classes for a multi worker server.

@@ -3,8 +3,11 @@ nobase_include_HEADERS = trcomm.h \ @@ -3,8 +3,11 @@ nobase_include_HEADERS = trcomm.h \
3 tr/comm_manager.h \ 3 tr/comm_manager.h \
4 tr/comm_manager_poll.h \ 4 tr/comm_manager_poll.h \
5 tr/comm_manager_epoll.h \ 5 tr/comm_manager_epoll.h \
  6 + tr/comm_manager_dispatcher.h \
  7 + tr/comm_worker.h \
6 tr/connect_entry_point.h \ 8 tr/connect_entry_point.h \
7 tr/connection.h \ 9 tr/connection.h \
  10 + tr/connection_getter.h \
8 tr/connector.h \ 11 tr/connector.h \
9 tr/datagram_service.h \ 12 tr/datagram_service.h \
10 tr/datagram_entry_point.h \ 13 tr/datagram_entry_point.h \
  1 +/**
  2 + * \file
  3 + *
  4 + * \author Georg Hopp
  5 + *
  6 + * \copyright
  7 + * Copyright © 2014 Georg Hopp
  8 + *
  9 + * This program is free software: you can redistribute it and/or modify
  10 + * it under the terms of the GNU General Public License as published by
  11 + * the Free Software Foundation, either version 3 of the License, or
  12 + * (at your option) any later version.
  13 + *
  14 + * This program is distributed in the hope that it will be useful,
  15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17 + * GNU General Public License for more details.
  18 + *
  19 + * You should have received a copy of the GNU General Public License
  20 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21 + */
  22 +
  23 +#ifndef __TR_COMM_MANAGER_DISPATCHER_H__
  24 +#define __TR_COMM_MANAGER_DISPATCHER_H__
  25 +
  26 +#include <sys/types.h>
  27 +#include <sys/epoll.h>
  28 +
  29 +#include "trbase.h"
  30 +#include "trdata.h"
  31 +#include "trevent.h"
  32 +
  33 +#include "tr/comm_worker.h"
  34 +
  35 +TR_CLASS(TR_CommManagerDispatcher) {
  36 + TR_EXTENDS(TR_CommManager);
  37 +
  38 + TR_Set workers;
  39 +};
  40 +
  41 +#define TR_commManDispAddWorker(this, worker) \
  42 + (TR_setAdd((this)->workers, (worker))
  43 +
  44 +TR_INSTANCE_INIT(TR_CommManagerDispatcher);
  45 +TR_CLASSVARS_DECL(TR_CommManagerDispatcher) {
  46 + TR_CV_EXTENDS(TR_EventHandler);
  47 +};
  48 +
  49 +#endif // __TR_COMM_MANAGER_DISPATCHER_H__
  50 +
  51 +// vim: set ts=4 sw=4:
  52 +
  1 +/**
  2 + * \file
  3 + * This exists only as a base type for all protocols.
  4 + *
  5 + * \author Georg Hopp
  6 + *
  7 + * \copyright
  8 + * Copyright © 2014 Georg Hopp
  9 + *
  10 + * This program is free software: you can redistribute it and/or modify
  11 + * it under the terms of the GNU General Public License as published by
  12 + * the Free Software Foundation, either version 3 of the License, or
  13 + * (at your option) any later version.
  14 + *
  15 + * This program is distributed in the hope that it will be useful,
  16 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18 + * GNU General Public License for more details.
  19 + *
  20 + * You should have received a copy of the GNU General Public License
  21 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
  22 + */
  23 +
  24 +#ifndef __TR_COMM_WORKER_H__
  25 +#define __TR_COMM_WORKER_H__
  26 +
  27 +#include <sys/types.h>
  28 +#include <semaphore.h>
  29 +
  30 +#include "trbase.h"
  31 +#include "trio.h"
  32 +
  33 +typedef struct s_TR_WorkerShm {
  34 + struct c_TR_Socket socket;
  35 + sem_t semaphore;
  36 +} s_TR_WorkerShm;
  37 +typedef s_TR_WorkerShm * sptr_TR_WorkerShm;
  38 +
  39 +TR_CLASS(TR_CommWorker) {
  40 + const char * const name;
  41 + pid_t pid;
  42 + TR_Socket socket;
  43 + sptr_TR_WorkerShm shm;
  44 +};
  45 +
  46 +TR_INSTANCE_INIT(TR_CommWorker);
  47 +TR_CLASSVARS_DECL(TR_CommWorker) {};
  48 +
  49 +#endif // __TR_COMM_WORKER_H__
  50 +
  51 +// vim: set ts=4 sw=4:
  52 +
  1 +/**
  2 + * \file
  3 + *
  4 + * \author Georg Hopp
  5 + *
  6 + * \copyright
  7 + * Copyright © 2014 Georg Hopp
  8 + *
  9 + * This program is free software: you can redistribute it and/or modify
  10 + * it under the terms of the GNU General Public License as published by
  11 + * the Free Software Foundation, either version 3 of the License, or
  12 + * (at your option) any later version.
  13 + *
  14 + * This program is distributed in the hope that it will be useful,
  15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17 + * GNU General Public License for more details.
  18 + *
  19 + * You should have received a copy of the GNU General Public License
  20 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21 + */
  22 +
  23 +#ifndef __TR_CONNECTION_GETTER_H__
  24 +#define __TR_CONNECTION_GETTER_H__
  25 +
  26 +#include <sys/types.h>
  27 +
  28 +#include "trbase.h"
  29 +#include "trevent.h"
  30 +
  31 +#include "tr/comm_worker.h"
  32 +
  33 +TR_CLASS(TR_ConnectionGetter) {
  34 + TR_EXTENDS(TR_EventHandler);
  35 +
  36 + TR_CommWorker worker;
  37 +};
  38 +TR_INSTANCE_INIT(TR_ConnectionGetter);
  39 +TR_CLASSVARS_DECL(TR_ConnectionGetter) {
  40 + TR_CV_EXTENDS(TR_EventHandler);
  41 +};
  42 +
  43 +#endif // __TR_CONNECTION_GETTER_H__
  44 +
  45 +// vim: set ts=4 sw=4:
  46 +
@@ -9,15 +9,18 @@ TRCOMM = cep_write_buffered.c \ @@ -9,15 +9,18 @@ TRCOMM = cep_write_buffered.c \
9 comm_end_point.c \ 9 comm_end_point.c \
10 conn_entry_point.c \ 10 conn_entry_point.c \
11 connection.c \ 11 connection.c \
  12 + connection_getter.c \
12 connector.c \ 13 connector.c \
13 datagram_service.c \ 14 datagram_service.c \
14 datagram_entry_point.c \ 15 datagram_entry_point.c \
15 comm_manager.c \ 16 comm_manager.c \
16 comm_manager_poll.c \ 17 comm_manager_poll.c \
17 comm_manager_epoll.c \ 18 comm_manager_epoll.c \
  19 + comm_manager_dispatcher.c \
18 comm_manager_shutdown.c \ 20 comm_manager_shutdown.c \
19 comm_manager_shutdown_read.c \ 21 comm_manager_shutdown_read.c \
20 comm_manager_shutdown_write.c \ 22 comm_manager_shutdown_write.c \
  23 + comm_worker.c \
21 io_handler.c \ 24 io_handler.c \
22 proto_message.c \ 25 proto_message.c \
23 protocol.c \ 26 protocol.c \
@@ -38,5 +41,5 @@ lib_LTLIBRARIES = libtrcomm.la @@ -38,5 +41,5 @@ lib_LTLIBRARIES = libtrcomm.la
38 41
39 libtrcomm_la_SOURCES = $(TRCOMM) 42 libtrcomm_la_SOURCES = $(TRCOMM)
40 libtrcomm_la_CFLAGS = $(AM_CFLAGS) 43 libtrcomm_la_CFLAGS = $(AM_CFLAGS)
41 -libtrcomm_la_LIBADD = 44 +libtrcomm_la_LIBADD = -lrt -lpthread
42 libtrcomm_la_LDFLAGS = -version-info 0:0:0 $(AM_LDFLAGS) 45 libtrcomm_la_LDFLAGS = -version-info 0:0:0 $(AM_LDFLAGS)
  1 +/**
  2 + * \file
  3 + *
  4 + * \author Georg Hopp
  5 + *
  6 + * \copyright
  7 + * Copyright © 2014 Georg Hopp
  8 + *
  9 + * This program is free software: you can redistribute it and/or modify
  10 + * it under the terms of the GNU General Public License as published by
  11 + * the Free Software Foundation, either version 3 of the License, or
  12 + * (at your option) any later version.
  13 + *
  14 + * This program is distributed in the hope that it will be useful,
  15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17 + * GNU General Public License for more details.
  18 + *
  19 + * You should have received a copy of the GNU General Public License
  20 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21 + */
  22 +#include <sys/types.h>
  23 +#include <sys/wait.h>
  24 +#include <semaphore.h>
  25 +#include <errno.h>
  26 +
  27 +#include "trbase.h"
  28 +#include "trdata.h"
  29 +#include "trevent.h"
  30 +
  31 +#include "tr/comm_manager.h"
  32 +#include "tr/comm_manager_dispatcher.h"
  33 +#include "tr/interface/comm_manager.h"
  34 +#include "tr/comm_end_point.h"
  35 +#include "tr/connection.h"
  36 +#include "tr/connect_entry_point.h"
  37 +#include "tr/_comm_manager.h" // ?? may we don't need this one
  38 +
  39 +
  40 +static
  41 +int
  42 +commManagerDispatcherCtor(void * _this, va_list * params)
  43 +{
  44 + TR_CommManagerDispatcher this = _this;
  45 + TR_PARENTCALL(TR_CommManagerDispatcher, _this, TR_Class, ctor, params);
  46 + this->workers = va_arg(*params, TR_Set);
  47 +
  48 + return 0;
  49 +}
  50 +
  51 +static
  52 +void
  53 +commManagerDispatcherDtor(void * _this)
  54 +{
  55 + TR_PARENTCALL(TR_CommManagerDispatcher, _this, TR_Class, dtor);
  56 +}
  57 +
  58 +static
  59 +TR_EventDone
  60 +TR_commManagerDispatcherSendEndpoint(void * _this, TR_CommEndPoint endpoint)
  61 +{
  62 + TR_CommManagerDispatcher this = _this;
  63 + TR_CommWorker worker = NULL;
  64 + int pid_stat;
  65 +
  66 + /* code to dispatch stuff.... */
  67 + while (! TR_setEmpty(this->workers) && ! worker) {
  68 + if (! TR_iterableValid(this->workers)) {
  69 + TR_iterableRewind(this->workers);
  70 + }
  71 + worker = (TR_CommWorker)TR_iterableCurrent(this->workers);
  72 + if (0 != waitpid(worker->pid, &pid_stat, WNOHANG)) {
  73 + TR_setDelete(this->workers, worker);
  74 + TR_delete(worker);
  75 + continue;
  76 + }
  77 + }
  78 +
  79 + if (! worker) {
  80 + TR_delete(endpoint); // no ones there to handle you, sorry...
  81 + return TR_EVENT_DONE;
  82 + }
  83 +
  84 + TR_iterableNext(this->workers);
  85 +
  86 + if (-1 == sem_trywait(&(worker->shm->semaphore))) {
  87 + switch(errno) {
  88 + case EAGAIN:
  89 + case EINTR:
  90 + break;
  91 +
  92 + default:
  93 + TR_setDelete(this->workers, worker);
  94 + TR_delete(worker);
  95 + break;
  96 + }
  97 + }
  98 +
  99 + memcpy(
  100 + &(worker->shm->socket),
  101 + endpoint->transport,
  102 + sizeof(struct c_TR_Socket));
  103 + TR_socketSendFd(worker->socket, TR_socketHandle(endpoint->transport));
  104 + TR_delete(endpoint);
  105 +
  106 + return TR_EVENT_DONE;
  107 +}
  108 +
  109 +
  110 +static
  111 +void
  112 +TR_commManagerDispatcherCvInit(TR_class_ptr cls)
  113 +{
  114 + TR_INHERIT_CLASSVARS(TR_CommManagerDispatcher, TR_CommManager);
  115 +}
  116 +
  117 +TR_INIT_IFACE(TR_Class, commManagerDispatcherCtor,
  118 + commManagerDispatcherDtor, NULL);
  119 +TR_INIT_IFACE(TR_CommManager, TR_commManagerDispatcherSendEndpoint,
  120 + NULL, NULL, NULL, NULL, NULL, NULL);
  121 +TR_CREATE_CLASS(
  122 + TR_CommManagerDispatcher,
  123 + TR_CommManager,
  124 + TR_commManagerDispatcherCvInit,
  125 + TR_IF(TR_Class),
  126 + TR_IF(TR_CommManager));
  127 +
  128 +// vim: set ts=4 sw=4:
  1 +/**
  2 + * \file
  3 + *
  4 + * \author Georg Hopp
  5 + *
  6 + * \copyright
  7 + * Copyright © 2014 Georg Hopp
  8 + *
  9 + * This program is free software: you can redistribute it and/or modify
  10 + * it under the terms of the GNU General Public License as published by
  11 + * the Free Software Foundation, either version 3 of the License, or
  12 + * (at your option) any later version.
  13 + *
  14 + * This program is distributed in the hope that it will be useful,
  15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17 + * GNU General Public License for more details.
  18 + *
  19 + * You should have received a copy of the GNU General Public License
  20 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21 + */
  22 +
  23 +#define _GNU_SOURCE
  24 +
  25 +#include <sys/types.h>
  26 +#include <sys/stat.h>
  27 +#include <sys/mman.h>
  28 +#include <sys/wait.h>
  29 +#include <stdarg.h>
  30 +#include <semaphore.h>
  31 +#include <unistd.h>
  32 +#include <fcntl.h>
  33 +
  34 +#include "trbase.h"
  35 +#include "trio.h"
  36 +
  37 +#include "tr/comm_worker.h"
  38 +
  39 +static
  40 +int
  41 +commWorkerCtor(void * _this, va_list * params)
  42 +{
  43 + TR_CommWorker this = _this;
  44 + TR_Socket socket[2];
  45 +
  46 + this->shm = mmap(NULL, sizeof(s_TR_WorkerShm),
  47 + PROT_READ|PROT_WRITE,
  48 + MAP_ANONYMOUS|MAP_SHARED,
  49 + 0, 0);
  50 +
  51 + sem_init(&(this->shm->semaphore), 1, 1);
  52 + TR_socketPair(socket, SOCK_DGRAM);
  53 +
  54 + /* TODO error handling... */
  55 + switch (this->pid = fork()) {
  56 + case -1:
  57 + TR_delete(socket[0]);
  58 + TR_delete(socket[1]);
  59 + break;
  60 +
  61 + case 0:
  62 + TR_delete(socket[0]);
  63 + this->socket = socket[1];
  64 + break;
  65 +
  66 + default:
  67 + TR_delete(socket[1]);
  68 + this->socket = socket[0];
  69 + break;
  70 + }
  71 +
  72 + return this->pid;
  73 +}
  74 +
  75 +static void commWorkerDtor(void * _this) {
  76 + TR_CommWorker this = _this;
  77 +
  78 + TR_delete(this->socket);
  79 + sem_close(&(this->shm->semaphore));
  80 + munmap(this->shm, sizeof(s_TR_WorkerShm));
  81 + if (this->pid != 0) {
  82 + int state;
  83 + waitpid(this->pid, &state, 0);
  84 + }
  85 +}
  86 +
  87 +TR_INIT_IFACE(TR_Class, commWorkerCtor, commWorkerDtor, NULL);
  88 +TR_CREATE_CLASS(TR_CommWorker, NULL, NULL, TR_IF(TR_Class));
  89 +
  90 +// vim: set ts=4 sw=4:
  1 +/**
  2 + * \file
  3 + *
  4 + * \author Georg Hopp
  5 + *
  6 + * \copyright
  7 + * Copyright © 2014 Georg Hopp
  8 + *
  9 + * This program is free software: you can redistribute it and/or modify
  10 + * it under the terms of the GNU General Public License as published by
  11 + * the Free Software Foundation, either version 3 of the License, or
  12 + * (at your option) any later version.
  13 + *
  14 + * This program is distributed in the hope that it will be useful,
  15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17 + * GNU General Public License for more details.
  18 + *
  19 + * You should have received a copy of the GNU General Public License
  20 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21 + */
  22 +
  23 +#include <stdarg.h>
  24 +#include <unistd.h>
  25 +
  26 +#include <sys/types.h>
  27 +#include <sys/stat.h>
  28 +#include <sys/mman.h>
  29 +#include <fcntl.h>
  30 +#include <semaphore.h>
  31 +
  32 +#include "trbase.h"
  33 +#include "trio.h"
  34 +#include "trevent.h"
  35 +
  36 +#include "tr/connection_getter.h"
  37 +#include "tr/connection.h"
  38 +#include "tr/protocol.h"
  39 +#include "tr/connect_entry_point.h"
  40 +
  41 +static
  42 +int
  43 +connectionGetterCtor(void * _this, va_list * params)
  44 +{
  45 + TR_ConnectionGetter this = _this;
  46 +
  47 + TR_PARENTCALL(TR_ConnectionGetter, _this, TR_Class, ctor, params);
  48 + this->worker = va_arg(*params, TR_CommWorker);
  49 +
  50 + return 0;
  51 +}
  52 +
  53 +static
  54 +void
  55 +connectionGetterDtor(void * _this)
  56 +{
  57 + TR_PARENTCALL(TR_ConnectionGetter, _this, TR_Class, dtor);
  58 +}
  59 +
  60 +static
  61 +TR_EventDone
  62 +connectionGetterAccept(void * _this, TR_Event event)
  63 +{
  64 + int count = 0;
  65 + TR_ConnectionGetter this = _this;
  66 + TR_CommEndPoint connection = (TR_CommEndPoint)event->subject;
  67 +
  68 + int handle = TR_socketGetFd((TR_Socket)connection->transport);
  69 +
  70 + while (handle != -1) {
  71 + TR_Socket socket = TR_new(TR_Socket);
  72 +
  73 + memcpy(
  74 + socket,
  75 + &(this->worker->shm->socket),
  76 + sizeof(struct c_TR_Socket));
  77 + sem_post(&(this->worker->shm->semaphore));
  78 + TR_socketHandle(socket) = handle;
  79 +
  80 + TR_socketNonblock(socket);
  81 + TR_Connection new_con = TR_new(
  82 + TR_Connection,
  83 + socket,
  84 + connection->protocol,
  85 + CEP_DEFAULT_READ_SIZE);
  86 + TR_eventHandlerIssueEvent(
  87 + (TR_EventHandler)this,
  88 + TR_eventSubjectEmit(
  89 + (TR_EventSubject)new_con,
  90 + TR_CON_EVENT_NEW_CON,
  91 + NULL));
  92 + if (++count > 100) break;
  93 + handle = TR_socketGetFd((TR_Socket)connection->transport);
  94 + }
  95 +
  96 + if (! handle) {
  97 + TR_eventHandlerIssueEvent(
  98 + (TR_EventHandler)this,
  99 + TR_eventSubjectEmit(
  100 + (TR_EventSubject)connection,
  101 + TR_CEP_EVENT_READ_BLOCK,
  102 + NULL));
  103 + }
  104 +
  105 + return TR_EVENT_DONE;
  106 +}
  107 +
  108 +static
  109 +void
  110 +connectorCvInit(TR_class_ptr cls)
  111 +{
  112 + TR_EVENT_HANDLER_SET_METHOD(
  113 + cls,
  114 + TR_ConnEntryPoint,
  115 + TR_CET_EVENT_ACC_READY,
  116 + connectionGetterAccept);
  117 +}
  118 +
  119 +TR_INIT_HANDLER(TR_ConnectionGetter);
  120 +TR_INIT_IFACE(TR_Class, connectionGetterCtor, connectionGetterDtor, NULL);
  121 +TR_CREATE_CLASS(
  122 + TR_ConnectionGetter,
  123 + TR_EventHandler,
  124 + connectorCvInit,
  125 + TR_IF(TR_Class)) = {
  126 + { TR_HANDLER_CVARS(TR_ConnectionGetter) }
  127 +};
  128 +
  129 +// vim: set ts=4 sw=4:
Please register or login to post a comment