Commit f71cac22a3dc7ce41c180776124ec52bac46caa4

Authored by Georg Hopp
1 parent c0b33ec7

fixes and additions for threaded code

... ... @@ -64,7 +64,8 @@ TR_CLASSVARS_DECL(TR_CommEndPoint) {
64 64 #define TR_CEP_EVENT_SHUT_READ 10 // CommManager
65 65 #define TR_CEP_EVENT_SHUT_WRITE 11 // CommManager
66 66 #define TR_CEP_EVENT_CLOSE 12 // CommManager
67   -#define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_CLOSE)
  67 +#define TR_CEP_EVENT_IO_DONE 13 // CommManager
  68 +#define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_IO_DONE)
68 69
69 70 #define TR_cepSetClose(ep) ((ep)->do_close = 1)
70 71 #define TR_cepHasProto(ep, proto) (TR_INSTANCE_OF(proto, TR_cepGetProto(ep)))
... ...
... ... @@ -24,6 +24,7 @@
24 24 #define __TR_COMM_MANAGER_H__
25 25
26 26 #include <sys/types.h>
  27 +#include <pthread.h>
27 28
28 29 #include "trbase.h"
29 30 #include "trdata.h"
... ... @@ -40,6 +41,8 @@ TR_CLASS(TR_CommManager) {
40 41 TR_Hash read;
41 42 size_t n_endpoints;
42 43 size_t max_handle;
  44 + unsigned long io_triggered;
  45 + pthread_mutex_t io_triggered_lock;
43 46 };
44 47 TR_INSTANCE_INIT(TR_CommManager);
45 48 TR_CLASSVARS_DECL(TR_CommManager) {
... ...
... ... @@ -30,13 +30,13 @@
30 30
31 31 #include "tr/comm_end_point.h"
32 32
33   -typedef TR_EventDone (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint);
34   -typedef TR_EventDone (* fptr_TR_commManagerSelect)(void *, TR_Event, unsigned long);
35   -typedef TR_EventDone (* fptr_TR_commManagerPollWrite)(void *, TR_Event);
36   -typedef TR_EventDone (* fptr_TR_commManagerPollRead)(void *, TR_Event);
37   -typedef TR_EventDone (* fptr_TR_commManagerDisableWrite)(void *, TR_Event);
38   -typedef TR_EventDone (* fptr_TR_commManagerDisableRead)(void *, TR_Event);
39   -typedef TR_EventDone (* fptr_TR_commManagerClose)(void *, TR_Event);
  33 +typedef void (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint);
  34 +typedef size_t (* fptr_TR_commManagerSelect)(void *, TR_Event, unsigned long);
  35 +typedef void (* fptr_TR_commManagerPollWrite)(void *, TR_Event);
  36 +typedef void (* fptr_TR_commManagerPollRead)(void *, TR_Event);
  37 +typedef void (* fptr_TR_commManagerDisableWrite)(void *, TR_Event);
  38 +typedef void (* fptr_TR_commManagerDisableRead)(void *, TR_Event);
  39 +typedef void (* fptr_TR_commManagerClose)(void *, TR_Event);
40 40
41 41 TR_INTERFACE(TR_CommManager) {
42 42 TR_IFID;
... ...
... ... @@ -29,7 +29,7 @@ int
29 29 TR_cepWriteBuffered(TR_CommEndPoint this, size_t * size)
30 30 {
31 31 TR_RemoteData data;
32   - int send;
  32 + size_t send;
33 33
34 34 *size = 0;
35 35
... ... @@ -55,6 +55,9 @@ TR_cepWriteBuffered(TR_CommEndPoint this, size_t * size)
55 55 {
56 56 TR_RemoteData new_data = NULL;
57 57
  58 + printf("[~DEBUG~] wrote %zd bytes\n", send);
  59 + fflush(stdout);
  60 +
58 61 if (send != ((TR_SizedData)data)->size) {
59 62 new_data = TR_new(
60 63 TR_RemoteData,
... ...
... ... @@ -41,7 +41,7 @@ commEndPointCtor(void * _this, va_list * params)
41 41 this->transport = va_arg(*params, TR_Socket);
42 42 this->protocol = va_arg(*params, TR_Protocol);
43 43 this->read_chunk_size = va_arg(*params, int);
44   - this->do_close = 0;
  44 + this->do_close = FALSE;
45 45 this->write_buffer = TR_new(TR_Queue);
46 46
47 47 return 0;
... ... @@ -101,6 +101,7 @@ commEndPointCvInit(TR_class_ptr cls)
101 101 TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_READ);
102 102 TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_WRITE);
103 103 TR_EVENT_CREATE(cls, TR_CEP_EVENT_CLOSE);
  104 + TR_EVENT_CREATE(cls, TR_CEP_EVENT_IO_DONE);
104 105 }
105 106
106 107 const char * TR_cepEventStrings[] = {
... ... @@ -117,6 +118,7 @@ const char * TR_cepEventStrings[] = {
117 118 "TR_CEP_EVENT_SHUT_READ",
118 119 "TR_CEP_EVENT_SHUT_WRITE",
119 120 "TR_CEP_EVENT_CLOSE",
  121 + "TR_CEP_EVENT_IO_DONE",
120 122 };
121 123
122 124 intptr_t comm_end_point_events[TR_CEP_EVENT_MAX + 1];
... ...
... ... @@ -30,6 +30,9 @@ TR_commEndPointRead(TR_CommEndPoint this, TR_RemoteData * data_ptr)
30 30 {
31 31 *data_ptr = TR_socketRecv(this->transport, this->read_chunk_size);
32 32
  33 + printf("[~DEBUG~] read %zd bytes\n", ((TR_SizedData)*data_ptr)->size);
  34 + fflush(stdout);
  35 +
33 36 if (! *data_ptr) return -1; // ment to trigger a close
34 37 if (*data_ptr == (void*)-1) return -2; // remote close... shutdown
35 38 if (*data_ptr == TR_emptyRemoteData) return FALSE; // read blocked
... ...
... ... @@ -22,6 +22,7 @@
22 22
23 23 #include <unistd.h>
24 24 #include <poll.h>
  25 +#include <pthread.h>
25 26
26 27 #include "trbase.h"
27 28 #include "trdata.h"
... ... @@ -51,6 +52,8 @@ commManagerCtor(void * _this, va_list * params)
51 52 this->n_endpoints = sysconf(_SC_OPEN_MAX);
52 53 this->endpoints = TR_calloc(sizeof(TR_CommEndPoint), this->n_endpoints);
53 54
  55 + pthread_mutex_init(&this->io_triggered_lock, NULL);
  56 +
54 57 return 0;
55 58 }
56 59
... ... @@ -61,6 +64,8 @@ commManagerDtor(void * _this)
61 64 TR_CommManager this = _this;
62 65 nfds_t i;
63 66
  67 + pthread_mutex_destroy(&this->io_triggered_lock);
  68 +
64 69 for (i=0; i<this->n_endpoints; i++) {
65 70 TR_delete(this->endpoints[i]);
66 71 }
... ... @@ -72,11 +77,22 @@ commManagerDtor(void * _this)
72 77
73 78 static
74 79 TR_EventDone
75   -TR_commManagerEnableWrite(void * _this, TR_Event event)
  80 +TR_commManagerWriteIsBlocked(void * _this, TR_Event event)
76 81 {
77 82 TR_CommManager this = _this;
78 83
79   - TR_hashAdd(this->write, event->subject);
  84 + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
  85 +
  86 + return TR_EVENT_DONE;
  87 +}
  88 +
  89 +static
  90 +TR_EventDone
  91 +TR_commManagerDecrementIoTriggerd(TR_CommManager this, TR_Event event)
  92 +{
  93 + pthread_mutex_lock(&this->io_triggered_lock);
  94 + this->io_triggered--;
  95 + pthread_mutex_unlock(&this->io_triggered_lock);
80 96
81 97 return TR_EVENT_DONE;
82 98 }
... ... @@ -103,8 +119,6 @@ static
103 119 void
104 120 commManagerCvInit(TR_class_ptr cls)
105 121 {
106   - TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree);
107   -
108 122 TR_EVENT_HANDLER_SET_METHOD(
109 123 cls, TR_EventDispatcher,
110 124 TR_DISPATCHER_EVENT_DATA_WAIT,
... ... @@ -120,7 +134,7 @@ commManagerCvInit(TR_class_ptr cls)
120 134 TR_EVENT_HANDLER_SET_METHOD(
121 135 cls, TR_CommEndPoint,
122 136 TR_CEP_EVENT_WRITE_BLOCK,
123   - TR_commManagerPollWrite);
  137 + TR_commManagerWriteIsBlocked);
124 138 TR_EVENT_HANDLER_SET_METHOD(
125 139 cls, TR_CommEndPoint,
126 140 TR_CEP_EVENT_READ_BLOCK,
... ... @@ -144,14 +158,18 @@ commManagerCvInit(TR_class_ptr cls)
144 158 TR_EVENT_HANDLER_SET_METHOD(
145 159 cls, TR_CommEndPoint,
146 160 TR_CEP_EVENT_DATA_READY,
147   - TR_commManagerEnableWrite);
  161 + TR_commManagerPollWrite);
148 162 TR_EVENT_HANDLER_SET_METHOD(
149 163 cls, TR_CommEndPoint,
150 164 TR_CEP_EVENT_DATA_END,
151 165 TR_commManagerDisableWrite);
  166 + TR_EVENT_HANDLER_SET_METHOD(
  167 + cls, TR_CommEndPoint,
  168 + TR_CEP_EVENT_IO_DONE,
  169 + TR_commManagerDecrementIoTriggerd);
152 170 }
153 171
154   -TR_INSTANCE(TR_Hash, commManagerEventMethods);
  172 +TR_INIT_HANDLER(TR_CommManager);
155 173 TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL);
156 174 TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
157 175 TR_CREATE_CLASS(
... ... @@ -160,7 +178,7 @@ TR_CREATE_CLASS(
160 178 commManagerCvInit,
161 179 TR_IF(TR_Class),
162 180 TR_IF(TR_CommManager)) = {
163   - { &(_commManagerEventMethods.data) }
  181 + { TR_HANDLER_CVARS(TR_CommManager) }
164 182 };
165 183
166 184 // vim: set ts=4 sw=4:
... ...
... ... @@ -73,8 +73,8 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
73 73 int handle = endpoint->transport->handle;
74 74 struct epoll_event event;
75 75
76   - //this->events[handle] = EPOLLET;
77   - this->events[handle] = 0;
  76 + //this->events[handle] = EPOLLIN | EPOLLET;
  77 + this->events[handle] = EPOLLIN;
78 78 event.data.ptr = endpoint;
79 79 event.events = this->events[handle];
80 80
... ... @@ -82,19 +82,19 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
82 82 }
83 83
84 84 static
85   -void
  85 +size_t
86 86 TR_commManagerEpollSelect(void * _this, TR_Event event, unsigned long timeout)
87 87 {
88 88 TR_CommManagerEpoll this = _this;
89 89 TR_CommManager cmgr = _this;
90 90 int i, nevents;
91   - struct epoll_event _event;
  91 + //struct epoll_event _event;
92 92
93 93 nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout);
94 94
95 95 for (i=0; i<nevents; i++) {
96 96 TR_CommEndPoint endpoint = (TR_CommEndPoint)events[i].data.ptr;
97   - int handle = endpoint->transport->handle;
  97 + //int handle = endpoint->transport->handle;
98 98
99 99 if ((events[i].events & EPOLLIN) == EPOLLIN) {
100 100 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
... ... @@ -106,20 +106,20 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, unsigned long timeout)
106 106 }
107 107 }
108 108
109   - this->events[handle] &= ~EPOLLIN;
110   - _event.data.ptr = endpoint;
111   - _event.events = this->events[handle];
112   - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
  109 + //this->events[handle] &= ~EPOLLIN;
  110 + //_event.data.ptr = endpoint;
  111 + //_event.events = this->events[handle];
  112 + //epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
113 113 }
114 114
115 115 if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
116 116 if (! event->subject->fin) {
117 117 TR_hashAdd(cmgr->write, endpoint);
118 118 }
119   - this->events[handle] &= ~EPOLLOUT;
120   - _event.data.ptr = endpoint;
121   - _event.events = this->events[handle];
122   - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
  119 + //this->events[handle] &= ~EPOLLOUT;
  120 + //_event.data.ptr = endpoint;
  121 + //_event.events = this->events[handle];
  122 + //epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
123 123 }
124 124
125 125 if ((events[i].events & EPOLLHUP) == EPOLLHUP) {
... ... @@ -131,6 +131,15 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, unsigned long timeout)
131 131 NULL));
132 132 }
133 133 }
  134 +
  135 + if (nevents >= 0) {
  136 + return nevents;
  137 + } else {
  138 + perror("epoll");
  139 + fflush(stderr);
  140 + fflush(stdout);
  141 + return 0;
  142 + }
