Commit 473ed763c4b84d59cda1ae91271f0262c9029000

Authored by Georg Hopp
1 parent fbbcbc57

general as well as edge level handling fixes.

... ... @@ -24,6 +24,7 @@
24 24 #define __TR_COMM_MANAGER_EPOLL_H__
25 25
26 26 #include <sys/types.h>
  27 +#include <sys/epoll.h>
27 28
28 29 #include "trbase.h"
29 30 #include "trevent.h"
... ... @@ -31,7 +32,8 @@
31 32 TR_CLASS(TR_CommManagerEpoll) {
32 33 TR_EXTENDS(TR_CommManager);
33 34
34   - int handle;
  35 + int handle;
  36 + struct epoll_event * events;
35 37 };
36 38 TR_INSTANCE_INIT(TR_CommManagerEpoll);
37 39 TR_CLASSVARS_DECL(TR_CommManagerEpoll) {
... ...
... ... @@ -34,9 +34,11 @@
34 34 #include "tr/connection.h"
35 35 #include "tr/connect_entry_point.h"
36 36
37   -#define MAXEVENTS 64
  37 +#define MAXEVENTS 256
38 38
39   -struct epoll_event events[64];
  39 +struct epoll_event events[MAXEVENTS];
  40 +
  41 +extern int count_write_ready;
40 42
41 43 static
42 44 int
... ... @@ -44,9 +46,16 @@ commManagerEpollCtor(void * _this, va_list * params)
44 46 {
45 47 TR_CommManagerEpoll this = _this;
46 48 TR_CommManager cmgr = _this;
  49 + nfds_t i;
47 50
48 51 TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, ctor, params);
49 52 this->handle = epoll_create(cmgr->n_endpoints);
  53 + this->events = TR_malloc(sizeof(struct epoll_event) * cmgr->n_endpoints);
  54 + for (i = 0; i < cmgr->n_endpoints; i++) {
  55 + this->events[i].data.ptr = NULL;
  56 + this->events[i].events = EPOLLET | EPOLLONESHOT;
  57 + }
  58 +
50 59
51 60 return 0;
52 61 }
... ... @@ -58,19 +67,21 @@ commManagerEpollDtor(void * _this)
58 67 TR_CommManagerEpoll this = _this;
59 68
60 69 close(this->handle);
  70 + TR_MEM_FREE(this->events);
  71 + TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, dtor);
61 72 }
62 73
63 74 static
64 75 void
65 76 TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
66 77 {
67   - TR_CommManagerEpoll this = _this;
68   - struct epoll_event event;
  78 + TR_CommManagerEpoll this = _this;
  79 + int handle = endpoint->transport->handle;
69 80
70   - event.data.ptr = endpoint;
71   - event.events = EPOLLIN | EPOLLET;
  81 + this->events[handle].data.ptr = endpoint;
  82 + this->events[handle].events |= EPOLLIN;
72 83
73   - epoll_ctl(this->handle, EPOLL_CTL_ADD, endpoint->transport->handle, &event);
  84 + epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &(this->events[handle]));
