Commit 171d5979946aace4d7d5b72676fe678ee4975d22

Authored by Georg Hopp
1 parent e3a6d0e9

Disable read completely when write is failing and buffer is full. Don't add endp…

…oints to read or write when they are already in fin state.
... ... @@ -54,14 +54,15 @@ TR_CLASSVARS_DECL(TR_CommEndPoint) {
54 54 #define TR_CEP_EVENT_DO_WRITE 1 // IoHandler
55 55 #define TR_CEP_EVENT_READ_BLOCK 2 // CommManager
56 56 #define TR_CEP_EVENT_WRITE_BLOCK 3 // CommManager
57   -#define TR_CEP_EVENT_NEW_DATA 4 // ProtocolHandler
58   -#define TR_CEP_EVENT_NEW_MSG 5 // Application
59   -#define TR_CEP_EVENT_MSG_READY 6 // ProtocolHandler
60   -#define TR_CEP_EVENT_DATA_READY 7 // CommManager
61   -#define TR_CEP_EVENT_DATA_END 8 // CommManager
62   -#define TR_CEP_EVENT_SHUT_READ 9 // CommManager
63   -#define TR_CEP_EVENT_SHUT_WRITE 10 // CommManager
64   -#define TR_CEP_EVENT_CLOSE 11 // CommManager
  57 +#define TR_CEP_EVENT_WBUF_FULL 4 // CommManager
  58 +#define TR_CEP_EVENT_NEW_DATA 5 // ProtocolHandler
  59 +#define TR_CEP_EVENT_NEW_MSG 6 // Application
  60 +#define TR_CEP_EVENT_MSG_READY 7 // ProtocolHandler
  61 +#define TR_CEP_EVENT_DATA_READY 8 // CommManager
  62 +#define TR_CEP_EVENT_DATA_END 9 // CommManager
  63 +#define TR_CEP_EVENT_SHUT_READ 10 // CommManager
  64 +#define TR_CEP_EVENT_SHUT_WRITE 11 // CommManager
  65 +#define TR_CEP_EVENT_CLOSE 12 // CommManager
65 66 #define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_CLOSE)
66 67
67 68 #define TR_cepSetClose(ep) ((ep)->do_close = 1)
... ...
... ... @@ -34,15 +34,19 @@ typedef TR_EventDone (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint)
34 34 typedef TR_EventDone (* fptr_TR_commManagerSelect)(void *, TR_Event, int);
35 35 typedef TR_EventDone (* fptr_TR_commManagerPollWrite)(void *, TR_Event);
36 36 typedef TR_EventDone (* fptr_TR_commManagerPollRead)(void *, TR_Event);
  37 +typedef TR_EventDone (* fptr_TR_commManagerDisableWrite)(void *, TR_Event);
  38 +typedef TR_EventDone (* fptr_TR_commManagerDisableRead)(void *, TR_Event);
37 39 typedef TR_EventDone (* fptr_TR_commManagerClose)(void *, TR_Event);
38 40
39 41 TR_INTERFACE(TR_CommManager) {
40 42 TR_IFID;
41   - fptr_TR_commManagerAddEndpoint addEndpoint;
42   - fptr_TR_commManagerSelect select;
43   - fptr_TR_commManagerPollWrite pollWrite;
44   - fptr_TR_commManagerPollRead pollRead;
45   - fptr_TR_commManagerClose close;
  43 + fptr_TR_commManagerAddEndpoint addEndpoint;
  44 + fptr_TR_commManagerSelect select;
  45 + fptr_TR_commManagerPollWrite pollWrite;
  46 + fptr_TR_commManagerPollRead pollRead;
  47 + fptr_TR_commManagerDisableWrite disableWrite;
  48 + fptr_TR_commManagerDisableRead disableRead;
  49 + fptr_TR_commManagerClose close;
46 50 };
47 51
48 52 void TR_commManagerAddEndpoint(void *, TR_CommEndPoint);
... ...
... ... @@ -92,6 +92,7 @@ commEndPointCvInit(TR_class_ptr cls)
92 92 TR_EVENT_CREATE(cls, TR_CEP_EVENT_DO_WRITE);
93 93 TR_EVENT_CREATE(cls, TR_CEP_EVENT_READ_BLOCK);
94 94 TR_EVENT_CREATE(cls, TR_CEP_EVENT_WRITE_BLOCK);
  95 + TR_EVENT_CREATE(cls, TR_CEP_EVENT_WBUF_FULL);
95 96 TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_DATA);
96 97 TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_MSG);
97 98 TR_EVENT_CREATE(cls, TR_CEP_EVENT_MSG_READY);
... ... @@ -107,6 +108,7 @@ const char * TR_cepEventStrings[] = {
107 108 "TR_CEP_EVENT_DO_WRITE",
108 109 "TR_CEP_EVENT_READ_BLOCK",
109 110 "TR_CEP_EVENT_WRITE_BLOCK",
  111 + "TR_CEP_EVENT_WBUF_FULL",
110 112 "TR_CEP_EVENT_NEW_DATA",
111 113 "TR_CEP_EVENT_NEW_MSG",
112 114 "TR_CEP_EVENT_MSG_READY",
... ...
... ... @@ -78,17 +78,6 @@ TR_commManagerEnableWrite(void * _this, TR_Event event)
78 78
79 79 static
80 80 TR_EventDone
81   -TR_commManagerDisableWrite(void * _this, TR_Event event)
82   -{
83   - TR_CommManager this = _this;
84   -
85   - TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
86   -
87   - return TR_EVENT_DONE;
88   -}
89   -
90   -static
91   -TR_EventDone
92 81 TR_commManagerAddEndpointEvt(TR_CommManager this, TR_Event event)
93 82 {
94 83 TR_commManagerAddEndpoint(this, (TR_CommEndPoint)event->subject);
... ... @@ -99,6 +88,8 @@ TR_commManagerAddEndpointEvt(TR_CommManager this, TR_Event event)
99 88 TR_EventDone TR_commManagerSelect(void *, TR_Event, int);
100 89 TR_EventDone TR_commManagerPollWrite(void *, TR_Event);
101 90 TR_EventDone TR_commManagerPollRead(void *, TR_Event);
  91 +TR_EventDone TR_commManagerDisableRead(void *, TR_Event);
  92 +TR_EventDone TR_commManagerDisableWrite(void *, TR_Event);
102 93 TR_EventDone TR_commManagerClose(void *, TR_Event);
103 94 TR_EventDone TR_commManagerShutdownRead(TR_CommManager, TR_Event);
104 95 TR_EventDone TR_commManagerShutdownWrite(TR_CommManager, TR_Event);
... ... @@ -129,6 +120,10 @@ commManagerCvInit(TR_class_ptr cls)
129 120 TR_commManagerPollRead);
130 121 TR_EVENT_HANDLER_SET_METHOD(
131 122 cls, TR_CommEndPoint,
  123 + TR_CEP_EVENT_WBUF_FULL,
  124 + TR_commManagerDisableRead);
  125 + TR_EVENT_HANDLER_SET_METHOD(
  126 + cls, TR_CommEndPoint,
132 127 TR_CEP_EVENT_CLOSE,
133 128 TR_commManagerClose);
134 129 TR_EVENT_HANDLER_SET_METHOD(
... ... @@ -151,7 +146,7 @@ commManagerCvInit(TR_class_ptr cls)
151 146
152 147 TR_INSTANCE(TR_Hash, commManagerEventMethods);
153 148 TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL);
154   -TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL);
  149 +TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
