Commit 521003633534d3e7ce8dd9eaa9ea65f9205c67e0

Authored by Georg Hopp
1 parent 8cab362d

generally epoll is working... sadly I removed the is_writing flag in the CommEnd…

…Point which was neccessary... I have to add it again.
... ... @@ -24,7 +24,6 @@
24 24 #define __TR_COMM_MANAGER_H__
25 25
26 26 #include <sys/types.h>
27   -#include <poll.h>
28 27
29 28 #include "trbase.h"
30 29 #include "trevent.h"
... ... @@ -35,8 +34,8 @@ TR_CLASS(TR_CommManager) {
35 34 TR_EXTENDS(TR_EventHandler);
36 35
37 36 TR_CommEndPoint * endpoints;
38   - nfds_t n_endpoints;
39   - nfds_t max_handle;
  37 + size_t n_endpoints;
  38 + size_t max_handle;
40 39 };
41 40 TR_INSTANCE_INIT(TR_CommManager);
42 41 TR_CLASSVARS_DECL(TR_CommManager) {
... ...
... ... @@ -27,6 +27,7 @@
27 27 #include <sys/epoll.h>
28 28
29 29 #include "trbase.h"
  30 +#include "trdata.h"
30 31 #include "trevent.h"
31 32
32 33 TR_CLASS(TR_CommManagerEpoll) {
... ... @@ -34,6 +35,8 @@ TR_CLASS(TR_CommManagerEpoll) {
34 35
35 36 int handle;
36 37 struct epoll_event * events;
  38 + TR_Queue read_ready;
  39 + TR_Queue write_ready;
37 40 };
38 41 TR_INSTANCE_INIT(TR_CommManagerEpoll);
39 42 TR_CLASSVARS_DECL(TR_CommManagerEpoll) {
... ...
... ... @@ -32,7 +32,7 @@
32 32 TR_CLASS(TR_CommManagerPoll) {
33 33 TR_EXTENDS(TR_CommManager);
34 34
35   - struct pollfd * fds;
  35 + struct pollfd * fds;
36 36 };
37 37 TR_INSTANCE_INIT(TR_CommManagerPoll);
38 38 TR_CLASSVARS_DECL(TR_CommManagerPoll) {
... ... @@ -42,4 +42,3 @@ TR_CLASSVARS_DECL(TR_CommManagerPoll) {
42 42 #endif // __TR_COMM_MANAGER_POLL_H__
43 43
44 44 // vim: set ts=4 sw=4:
45   -
... ...
... ... @@ -47,8 +47,8 @@ TR_INTERFACE(TR_CommManager) {
47 47 fptr_TR_commManagerDisableWrite disableWrite;
48 48 fptr_TR_commManagerEnableRead enableRead;
49 49 fptr_TR_commManagerClose close;
50   - fptr_TR_commManagerShutdownRead shutdownWrite;
51   - fptr_TR_commManagerShutdownWrite shutdownRead;
  50 + fptr_TR_commManagerShutdownWrite shutdownWrite;
  51 + fptr_TR_commManagerShutdownRead shutdownRead;
52 52 };
53 53
54 54 void TR_commManagerAddEndpoint(void *, TR_CommEndPoint);
... ...
... ... @@ -28,12 +28,24 @@
28 28 int
29 29 TR_cepWriteBuffered(TR_CommEndPoint this)
30 30 {
31   - TR_RemoteData data = TR_cepNextWriteData(this);
32   - int send = TR_socketSend(this->transport, data);
  31 + TR_RemoteData data;
  32 + int send;
  33 +
  34 +// fprintf(stderr, "%s(%p): before get write data: %p / %zd messages\n",
  35 +// __func__, this, data, this->write_buffer->nmsg);
  36 + data = TR_cepNextWriteData(this);
  37 +// fprintf(stderr, "%s(%p): get write data: %p / %zd messages\n",
  38 +// __func__, this, data, this->write_buffer->nmsg);
  39 +// fflush(stderr);
  40 +
  41 + send = TR_socketSend(this->transport, data);
33 42
34 43 switch (send) {
35 44 case FALSE: // EAGAIN
36 45 TR_queuePutFirst(this->write_buffer, data);
  46 +// fprintf(stderr, "%s(%p): put first write data: %p / %zd messages\n",
  47 +// __func__, this, data, this->write_buffer->nmsg);
  48 +// fflush(stderr);
37 49 break;
38 50
39 51 case -1: // FAILURE
... ...
... ... @@ -77,6 +77,21 @@ commEndPointCvInit(TR_class_ptr cls)
77 77 TR_EVENT_CREATE(cls, TR_CEP_EVENT_CLOSE);
78 78 }
79 79
  80 +const char * TR_cepEventStrings[] = {
  81 + "TR_CEP_EVENT_READ_READY",
  82 + "TR_CEP_EVENT_READ_BLOCK",
  83 + "TR_CEP_EVENT_WRITE_READY",
  84 + "TR_CEP_EVENT_UPGRADE",
  85 + "TR_CEP_EVENT_NEW_DATA",
  86 + "TR_CEP_EVENT_PENDING_DATA",
  87 + "TR_CEP_EVENT_END_DATA",
  88 + "TR_CEP_EVENT_NEW_MSG",
  89 + "TR_CEP_EVENT_SEND_MSG",
  90 + "TR_CEP_EVENT_SHUT_READ",
  91 + "TR_CEP_EVENT_SHUT_WRITE",
  92 + "TR_CEP_EVENT_CLOSE"
  93 +};
  94 +
80 95 intptr_t comm_end_point_events[TR_CEP_EVENT_MAX + 1];
81 96 TR_INIT_IFACE(TR_Class, commEndPointCtor, commEndPointDtor, NULL);
82 97 TR_INIT_IFACE(TR_CommEndPoint, NULL, NULL);
... ... @@ -87,6 +102,7 @@ TR_CREATE_CLASS(
87 102 TR_IF(TR_Class),
88 103 TR_IF(TR_CommEndPoint)) = {
89 104 {
  105 + TR_cepEventStrings,
90 106 TR_CEP_EVENT_MAX + 1,
91 107 comm_end_point_events
92 108 }
... ...
... ... @@ -25,6 +25,7 @@
25 25 #include <sys/epoll.h>
26 26
27 27 #include "trbase.h"
  28 +#include "trdata.h"
28 29 #include "trevent.h"
29 30
30 31 #include "tr/comm_manager.h"
... ... @@ -44,16 +45,14 @@ commManagerEpollCtor(void * _this, va_list * params)
44 45 {
45 46 TR_CommManagerEpoll this = _this;
46 47 TR_CommManager cmgr = _this;
47   - nfds_t i;
48 48
49 49 TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, ctor, params);
50   - this->handle = epoll_create(cmgr->n_endpoints);
51   - this->events = TR_malloc(sizeof(struct epoll_event) * cmgr->n_endpoints);
52   - for (i = 0; i < cmgr->n_endpoints; i++) {
53   - this->events[i].data.ptr = NULL;
54   - this->events[i].events = EPOLLET | EPOLLONESHOT;
55   - }
  50 + this->handle = epoll_create(cmgr->n_endpoints);
  51 + this->read_ready = TR_new(TR_Queue);
  52 + this->write_ready = TR_new(TR_Queue);
56 53
  54 + this->read_ready->free_msgs = 0;
  55 + this->write_ready->free_msgs = 0;
57 56
58 57 return 0;
59 58 }
... ... @@ -64,8 +63,10 @@ commManagerEpollDtor(void * _this)
64 63 {
65 64 TR_CommManagerEpoll this = _this;
66 65
  66 + TR_delete(this->read_ready);
  67 + TR_delete(this->write_ready);
  68 +
67 69 close(this->handle);
68   - TR_MEM_FREE(this->events);
69 70 TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, dtor);
70 71 }
71 72
... ... @@ -75,11 +76,12 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
75 76 {
76 77 TR_CommManagerEpoll this = _this;
77 78 int handle = endpoint->transport->handle;
  79 + struct epoll_event event;
78 80
79   - this->events[handle].data.ptr = endpoint;
80   - this->events[handle].events |= EPOLLIN;
  81 + event.data.ptr = endpoint;
  82 + event.events = EPOLLIN | EPOLLOUT | EPOLLET;
81 83
82   - epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &(this->events[handle]));
  84 + epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &event);
83 85 }
84 86
85 87 static
... ... @@ -88,6 +90,11 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
88 90 {
89 91 TR_CommManagerEpoll this = _this;
90 92 int i, nevents;
  93 + TR_Queue node;
  94 +
  95 + if (0 != (this->read_ready->nmsg & this->write_ready->nmsg)) {
  96 + timeout = 0;
  97 + }
91 98
92 99 nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout);
93 100
... ... @@ -95,113 +102,132 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
95 102 TR_CommEndPoint endpoint = (TR_CommEndPoint)events[i].data.ptr;
96 103
97 104 if ((events[i].events & EPOLLIN) == EPOLLIN) {
98   - TR_Event event;
99   -
100 105 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
101 106 && ((TR_TcpSocket)endpoint->transport)->listen) {
102   - event = TR_eventSubjectEmit(
103   - (TR_EventSubject)endpoint,
104   - TR_CET_EVENT_ACC_READY,
105   - NULL);
  107 + TR_eventHandlerIssueEvent((TR_EventHandler)this,
  108 + TR_eventSubjectEmit(
  109 + (TR_EventSubject)endpoint,
  110 + TR_CET_EVENT_ACC_READY,
  111 + NULL));
106 112 } else {
107   - event = TR_eventSubjectEmit(
108   - (TR_EventSubject)endpoint,
109   - TR_CEP_EVENT_READ_READY,
110   - NULL);
  113 + if (! ((TR_EventSubject)endpoint)->fin) {
  114 + TR_queuePut(this->read_ready, endpoint);
  115 + }
111 116 }
112   -
113   - TR_eventHandlerIssueEvent((TR_EventHandler)this, event);
114   - this->events[i].events &= ~EPOLLIN;
115 117 }
116 118
117 119 if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
118   - TR_Event _event = TR_eventSubjectEmit(
119   - (TR_EventSubject)endpoint,
120   - TR_CEP_EVENT_WRITE_READY,
121   - NULL);
  120 + if (TR_cepHasPendingData(endpoint) &&
  121 + ! ((TR_EventSubject)endpoint)->fin) {
  122 + TR_queuePut(this->write_ready, endpoint);
  123 + }
  124 + }
  125 + }
  126 +
  127 + /* now issue reads and write events */
  128 + for (node=this->read_ready->first; node; node=node->next) {
  129 + TR_CommEndPoint endpoint = (TR_CommEndPoint)node->msg;
  130 +
  131 + if (! TR_socketFinRd(endpoint->transport)) {
  132 + TR_eventHandlerIssueEvent(
  133 + (TR_EventHandler)this,
  134 + TR_eventSubjectEmit(
  135 + (TR_EventSubject)endpoint,
  136 + TR_CEP_EVENT_READ_READY,
  137 + NULL));
  138 + }
  139 + }
