Commit 17cf44b9b92c27190cfba98b5a10a6a798940984

Authored by Georg Hopp
1 parent 3f704d2f

Revert "fixes and additions for threaded code"

This reverts commit f71cac22.
... ... @@ -64,8 +64,7 @@ TR_CLASSVARS_DECL(TR_CommEndPoint) {
64 64 #define TR_CEP_EVENT_SHUT_READ 10 // CommManager
65 65 #define TR_CEP_EVENT_SHUT_WRITE 11 // CommManager
66 66 #define TR_CEP_EVENT_CLOSE 12 // CommManager
67   -#define TR_CEP_EVENT_IO_DONE 13 // CommManager
68   -#define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_IO_DONE)
  67 +#define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_CLOSE)
69 68
70 69 #define TR_cepSetClose(ep) ((ep)->do_close = 1)
71 70 #define TR_cepHasProto(ep, proto) (TR_INSTANCE_OF(proto, TR_cepGetProto(ep)))
... ...
... ... @@ -24,7 +24,6 @@
24 24 #define __TR_COMM_MANAGER_H__
25 25
26 26 #include <sys/types.h>
27   -#include <pthread.h>
28 27
29 28 #include "trbase.h"
30 29 #include "trdata.h"
... ... @@ -41,8 +40,6 @@ TR_CLASS(TR_CommManager) {
41 40 TR_Hash read;
42 41 size_t n_endpoints;
43 42 size_t max_handle;
44   - unsigned long io_triggered;
45   - pthread_mutex_t io_triggered_lock;
46 43 };
47 44 TR_INSTANCE_INIT(TR_CommManager);
48 45 TR_CLASSVARS_DECL(TR_CommManager) {
... ...
... ... @@ -30,13 +30,13 @@
30 30
31 31 #include "tr/comm_end_point.h"
32 32
33   -typedef void (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint);
34   -typedef size_t (* fptr_TR_commManagerSelect)(void *, TR_Event, unsigned long);
35   -typedef void (* fptr_TR_commManagerPollWrite)(void *, TR_Event);
36   -typedef void (* fptr_TR_commManagerPollRead)(void *, TR_Event);
37   -typedef void (* fptr_TR_commManagerDisableWrite)(void *, TR_Event);
38   -typedef void (* fptr_TR_commManagerDisableRead)(void *, TR_Event);
39   -typedef void (* fptr_TR_commManagerClose)(void *, TR_Event);
  33 +typedef TR_EventDone (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint);
  34 +typedef TR_EventDone (* fptr_TR_commManagerSelect)(void *, TR_Event, unsigned long);
  35 +typedef TR_EventDone (* fptr_TR_commManagerPollWrite)(void *, TR_Event);
  36 +typedef TR_EventDone (* fptr_TR_commManagerPollRead)(void *, TR_Event);
  37 +typedef TR_EventDone (* fptr_TR_commManagerDisableWrite)(void *, TR_Event);
  38 +typedef TR_EventDone (* fptr_TR_commManagerDisableRead)(void *, TR_Event);
  39 +typedef TR_EventDone (* fptr_TR_commManagerClose)(void *, TR_Event);
40 40
41 41 TR_INTERFACE(TR_CommManager) {
42 42 TR_IFID;
... ...
... ... @@ -29,7 +29,7 @@ int
29 29 TR_cepWriteBuffered(TR_CommEndPoint this, size_t * size)
30 30 {
31 31 TR_RemoteData data;
32   - size_t send;
  32 + int send;
33 33
34 34 *size = 0;
35 35
... ... @@ -55,9 +55,6 @@ TR_cepWriteBuffered(TR_CommEndPoint this, size_t * size)
55 55 {
56 56 TR_RemoteData new_data = NULL;
57 57
58   - printf("[~DEBUG~] wrote %zd bytes\n", send);
59   - fflush(stdout);
60   -
61 58 if (send != ((TR_SizedData)data)->size) {
62 59 new_data = TR_new(
63 60 TR_RemoteData,
... ...
... ... @@ -41,7 +41,7 @@ commEndPointCtor(void * _this, va_list * params)
41 41 this->transport = va_arg(*params, TR_Socket);
42 42 this->protocol = va_arg(*params, TR_Protocol);
43 43 this->read_chunk_size = va_arg(*params, int);
44   - this->do_close = FALSE;
  44 + this->do_close = 0;
45 45 this->write_buffer = TR_new(TR_Queue);
46 46
47 47 return 0;
... ... @@ -101,7 +101,6 @@ commEndPointCvInit(TR_class_ptr cls)
101 101 TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_READ);
102 102 TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_WRITE);
103 103 TR_EVENT_CREATE(cls, TR_CEP_EVENT_CLOSE);
104   - TR_EVENT_CREATE(cls, TR_CEP_EVENT_IO_DONE);
105 104 }
106 105
107 106 const char * TR_cepEventStrings[] = {
... ... @@ -118,7 +117,6 @@ const char * TR_cepEventStrings[] = {
118 117 "TR_CEP_EVENT_SHUT_READ",
119 118 "TR_CEP_EVENT_SHUT_WRITE",
120 119 "TR_CEP_EVENT_CLOSE",
121   - "TR_CEP_EVENT_IO_DONE",
122 120 };
123 121
124 122 intptr_t comm_end_point_events[TR_CEP_EVENT_MAX + 1];
... ...
... ... @@ -30,9 +30,6 @@ TR_commEndPointRead(TR_CommEndPoint this, TR_RemoteData * data_ptr)
30 30 {
31 31 *data_ptr = TR_socketRecv(this->transport, this->read_chunk_size);
32 32
33   - printf("[~DEBUG~] read %zd bytes\n", ((TR_SizedData)*data_ptr)->size);
34   - fflush(stdout);
35   -
36 33 if (! *data_ptr) return -1; // ment to trigger a close
37 34 if (*data_ptr == (void*)-1) return -2; // remote close... shutdown
38 35 if (*data_ptr == TR_emptyRemoteData) return FALSE; // read blocked
... ...
... ... @@ -22,7 +22,6 @@
22 22
23 23 #include <unistd.h>
24 24 #include <poll.h>
25   -#include <pthread.h>
26 25
27 26 #include "trbase.h"
28 27 #include "trdata.h"
... ... @@ -52,8 +51,6 @@ commManagerCtor(void * _this, va_list * params)
52 51 this->n_endpoints = sysconf(_SC_OPEN_MAX);
53 52 this->endpoints = TR_calloc(sizeof(TR_CommEndPoint), this->n_endpoints);
54 53
55   - pthread_mutex_init(&this->io_triggered_lock, NULL);
56   -
57 54 return 0;
58 55 }
59 56
... ... @@ -64,8 +61,6 @@ commManagerDtor(void * _this)
64 61 TR_CommManager this = _this;
65 62 nfds_t i;
66 63
67   - pthread_mutex_destroy(&this->io_triggered_lock);
68   -
69 64 for (i=0; i<this->n_endpoints; i++) {
70 65 TR_delete(this->endpoints[i]);
71 66 }
... ... @@ -77,22 +72,11 @@ commManagerDtor(void * _this)
77 72
78 73 static
79 74 TR_EventDone
80   -TR_commManagerWriteIsBlocked(void * _this, TR_Event event)
  75 +TR_commManagerEnableWrite(void * _this, TR_Event event)
81 76 {
82 77 TR_CommManager this = _this;
83 78
84   - TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
85   -
86   - return TR_EVENT_DONE;
87   -}
88   -
89   -static
90   -TR_EventDone
91   -TR_commManagerDecrementIoTriggerd(TR_CommManager this, TR_Event event)
92   -{
93   - pthread_mutex_lock(&this->io_triggered_lock);
94   - this->io_triggered--;
95   - pthread_mutex_unlock(&this->io_triggered_lock);
  79 + TR_hashAdd(this->write, event->subject);
96 80
97 81 return TR_EVENT_DONE;
98 82 }
... ... @@ -119,6 +103,8 @@ static
119 103 void
120 104 commManagerCvInit(TR_class_ptr cls)
121 105 {
  106 + TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree);
  107 +
122 108 TR_EVENT_HANDLER_SET_METHOD(
123 109 cls, TR_EventDispatcher,
124 110 TR_DISPATCHER_EVENT_DATA_WAIT,
... ... @@ -134,7 +120,7 @@ commManagerCvInit(TR_class_ptr cls)
134 120 TR_EVENT_HANDLER_SET_METHOD(
135 121 cls, TR_CommEndPoint,
136 122 TR_CEP_EVENT_WRITE_BLOCK,
137   - TR_commManagerWriteIsBlocked);
  123 + TR_commManagerPollWrite);
138 124 TR_EVENT_HANDLER_SET_METHOD(
139 125 cls, TR_CommEndPoint,
140 126 TR_CEP_EVENT_READ_BLOCK,
... ... @@ -158,18 +144,14 @@ commManagerCvInit(TR_class_ptr cls)
158 144 TR_EVENT_HANDLER_SET_METHOD(
159 145 cls, TR_CommEndPoint,
160 146 TR_CEP_EVENT_DATA_READY,
161   - TR_commManagerPollWrite);
  147 + TR_commManagerEnableWrite);
162 148 TR_EVENT_HANDLER_SET_METHOD(
163 149 cls, TR_CommEndPoint,
164 150 TR_CEP_EVENT_DATA_END,
165 151 TR_commManagerDisableWrite);
166   - TR_EVENT_HANDLER_SET_METHOD(
167   - cls, TR_CommEndPoint,
168   - TR_CEP_EVENT_IO_DONE,
169   - TR_commManagerDecrementIoTriggerd);
170 152 }
171 153
172   -TR_INIT_HANDLER(TR_CommManager);
  154 +TR_INSTANCE(TR_Hash, commManagerEventMethods);
173 155 TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL);
174 156 TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
175 157 TR_CREATE_CLASS(
... ... @@ -178,7 +160,7 @@ TR_CREATE_CLASS(
178 160 commManagerCvInit,
179 161 TR_IF(TR_Class),
180 162 TR_IF(TR_CommManager)) = {
181   - { TR_HANDLER_CVARS(TR_CommManager) }
  163 + { &(_commManagerEventMethods.data) }
182 164 };
183 165
184 166 // vim: set ts=4 sw=4:
... ...
... ... @@ -73,8 +73,8 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
73 73 int handle = endpoint->transport->handle;
74 74 struct epoll_event event;
75 75
76   - //this->events[handle] = EPOLLIN | EPOLLET;
77   - this->events[handle] = EPOLLIN;
  76 + //this->events[handle] = EPOLLET;
  77 + this->events[handle] = 0;
78 78 event.data.ptr = endpoint;
79 79 event.events = this->events[handle];
80 80
... ... @@ -82,19 +82,19 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
82 82 }
83 83
84 84 static
85   -size_t
  85 +void
86 86 TR_commManagerEpollSelect(void * _this, TR_Event event, unsigned long timeout)
87 87 {
88 88 TR_CommManagerEpoll this = _this;
89 89 TR_CommManager cmgr = _this;
90 90 int i, nevents;
91   - //struct epoll_event _event;
  91 + struct epoll_event _event;
92 92
93 93 nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout);
94 94
95 95 for (i=0; i<nevents; i++) {
96 96 TR_CommEndPoint endpoint = (TR_CommEndPoint)events[i].data.ptr;
97   - //int handle = endpoint->transport->handle;
  97 + int handle = endpoint->transport->handle;
98 98
99 99 if ((events[i].events & EPOLLIN) == EPOLLIN) {
100 100 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
... ... @@ -106,20 +106,20 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, unsigned long timeout)
106 106 }
107 107 }
108 108
109   - //this->events[handle] &= ~EPOLLIN;
110   - //_event.data.ptr = endpoint;
111   - //_event.events = this->events[handle];
112   - //epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
  109 + this->events[handle] &= ~EPOLLIN;
  110 + _event.data.ptr = endpoint;
  111 + _event.events = this->events[handle];
  112 + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
113 113 }
114 114
115 115 if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
116 116 if (! event->subject->fin) {
117 117 TR_hashAdd(cmgr->write, endpoint);
118 118 }
119   - //this->events[handle] &= ~EPOLLOUT;
120   - //_event.data.ptr = endpoint;
121   - //_event.events = this->events[handle];
122   - //epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
  119 + this->events[handle] &= ~EPOLLOUT;
  120 + _event.data.ptr = endpoint;
  121 + _event.events = this->events[handle];
  122 + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
123 123 }
124 124
125 125 if ((events[i].events & EPOLLHUP) == EPOLLHUP) {
... ... @@ -131,15 +131,6 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, unsigned long timeout)
131 131 NULL));
132 132 }
133 133 }
134   -
135   - if (nevents >= 0) {
136   - return nevents;
137   - } else {
138   - perror("epoll");
139   - fflush(stderr);
140   - fflush(stdout);
141   - return 0;
142   - }
143 134 }
144 135
145 136 static
... ... @@ -189,9 +180,9 @@ static
189 180 void
190 181 TR_commManagerEpollEnableRead(void * _this, TR_Event event)
191 182 {
192   -// if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
193   -// TR_commManagerEpollEnable(_this, EPOLLIN, event);
194   -// }
  183 + if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
  184 + TR_commManagerEpollEnable(_this, EPOLLIN, event);
  185 + }