155 150 TR_CREATE_CLASS(
156 151 TR_CommManager,
157 152 TR_EventHandler,
... ...
... ... @@ -73,7 +73,8 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
73 73 int handle = endpoint->transport->handle;
74 74 struct epoll_event event;
75 75
76   - this->events[handle] = EPOLLET;
  76 + //this->events[handle] = EPOLLET;
  77 + this->events[handle] = 0;
77 78 event.data.ptr = endpoint;
78 79 event.events = this->events[handle];
79 80
... ... @@ -100,8 +101,9 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
100 101 && ((TR_TcpSocket)endpoint->transport)->listen) {
101 102 TR_hashAdd(cmgr->accept, endpoint);
102 103 } else {
103   - TR_hashAdd(cmgr->read, endpoint);
104   -
  104 + if (! event->subject->fin) {
  105 + TR_hashAdd(cmgr->read, endpoint);
  106 + }
105 107 }
106 108
107 109 this->events[handle] &= ~EPOLLIN;
... ... @@ -111,11 +113,22 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
111 113 }
112 114
113 115 if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
  116 + if (! event->subject->fin) {
114 117 TR_hashAdd(cmgr->write, endpoint);
115   - this->events[handle] &= ~EPOLLOUT;
116   - _event.data.ptr = endpoint;
117   - _event.events = this->events[handle];
118   - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
  118 + }
  119 + this->events[handle] &= ~EPOLLOUT;
  120 + _event.data.ptr = endpoint;
  121 + _event.events = this->events[handle];
  122 + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
  123 + }
  124 +
  125 + if ((events[i].events & EPOLLHUP) == EPOLLHUP) {
  126 + TR_eventHandlerIssueEvent(
  127 + (TR_EventHandler)_this,
  128 + TR_eventSubjectEmit(
  129 + (TR_EventSubject)endpoint,
  130 + TR_CEP_EVENT_SHUT_WRITE,
  131 + NULL));
119 132 }
120 133 }
121 134 }
... ... @@ -123,6 +136,23 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
123 136 static
124 137 inline
125 138 void
  139 +TR_commManagerEpollDisable(void * _this, uint32_t mask, TR_Event event)
  140 +{
  141 + TR_CommManagerEpoll this = _this;
  142 + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  143 + int handle = endpoint->transport->handle;
  144 + struct epoll_event _event;
  145 +
  146 + this->events[handle] &= ~mask;
  147 + _event.data.ptr = endpoint;
  148 + _event.events = this->events[handle];
  149 +
  150 + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
  151 +}
  152 +
  153 +static
  154 +inline
  155 +void