122 140
123   - TR_eventHandlerIssueEvent((TR_EventHandler)this, _event);
124   - this->events[i].events &= ~EPOLLOUT;
  141 + for (node=this->write_ready->first; node; node=node->next) {
  142 + TR_CommEndPoint endpoint = (TR_CommEndPoint)node->msg;
  143 +
  144 + if (! TR_socketFinWr(endpoint->transport)) {
  145 + TR_eventHandlerIssueEvent(
  146 + (TR_EventHandler)this,
  147 + TR_eventSubjectEmit(
  148 + (TR_EventSubject)endpoint,
  149 + TR_CEP_EVENT_WRITE_READY,
  150 + NULL));
125 151 }
126 152 }
127 153 }
128 154
129 155 static
130 156 void
131   -TR_commManagerEpollEnableWrite(void * _this, TR_Event event)
  157 +TR_commManagerEpollRemoveWrite(void * _this, TR_Event event)
132 158 {
133 159 TR_CommManagerEpoll this = _this;
134 160 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
135 161
136   - if (! TR_socketFinWr(endpoint->transport)) {
137   - int handle = endpoint->transport->handle;
138   -
139   - this->events[handle].data.ptr = endpoint;
140   - this->events[handle].events |= EPOLLOUT;
141   -
142   - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
143   - }
  162 + TR_queueDelete(this->write_ready, endpoint);
144 163 }
145 164
146 165 static
147 166 void
148   -TR_commManagerEpollDisableWrite(void * _this, TR_Event event)
  167 +TR_commManagerEpollRemoveRead(void * _this, TR_Event event)