195 186 }
196 187
197 188 static
... ... @@ -205,7 +196,7 @@ static
205 196 void
206 197 TR_commManagerEpollDisableRead(void * _this, TR_Event event)
207 198 {
208   -// TR_commManagerEpollDisable(_this, EPOLLIN, event);
  199 + TR_commManagerEpollDisable(_this, EPOLLIN, event);
209 200 }
210 201
211 202 static
... ...
... ... @@ -71,57 +71,45 @@ TR_commManagerPollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
71 71 TR_CommManagerPoll this = _this;
72 72
73 73 this->fds[endpoint->transport->handle].fd = endpoint->transport->handle;
74   - this->fds[endpoint->transport->handle].events = POLLIN;
  74 + this->fds[endpoint->transport->handle].events = 0;
75 75 }
76 76
77 77 static
78   -size_t
  78 +void
79 79 TR_commManagerPollSelect(void * _this, TR_Event event, unsigned long timeout)
80 80 {
81 81 TR_CommManagerPoll this = _this;
82 82 TR_CommManager cmgr = _this;
83 83 nfds_t i;
84   - int nevents, doevents;
85   -
86   - for (i = 0; i < cmgr->max_handle+1; i++) {
87   - printf("[=DEBUG=] handle %ld POLLIN? %s\n", i,
88   - (this->fds[i].events & POLLIN) == POLLIN ? "YES" : "NO");
89   - fflush(stdout);
90   - }
  84 + int nevents;
91 85
92   - nevents = doevents = poll(this->fds, cmgr->max_handle+1, timeout);
  86 + nevents = poll(this->fds, cmgr->max_handle+1, timeout);
93 87
94   - if (doevents) {
  88 + if (nevents) {
95 89 for (i = 0; i < cmgr->max_handle+1; i++) {
96 90 if (this->fds[i].revents != 0) {
97 91 TR_CommEndPoint endpoint = cmgr->endpoints[i];
98 92
99   - doevents--;
  93 + nevents--;
100 94
101 95 if ((this->fds[i].revents & POLLIN) == POLLIN) {
102 96 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
103 97 && ((TR_TcpSocket)endpoint->transport)->listen) {
104   - pthread_mutex_lock(&cmgr->io_triggered_lock);
105 98 TR_hashAdd(cmgr->accept, endpoint);
106   - pthread_mutex_unlock(&cmgr->io_triggered_lock);
107 99 } else {
108 100 if (! event->subject->fin) {
109   - pthread_mutex_lock(&cmgr->io_triggered_lock);
110 101 TR_hashAdd(cmgr->read, endpoint);
111   - pthread_mutex_unlock(&cmgr->io_triggered_lock);
112 102 }
113 103 }
114   - //this->fds[endpoint->transport->handle].events &= ~POLLIN;
  104 + this->fds[endpoint->transport->handle].events &= ~POLLIN;
115 105 }
116 106
117 107 if ((this->fds[i].revents & POLLOUT) == POLLOUT) {
118 108 if (! event->subject->fin) {
119   - pthread_mutex_lock(&cmgr->io_triggered_lock);
120 109 TR_hashAdd(cmgr->write, endpoint);
121   - pthread_mutex_unlock(&cmgr->io_triggered_lock);
122 110 }
123   - //this->fds[endpoint->transport->handle].events &=
124   - // ~(POLLOUT|POLLHUP);
  111 + this->fds[endpoint->transport->handle].events &=
  112 + ~(POLLOUT|POLLHUP);
125 113 }
126 114
127 115 if ((this->fds[i].revents & POLLHUP) == POLLHUP) {
... ... @@ -134,12 +122,10 @@ TR_commManagerPollSelect(void * _this, TR_Event event, unsigned long timeout)
134 122 }
135 123
136 124 this->fds[i].revents = 0;
137   - if (doevents <= 0) break;
  125 + if (nevents <= 0) break;
138 126 }
139 127 }
140 128 }
141   -
142   - return nevents;
143 129 }
144 130
145 131 static
... ... @@ -158,12 +144,12 @@ static
158 144 void
159 145 TR_commManagerPollEnableRead(void * _this, TR_Event event)
160 146 {
161   -// TR_CommManagerPoll this = _this;
162   -// TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
163   -//
164   -// if (! TR_socketFinRd(endpoint->transport)) {
165   -// this->fds[endpoint->transport->handle].events |= POLLIN;
166   -// }
  147 + TR_CommManagerPoll this = _this;
  148 + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  149 +
  150 + if (! TR_socketFinRd(endpoint->transport)) {
  151 + this->fds[endpoint->transport->handle].events |= POLLIN;
  152 + }
167 153 }
168 154
169 155 static
... ... @@ -180,10 +166,10 @@ static
180 166 void
181 167 TR_commManagerPollDisableRead(void * _this, TR_Event event)
182 168 {
183   -// TR_CommManagerPoll this = _this;
184   -// TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
185   -//
186   -// this->fds[endpoint->transport->handle].events &= ~POLLIN;
  169 + TR_CommManagerPoll this = _this;
  170 + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  171 +
  172 + this->fds[endpoint->transport->handle].events &= ~POLLIN;
187 173 }
188 174
189 175 static
... ...
... ... @@ -37,10 +37,10 @@ static
37 37 int
38 38 connectionCtor(void * _this, va_list * params)
39 39 {
40   - //TR_Connection this = _this;
  40 + TR_Connection this = _this;
41 41
42 42 TR_PARENTCALL(TR_Connection, _this, TR_Class, ctor, params);
43   - //this->current_message = NULL;
  43 + this->current_message = NULL;
44 44
45 45 return 0;
46 46 }
... ...
... ... @@ -76,13 +76,6 @@ connectorAccept(void * _this, TR_Event event)
76 76 socket = TR_socketAccept((TR_TcpSocket)connection->transport);
77 77 }
78 78
79   - TR_eventHandlerIssueEvent(
80   - (TR_EventHandler)this,
81   - TR_eventSubjectEmit(
82   - (TR_EventSubject)connection,
83   - TR_CEP_EVENT_IO_DONE,
84   - NULL));
85   -
86 79 if (! socket) {
87 80 TR_eventHandlerIssueEvent(
88 81 (TR_EventHandler)this,
... ... @@ -99,6 +92,8 @@ static
99 92 void
100 93 connectorCvInit(TR_class_ptr cls)
101 94 {
  95 + TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree);
  96 +
102 97 TR_EVENT_HANDLER_SET_METHOD(
103 98 cls,
104 99 TR_ConnEntryPoint,
... ... @@ -106,7 +101,6 @@ connectorCvInit(TR_class_ptr cls)
106 101 connectorAccept);
107 102 }
108 103
109   -TR_INIT_HANDLER(TR_Connector);
110 104 TR_INSTANCE(TR_Hash, connectorEventMethods);
111 105 TR_INIT_IFACE(TR_Class, connectorCtor, connectorDtor, NULL);
112 106 TR_CREATE_CLASS(
... ... @@ -114,7 +108,7 @@ TR_CREATE_CLASS(
114 108 TR_EventHandler,
115 109 connectorCvInit,
116 110 TR_IF(TR_Class)) = {
117   - { TR_HANDLER_CVARS(TR_Connector) }
  111 + { &(_connectorEventMethods.data) }
118 112 };
119 113
120 114 // vim: set ts=4 sw=4:
... ...
... ... @@ -20,11 +20,8 @@
20 20 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 21 */
22 22
23   -#define _GNU_SOURCE
24   -
25 23 #include <errno.h>
26 24 #include <poll.h>
27   -#include <pthread.h>
28 25
29 26 #include "trbase.h"
30 27 #include "trevent.h"
... ... @@ -62,8 +59,6 @@ TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint)
62 59 TR_ISSUE_IO_READ_EVENT(this, endpoint);
63 60 }
64 61
65   - this->io_triggered++;
66   -
67 62 TR_CALL(_this, TR_CommManager, addEndpoint, endpoint);
68 63 }
69 64
... ... @@ -95,35 +90,22 @@ TR_commManagerSelect(void * _this, TR_Event event)
95 90 TR_Timer timer = (TR_Timer)event->data;
96 91 TR_EventDispatcher dispatcher = (TR_EventDispatcher)event->subject;
97 92 unsigned long timeout; // milliseconds
98   - char buffer[17];
99   -
100   - pthread_getname_np(pthread_self(), buffer, 17);
101   -
102   - if (! this->io_triggered) {
103   - printf("[DEBUG] [%s] io triggerd was empty\n", buffer);
104   - fflush(stdout);
105   - pthread_mutex_lock(&this->io_triggered_lock);
106   - this->io_triggered = TR_hashEach(this->write, this, commManagerIssueWriteEvents);
107   - this->io_triggered += TR_hashEach(this->accept, this, commManagerIssueAcceptEvents);
108   - this->io_triggered += TR_hashEach(this->read, this, commManagerIssueReadEvents);
109   - pthread_mutex_unlock(&this->io_triggered_lock);
110   - }
111 93
112   - printf("[DEBUG] [%s] io triggerd: %lu\n", buffer, this->io_triggered);
113   - fflush(stdout);
114   -
115   - if (! this->io_triggered) {
116   - if (NULL == timer) {
117   - timeout = TR_eventDispatcherGetDataWaitTime(dispatcher);
118   - } else {
119   - timeout = TR_timerGet(timer, NULL);
120   - }
  94 + if (! (TR_hashEmpty(this->read)
  95 + && TR_hashEmpty(this->write)
  96 + && TR_hashEmpty(this->accept))) {
  97 + timeout = 0;
  98 + } else if (NULL == timer) {
  99 + timeout = TR_eventDispatcherGetDataWaitTime(dispatcher);
  100 + } else {
  101 + timeout = TR_timerGet(timer, NULL);
  102 + }
121 103
122   - printf("[DEBUG] [%s] select timeout: %lu\n", buffer, timeout);
123   - fflush(stdout);
  104 + TR_CALL(_this, TR_CommManager, select, event, timeout);
124 105
125   - TR_CALL(_this, TR_CommManager, select, event, timeout);
126   - }
  106 + TR_hashEach(this->write, this, commManagerIssueWriteEvents);
  107 + TR_hashEach(this->accept, this, commManagerIssueAcceptEvents);
  108 + TR_hashEach(this->read, this, commManagerIssueReadEvents);