74 85 }
75 86
76 87 static
... ... @@ -85,7 +96,7 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
85 96 for (i=0; i<nevents; i++) {
86 97 TR_CommEndPoint endpoint = (TR_CommEndPoint)events[i].data.ptr;
87 98
88   - if ((events[i].events & POLLIN) == POLLIN) {
  99 + if ((events[i].events & EPOLLIN) == EPOLLIN) {
89 100 TR_Event event;
90 101
91 102 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
... ... @@ -102,15 +113,18 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
102 113 }
103 114
104 115 TR_eventHandlerIssueEvent((TR_EventHandler)this, event);
  116 + this->events[i].events &= ~EPOLLIN;
105 117 }
106 118
107   - if ((events[i].events & POLLOUT) == POLLOUT) {
108   - TR_eventHandlerIssueEvent(
109   - (TR_EventHandler)this,
110   - TR_eventSubjectEmit(
111   - (TR_EventSubject)endpoint,
112   - TR_CEP_EVENT_WRITE_READY,
113   - NULL));
  119 + if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
  120 + TR_Event _event = TR_eventSubjectEmit(
  121 + (TR_EventSubject)endpoint,
  122 + TR_CEP_EVENT_WRITE_READY,
  123 + NULL);
  124 +
  125 + TR_eventHandlerIssueEvent((TR_EventHandler)this, _event);
  126 + this->events[i].events &= ~EPOLLOUT;
  127 + count_write_ready++;
114 128 }
115 129 }
116 130 }
... ... @@ -123,16 +137,12 @@ TR_commManagerEpollEnableWrite(void * _this, TR_Event event)
123 137 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
124 138
125 139 if (! TR_socketFinWr(endpoint->transport)) {
126   - struct epoll_event epevent;
  140 + int handle = endpoint->transport->handle;
127 141
128   - epevent.data.ptr = endpoint;
129   - epevent.events = EPOLLOUT | EPOLLIN | EPOLLET;
  142 + this->events[handle].data.ptr = endpoint;
  143 + this->events[handle].events |= EPOLLOUT;
130 144
131   - epoll_ctl(
132   - this->handle,
133   - EPOLL_CTL_MOD,
134   - endpoint->transport->handle,
135   - &epevent);
  145 + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
136 146 }
137 147 }
138 148
... ... @@ -142,38 +152,57 @@ TR_commManagerEpollDisableWrite(void * _this, TR_Event event)
142 152 {
143 153 TR_CommManagerEpoll this = _this;
144 154 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
145   - struct epoll_event epevent;
  155 + int handle = endpoint->transport->handle;
146 156
147   - epevent.data.ptr = endpoint;
148   - epevent.events = EPOLLIN | EPOLLET;
  157 + this->events[handle].data.ptr = endpoint;
  158 + this->events[handle].events &= ~EPOLLOUT;
149 159
150   - epoll_ctl(
151   - this->handle,
152   - EPOLL_CTL_MOD,
153   - endpoint->transport->handle,
154   - &epevent);
  160 + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
155 161 }
156 162
157 163 static
158 164 void
159   -TR_commManagerEpollClose(void * _this, TR_Event event)
  165 +TR_commManagerEpollEnableRead(void * _this, TR_Event event)
160 166 {
161 167 TR_CommManagerEpoll this = _this;
162 168 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
163 169
164   - epoll_ctl(this->handle, EPOLL_CTL_DEL, endpoint->transport->handle, NULL);
  170 + if (! TR_socketFinRd(endpoint->transport)) {
  171 + int handle = endpoint->transport->handle;
  172 +
  173 + this->events[handle].data.ptr = endpoint;
  174 + this->events[handle].events |= EPOLLIN;
  175 +
  176 + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
  177 + }
165 178 }
166 179
167 180 static
168 181 void
169   -TR_commManagerEpollEnableRead(void * _this, TR_Event event)
  182 +TR_commManagerEpollDisableRead(void * _this, TR_Event event)
170 183 {
  184 + TR_CommManagerEpoll this = _this;
  185 + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  186 + int handle = endpoint->transport->handle;
  187 +
  188 + this->events[handle].data.ptr = endpoint;
  189 + this->events[handle].events &= ~EPOLLIN;
  190 +
  191 + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle]));
171 192 }
172 193
173 194 static
174 195 void
175   -TR_commManagerEpollDisableRead(void * _this, TR_Event event)
  196 +TR_commManagerEpollClose(void * _this, TR_Event event)
176 197 {
  198 + TR_CommManagerEpoll this = _this;
  199 + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  200 + int handle = endpoint->transport->handle;
  201 +
  202 + this->events[handle].data.ptr = NULL;
  203 + this->events[handle].events = EPOLLET | EPOLLONESHOT;
  204 +
  205 + epoll_ctl(this->handle, EPOLL_CTL_DEL, handle, NULL);
177 206 }
178 207
179 208 static
... ...
... ... @@ -104,18 +104,20 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout)
104 104 }
105 105
106 106 TR_eventHandlerIssueEvent((TR_EventHandler)this, event);
107   - this->fds[i].fd = -1; // this deactivates poll...
  107 + // deactivate read poll mimic edge level behaviour
  108 + this->fds[endpoint->transport->handle].events &= ~POLLIN;