134 143 }
135 144
136 145 static
... ... @@ -180,9 +189,9 @@ static
180 189 void
181 190 TR_commManagerEpollEnableRead(void * _this, TR_Event event)
182 191 {
183   - if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
184   - TR_commManagerEpollEnable(_this, EPOLLIN, event);
185   - }
  192 +// if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
  193 +// TR_commManagerEpollEnable(_this, EPOLLIN, event);
  194 +// }
186 195 }
187 196
188 197 static
... ... @@ -196,7 +205,7 @@ static
196 205 void
197 206 TR_commManagerEpollDisableRead(void * _this, TR_Event event)
198 207 {
199   - TR_commManagerEpollDisable(_this, EPOLLIN, event);
  208 +// TR_commManagerEpollDisable(_this, EPOLLIN, event);
200 209 }
201 210
202 211 static
... ...
... ... @@ -71,45 +71,57 @@ TR_commManagerPollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
71 71 TR_CommManagerPoll this = _this;
72 72
73 73 this->fds[endpoint->transport->handle].fd = endpoint->transport->handle;
74   - this->fds[endpoint->transport->handle].events = 0;
  74 + this->fds[endpoint->transport->handle].events = POLLIN;
75 75 }
76 76
77 77 static
78   -void
  78 +size_t