127 109
128 110 return TR_EVENT_DONE;
129 111 }
... ... @@ -133,13 +115,9 @@ TR_commManagerPollWrite(void * _this, TR_Event event)
133 115 {
134 116 TR_CommManager this = _this;
135 117
  118 + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
136 119 if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) {
137   - pthread_mutex_lock(&this->io_triggered_lock);
138   - TR_hashAdd(this->write, event->subject);
139 120 TR_CALL(_this, TR_CommManager, pollWrite, event);
140   - pthread_mutex_unlock(&this->io_triggered_lock);
141   - printf("[!DEBUG!] socket added to write hash\n");
142   - fflush(stdout);
143 121 }
144 122
145 123 return TR_EVENT_DONE;
... ... @@ -151,10 +129,6 @@ TR_commManagerPollRead(void * _this, TR_Event event)
151 129 TR_CommManager this = _this;
152 130 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
153 131
154   - if (! TR_socketFinRd(endpoint->transport)) {
155   - TR_CALL(_this, TR_CommManager, pollRead, event);
156   - }
157   -
158 132 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
159 133 && ((TR_TcpSocket)endpoint->transport)->listen) {
160 134 TR_hashDeleteByVal(this->accept, TR_hashableGetHash(event->subject));
... ... @@ -162,6 +136,10 @@ TR_commManagerPollRead(void * _this, TR_Event event)
162 136 TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
163 137 }
164 138
  139 + if (! TR_socketFinRd(endpoint->transport)) {
  140 + TR_CALL(_this, TR_CommManager, pollRead, event);
  141 + }
  142 +