108 109 }
109 110
110 111 if ((this->fds[i].revents & POLLOUT) == POLLOUT) {
  112 + TR_Event _event = TR_eventSubjectEmit(
  113 + (TR_EventSubject)endpoint,
  114 + TR_CEP_EVENT_WRITE_READY,
  115 + NULL);
  116 +
  117 + TR_eventHandlerIssueEvent((TR_EventHandler)this, _event);
111 118 nevents--;
112   - TR_eventHandlerIssueEvent(
113   - (TR_EventHandler)this,
114   - TR_eventSubjectEmit(
115   - (TR_EventSubject)endpoint,
116   - TR_CEP_EVENT_WRITE_READY,
117   - NULL));
118   - // deactivate write poll...
  119 +
  120 + // deactivate write poll mimic edge level behaviour
119 121 this->fds[endpoint->transport->handle].events &= ~POLLOUT;
120 122 }
121 123
... ... @@ -153,28 +155,30 @@ TR_commManagerPollEnableRead(void * _this, TR_Event event)
153 155 TR_CommManagerPoll this = _this;
154 156 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
155 157
156   - this->fds[endpoint->transport->handle].fd = endpoint->transport->handle;
  158 + if (! TR_socketFinRd(endpoint->transport)) {
  159 + this->fds[endpoint->transport->handle].events |= POLLIN;
  160 + }
157 161 }
158 162
159 163 static
160 164 void
161   -TR_commManagerPollClose(void * _this, TR_Event event)
  165 +TR_commManagerPollDisableRead(void * _this, TR_Event event)
162 166 {
163 167 TR_CommManagerPoll this = _this;
164 168 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
165 169
166   - this->fds[endpoint->transport->handle].events = 0;
167   - this->fds[endpoint->transport->handle].fd = -1;
  170 + this->fds[endpoint->transport->handle].events &= ~POLLIN;
168 171 }
169 172
170 173 static
171 174 void
172   -TR_commManagerPollDisableRead(void * _this, TR_Event event)
  175 +TR_commManagerPollClose(void * _this, TR_Event event)
173 176 {
174 177 TR_CommManagerPoll this = _this;
175 178 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
176 179
177   - this->fds[endpoint->transport->handle].events &= ~POLLIN;
  180 + this->fds[endpoint->transport->handle].events = 0;
  181 + this->fds[endpoint->transport->handle].fd = -1;
178 182 }
179 183
180 184 static
... ...
... ... @@ -112,12 +112,18 @@ connectionNextMessage(void * _this)
112 112 }
113 113
114 114 static
115   -void
  115 +int
116 116 connectionCompose(void * _this, TR_ProtoMessage message)
117 117 {
118   - TR_queuePut(
119   - ((TR_CommEndPoint)_this)->write_buffer,
120   - TR_protoCompose(((TR_CommEndPoint)_this)->protocol, message));
  118 + TR_RemoteData data =
  119 + TR_protoCompose(((TR_CommEndPoint)_this)->protocol, message);
  120 +
  121 + if (! data) {
  122 + return FALSE;
  123 + }
  124 +
  125 + TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data);
  126 + return TRUE;
121 127 }
122 128
123 129 static
... ...
... ... @@ -99,12 +99,18 @@ datagramServiceNextMessage(void * _this)
99 99 }
100 100
101 101 static
102   -void
  102 +int
103 103 datagramServiceCompose(void * _this, TR_ProtoMessage message)
104 104 {
105   - TR_queuePut(
106   - ((TR_CommEndPoint)_this)->write_buffer,
107   - TR_protoCompose(((TR_CommEndPoint)_this)->protocol, message));
  105 + TR_RemoteData data =
  106 + TR_protoCompose(((TR_CommEndPoint)_this)->protocol, message);
  107 +
  108 + if (! data) {
  109 + return FALSE;
  110 + }
  111 +
  112 + TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data);
  113 + return TRUE;
108 114 }
109 115
110 116 intptr_t datagramService_events[TR_CEP_EVENT_MAX + 1];
... ...
... ... @@ -112,13 +112,19 @@ TR_commManagerClose(void * _this, TR_Event event)
112 112 TR_CommManager this = _this;
113 113 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
114 114
115   - TR_socketShutdown(endpoint->transport);
116 115 TR_CALL(_this, TR_CommManager, close, event);
117 116
  117 + if (! TR_socketFinRdWr(endpoint->transport)) {
  118 + TR_socketShutdown(endpoint->transport);
  119 + }
  120 +