79 79 TR_commManagerPollSelect(void * _this, TR_Event event, unsigned long timeout)
80 80 {
81 81 TR_CommManagerPoll this = _this;
82 82 TR_CommManager cmgr = _this;
83 83 nfds_t i;
84   - int nevents;
  84 + int nevents, doevents;
  85 +
  86 + for (i = 0; i < cmgr->max_handle+1; i++) {
  87 + printf("[=DEBUG=] handle %ld POLLIN? %s\n", i,
  88 + (this->fds[i].events & POLLIN) == POLLIN ? "YES" : "NO");
  89 + fflush(stdout);
  90 + }
85 91
86   - nevents = poll(this->fds, cmgr->max_handle+1, timeout);
  92 + nevents = doevents = poll(this->fds, cmgr->max_handle+1, timeout);
87 93
88   - if (nevents) {
  94 + if (doevents) {
89 95 for (i = 0; i < cmgr->max_handle+1; i++) {
90 96 if (this->fds[i].revents != 0) {
91 97 TR_CommEndPoint endpoint = cmgr->endpoints[i];
92 98
93   - nevents--;
  99 + doevents--;
94 100
95 101 if ((this->fds[i].revents & POLLIN) == POLLIN) {
96 102 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
97 103 && ((TR_TcpSocket)endpoint->transport)->listen) {
  104 + pthread_mutex_lock(&cmgr->io_triggered_lock);
98 105 TR_hashAdd(cmgr->accept, endpoint);
  106 + pthread_mutex_unlock(&cmgr->io_triggered_lock);
99 107 } else {
100 108 if (! event->subject->fin) {
  109 + pthread_mutex_lock(&cmgr->io_triggered_lock);
101 110 TR_hashAdd(cmgr->read, endpoint);
  111 + pthread_mutex_unlock(&cmgr->io_triggered_lock);
102 112 }
103 113 }
104   - this->fds[endpoint->transport->handle].events &= ~POLLIN;
  114 + //this->fds[endpoint->transport->handle].events &= ~POLLIN;
105 115 }
106 116
107 117 if ((this->fds[i].revents & POLLOUT) == POLLOUT) {
108 118 if (! event->subject->fin) {
  119 + pthread_mutex_lock(&cmgr->io_triggered_lock);
109 120 TR_hashAdd(cmgr->write, endpoint);
  121 + pthread_mutex_unlock(&cmgr->io_triggered_lock);
110 122 }
111   - this->fds[endpoint->transport->handle].events &=
112   - ~(POLLOUT|POLLHUP);
  123 + //this->fds[endpoint->transport->handle].events &=
  124 + // ~(POLLOUT|POLLHUP);
113 125 }
114 126
115 127 if ((this->fds[i].revents & POLLHUP) == POLLHUP) {
... ... @@ -122,10 +134,12 @@ TR_commManagerPollSelect(void * _this, TR_Event event, unsigned long timeout)
122 134 }
123 135
124 136 this->fds[i].revents = 0;
125   - if (nevents <= 0) break;
  137 + if (doevents <= 0) break;
126 138 }
127 139 }
128 140 }
  141 +
  142 + return nevents;
129 143 }
130 144
131 145 static
... ... @@ -144,12 +158,12 @@ static
144 158 void
145 159 TR_commManagerPollEnableRead(void * _this, TR_Event event)
146 160 {
147   - TR_CommManagerPoll this = _this;
148   - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
149   -
150   - if (! TR_socketFinRd(endpoint->transport)) {
151   - this->fds[endpoint->transport->handle].events |= POLLIN;
152   - }
  161 +// TR_CommManagerPoll this = _this;
  162 +// TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  163 +//
  164 +// if (! TR_socketFinRd(endpoint->transport)) {
  165 +// this->fds[endpoint->transport->handle].events |= POLLIN;
  166 +// }
153 167 }
154 168
155 169 static
... ... @@ -166,10 +180,10 @@ static
166 180 void
167 181 TR_commManagerPollDisableRead(void * _this, TR_Event event)
168 182 {
169   - TR_CommManagerPoll this = _this;
170   - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
171   -
172   - this->fds[endpoint->transport->handle].events &= ~POLLIN;
  183 +// TR_CommManagerPoll this = _this;
  184 +// TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  185 +//
  186 +// this->fds[endpoint->transport->handle].events &= ~POLLIN;
173 187 }
174 188
175 189 static
... ...
... ... @@ -37,10 +37,10 @@ static
37 37 int
38 38 connectionCtor(void * _this, va_list * params)
39 39 {
40   - TR_Connection this = _this;
  40 + //TR_Connection this = _this;
41 41
42 42 TR_PARENTCALL(TR_Connection, _this, TR_Class, ctor, params);
43   - this->current_message = NULL;
  43 + //this->current_message = NULL;
44 44
45 45 return 0;
46 46 }
... ...
... ... @@ -76,6 +76,13 @@ connectorAccept(void * _this, TR_Event event)
76 76 socket = TR_socketAccept((TR_TcpSocket)connection->transport);
77 77 }
78 78
  79 + TR_eventHandlerIssueEvent(
  80 + (TR_EventHandler)this,
  81 + TR_eventSubjectEmit(
  82 + (TR_EventSubject)connection,
  83 + TR_CEP_EVENT_IO_DONE,
  84 + NULL));
  85 +
