Commit fcbef2f039c18c51006d259445e9023cb04bbd5d

Authored by Georg Hopp
1 parent c2767f62

first try for a threaded event dispatcher, but this is not correctly working right now.

@@ -2,4 +2,5 @@ nobase_include_HEADERS = trevent.h \ @@ -2,4 +2,5 @@ nobase_include_HEADERS = trevent.h \
2 tr/event.h \ 2 tr/event.h \
3 tr/event_handler.h \ 3 tr/event_handler.h \
4 tr/event_subject.h \ 4 tr/event_subject.h \
5 - tr/event_dispatcher.h 5 + tr/event_dispatcher.h \
  6 + tr/event_thread.h
@@ -25,6 +25,7 @@ @@ -25,6 +25,7 @@
25 25
26 #include <time.h> 26 #include <time.h>
27 #include <stdint.h> 27 #include <stdint.h>
  28 +#include <pthread.h>
28 29
29 #include "trbase.h" 30 #include "trbase.h"
30 #include "trdata.h" 31 #include "trdata.h"
@@ -49,6 +50,10 @@ TR_CLASS(TR_EventDispatcher) { @@ -49,6 +50,10 @@ TR_CLASS(TR_EventDispatcher) {
49 TR_EXTENDS(TR_EventSubject); 50 TR_EXTENDS(TR_EventSubject);
50 51
51 TR_Queue events; 52 TR_Queue events;
  53 + pthread_mutex_t events_lock;
  54 + pthread_cond_t events_cond;
  55 + pthread_t events_wait;
  56 +
52 TR_Hash handler; 57 TR_Hash handler;
53 TR_EventHandler default_handler; 58 TR_EventHandler default_handler;
54 int running; 59 int running;
@@ -78,8 +83,12 @@ TR_eventDispatcherGetDataWaitTime(TR_EventDispatcher); @@ -78,8 +83,12 @@ TR_eventDispatcherGetDataWaitTime(TR_EventDispatcher);
78 void TR_eventDispatcherStart(TR_EventDispatcher); 83 void TR_eventDispatcherStart(TR_EventDispatcher);
79 void TR_eventDispatcherShutdown(TR_EventDispatcher); 84 void TR_eventDispatcherShutdown(TR_EventDispatcher);
80 85
81 -#define TR_eventDispatcherEnqueueEvent(disp,ev) \  
82 - (TR_queuePut((disp)->events, (ev))) 86 +#define TR_eventDispatcherEnqueueEvent(disp,ev) \
  87 + pthread_mutex_lock(&((disp)->events_lock)); \
  88 + TR_queuePut((disp)->events, (ev)); \
  89 + pthread_cond_broadcast(&((disp)->events_cond)); \
  90 + pthread_mutex_unlock(&((disp)->events_lock))
  91 +
83 #define TR_eventDispatcherStop(disp) \ 92 #define TR_eventDispatcherStop(disp) \
84 (((TR_EventDispatcher)disp)->running = 0) 93 (((TR_EventDispatcher)disp)->running = 0)
85 #define TR_eventDispatcherSetHeartbeat(disp, beat) \ 94 #define TR_eventDispatcherSetHeartbeat(disp, beat) \
  1 +/**
  2 + * \file
  3 + *
  4 + * \author Georg Hopp
  5 + *
  6 + * \copyright
  7 + * Copyright © 2014 Georg Hopp
  8 + *
  9 + * This program is free software: you can redistribute it and/or modify
  10 + * it under the terms of the GNU General Public License as published by
  11 + * the Free Software Foundation, either version 3 of the License, or
  12 + * (at your option) any later version.
  13 + *
  14 + * This program is distributed in the hope that it will be useful,
  15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17 + * GNU General Public License for more details.
  18 + *
  19 + * You should have received a copy of the GNU General Public License
  20 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21 + */
  22 +
  23 +#ifndef __TR_EVENT_THREAD_H__
  24 +#define __TR_EVENT_THREAD_H__
  25 +
  26 +#include <pthread.h>
  27 +
  28 +#include "trbase.h"
  29 +#include "event_dispatcher.h"
  30 +
  31 +TR_CLASS(TR_EventThread) {
  32 + TR_EventDispatcher dispatcher;
  33 + pthread_t handle;
  34 +};
  35 +TR_INSTANCE_INIT(TR_EventThread);
  36 +TR_CLASSVARS_DECL(TR_EventThread) {};
  37 +
  38 +void TR_eventThreadStart(TR_EventThread);
  39 +void TR_eventThreadJoin(TR_EventThread);
  40 +
  41 +#endif // __TR_EVENT_THREAD_H__
  42 +
  43 +// vim: set ts=4 sw=4:
  44 +
@@ -5,6 +5,7 @@ @@ -5,6 +5,7 @@
5 #include "tr/event_handler.h" 5 #include "tr/event_handler.h"
6 #include "tr/event_subject.h" 6 #include "tr/event_subject.h"
7 #include "tr/event_dispatcher.h" 7 #include "tr/event_dispatcher.h"
  8 +#include "tr/event_thread.h"
8 9
9 #endif // __TR_EVENT_H__ 10 #endif // __TR_EVENT_H__
10 11
@@ -17,10 +17,13 @@ TREVENT = event.c \ @@ -17,10 +17,13 @@ TREVENT = event.c \
17 event_handler_class_cleanup.c \ 17 event_handler_class_cleanup.c \
18 event_subject.c \ 18 event_subject.c \
19 event_subject_emit.c \ 19 event_subject_emit.c \
20 - event_subject_id.c 20 + event_subject_id.c \
  21 + event_thread.c \
  22 + event_thread_start.c \
  23 + event_thread_join.c
21 24
22 lib_LTLIBRARIES = libtrevent.la 25 lib_LTLIBRARIES = libtrevent.la
23 26
24 libtrevent_la_SOURCES = $(TREVENT) 27 libtrevent_la_SOURCES = $(TREVENT)
25 -libtrevent_la_CFLAGS = $(AM_CFLAGS) 28 +libtrevent_la_CFLAGS = $(AM_CFLAGS) -pthread
26 libtrevent_la_LIBADD = 29 libtrevent_la_LIBADD =
@@ -24,6 +24,7 @@ @@ -24,6 +24,7 @@
24 #include <signal.h> 24 #include <signal.h>
25 #include <stdio.h> 25 #include <stdio.h>
26 #include <time.h> 26 #include <time.h>
  27 +#include <pthread.h>
27 28
28 #include "trbase.h" 29 #include "trbase.h"
29 #include "trdata.h" 30 #include "trdata.h"
@@ -65,7 +66,10 @@ int @@ -65,7 +66,10 @@ int
65 eventDispatcherCtor(void * _this, va_list * params) { 66 eventDispatcherCtor(void * _this, va_list * params) {
66 TR_EventDispatcher this = _this; 67 TR_EventDispatcher this = _this;
67 68
68 - this->events = TR_new(TR_Queue); 69 + this->events = TR_new(TR_Queue);
  70 + pthread_mutex_init(&(this->events_lock), NULL);
  71 + pthread_cond_init(&(this->events_cond), NULL);
  72 +
69 this->handler = TR_new(TR_Hash); 73 this->handler = TR_new(TR_Hash);
70 this->heartbeat = TR_new(TR_Timer, TR_TBASE_MIL, 1000); 74 this->heartbeat = TR_new(TR_Timer, TR_TBASE_MIL, 1000);
71 this->mode = va_arg(*params, TR_EventDispatcherMode); 75 this->mode = va_arg(*params, TR_EventDispatcherMode);
@@ -98,6 +102,9 @@ eventDispatcherDtor(void * _this) { @@ -98,6 +102,9 @@ eventDispatcherDtor(void * _this) {
98 TR_delete(this->heartbeat); 102 TR_delete(this->heartbeat);
99 TR_delete(this->handler); 103 TR_delete(this->handler);
100 TR_delete(this->events); 104 TR_delete(this->events);
  105 +
  106 + pthread_mutex_destroy(&(this->events_lock));
  107 + pthread_cond_destroy(&(this->events_cond));
101 } 108 }
102 109
103 static 110 static
@@ -40,6 +40,8 @@ TR_eventDispatcherStart(TR_EventDispatcher this) @@ -40,6 +40,8 @@ TR_eventDispatcherStart(TR_EventDispatcher this)
40 TR_Queue handler_queue; 40 TR_Queue handler_queue;
41 TR_HashValue handler_queue_hv; 41 TR_HashValue handler_queue_hv;
42 42
  43 + pthread_mutex_lock(&(this->events_lock));
  44 +
43 TR_eventDispatcherGetBeatTime(this); 45 TR_eventDispatcherGetBeatTime(this);
44 46
45 if (this->n_beats) { 47 if (this->n_beats) {
@@ -48,15 +50,27 @@ TR_eventDispatcherStart(TR_EventDispatcher this) @@ -48,15 +50,27 @@ TR_eventDispatcherStart(TR_EventDispatcher this)
48 TR_DISPATCHER_EVENT_HEARTBEAT, 50 TR_DISPATCHER_EVENT_HEARTBEAT,
49 NULL); 51 NULL);
50 } else if (TR_queueEmpty(this->events)) { 52 } else if (TR_queueEmpty(this->events)) {
51 - int evtid = TR_EVD_CLIENT == this->mode  
52 - ? TR_DISPATCHER_EVENT_USER_WAIT  
53 - : TR_DISPATCHER_EVENT_DATA_WAIT; 53 + if (! this->events_wait) {
  54 + int evtid = TR_EVD_CLIENT == this->mode
  55 + ? TR_DISPATCHER_EVENT_USER_WAIT
  56 + : TR_DISPATCHER_EVENT_DATA_WAIT;
54 57
55 - event = TR_eventSubjectEmit((TR_EventSubject)this, evtid, NULL); 58 + this->events_wait = pthread_self();
  59 + event = TR_eventSubjectEmit((TR_EventSubject)this, evtid, NULL);
  60 + } else {
  61 + pthread_cond_wait(&(this->events_cond), &(this->events_lock));
  62 + event = NULL;
  63 + }
56 } else { 64 } else {
57 event = TR_queueGet(this->events); 65 event = TR_queueGet(this->events);
58 } 66 }
59 67
  68 + pthread_mutex_unlock(&(this->events_lock));
  69 +
  70 + if (! event) {
  71 + continue;
  72 + }
  73 +
60 handler_queue_hv = TR_hashGetByVal( 74 handler_queue_hv = TR_hashGetByVal(
61 this->handler, 75 this->handler,
62 TR_sdbm( 76 TR_sdbm(
@@ -89,6 +103,10 @@ TR_eventDispatcherStart(TR_EventDispatcher this) @@ -89,6 +103,10 @@ TR_eventDispatcherStart(TR_EventDispatcher this)
89 } else { 103 } else {
90 TR_delete(event); 104 TR_delete(event);
91 } 105 }
  106 +
  107 + if (pthread_equal(this->events_wait, pthread_self())) {
  108 + this->events_wait = FALSE;
  109 + }
92 } 110 }
93 } 111 }
94 112
@@ -32,26 +32,29 @@ @@ -32,26 +32,29 @@
32 TR_EventDone 32 TR_EventDone
33 TR_eventHandlerHandleEvent(TR_EventHandler this, TR_Event event) 33 TR_eventHandlerHandleEvent(TR_EventHandler this, TR_Event event)
34 { 34 {
  35 + TR_EventDone retval;
35 TR_EventMethod_fptr event_func = NULL; 36 TR_EventMethod_fptr event_func = NULL;
36 TR_HashValue handle_func_hv = TR_hashGetByVal( 37 TR_HashValue handle_func_hv = TR_hashGetByVal(
37 TR_CLASSVARS(TR_EventHandler, TR_GET_CLASS(this))->event_methods, 38 TR_CLASSVARS(TR_EventHandler, TR_GET_CLASS(this))->event_methods,
38 TR_sdbm((unsigned char *)&event->id, sizeof(event->id))); 39 TR_sdbm((unsigned char *)&event->id, sizeof(event->id)));
39 40
40 - TR_loggerLog(TR_logger, TR_LOGGER_DEBUG,  
41 - "%zd - HANDLE(%zd): %s event on %p with no. %d",  
42 - this->dispatcher[0]->events->nmsg,  
43 - event->subject->emitted,  
44 - TR_getEventString(event),  
45 - event->subject,  
46 - event->serial);  
47 -  
48 if (! handle_func_hv) { 41 if (! handle_func_hv) {
49 return 0; 42 return 0;
50 } 43 }
51 44
52 event_func = *(TR_EventMethod_fptr *)handle_func_hv->value; 45 event_func = *(TR_EventMethod_fptr *)handle_func_hv->value;
53 46
54 - return event_func(this, event); 47 + retval = event_func(this, event);
  48 +
  49 + TR_loggerLog(TR_logger, TR_LOGGER_DEBUG,
  50 + "[%ld] - HANDLE(%zd): %s event on %p with no. %d",
  51 + pthread_self(),
  52 + event->subject->emitted,
  53 + TR_getEventString(event),
  54 + event->subject,
  55 + event->serial);
  56 +
  57 + return retval;
55 } 58 }
56 59
57 // vim: set ts=4 sw=4: 60 // vim: set ts=4 sw=4:
@@ -21,6 +21,7 @@ @@ -21,6 +21,7 @@
21 */ 21 */
22 22
23 #include <stdio.h> 23 #include <stdio.h>
  24 +#include <pthread.h>