165 143 return TR_EVENT_DONE;
166 144 }
167 145
... ... @@ -178,17 +156,13 @@ TR_commManagerDisableRead(void * _this, TR_Event event)
178 156 TR_EventDone
179 157 TR_commManagerDisableWrite(void * _this, TR_Event event)
180 158 {
181   - TR_CommManager this = _this;
182   - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  159 + TR_CommManager this = _this;
183 160
184   - if (! endpoint->write_buffer->nmsg) {
185   - // TODO think about a better way...
186   - TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
187   - //if (! event->subject->fin) {
188   - // TR_hashAdd(this->read, event->subject);
189   - //}
190   - TR_CALL(_this, TR_CommManager, disableWrite, event);
  161 + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
  162 + if (! event->subject->fin) {
  163 + TR_hashAdd(this->read, event->subject);
191 164 }
  165 + TR_CALL(_this, TR_CommManager, disableWrite, event);
192 166
193 167 return TR_EVENT_DONE;
194 168 }
... ...
... ... @@ -50,7 +50,6 @@ ioHandlerRead(void * _this, TR_Event event)
50 50 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
51 51 TR_Event revent;
52 52 TR_RemoteData data;
53   - char ip[16];
54 53
55 54 switch (TR_commEndPointRead(endpoint, &data)) {
56 55 case FALSE: // EAGAIN
... ... @@ -84,13 +83,6 @@ ioHandlerRead(void * _this, TR_Event event)
84 83 return TR_EVENT_DONE;
85 84 }
86 85
87   - TR_socketAddrIpStr(data->remote, ip, 16);
88   - printf(
89   - "DEBUG: remote ip: %s / port: %d\n",
90   - ip,
91   - TR_socketAddrPort(data->remote));
92   - fflush(stdout);
93   -
94 86 revent = TR_eventSubjectEmit(
95 87 event->subject,
96 88 TR_CEP_EVENT_NEW_DATA,
... ... @@ -98,13 +90,6 @@ ioHandlerRead(void * _this, TR_Event event)
98 90 break;
99 91 }
100 92
101   - TR_eventHandlerIssueEvent(
102   - (TR_EventHandler)_this,
103   - TR_eventSubjectEmit(
104   - event->subject,
105   - TR_CEP_EVENT_IO_DONE,
106   - NULL));
107   -
108 93 TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
109 94
110 95 return TR_EVENT_DONE;
... ... @@ -162,13 +147,6 @@ ioHandlerWrite(void * _this, TR_Event event)
162 147
163 148 endpoint->write_buffer_size -= written;
164 149
165   - TR_eventHandlerIssueEvent(
166   - (TR_EventHandler)_this,
167   - TR_eventSubjectEmit(
168   - event->subject,
169   - TR_CEP_EVENT_IO_DONE,
170   - NULL));
171   -
172 150 if (revent) {
173 151 TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
174 152 }
... ... @@ -180,6 +158,8 @@ static
180 158 void
181 159 ioHandlerCvInit(TR_class_ptr cls)
182 160 {
  161 + TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree);
  162 +
183 163 TR_EVENT_HANDLER_SET_METHOD(
184 164 cls,
185 165 TR_CommEndPoint,
... ... @@ -192,14 +172,14 @@ ioHandlerCvInit(TR_class_ptr cls)
192 172 ioHandlerWrite);
193 173 }
194 174
195   -TR_INIT_HANDLER(TR_IoHandler);
  175 +TR_INSTANCE(TR_Hash, ioHandlerEventMethods);