126 156 TR_commManagerEpollEnable(void * _this, uint32_t mask, TR_Event event)
127 157 {
128 158 TR_CommManagerEpoll this = _this;
... ... @@ -157,6 +187,20 @@ TR_commManagerEpollEnableRead(void * _this, TR_Event event)
157 187
158 188 static
159 189 void
  190 +TR_commManagerEpollDisableWrite(void * _this, TR_Event event)
  191 +{
  192 + TR_commManagerEpollDisable(_this, EPOLLOUT, event);
  193 +}
  194 +
  195 +static
  196 +void
  197 +TR_commManagerEpollDisableRead(void * _this, TR_Event event)
  198 +{
  199 + TR_commManagerEpollDisable(_this, EPOLLIN, event);
  200 +}
  201 +
  202 +static
  203 +void
160 204 TR_commManagerEpollClose(void * _this, TR_Event event)
161 205 {
162 206 TR_CommManagerEpoll this = _this;
... ... @@ -178,6 +222,8 @@ TR_INIT_IFACE(
178 222 TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT
179 223 TR_commManagerEpollEnableWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK
180 224 TR_commManagerEpollEnableRead, // TR_CEP_EVENT_READ_BLOCK
  225 + TR_commManagerEpollDisableWrite,
  226 + TR_commManagerEpollDisableRead,
181 227 TR_commManagerEpollClose); // TR_CEP_EVENT_CLOSE
182 228 TR_CREATE_CLASS(
183 229 TR_CommManagerEpoll,
... ...
... ... @@ -96,16 +96,31 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout)
96 96 && ((TR_TcpSocket)endpoint->transport)->listen) {
97 97 TR_hashAdd(cmgr->accept, endpoint);
98 98 } else {
99   - TR_hashAdd(cmgr->read, endpoint);
  99 + if (! event->subject->fin) {
  100 + TR_hashAdd(cmgr->read, endpoint);
  101 + }
100 102 }
101 103 this->fds[endpoint->transport->handle].events &= ~POLLIN;
102 104 }
103 105
104 106 if ((this->fds[i].revents & POLLOUT) == POLLOUT) {
105   - TR_hashAdd(cmgr->write, endpoint);
106   - this->fds[endpoint->transport->handle].events &= ~POLLOUT;
  107 + if (! event->subject->fin) {
  108 + TR_hashAdd(cmgr->write, endpoint);
  109 + }
  110 + this->fds[endpoint->transport->handle].events &=
  111 + ~(POLLOUT|POLLHUP);
  112 + }
  113 +
  114 + if ((this->fds[i].revents & POLLHUP) == POLLHUP) {
  115 + TR_eventHandlerIssueEvent(
  116 + (TR_EventHandler)_this,
  117 + TR_eventSubjectEmit(
  118 + (TR_EventSubject)endpoint,
  119 + TR_CEP_EVENT_SHUT_WRITE,
  120 + NULL));
107 121 }
108 122
  123 + this->fds[i].revents = 0;