149 168 {
150 169 TR_CommManagerEpoll this = _this;
151 170 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
152   - int handle = endpoint->transport->handle;
153   -
154   - this->events[handle].data.ptr = endpoint;
155   - this->events[handle].events &= ~EPOLLOUT;
156 171
157   - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
  172 + TR_queueDelete(this->read_ready, endpoint);
158 173 }
159 174
160 175 static
161 176 void
162   -TR_commManagerEpollEnableRead(void * _this, TR_Event event)
  177 +TR_commManagerEpollClose(void * _this, TR_Event event)
163 178 {
164 179 TR_CommManagerEpoll this = _this;
165 180 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
166 181
167   - if (! TR_socketFinRd(endpoint->transport)) {
168   - int handle = endpoint->transport->handle;
169   -
170   - this->events[handle].data.ptr = endpoint;
171   - this->events[handle].events |= EPOLLIN;
  182 + TR_queueDelete(this->read_ready, endpoint);
  183 + TR_queueDelete(this->write_ready, endpoint);
172 184
173   - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
174   - }
  185 + epoll_ctl(this->handle, EPOLL_CTL_DEL, endpoint->transport->handle, NULL);
175 186 }
176 187
177 188 static
178 189 void
179   -TR_commManagerEpollDisableRead(void * _this, TR_Event event)
  190 +TR_commManagerEpollShutRead(void * _this, TR_Event event)