79 86 if (! socket) {
80 87 TR_eventHandlerIssueEvent(
81 88 (TR_EventHandler)this,
... ... @@ -92,8 +99,6 @@ static
92 99 void
93 100 connectorCvInit(TR_class_ptr cls)
94 101 {
95   - TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree);
96   -
97 102 TR_EVENT_HANDLER_SET_METHOD(
98 103 cls,
99 104 TR_ConnEntryPoint,
... ... @@ -101,6 +106,7 @@ connectorCvInit(TR_class_ptr cls)
101 106 connectorAccept);
102 107 }
103 108
  109 +TR_INIT_HANDLER(TR_Connector);
104 110 TR_INSTANCE(TR_Hash, connectorEventMethods);
105 111 TR_INIT_IFACE(TR_Class, connectorCtor, connectorDtor, NULL);
106 112 TR_CREATE_CLASS(
... ... @@ -108,7 +114,7 @@ TR_CREATE_CLASS(
108 114 TR_EventHandler,
109 115 connectorCvInit,
110 116 TR_IF(TR_Class)) = {
111   - { &(_connectorEventMethods.data) }
  117 + { TR_HANDLER_CVARS(TR_Connector) }
112 118 };
113 119
114 120 // vim: set ts=4 sw=4:
... ...
... ... @@ -20,8 +20,11 @@
20 20 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 21 */
22 22
  23 +#define _GNU_SOURCE
  24 +
23 25 #include <errno.h>
24 26 #include <poll.h>
  27 +#include <pthread.h>
25 28
26 29 #include "trbase.h"
27 30 #include "trevent.h"
... ... @@ -59,6 +62,8 @@ TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint)
59 62 TR_ISSUE_IO_READ_EVENT(this, endpoint);
60 63 }
61 64
  65 + this->io_triggered++;
  66 +