196 176 TR_INIT_IFACE(TR_Class, ioHandlerCtor, ioHandlerDtor, NULL);
197 177 TR_CREATE_CLASS(
198 178 TR_IoHandler,
199 179 TR_EventHandler,
200 180 ioHandlerCvInit,
201 181 TR_IF(TR_Class)) = {
202   - { TR_HANDLER_CVARS(TR_IoHandler) }
  182 + { &(_ioHandlerEventMethods.data) }
203 183 };
204 184
205 185 // vim: set ts=4 sw=4:
... ...
... ... @@ -125,6 +125,8 @@ static
125 125 void
126 126 protocolHandlerCvInit(TR_class_ptr cls)
127 127 {
  128 + TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree);
  129 +
128 130 TR_EVENT_HANDLER_SET_METHOD(
129 131 cls,
130 132 TR_CommEndPoint,
... ... @@ -142,14 +144,14 @@ protocolHandlerCvInit(TR_class_ptr cls)
142 144 // protocolHandlerUpgrade);
143 145 }
144 146
145   -TR_INIT_HANDLER(TR_ProtocolHandler);
  147 +TR_INSTANCE(TR_Hash, protocolHandlerEventMethods);
146 148 TR_INIT_IFACE(TR_Class, protocolHandlerCtor, protocolHandlerDtor, NULL);
147 149 TR_CREATE_CLASS(
148 150 TR_ProtocolHandler,
149 151 TR_EventHandler,
150 152 protocolHandlerCvInit,
151 153 TR_IF(TR_Class)) = {
152   - { TR_HANDLER_CVARS(TR_ProtocolHandler) }
  154 + { &(_protocolHandlerEventMethods.data) }
153 155 };
154 156
155 157 // vim: set ts=4 sw=4:
... ...
... ... @@ -43,7 +43,7 @@ serverCtor(void * _this, va_list * params)
43 43 {
44 44 TR_Server this = _this;
45 45
46   -#if 0
  46 +#if 1
47 47 this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerEpoll);
48 48 #else
49 49 this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerPoll);
... ...
... ... @@ -135,6 +135,8 @@ static
135 135 void
136 136 simpleClientCvInit(TR_class_ptr cls)
137 137 {
  138 + TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree);
  139 +