180 191 {
181 192 TR_CommManagerEpoll this = _this;
182 193 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
183   - int handle = endpoint->transport->handle;
  194 + struct epoll_event _event;
  195 +
  196 + TR_queueDelete(this->read_ready, endpoint);
184 197
185   - this->events[handle].data.ptr = endpoint;
186   - this->events[handle].events &= ~EPOLLIN;
  198 + _event.data.ptr = endpoint;
  199 + _event.events = EPOLLOUT | EPOLLET;
187 200
188   - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
  201 + epoll_ctl(
  202 + this->handle,
  203 + EPOLL_CTL_MOD,
  204 + endpoint->transport->handle,
  205 + &_event);
189 206 }
190 207
191 208 static
192 209 void
193   -TR_commManagerEpollClose(void * _this, TR_Event event)
  210 +TR_commManagerEpollShutWrite(void * _this, TR_Event event)
194 211 {
195 212 TR_CommManagerEpoll this = _this;
196 213 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
197   - int handle = endpoint->transport->handle;
  214 + struct epoll_event _event;
  215 +
  216 + TR_queueDelete(this->write_ready, endpoint);
198 217
199   - this->events[handle].data.ptr = NULL;
200   - this->events[handle].events = EPOLLET | EPOLLONESHOT;
  218 + _event.data.ptr = endpoint;
  219 + _event.events = EPOLLIN | EPOLLET;
201 220
202   - epoll_ctl(this->handle, EPOLL_CTL_DEL, handle, NULL);
  221 + epoll_ctl(
  222 + this->handle,
  223 + EPOLL_CTL_MOD,
  224 + endpoint->transport->handle,
  225 + &_event);
203 226 }
204 227
  228 +
  229 +static void TR_commManagerEpollNoop(void * _this, TR_Event event) {}
  230 +