62 67 TR_CALL(_this, TR_CommManager, addEndpoint, endpoint);
63 68 }
64 69
... ... @@ -90,22 +95,35 @@ TR_commManagerSelect(void * _this, TR_Event event)
90 95 TR_Timer timer = (TR_Timer)event->data;
91 96 TR_EventDispatcher dispatcher = (TR_EventDispatcher)event->subject;
92 97 unsigned long timeout; // milliseconds
93   -
94   - if (! (TR_hashEmpty(this->read)
95   - && TR_hashEmpty(this->write)
96   - && TR_hashEmpty(this->accept))) {
97   - timeout = 0;
98   - } else if (NULL == timer) {
99   - timeout = TR_eventDispatcherGetDataWaitTime(dispatcher);
100   - } else {
101   - timeout = TR_timerGet(timer, NULL);
  98 + char buffer[17];
  99 +
  100 + pthread_getname_np(pthread_self(), buffer, 17);
  101 +
  102 + if (! this->io_triggered) {
  103 + printf("[DEBUG] [%s] io triggerd was empty\n", buffer);
  104 + fflush(stdout);
  105 + pthread_mutex_lock(&this->io_triggered_lock);
  106 + this->io_triggered = TR_hashEach(this->write, this, commManagerIssueWriteEvents);
  107 + this->io_triggered += TR_hashEach(this->accept, this, commManagerIssueAcceptEvents);
  108 + this->io_triggered += TR_hashEach(this->read, this, commManagerIssueReadEvents);
  109 + pthread_mutex_unlock(&this->io_triggered_lock);
102 110 }
103 111
104   - TR_CALL(_this, TR_CommManager, select, event, timeout);
  112 + printf("[DEBUG] [%s] io triggerd: %lu\n", buffer, this->io_triggered);
  113 + fflush(stdout);
  114 +
  115 + if (! this->io_triggered) {
  116 + if (NULL == timer) {
  117 + timeout = TR_eventDispatcherGetDataWaitTime(dispatcher);
  118 + } else {
  119 + timeout = TR_timerGet(timer, NULL);
  120 + }
105 121
106   - TR_hashEach(this->write, this, commManagerIssueWriteEvents);
107   - TR_hashEach(this->accept, this, commManagerIssueAcceptEvents);
108   - TR_hashEach(this->read, this, commManagerIssueReadEvents);
  122 + printf("[DEBUG] [%s] select timeout: %lu\n", buffer, timeout);
  123 + fflush(stdout);
  124 +
  125 + TR_CALL(_this, TR_CommManager, select, event, timeout);
  126 + }
109 127
110 128 return TR_EVENT_DONE;
111 129 }
... ... @@ -115,9 +133,13 @@ TR_commManagerPollWrite(void * _this, TR_Event event)
115 133 {
116 134 TR_CommManager this = _this;
117 135
118   - TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
119 136 if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) {
  137 + pthread_mutex_lock(&this->io_triggered_lock);
  138 + TR_hashAdd(this->write, event->subject);
120 139 TR_CALL(_this, TR_CommManager, pollWrite, event);
  140 + pthread_mutex_unlock(&this->io_triggered_lock);
  141 + printf("[!DEBUG!] socket added to write hash\n");
  142 + fflush(stdout);
121 143 }
122 144
123 145 return TR_EVENT_DONE;
... ... @@ -129,6 +151,10 @@ TR_commManagerPollRead(void * _this, TR_Event event)
129 151 TR_CommManager this = _this;
130 152 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
131 153
  154 + if (! TR_socketFinRd(endpoint->transport)) {
  155 + TR_CALL(_this, TR_CommManager, pollRead, event);
  156 + }
  157 +