109 124 if (nevents <= 0) break;
110 125 }
111 126 }
... ... @@ -120,7 +135,7 @@ TR_commManagerPollEnableWrite(void * _this, TR_Event event)
120 135 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
121 136
122 137 if (! TR_socketFinWr(endpoint->transport)) {
123   - this->fds[endpoint->transport->handle].events |= POLLOUT;
  138 + this->fds[endpoint->transport->handle].events |= POLLOUT|POLLHUP;
124 139 }
125 140 }
126 141
... ... @@ -138,6 +153,26 @@ TR_commManagerPollEnableRead(void * _this, TR_Event event)
138 153
139 154 static
140 155 void
  156 +TR_commManagerPollDisableWrite(void * _this, TR_Event event)
  157 +{
  158 + TR_CommManagerPoll this = _this;
  159 + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  160 +
  161 + this->fds[endpoint->transport->handle].events &= ~(POLLOUT|POLLHUP);
  162 +}
  163 +
  164 +static
  165 +void
  166 +TR_commManagerPollDisableRead(void * _this, TR_Event event)
  167 +{
  168 + TR_CommManagerPoll this = _this;
  169 + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  170 +
  171 + this->fds[endpoint->transport->handle].events &= ~POLLIN;
  172 +}
  173 +
  174 +static
  175 +void