205 231 static
206 232 void
207 233 TR_commManagerEpollCvInit(TR_class_ptr cls) {
... ... @@ -211,14 +237,14 @@ TR_commManagerEpollCvInit(TR_class_ptr cls) {
211 237 TR_INIT_IFACE(TR_Class, commManagerEpollCtor, commManagerEpollDtor, NULL);
212 238 TR_INIT_IFACE(
213 239 TR_CommManager,
214   - TR_commManagerEpollAddEndpoint,
215   - TR_commManagerEpollSelect,
216   - TR_commManagerEpollEnableWrite,
217   - TR_commManagerEpollDisableWrite,
218   - TR_commManagerEpollEnableRead,
219   - TR_commManagerEpollClose,
220   - TR_commManagerEpollDisableRead,
221   - TR_commManagerEpollDisableWrite);
  240 + TR_commManagerEpollAddEndpoint, // TR_CON_EVENT_NEW_CON
  241 + TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT
  242 + TR_commManagerEpollRemoveWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK
  243 + TR_commManagerEpollNoop, // TR_CEP_EVENT_END_DATA
  244 + TR_commManagerEpollRemoveRead, // TR_CEP_EVENT_READ_BLOCK
  245 + TR_commManagerEpollClose, // TR_CEP_EVENT_CLOSE
  246 + TR_commManagerEpollShutWrite, // TR_CEP_EVENT_SHUT_READ
  247 + TR_commManagerEpollShutRead); // TR_CEP_EVENT_SHUT_WRITE
222 248 TR_CREATE_CLASS(
223 249 TR_CommManagerEpoll,
224 250 TR_CommManager,
... ...
... ... @@ -42,6 +42,7 @@ commManagerPollCtor(void * _this, va_list * params)
42 42 nfds_t i;
43 43
44 44 TR_PARENTCALL(TR_CommManagerPoll, _this, TR_Class, ctor, params);
  45 +
45 46 this->fds = TR_malloc(sizeof(struct pollfd) * cmgr->n_endpoints);
46 47 for (i = 0; i < cmgr->n_endpoints; i++) {
47 48 this->fds[i].fd = -1;
... ... @@ -59,7 +60,6 @@ commManagerPollDtor(void * _this)
59 60 TR_CommManagerPoll this = _this;
60 61
61 62 TR_MEM_FREE(this->fds);
62   - TR_PARENTCALL(TR_CommManagerPoll, _this, TR_Class, dtor);
63 63 }
64 64
65 65 static
... ... @@ -67,6 +67,7 @@ void
67 67 TR_commManagerPollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
68 68 {
69 69 TR_CommManagerPoll this = _this;
  70 +
70 71 this->fds[endpoint->transport->handle].fd = endpoint->transport->handle;
71 72 this->fds[endpoint->transport->handle].events = POLLIN;
72 73 }
... ... @@ -104,21 +105,16 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout)
104 105 }
105 106
106 107 TR_eventHandlerIssueEvent((TR_EventHandler)this, event);
107   - // deactivate read poll mimic edge level behaviour
108   - this->fds[endpoint->transport->handle].events &= ~POLLIN;
109 108 }
110 109
111 110 if ((this->fds[i].revents & POLLOUT) == POLLOUT) {
  111 + nevents--;
112 112 TR_Event _event = TR_eventSubjectEmit(
113 113 (TR_EventSubject)endpoint,
114 114 TR_CEP_EVENT_WRITE_READY,
115 115 NULL);
116 116
117 117 TR_eventHandlerIssueEvent((TR_EventHandler)this, _event);
118   - nevents--;
119   -
120   - // deactivate write poll mimic edge level behaviour
121   - this->fds[endpoint->transport->handle].events &= ~POLLOUT;
122 118 }
123 119
124 120 if (nevents <= 0) break;
... ... @@ -196,8 +192,8 @@ TR_INIT_IFACE(
196 192 TR_commManagerPollDisableWrite,
197 193 TR_commManagerPollEnableRead,
198 194 TR_commManagerPollClose,
199   - TR_commManagerPollDisableRead,
200   - TR_commManagerPollDisableWrite);
  195 + TR_commManagerPollDisableWrite,
  196 + TR_commManagerPollDisableRead);