132 158 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
133 159 && ((TR_TcpSocket)endpoint->transport)->listen) {
134 160 TR_hashDeleteByVal(this->accept, TR_hashableGetHash(event->subject));
... ... @@ -136,10 +162,6 @@ TR_commManagerPollRead(void * _this, TR_Event event)
136 162 TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
137 163 }
138 164
139   - if (! TR_socketFinRd(endpoint->transport)) {
140   - TR_CALL(_this, TR_CommManager, pollRead, event);
141   - }
142   -
143 165 return TR_EVENT_DONE;
144 166 }
145 167
... ... @@ -156,13 +178,17 @@ TR_commManagerDisableRead(void * _this, TR_Event event)
156 178 TR_EventDone
157 179 TR_commManagerDisableWrite(void * _this, TR_Event event)
158 180 {
159   - TR_CommManager this = _this;
  181 + TR_CommManager this = _this;
  182 + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
160 183
161   - TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
162   - if (! event->subject->fin) {
163   - TR_hashAdd(this->read, event->subject);
  184 + if (! endpoint->write_buffer->nmsg) {
  185 + // TODO think about a better way...
  186 + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
  187 + //if (! event->subject->fin) {
  188 + // TR_hashAdd(this->read, event->subject);
  189 + //}
  190 + TR_CALL(_this, TR_CommManager, disableWrite, event);
164 191 }
165   - TR_CALL(_this, TR_CommManager, disableWrite, event);
166 192
167 193 return TR_EVENT_DONE;
168 194 }
... ...
... ... @@ -50,6 +50,7 @@ ioHandlerRead(void * _this, TR_Event event)
50 50 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
51 51 TR_Event revent;
52 52 TR_RemoteData data;
  53 + char ip[16];
53 54
54 55 switch (TR_commEndPointRead(endpoint, &data)) {
55 56 case FALSE: // EAGAIN
... ... @@ -83,6 +84,13 @@ ioHandlerRead(void * _this, TR_Event event)
83 84 return TR_EVENT_DONE;
84 85 }
85 86
  87 + TR_socketAddrIpStr(data->remote, ip, 16);
  88 + printf(
  89 + "DEBUG: remote ip: %s / port: %d\n",
  90 + ip,
  91 + TR_socketAddrPort(data->remote));
  92 + fflush(stdout);
  93 +
86 94 revent = TR_eventSubjectEmit(
87 95 event->subject,
88 96 TR_CEP_EVENT_NEW_DATA,
... ... @@ -90,6 +98,13 @@ ioHandlerRead(void * _this, TR_Event event)
90 98 break;
91 99 }
92 100
  101 + TR_eventHandlerIssueEvent(
  102 + (TR_EventHandler)_this,
  103 + TR_eventSubjectEmit(
  104 + event->subject,
  105 + TR_CEP_EVENT_IO_DONE,
  106 + NULL));
  107 +
93 108 TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
94 109
95 110 return TR_EVENT_DONE;
... ... @@ -147,6 +162,13 @@ ioHandlerWrite(void * _this, TR_Event event)
147 162
148 163 endpoint->write_buffer_size -= written;
149 164
  165 + TR_eventHandlerIssueEvent(
  166 + (TR_EventHandler)_this,
  167 + TR_eventSubjectEmit(
  168 + event->subject,
  169 + TR_CEP_EVENT_IO_DONE,
  170 + NULL));
  171 +
150 172 if (revent) {
151 173 TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
152 174 }
... ... @@ -158,8 +180,6 @@ static
158 180 void
159 181 ioHandlerCvInit(TR_class_ptr cls)
160 182 {
161   - TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree);
162   -
163 183 TR_EVENT_HANDLER_SET_METHOD(
164 184 cls,
165 185 TR_CommEndPoint,
... ... @@ -172,14 +192,14 @@ ioHandlerCvInit(TR_class_ptr cls)
172 192 ioHandlerWrite);
173 193 }
174 194
175   -TR_INSTANCE(TR_Hash, ioHandlerEventMethods);
  195 +TR_INIT_HANDLER(TR_IoHandler);