141 176 TR_commManagerPollClose(void * _this, TR_Event event)
142 177 {
143 178 TR_CommManagerPoll this = _this;
... ... @@ -160,6 +195,8 @@ TR_INIT_IFACE(
160 195 TR_commManagerPollSelect,
161 196 TR_commManagerPollEnableWrite,
162 197 TR_commManagerPollEnableRead,
  198 + TR_commManagerPollDisableWrite,
  199 + TR_commManagerPollDisableRead,
163 200 TR_commManagerPollClose);
164 201 TR_CREATE_CLASS(
165 202 TR_CommManagerPoll,
... ...
... ... @@ -144,6 +144,30 @@ TR_commManagerPollRead(void * _this, TR_Event event)
144 144 }
145 145
146 146 TR_EventDone
  147 +TR_commManagerDisableRead(void * _this, TR_Event event)
  148 +{
  149 + TR_CommManager this = _this;
  150 + TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
  151 + TR_CALL(_this, TR_CommManager, disableRead, event);
  152 +
  153 + return TR_EVENT_DONE;
  154 +}
  155 +
  156 +TR_EventDone
  157 +TR_commManagerDisableWrite(void * _this, TR_Event event)
  158 +{
  159 + TR_CommManager this = _this;
  160 +
  161 + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
  162 + if (! event->subject->fin) {
  163 + TR_hashAdd(this->read, event->subject);
  164 + }
  165 + TR_CALL(_this, TR_CommManager, disableWrite, event);
  166 +
  167 + return TR_EVENT_DONE;
  168 +}
  169 +
  170 +TR_EventDone
147 171 TR_commManagerClose(void * _this, TR_Event event)
148 172 {
149 173 TR_CommManager this = _this;
... ... @@ -155,12 +179,14 @@ TR_commManagerClose(void * _this, TR_Event event)
155 179 }
156 180
157 181 if (handle == this->max_handle) {
158   - while (! this->endpoints[--this->max_handle]);
  182 + while (! this->endpoints[--this->max_handle] &&
  183 + this->max_handle > 0);
159 184 }
160 185
161 186 if (this->endpoints[handle]) {
162 187 TR_eventSubjectFinalize((TR_EventSubject)this->endpoints[handle]);
163   - this->endpoints[handle] = NULL;
  188 + TR_CALL(_this, TR_CommManager, disableWrite, event);
  189 + TR_CALL(_this, TR_CommManager, disableRead, event);
164 190 TR_hashDeleteByVal(this->write, TR_hashableGetHash(endpoint));
165 191 TR_hashDeleteByVal(this->read, TR_hashableGetHash(endpoint));
166 192 }
... ...
... ... @@ -51,44 +51,42 @@ ioHandlerRead(void * _this, TR_Event event)
51 51 TR_Event revent;
52 52 TR_RemoteData data;
53 53
54   - if (endpoint->write_buffer_size < CEP_WRITE_BUFFER_THRESHOLD) {
55   - switch (TR_commEndPointRead(endpoint, &data)) {
56   - case FALSE: // EAGAIN
57   - revent = TR_eventSubjectEmit(
58   - event->subject,
59   - TR_CEP_EVENT_READ_BLOCK,
60   - NULL);
61   - break;
62   -
63   - case -1: // error
64   - revent = TR_eventSubjectEmit(
65   - event->subject,
66   - TR_CEP_EVENT_CLOSE,
67   - NULL);
68   - break;
  54 + switch (TR_commEndPointRead(endpoint, &data)) {
  55 + case FALSE: // EAGAIN
  56 + revent = TR_eventSubjectEmit(
  57 + event->subject,
  58 + TR_CEP_EVENT_READ_BLOCK,
  59 + NULL);
  60 + break;
69 61
70   - default:
71   - case -2: // remote close
72   - revent = TR_eventSubjectEmit(
73   - event->subject,
74   - TR_CEP_EVENT_SHUT_READ,
75   - NULL);
76   - break;
  62 + case -1: // error
  63 + revent = TR_eventSubjectEmit(
  64 + event->subject,
  65 + TR_CEP_EVENT_CLOSE,
  66 + NULL);
  67 + break;
77 68
78   - case -3: // read limit
79   - return TR_EVENT_DONE;
  69 + default:
  70 + case -2: // remote close
  71 + revent = TR_eventSubjectEmit(
  72 + event->subject,
  73 + TR_CEP_EVENT_SHUT_READ,
  74 + NULL);
  75 + break;
80 76
81   - case TRUE:
82   - revent = TR_eventSubjectEmit(
83   - event->subject,
84   - TR_CEP_EVENT_NEW_DATA,
85   - data);
86   - break;
87   - }
  77 + case -3: // read limit
  78 + return TR_EVENT_DONE;
88 79
89   - TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
  80 + case TRUE:
  81 + revent = TR_eventSubjectEmit(
  82 + event->subject,
  83 + TR_CEP_EVENT_NEW_DATA,
  84 + data);
  85 + break;
90 86 }
91 87
  88 + TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
  89 +
92 90 return TR_EVENT_DONE;
93 91 }
94 92
... ...
... ... @@ -89,6 +89,15 @@ protocolHandlerCompose(void * _this, TR_Event event)
89 89 if ((message_size = TR_cepCompose(endpoint, message))) {
90 90 endpoint->write_buffer_size += message_size;
91 91
  92 + if (endpoint->write_buffer_size >= CEP_WRITE_BUFFER_THRESHOLD) {
  93 + TR_eventHandlerIssueEvent(
  94 + (TR_EventHandler)_this,
  95 + TR_eventSubjectEmit(
  96 + event->subject,
  97 + TR_CEP_EVENT_WBUF_FULL,
  98 + NULL));
  99 + }
  100 +
92 101 if (endpoint->write_buffer->nmsg == 1) {
93 102 TR_eventHandlerIssueEvent(
94 103 (TR_EventHandler)_this,
... ...
... ... @@ -11,7 +11,9 @@ static
11 11 TR_EventDone
12 12 testHandlerHeartbeat(TR_EventHandler this, TR_Event event)
13 13 {
14   - printf("handled: %llu/s\n", ((TestHandler)this)->handled);
  14 + printf("%zd beats since last beat / handled: %llu/s\n",
  15 + ((TR_EventDispatcher)event->subject)->n_beats,
  16 + ((TestHandler)this)->handled);
15 17 ((TestHandler)this)->handled = 0;
16 18
17 19 return TR_EVENT_DONE;
... ... @@ -51,7 +53,7 @@ static
51 53 TR_EventDone
52 54 testHandlerClose(TR_EventHandler this, TR_Event event)
53 55 {
54   - puts("close");
  56 +// puts("close");
55 57
56 58 return TR_EVENT_PENDING;
57 59 }
... ...
1 1 #!/bin/sh
2 2
3 3 BS=8192
4   -COUNT=25000
  4 +COUNT=10000
5 5 CONCURENT=200
6 6 IP="192.168.2.13"
7 7 pids=""
8 8 i=0
9 9
  10 +MESSAGE="GET / HTTP/1.1\r\nConnection: keep-alive\r\n\r\n"
  11 +
10 12 while [ $i -lt ${CONCURENT} ]
11 13 do
12   - dd if=/dev/zero bs=${BS} count=${COUNT} | nc ${IP} 5678 >/dev/null &
  14 + dd if=/dev/zero bs=${BS} count=${COUNT} | nc -q 1 ${IP} 5678 >/dev/null &
  15 + #echo -en "${MESSAGE}" | nc -q 1 ${IP} 5678 &
  16 +
13 17 pids="${pids} $!"
14 18 i=$((i + 1))
15 19 done
... ...
Please register or login to post a comment