201 197 TR_CREATE_CLASS(
202 198 TR_CommManagerPoll,
203 199 TR_CommManager,
... ...
... ... @@ -34,7 +34,7 @@ TR_commManagerShutdown(void * _this, TR_Event event)
34 34 TR_CommManager this = _this;
35 35 nfds_t i;
36 36
37   - for (i=0; i<this->n_endpoints; i++) {
  37 + for (i=0; i<=this->max_handle; i++) {
38 38 if (this->endpoints[i]) {
39 39 TR_eventHandlerIssueEvent(
40 40 (TR_EventHandler)_this,
... ...
... ... @@ -60,6 +60,10 @@ connEntryPointCvInit(TR_class_ptr cls)
60 60 TR_EVENT_CREATE(cls, TR_CET_EVENT_ACC_READY);
61 61 }
62 62
  63 +const char * TR_cetEventStrings[] = {
  64 + "TR_CET_EVENT_ACC_READY"
  65 +};
  66 +
63 67 intptr_t connEntryPoint_events[TR_CET_EVENT_MAX + 1];
64 68 TR_INIT_IFACE(TR_Class, connEntryPointCtor, connEntryPointDtor, NULL);
65 69 TR_INIT_IFACE(TR_CommEndPoint, NULL, NULL);
... ... @@ -70,8 +74,9 @@ TR_CREATE_CLASS(
70 74 TR_IF(TR_Class),
71 75 TR_IF(TR_CommEndPoint)) = {
72 76 {{
73   - TR_CET_EVENT_MAX + 1,
74   - connEntryPoint_events
  77 + TR_cetEventStrings,
  78 + TR_CET_EVENT_MAX + 1,
  79 + connEntryPoint_events
75 80 }}
76 81 };
77 82
... ...
... ... @@ -123,6 +123,11 @@ connectionCompose(void * _this, TR_ProtoMessage message)
123 123 }
124 124
125 125 TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data);
  126 +// fprintf(stderr, "%s(%p): put write data: %p / %zd messages\n",
  127 +// __func__, (TR_CommEndPoint)_this, data,
  128 +// ((TR_CommEndPoint)_this)->write_buffer->nmsg);
  129 +// fflush(stderr);
  130 +
126 131 return TRUE;
127 132 }
128 133
... ... @@ -133,6 +138,10 @@ connectionCvInit(TR_class_ptr cls)
133 138 TR_EVENT_CREATE(cls, TR_CON_EVENT_NEW_CON);
134 139 }
135 140
  141 +const char * TR_connectionEventStrings[] = {
  142 + "TR_CON_EVENT_NEW_CON"
  143 +};
  144 +
136 145 intptr_t connection_events[TR_CON_EVENT_MAX + 1];
137 146 TR_INIT_IFACE(TR_Class, connectionCtor, connectionDtor, NULL);
138 147 TR_INIT_IFACE(
... ... @@ -146,8 +155,9 @@ TR_CREATE_CLASS(
146 155 TR_IF(TR_Class),
147 156 TR_IF(TR_CommEndPoint)) = {
148 157 {{
149   - TR_CON_EVENT_MAX + 1,
150   - connection_events
  158 + TR_connectionEventStrings,
  159 + TR_CON_EVENT_MAX + 1,
  160 + connection_events
151 161 }}
152 162 };
153 163
... ...
... ... @@ -56,8 +56,9 @@ TR_CREATE_CLASS(
56 56 NULL,
57 57 TR_IF(TR_Class)) = {
58 58 {{
59   - TR_CEP_EVENT_MAX + 1,
60   - datagramEntryPoint_events
  59 + NULL,
  60 + TR_CEP_EVENT_MAX + 1,
  61 + datagramEntryPoint_events
61 62 }}
62 63 };
63 64
... ...
... ... @@ -110,6 +110,10 @@ datagramServiceCompose(void * _this, TR_ProtoMessage message)
110 110 }
111 111
112 112 TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data);
  113 +// fprintf(stderr, "%s(%p): put write data: %p / %zd messages\n",
  114 +// __func__, (TR_CommEndPoint)_this, data,
  115 +// ((TR_CommEndPoint)_this)->write_buffer->nmsg);
  116 +// fflush(stderr);
113 117 return TRUE;
114 118 }
115 119
... ... @@ -126,8 +130,9 @@ TR_CREATE_CLASS(
126 130 TR_IF(TR_Class),
127 131 TR_IF(TR_CommEndPoint)) = {
128 132 {{
129   - TR_CEP_EVENT_MAX + 1,
130   - datagramService_events
  133 + NULL,
  134 + TR_CEP_EVENT_MAX + 1,
  135 + datagramService_events
131 136 }}
132 137 };
133 138
... ...
... ... @@ -44,8 +44,7 @@ static
44 44 TR_EventDone
45 45 ioHandlerRead(void * _this, TR_Event event)
46 46 {
47   - TR_Event revent;
48   - TR_EventDone done = TR_EVENT_DONE;
  47 + TR_Event revent;
49 48
50 49 switch (TR_cepBufferRead((TR_CommEndPoint)event->subject)) {
51 50 case FALSE: // EAGAIN
... ... @@ -75,26 +74,26 @@ ioHandlerRead(void * _this, TR_Event event)
75 74 event->subject,
76 75 TR_CEP_EVENT_NEW_DATA,
77 76 NULL);
78   -
79   - done = TR_EVENT_PENDING;
80 77 break;
81 78 }
82 79
83 80 TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
84   - return done;
  81 +
  82 + return TR_EVENT_DONE;