176 196 TR_INIT_IFACE(TR_Class, ioHandlerCtor, ioHandlerDtor, NULL);
177 197 TR_CREATE_CLASS(
178 198 TR_IoHandler,
179 199 TR_EventHandler,
180 200 ioHandlerCvInit,
181 201 TR_IF(TR_Class)) = {
182   - { &(_ioHandlerEventMethods.data) }
  202 + { TR_HANDLER_CVARS(TR_IoHandler) }
183 203 };
184 204
185 205 // vim: set ts=4 sw=4:
... ...
... ... @@ -125,8 +125,6 @@ static
125 125 void
126 126 protocolHandlerCvInit(TR_class_ptr cls)
127 127 {
128   - TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree);
129   -
130 128 TR_EVENT_HANDLER_SET_METHOD(
131 129 cls,
132 130 TR_CommEndPoint,
... ... @@ -144,14 +142,14 @@ protocolHandlerCvInit(TR_class_ptr cls)
144 142 // protocolHandlerUpgrade);
145 143 }
146 144
147   -TR_INSTANCE(TR_Hash, protocolHandlerEventMethods);
  145 +TR_INIT_HANDLER(TR_ProtocolHandler);
148 146 TR_INIT_IFACE(TR_Class, protocolHandlerCtor, protocolHandlerDtor, NULL);
149 147 TR_CREATE_CLASS(
150 148 TR_ProtocolHandler,
151 149 TR_EventHandler,
152 150 protocolHandlerCvInit,
153 151 TR_IF(TR_Class)) = {
154   - { &(_protocolHandlerEventMethods.data) }
  152 + { TR_HANDLER_CVARS(TR_ProtocolHandler) }
155 153 };
156 154
157 155 // vim: set ts=4 sw=4:
... ...
... ... @@ -43,7 +43,7 @@ serverCtor(void * _this, va_list * params)
43 43 {
44 44 TR_Server this = _this;
45 45
46   -#if 1
  46 +#if 0
47 47 this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerEpoll);
48 48 #else
49 49 this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerPoll);
... ...
... ... @@ -135,8 +135,6 @@ static
135 135 void
136 136 simpleClientCvInit(TR_class_ptr cls)
137 137 {
138   - TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree);
139   -
140 138 TR_EVENT_HANDLER_SET_METHOD(
141 139 cls,
142 140 TR_EventDispatcher,
... ... @@ -149,14 +147,14 @@ simpleClientCvInit(TR_class_ptr cls)
149 147 simpleClientHandleData);
150 148 }
151 149
152   -TR_INSTANCE(TR_Hash, simpleClientEventMethods);
  150 +TR_INIT_HANDLER(TR_SimpleClient);