138 140 TR_EVENT_HANDLER_SET_METHOD(
139 141 cls,
140 142 TR_EventDispatcher,
... ... @@ -147,14 +149,14 @@ simpleClientCvInit(TR_class_ptr cls)
147 149 simpleClientHandleData);
148 150 }
149 151
150   -TR_INIT_HANDLER(TR_SimpleClient);
  152 +TR_INSTANCE(TR_Hash, simpleClientEventMethods);
151 153 TR_INIT_IFACE(TR_Class, simpleClientCtor, simpleClientDtor, NULL);
152 154 TR_CREATE_CLASS(
153 155 TR_SimpleClient,
154 156 TR_EventHandler,
155 157 simpleClientCvInit,
156 158 TR_IF(TR_Class)) = {
157   - { TR_HANDLER_CVARS(TR_SimpleClient) }
  159 + { &(_simpleClientEventMethods.data) }
158 160 };
159 161
160 162 // vim: set ts=4 sw=4:
... ...
... ... @@ -36,7 +36,6 @@ threadedServerCtor(void * _this, va_list * params)
36 36 {
37 37 TR_ThreadedServer this = _this;
38 38 int i;
39   - char buffer[16];
40 39
41 40 TR_PARENTCALL(TR_ThreadedServer, _this, TR_Class, ctor, params);
42 41
... ... @@ -44,11 +43,9 @@ threadedServerCtor(void * _this, va_list * params)
44 43 this->threads = TR_malloc(sizeof(TR_EventThread) * this->n_threads);
45 44
46 45 for (i=0; i<this->n_threads; i++) {
47   - sprintf(buffer, "test%03d", i);
48 46 this->threads[i] = TR_new(
49 47 TR_EventThread,
50   - ((TR_Server)this)->dispatcher,
51   - buffer);
  48 + ((TR_Server)this)->dispatcher);
52 49 }
53 50
54 51 return 0;
... ...
1 1 #!/bin/bash
2 2 #TRLIBS="-ltrbase -ltrhashing -ltrio -ltrdata -ltrevent -ltrcomm"
3 3 TRLIBS="/usr/local/lib/libtrcomm.a /usr/local/lib/libtrevent.a /usr/local/lib/libtrdata.a /usr/local/lib/libtrio.a /usr/local/lib/libtrhashing.a /usr/local/lib/libtrbase.a"
4   -LIBS="-lcrypto -lssl -lrt -luuid -lpthread"
  4 +LIBS="-lcrypto -lssl -lrt -luuid"