85 83 }
86 84
87 85 static
88 86 TR_EventDone
89 87 ioHandlerWrite(void * _this, TR_Event event)
90 88 {
91   - TR_Event revent, close_event = NULL;
  89 + TR_Event revent = NULL,
  90 + close_event = NULL;
92 91
93 92 switch (TR_cepWriteBuffered((TR_CommEndPoint)event->subject)) {
94 93 case FALSE: // EAGAIN
95 94 revent = TR_eventSubjectEmit(
96 95 event->subject,
97   - TR_CEP_EVENT_PENDING_DATA,
  96 + TR_CEP_EVENT_PENDING_DATA, // is WRITE_BLOCK
98 97 NULL);
99 98 break;
100 99
... ... @@ -129,9 +128,7 @@ ioHandlerWrite(void * _this, TR_Event event)
129 128 }
130 129
131 130 TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
132   - if (close_event) {
133   - TR_eventHandlerIssueEvent((TR_EventHandler)_this, close_event);
134   - }
  131 + TR_eventHandlerIssueEvent((TR_EventHandler)_this, close_event);
135 132
136 133 return TR_EVENT_DONE;
137 134 }
... ...
... ... @@ -91,8 +91,6 @@ protocolHandlerCompose(void * _this, TR_Event event)
91 91 NULL);
92 92
93 93 TR_eventHandlerIssueEvent((TR_EventHandler)_this, _event);
94   - } else {
95   - //printf("%s: compose failed\n", __func__);
96 94 }
97 95 TR_delete(message);
98 96
... ...
... ... @@ -25,6 +25,7 @@ testHandlerNewMessage(TR_EventHandler this, TR_Event event)
25 25 // TR_SizedData data = (TR_SizedData)msg->data;
26 26 // char buf[data->size + 1];
27 27 // int i;
  28 + TR_Event _event;
28 29
29 30 ((TestHandler)this)->handled++;
30 31
... ... @@ -36,12 +37,12 @@ testHandlerNewMessage(TR_EventHandler this, TR_Event event)
36 37 // }
37 38 // printf("echo message: %s(%zd)\n", buf, data->size);
38 39
39   - TR_eventHandlerIssueEvent(
40   - (TR_EventHandler)this,
41   - TR_eventSubjectEmit(
42   - event->subject,
43   - TR_CEP_EVENT_SEND_MSG,
44   - event->data));
  40 + _event = TR_eventSubjectEmit(
  41 + event->subject,
  42 + TR_CEP_EVENT_SEND_MSG,
  43 + event->data);
  44 +
  45 + TR_eventHandlerIssueEvent((TR_EventHandler)this, _event);
45 46
46 47 return TR_EVENT_DONE;
47 48 }
... ...
... ... @@ -2,9 +2,9 @@
2 2
3 3 pids=""
4 4 i=0
5   -while [ $i -lt 100 ]
  5 +while [ $i -lt 120 ]
6 6 do
7   - dd if=/dev/zero bs=8192 count=25000 | nc 192.168.2.13 5678 &
  7 + dd if=/dev/zero bs=8192 count=2500 | nc 192.168.2.13 5678 &
8 8 pids="${pids} $!"
9 9 i=$((i + 1))
10 10 done
... ...
... ... @@ -9,7 +9,7 @@
9 9
10 10 #include "test_handler.h"
11 11
12   -TR_INSTANCE(TR_LoggerSyslog, mylogger, {TR_LOGGER_DEBUG});
  12 +TR_INSTANCE(TR_LoggerSyslog, mylogger, {TR_LOGGER_INFO});
13 13
14 14 int
15 15 main (int argc, char * argv[])
... ...
Please register or login to post a comment