153 151 TR_INIT_IFACE(TR_Class, simpleClientCtor, simpleClientDtor, NULL);
154 152 TR_CREATE_CLASS(
155 153 TR_SimpleClient,
156 154 TR_EventHandler,
157 155 simpleClientCvInit,
158 156 TR_IF(TR_Class)) = {
159   - { &(_simpleClientEventMethods.data) }
  157 + { TR_HANDLER_CVARS(TR_SimpleClient) }
160 158 };
161 159
162 160 // vim: set ts=4 sw=4:
... ...
... ... @@ -36,6 +36,7 @@ threadedServerCtor(void * _this, va_list * params)
36 36 {
37 37 TR_ThreadedServer this = _this;
38 38 int i;
  39 + char buffer[16];
39 40
40 41 TR_PARENTCALL(TR_ThreadedServer, _this, TR_Class, ctor, params);
41 42
... ... @@ -43,9 +44,11 @@ threadedServerCtor(void * _this, va_list * params)
43 44 this->threads = TR_malloc(sizeof(TR_EventThread) * this->n_threads);
44 45
45 46 for (i=0; i<this->n_threads; i++) {
  47 + sprintf(buffer, "test%03d", i);
46 48 this->threads[i] = TR_new(
47 49 TR_EventThread,
48   - ((TR_Server)this)->dispatcher);
  50 + ((TR_Server)this)->dispatcher,
  51 + buffer);
49 52 }
50 53
51 54 return 0;
... ...
1 1 #!/bin/bash
2 2 #TRLIBS="-ltrbase -ltrhashing -ltrio -ltrdata -ltrevent -ltrcomm"
3 3 TRLIBS="/usr/local/lib/libtrcomm.a /usr/local/lib/libtrevent.a /usr/local/lib/libtrdata.a /usr/local/lib/libtrio.a /usr/local/lib/libtrhashing.a /usr/local/lib/libtrbase.a"
4   -LIBS="-lcrypto -lssl -lrt -luuid"
  4 +LIBS="-lcrypto -lssl -lrt -luuid -lpthread"
5 5 gcc ${CFLAGS} -c -o test_handler.o test_handler.c
6 6 gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testserver testserver.c test_handler.o ${TRLIBS}
7 7 gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testserver2 testserver2.c test_handler.o ${TRLIBS}
8 8 gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testtcp testclient.c ${TRLIBS}
9 9 gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -DUDP=1 -o testudp testclient.c ${TRLIBS}
10   -gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -lpthread -o testserver_thread testserver_thread.c test_handler.o ${TRLIBS}
  10 +gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testserver_thread testserver_thread.c test_handler.o ${TRLIBS}
... ...
... ... @@ -15,7 +15,7 @@ testHandlerHeartbeat(TR_EventHandler this, TR_Event event)
15 15 double size_msg = ((TestHandler)this)->size
16 16 ? size / ((TestHandler)this)->handled
17 17 : 0.0;
18   - int div_count = 0;
  18 + int div_count = ' ';
19 19
20 20 while (size > 1024. && div_count != 'G') {
21 21 size /= 1024.;
... ... @@ -31,6 +31,7 @@ testHandlerHeartbeat(TR_EventHandler this, TR_Event event)
31 31 ((TR_EventDispatcher)event->subject)->n_beats,
32 32 ((TestHandler)this)->handled,
33 33 size, div_count, size_msg);
  34 + fflush(stdout);
34 35 ((TestHandler)this)->handled = 0;
35 36 ((TestHandler)this)->size = 0;
36 37
... ...
... ... @@ -30,7 +30,7 @@ main (int argc, char * argv[])
30 30 TR_ProtoMessageRaw message;
31 31 int i, j=0;
32 32
33   - TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2);
  33 + //TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2);
34 34
35 35 protocol = TR_new(TR_ProtocolRaw);
36 36 #if UDP
... ...
... ... @@ -19,7 +19,7 @@ main (int argc, char * argv[])
19 19 TR_Protocol protocol = TR_new(TR_ProtocolRaw);
20 20 TestHandler test_handler = TR_new(TestHandler);
21 21
22   - TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2);
  22 + //TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2);
23 23
24 24 TR_serverAddHandler(server, (TR_EventHandler)test_handler);
25 25 TR_serverBindTcp(server, "0.0.0.0", 5678, protocol);
... ...
Please register or login to post a comment