24 25
25 #include "trbase.h" 26 #include "trbase.h"
26 27
@@ -34,15 +35,15 @@ TR_eventHandlerIssueEvent(TR_EventHandler this, TR_Event event) @@ -34,15 +35,15 @@ TR_eventHandlerIssueEvent(TR_EventHandler this, TR_Event event)
34 int i; 35 int i;
35 36
36 for (i=0; i<this->ndispatcher; i++) { 37 for (i=0; i<this->ndispatcher; i++) {
37 - TR_eventDispatcherEnqueueEvent(this->dispatcher[i], event);  
38 -  
39 TR_loggerLog(TR_logger, TR_LOGGER_DEBUG, 38 TR_loggerLog(TR_logger, TR_LOGGER_DEBUG,
40 - "%zd - ISSUE(%zd): %s event on %p with no. %d",  
41 - this->dispatcher[i]->events->nmsg, 39 + "[%ld] - ISSUE(%zd): %s event on %p with no. %d",
  40 + pthread_self(),
42 event->subject->emitted, 41 event->subject->emitted,
43 TR_getEventString(event), 42 TR_getEventString(event),
44 event->subject, 43 event->subject,
45 event->serial); 44 event->serial);
  45 +
  46 + TR_eventDispatcherEnqueueEvent(this->dispatcher[i], event);
46 } 47 }
47 48
48 return TRUE; 49 return TRUE;
  1 +/**
  2 + * \file
  3 + *
  4 + * \author Georg Hopp
  5 + *
  6 + * \copyright
  7 + * Copyright © 2014 Georg Hopp
  8 + *
  9 + * This program is free software: you can redistribute it and/or modify
  10 + * it under the terms of the GNU General Public License as published by
  11 + * the Free Software Foundation, either version 3 of the License, or
  12 + * (at your option) any later version.
  13 + *
  14 + * This program is distributed in the hope that it will be useful,
  15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17 + * GNU General Public License for more details.
  18 + *
  19 + * You should have received a copy of the GNU General Public License
  20 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21 + */
  22 +
  23 +#include <pthread.h>
  24 +
  25 +#include "trbase.h"
  26 +
  27 +#include "tr/event_dispatcher.h"
  28 +#include "tr/event_thread.h"
  29 +
  30 +static
  31 +int
  32 +eventThreadCtor(void * _this, va_list * params)
  33 +{
  34 + TR_EventThread this = _this;
  35 +
  36 + this->dispatcher = va_arg(*params, TR_EventDispatcher);
  37 +
  38 + return 0;
  39 +}
  40 +
  41 +static
  42 +void
  43 +eventThreadDtor(void * _this)
  44 +{
  45 + TR_EventThread this = _this;
  46 +
  47 + if (this->handle) {
  48 + pthread_cancel(this->handle);
  49 + pthread_join(this->handle, NULL);
  50 + }
  51 +}
  52 +
  53 +TR_INIT_IFACE(TR_Class, eventThreadCtor, eventThreadDtor, NULL);
  54 +TR_CREATE_CLASS(TR_EventThread, NULL, NULL, TR_IF(TR_Class));
  55 +
  56 +// vim: set ts=4 sw=4:
  1 +/**
  2 + * \file
  3 + *
  4 + * \author Georg Hopp
  5 + *
  6 + * \copyright
  7 + * Copyright © 2014 Georg Hopp
  8 + *
  9 + * This program is free software: you can redistribute it and/or modify
  10 + * it under the terms of the GNU General Public License as published by
  11 + * the Free Software Foundation, either version 3 of the License, or
  12 + * (at your option) any later version.
  13 + *
  14 + * This program is distributed in the hope that it will be useful,
  15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17 + * GNU General Public License for more details.
  18 + *
  19 + * You should have received a copy of the GNU General Public License
  20 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21 + */
  22 +
  23 +#include <pthread.h>
  24 +#include <errno.h>
  25 +
  26 +#include "trbase.h"
  27 +
  28 +#include "tr/event_thread.h"
  29 +
  30 +void
  31 +TR_eventThreadJoin(TR_EventThread this)
  32 +{
  33 + int error = pthread_join(this->handle, NULL);
  34 +
  35 + /*
  36 + * AFAIC there is no error condition from this function that
  37 + * should lead to a retry. Additionally pthread_join is
  38 + * continued after a signal, so all I do is log error and
  39 + * continue with the next.
  40 + */
  41 + switch (error) {
  42 + case EDEADLK:
  43 + TR_loggerLog(
  44 + TR_logger,
  45 + TR_LOGGER_WARNING,
  46 + "Thread deadlock detected");
  47 + break;
  48 +
  49 + case EINVAL:
  50 + TR_loggerLog(
  51 + TR_logger,
  52 + TR_LOGGER_WARNING,
  53 + "Tried to join a non joinable thread");
  54 + break;
  55 +
  56 + case ESRCH:
  57 + TR_loggerLog(
  58 + TR_logger,
  59 + TR_LOGGER_WARNING,
  60 + "Tried to join non existent thread");
  61 + break;
  62 + }
  63 +}
  64 +
  65 +// vim: set ts=4 sw=4:
  1 +/**
  2 + * \file
  3 + *
  4 + * \author Georg Hopp
  5 + *
  6 + * \copyright
  7 + * Copyright © 2014 Georg Hopp
  8 + *
  9 + * This program is free software: you can redistribute it and/or modify
  10 + * it under the terms of the GNU General Public License as published by
  11 + * the Free Software Foundation, either version 3 of the License, or
  12 + * (at your option) any later version.
  13 + *
  14 + * This program is distributed in the hope that it will be useful,
  15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17 + * GNU General Public License for more details.
  18 + *
  19 + * You should have received a copy of the GNU General Public License
  20 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21 + */
  22 +
  23 +#include <pthread.h>
  24 +
  25 +#include "trbase.h"
  26 +
  27 +#include "tr/event_dispatcher.h"
  28 +#include "tr/event_thread.h"
  29 +
  30 +static
  31 +void *
  32 +TR_eventStreadRun(void * message)
  33 +{
  34 + TR_EventThread this = message;
  35 + TR_eventDispatcherStart(this->dispatcher);
  36 + return NULL;
  37 +}
  38 +
  39 +void
  40 +TR_eventThreadStart(TR_EventThread this)
  41 +{
  42 + int error = pthread_create(
  43 + &this->handle,
  44 + NULL,
  45 + TR_eventStreadRun,
  46 + (void *)this);
  47 +
  48 + if (error) {
  49 + TR_loggerLog(
  50 + TR_logger,
  51 + TR_LOGGER_ERR,
  52 + "Thread creation failed with error code: %d",
  53 + error);
  54 + }
  55 +}
  56 +
  57 +// vim: set ts=4 sw=4:
Please register or login to post a comment