118 121 if (endpoint->transport->handle == this->max_handle) {
119 122 while (! this->endpoints[--this->max_handle]);
120 123 }
121   - TR_delete(this->endpoints[endpoint->transport->handle]);
  124 +
  125 + TR_eventSubjectFinalize(
  126 + (TR_EventSubject)this->endpoints[endpoint->transport->handle]);
  127 + this->endpoints[endpoint->transport->handle] = NULL;
122 128
123 129 return TR_EVENT_DONE;
124 130 }
... ... @@ -128,6 +134,10 @@ TR_commManagerShutdownRead(void * _this, TR_Event event)
128 134 {
129 135 TR_CALL(_this, TR_CommManager, shutdownRead, event);
130 136
  137 + if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
  138 + TR_socketShutdownRead(((TR_CommEndPoint)event->subject)->transport);
  139 + }
  140 +
131 141 if (TR_socketFinRdWr(((TR_CommEndPoint)event->subject)->transport)) {
132 142 // close
133 143 TR_eventHandlerIssueEvent(
... ... @@ -136,16 +146,15 @@ TR_commManagerShutdownRead(void * _this, TR_Event event)
136 146 event->subject,
137 147 TR_CEP_EVENT_CLOSE,
138 148 NULL));
139   - } else if (! TR_cepHasPendingData((TR_CommEndPoint)event->subject)) {
140   - // handle pending data... close is issued from disableWrite
  149 + }
  150 +
  151 + if (! TR_cepHasPendingData((TR_CommEndPoint)event->subject)) {
141 152 TR_eventHandlerIssueEvent(
142 153 (TR_EventHandler)_this,
143 154 TR_eventSubjectEmit(
144 155 event->subject,
145   - TR_CEP_EVENT_CLOSE,
  156 + TR_CEP_EVENT_SHUT_WRITE,
146 157 NULL));
147   - } else {
148   - TR_cepSetClose((TR_CommEndPoint)event->subject);
149 158 }
150 159
151 160 return TR_EVENT_DONE;
... ... @@ -156,7 +165,11 @@ TR_commManagerShutdownWrite(void * _this, TR_Event event)
156 165 {
157 166 TR_CALL(_this, TR_CommManager, shutdownWrite, event);
158 167
159   - if (TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
  168 + if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) {
  169 + TR_socketShutdownWrite(((TR_CommEndPoint)event->subject)->transport);
  170 + }
  171 +
  172 + if (TR_socketFinRdWr(((TR_CommEndPoint)event->subject)->transport)) {
160 173 TR_eventHandlerIssueEvent(
161 174 (TR_EventHandler)_this,
162 175 TR_eventSubjectEmit(
... ...
... ... @@ -29,6 +29,9 @@
29 29 #include "tr/comm_end_point.h"
30 30 #include "tr/interface/comm_end_point.h"
31 31
  32 +extern int count_write_ready;
  33 +int count_write_ready_handle = 0;
  34 +
32 35 static
33 36 int
34 37 ioHandlerCtor(void * _this, va_list * params)
... ... @@ -89,14 +92,22 @@ TR_EventDone
89 92 ioHandlerWrite(void * _this, TR_Event event)
90 93 {
91 94 TR_Event revent, close_event = NULL;
92   - TR_EventDone done = TR_EVENT_DONE;
  95 +
  96 + count_write_ready_handle++;
93 97
94 98 switch (TR_cepWriteBuffered((TR_CommEndPoint)event->subject)) {
95 99 case FALSE: // EAGAIN
96   - revent = TR_eventSubjectEmit(
97   - event->subject,
98   - TR_CEP_EVENT_PENDING_DATA,
99   - NULL);
  100 + if (TR_cepHasPendingData((TR_CommEndPoint)event->subject)) {
  101 + revent = TR_eventSubjectEmit(
  102 + event->subject,
  103 + TR_CEP_EVENT_PENDING_DATA,
  104 + NULL);
  105 + } else {
  106 + revent = TR_eventSubjectEmit(
  107 + event->subject,
  108 + TR_CEP_EVENT_END_DATA,
  109 + NULL);
  110 + }
100 111 break;
101 112
102 113 case -1: // FAILURE
... ... @@ -114,9 +125,7 @@ ioHandlerWrite(void * _this, TR_Event event)
114 125 break;
115 126
116 127 default:
117   - if (TR_cepHasPendingData((TR_CommEndPoint)event->subject)) {
118   - done = TR_EVENT_PENDING;
119   - } else {
  128 + if (! TR_cepHasPendingData((TR_CommEndPoint)event->subject)) {
120 129 revent = TR_eventSubjectEmit(
121 130 event->subject,
122 131 TR_CEP_EVENT_END_DATA,
... ... @@ -135,7 +144,8 @@ ioHandlerWrite(void * _this, TR_Event event)
135 144 if (close_event) {
136 145 TR_eventHandlerIssueEvent((TR_EventHandler)_this, close_event);
137 146 }
138   - return done;
  147 +
  148 + return TR_EVENT_DONE;
139 149 }
140 150
141 151 static
... ...
... ... @@ -32,6 +32,8 @@
32 32 #include "tr/comm_end_point.h"
33 33 #include "tr/interface/comm_end_point.h"
34 34
  35 +int count_write_ready = 0;
  36 +
35 37 static
36 38 int
37 39 protocolHandlerCtor(void * _this, va_list * params)
... ... @@ -85,14 +87,18 @@ protocolHandlerCompose(void * _this, TR_Event event)
85 87 }
86 88
87 89 if (TR_cepCompose(endpoint, message)) {
88   - TR_eventHandlerIssueEvent(
89   - (TR_EventHandler)_this,
90   - TR_eventSubjectEmit(
91   - event->subject,
92   - TR_CEP_EVENT_WRITE_READY,
93   - NULL));
94   - TR_delete(message);
  90 + TR_Event _event = TR_eventSubjectEmit(
  91 + event->subject,
  92 + TR_CEP_EVENT_WRITE_READY,
  93 + NULL);
  94 +
  95 + TR_eventHandlerIssueEvent((TR_EventHandler)_this, _event);
  96 +
  97 + count_write_ready++;
  98 + } else {
  99 + //printf("%s: compose failed\n", __func__);
95 100 }
  101 + TR_delete(message);
96 102
97 103 return TR_EVENT_DONE;
98 104 }
... ...
... ... @@ -43,8 +43,11 @@ serverCtor(void * _this, va_list * params)
43 43 {
44 44 TR_Server this = _this;
45 45
46   - //this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerEpoll);
  46 +#if 1
  47 + this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerEpoll);
  48 +#else
47 49 this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerPoll);
  50 +#endif
48 51 this->dispatcher = TR_new(TR_EventDispatcher, TR_EVD_SERVER, NULL, 100);
49 52 this->connector = TR_new(TR_Connector);
50 53 this->io_handler = TR_new(TR_IoHandler);
... ...
... ... @@ -21,18 +21,19 @@ static
21 21 TR_EventDone
22 22 testHandlerNewMessage(TR_EventHandler this, TR_Event event)
23 23 {
24   - TR_ProtoMessageRaw msg = event->data;
25   - TR_SizedData data = (TR_SizedData)msg->data;
26   - char buf[data->size + 1];
27   - int i;
  24 +// TR_ProtoMessageRaw msg = event->data;
  25 +// TR_SizedData data = (TR_SizedData)msg->data;
  26 +// char buf[data->size + 1];
  27 +// int i;
28 28
29 29 ((TestHandler)this)->handled++;
30 30
31   - memcpy(buf, data->data, data->size);
32   - buf[data->size] = 0;
33   - for (i = 0; buf[i]; i++) {
34   - if (! isprint(buf[i])) buf[i] = '.';
35   - }
  31 +// printf("handled data %p\n", event->data);
  32 +// memcpy(buf, data->data, data->size);
  33 +// buf[data->size] = 0;
  34 +// for (i = 0; buf[i]; i++) {
  35 +// if (! isprint(buf[i])) buf[i] = '.';
  36 +// }
36 37 // printf("echo message: %s(%zd)\n", buf, data->size);
37 38
38 39 TR_eventHandlerIssueEvent(
... ...
Please register or login to post a comment