5 5 gcc ${CFLAGS} -c -o test_handler.o test_handler.c
6 6 gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testserver testserver.c test_handler.o ${TRLIBS}
7 7 gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testserver2 testserver2.c test_handler.o ${TRLIBS}
8 8 gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testtcp testclient.c ${TRLIBS}
9 9 gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -DUDP=1 -o testudp testclient.c ${TRLIBS}
10   -gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testserver_thread testserver_thread.c test_handler.o ${TRLIBS}
  10 +gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -lpthread -o testserver_thread testserver_thread.c test_handler.o ${TRLIBS}
... ...
... ... @@ -15,7 +15,7 @@ testHandlerHeartbeat(TR_EventHandler this, TR_Event event)
15 15 double size_msg = ((TestHandler)this)->size
16 16 ? size / ((TestHandler)this)->handled
17 17 : 0.0;
18   - int div_count = ' ';
  18 + int div_count = 0;
19 19
20 20 while (size > 1024. && div_count != 'G') {
21 21 size /= 1024.;
... ... @@ -31,7 +31,6 @@ testHandlerHeartbeat(TR_EventHandler this, TR_Event event)
31 31 ((TR_EventDispatcher)event->subject)->n_beats,
32 32 ((TestHandler)this)->handled,
33 33 size, div_count, size_msg);
34   - fflush(stdout);
35 34 ((TestHandler)this)->handled = 0;
36 35 ((TestHandler)this)->size = 0;
37 36
... ...
... ... @@ -30,7 +30,7 @@ main (int argc, char * argv[])
30 30 TR_ProtoMessageRaw message;
31 31 int i, j=0;
32 32
33   - //TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2);
  33 + TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2);
34 34
35 35 protocol = TR_new(TR_ProtocolRaw);
36 36 #if UDP
... ...
... ... @@ -19,7 +19,7 @@ main (int argc, char * argv[])
19 19 TR_Protocol protocol = TR_new(TR_ProtocolRaw);
20 20 TestHandler test_handler = TR_new(TestHandler);
21 21
22   - //TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2);
  22 + TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2);
23 23
24 24 TR_serverAddHandler(server, (TR_EventHandler)test_handler);
25 25 TR_serverBindTcp(server, "0.0.0.0", 5678, protocol);
... ...
Please register or login to post a comment