Commit db9b87b26b3d3b0f777938ebc730fbc5bbbc3102

Authored by Georg Hopp
1 parent 01463d7b

add an extra queue for pending events. That way I achive a better ballance betwe…

…en reading and processing and i don
't need the additional timeout for polls
@@ -48,11 +48,10 @@ TR_CLASS(TR_EventDispatcher) { @@ -48,11 +48,10 @@ TR_CLASS(TR_EventDispatcher) {
48 TR_EXTENDS(TR_EventSubject); 48 TR_EXTENDS(TR_EventSubject);
49 49
50 TR_Queue events; 50 TR_Queue events;
  51 + TR_Queue pending;
51 TR_Hash handler; 52 TR_Hash handler;
52 TR_EventHandler default_handler; 53 TR_EventHandler default_handler;
53 int running; 54 int running;
54 - int pollinterval; // milliseconds  
55 - int nextpoll; // milliseconds  
56 int heartbeat; // milliseconds 55 int heartbeat; // milliseconds
57 int nextbeat; // milliseconds 56 int nextbeat; // milliseconds
58 TR_EventDispatcherMode mode; 57 TR_EventDispatcherMode mode;
@@ -72,12 +71,13 @@ void TR_eventDispatcherRegisterHandler(TR_EventDispatcher, TR_EventHandler); @@ -72,12 +71,13 @@ void TR_eventDispatcherRegisterHandler(TR_EventDispatcher, TR_EventHandler);
72 void TR_eventDispatcherSetHeartbeat(TR_EventDispatcher, int); 71 void TR_eventDispatcherSetHeartbeat(TR_EventDispatcher, int);
73 int TR_eventDispatcherGetBeatTime(TR_EventDispatcher); 72 int TR_eventDispatcherGetBeatTime(TR_EventDispatcher);
74 int TR_eventDispatcherGetDataWaitTime(TR_EventDispatcher); 73 int TR_eventDispatcherGetDataWaitTime(TR_EventDispatcher);
75 -void TR_eventDispatcherUpdateNextPoll(TR_EventDispatcher);  
76 void TR_eventDispatcherStart(TR_EventDispatcher); 74 void TR_eventDispatcherStart(TR_EventDispatcher);
77 void TR_eventDispatcherShutdown(TR_EventDispatcher); 75 void TR_eventDispatcherShutdown(TR_EventDispatcher);
78 76
79 #define TR_eventDispatcherEnqueueEvent(disp,ev) \ 77 #define TR_eventDispatcherEnqueueEvent(disp,ev) \
80 (TR_queuePut((disp)->events, (ev))) 78 (TR_queuePut((disp)->events, (ev)))
  79 +#define TR_eventDispatcherEnqueuePending(disp,ev) \
  80 + (TR_queuePut((disp)->pending, (ev)))
81 #define TR_eventDispatcherStop(disp) \ 81 #define TR_eventDispatcherStop(disp) \
82 (((TR_EventDispatcher)disp)->running = 0) 82 (((TR_EventDispatcher)disp)->running = 0)
83 83
@@ -9,7 +9,6 @@ TREVENT = event.c \ @@ -9,7 +9,6 @@ TREVENT = event.c \
9 event_dispatcher_set_hearbeat.c \ 9 event_dispatcher_set_hearbeat.c \
10 event_dispatcher_get_beat_time.c \ 10 event_dispatcher_get_beat_time.c \
11 event_dispatcher_get_data_wait_time.c \ 11 event_dispatcher_get_data_wait_time.c \
12 - event_dispatcher_update_next_poll.c \  
13 event_dispatcher_start.c \ 12 event_dispatcher_start.c \
14 event_dispatcher_shutdown.c \ 13 event_dispatcher_shutdown.c \
15 event_handler.c \ 14 event_handler.c \
@@ -64,22 +64,15 @@ static @@ -64,22 +64,15 @@ static
64 int 64 int
65 eventDispatcherCtor(void * _this, va_list * params) { 65 eventDispatcherCtor(void * _this, va_list * params) {
66 TR_EventDispatcher this = _this; 66 TR_EventDispatcher this = _this;
67 - struct timespec tp;  
68 - int now; // milliseconds  
69 67
70 this->events = TR_new(TR_Queue); 68 this->events = TR_new(TR_Queue);
  69 + this->pending = TR_new(TR_Queue);
71 this->handler = TR_new(TR_Hash); 70 this->handler = TR_new(TR_Hash);
72 this->mode = va_arg(*params, TR_EventDispatcherMode); 71 this->mode = va_arg(*params, TR_EventDispatcherMode);
73 this->default_handler = va_arg(*params, TR_EventHandler); 72 this->default_handler = va_arg(*params, TR_EventHandler);
74 this->running = 0; 73 this->running = 0;
75 this->heartbeat = 0; 74 this->heartbeat = 0;
76 this->nextbeat = 0; 75 this->nextbeat = 0;
77 - this->pollinterval = va_arg(*params, int);  
78 -  
79 - clock_gettime(CLOCK_REALTIME, &tp);  
80 - now = tp.tv_sec * 1000 + tp.tv_nsec / 1000000;  
81 -  
82 - this->nextpoll = now + this->pollinterval;  
83 76
84 if (! _TR_controlDispatcher) { 77 if (! _TR_controlDispatcher) {
85 _TR_controlDispatcher = this; 78 _TR_controlDispatcher = this;
@@ -105,6 +98,7 @@ eventDispatcherDtor(void * _this) { @@ -105,6 +98,7 @@ eventDispatcherDtor(void * _this) {
105 98
106 TR_hashCleanup(this->handler); 99 TR_hashCleanup(this->handler);
107 TR_delete(this->handler); 100 TR_delete(this->handler);
  101 + TR_delete(this->pending);
108 TR_delete(this->events); 102 TR_delete(this->events);
109 } 103 }
110 104
@@ -53,11 +53,18 @@ TR_eventDispatcherStart(TR_EventDispatcher this) @@ -53,11 +53,18 @@ TR_eventDispatcherStart(TR_EventDispatcher this)
53 (TR_EventSubject)this, 53 (TR_EventSubject)this,
54 TR_DISPATCHER_EVENT_HEARTBEAT, 54 TR_DISPATCHER_EVENT_HEARTBEAT,
55 NULL); 55 NULL);
56 - } else if (TR_queueEmpty(this->events) || this->nextpoll <= now) { 56 + } else if (TR_queueEmpty(this->events)) {
57 int evtid = TR_EVD_CLIENT == this->mode 57 int evtid = TR_EVD_CLIENT == this->mode
58 ? TR_DISPATCHER_EVENT_USER_WAIT 58 ? TR_DISPATCHER_EVENT_USER_WAIT
59 : TR_DISPATCHER_EVENT_DATA_WAIT; 59 : TR_DISPATCHER_EVENT_DATA_WAIT;
60 - int * toutptr = TR_queueEmpty(this->events) ? NULL : &ZERO; 60 + int * toutptr = NULL;
  61 +
  62 + if (! TR_queueEmpty(this->pending)) {
  63 + toutptr = &ZERO;
  64 + TR_delete(this->events);
  65 + this->events = this->pending;
  66 + this->pending = TR_new(TR_Queue);
  67 + }
61 68
62 event = TR_eventSubjectEmit((TR_EventSubject)this, evtid, toutptr); 69 event = TR_eventSubjectEmit((TR_EventSubject)this, evtid, toutptr);
63 } else { 70 } else {
@@ -91,7 +98,7 @@ TR_eventDispatcherStart(TR_EventDispatcher this) @@ -91,7 +98,7 @@ TR_eventDispatcherStart(TR_EventDispatcher this)
91 if (TR_EVENT_DONE == done) { 98 if (TR_EVENT_DONE == done) {
92 TR_delete(event); 99 TR_delete(event);
93 } else { 100 } else {
94 - TR_eventDispatcherEnqueueEvent(this, event); 101 + TR_eventDispatcherEnqueuePending(this, event);
95 } 102 }
96 } 103 }
97 } 104 }
1 -/**  
2 - * \file  
3 - *  
4 - * \author Georg Hopp  
5 - *  
6 - * \copyright  
7 - * Copyright © 2014 Georg Hopp  
8 - *  
9 - * This program is free software: you can redistribute it and/or modify  
10 - * it under the terms of the GNU General Public License as published by  
11 - * the Free Software Foundation, either version 3 of the License, or  
12 - * (at your option) any later version.  
13 - *  
14 - * This program is distributed in the hope that it will be useful,  
15 - * but WITHOUT ANY WARRANTY; without even the implied warranty of  
16 - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the  
17 - * GNU General Public License for more details.  
18 - *  
19 - * You should have received a copy of the GNU General Public License  
20 - * along with this program. If not, see <http://www.gnu.org/licenses/>.  
21 - */  
22 -  
23 -#include <time.h>  
24 -  
25 -#include "trbase.h"  
26 -  
27 -#include "tr/event_dispatcher.h"  
28 -  
29 -void  
30 -TR_eventDispatcherUpdateNextPoll(TR_EventDispatcher this)  
31 -{  
32 - struct timespec tp;  
33 - int now; // milliseconds  
34 -  
35 - clock_gettime(CLOCK_REALTIME, &tp);  
36 - now = tp.tv_sec * 1000 + tp.tv_nsec / 1000000;  
37 -  
38 - while(this->nextpoll <= now) this->nextpoll += this->pollinterval;  
39 -}  
40 -  
41 -// vim: set ts=4 sw=4:  
Please register or login to post a comment