Commit f79801180c52d57d2d73f18221e4d6193539eb97

Authored by Georg Hopp
1 parent 67eaeba4

fix leaks, close handling and things.

@@ -30,13 +30,11 @@ TR_cepBufferRead(TR_CommEndPoint this) @@ -30,13 +30,11 @@ TR_cepBufferRead(TR_CommEndPoint this)
30 { 30 {
31 TR_RemoteData data = TR_socketRecv(this->transport, this->read_chunk_size); 31 TR_RemoteData data = TR_socketRecv(this->transport, this->read_chunk_size);
32 32
33 - if (! data) return FALSE;  
34 -  
35 - while (data) {  
36 - TR_cepAppendReadData(this, data);  
37 - data = TR_socketRecv(this->transport, this->read_chunk_size);  
38 - } 33 + if (! data) return -1; // ment to trigger a close
  34 + if (data == (void*)-1) return -2; // remote close... shutdown
  35 + if (data == TR_emptyRemoteData) return FALSE;
39 36
  37 + TR_cepAppendReadData(this, data);
40 return TRUE; 38 return TRUE;
41 } 39 }
42 40
@@ -55,6 +55,7 @@ commManagerDtor(void * _this) @@ -55,6 +55,7 @@ commManagerDtor(void * _this)
55 for (i = 0; i < this->n_endpoints; i++) { 55 for (i = 0; i < this->n_endpoints; i++) {
56 TR_delete(this->endpoints[i]); 56 TR_delete(this->endpoints[i]);
57 } 57 }
  58 + TR_MEM_FREE(this->endpoints);
58 } 59 }
59 60
60 static 61 static
@@ -59,56 +59,56 @@ static @@ -59,56 +59,56 @@ static
59 TR_ProtoMessage 59 TR_ProtoMessage
60 connectionNextMessage(void * _this) 60 connectionNextMessage(void * _this)
61 { 61 {
62 - TR_Connection this = _this;  
63 - TR_CommEndPoint comm = _this;  
64 - TR_RemoteData data = TR_queueGet(comm->read_buffer); 62 + TR_Connection this = _this;
  63 + TR_CommEndPoint comm = _this;
  64 + TR_RemoteData data = TR_queueGet(comm->read_buffer);
  65 + TR_ProtoMessage ret_message = NULL;
65 size_t end; 66 size_t end;
66 67
67 - if (data && (! this->current_message || this->current_message->ready)) 68 + if (NULL == data) return ret_message;
  69 +
  70 + if (! this->current_message || this->current_message->ready)
68 { 71 {
69 this->current_message = 72 this->current_message =
70 TR_protoCreateMessage(comm->protocol, data->remote); 73 TR_protoCreateMessage(comm->protocol, data->remote);
71 } 74 }
72 75
73 - while (NULL != data) {  
74 - end = TR_protoParse(comm->protocol, this->current_message, data);  
75 -  
76 - if (end != ((TR_SizedData)data)->size) {  
77 - /**  
78 - * TODO  
79 - * This means that the parser has not consumed all of the data.  
80 - * We do not know the reason, but with HTTP this should only occur  
81 - * when the message is complete... anyway, to prevent us from  
82 - * looping forever because a protocol implementation is buggy  
83 - * we should close the connection after end was 0 the second time.  
84 - * This can be done by firing a close event.  
85 - */  
86 - switch(end) {  
87 - default:  
88 - {  
89 - TR_RemoteData new_data = TR_new(  
90 - TR_RemoteData,  
91 - ((TR_SizedData)data)->data + end,  
92 - ((TR_SizedData)data)->size - end,  
93 - data->remote);  
94 - TR_delete(data);  
95 - data = new_data;  
96 - }  
97 - // intended drop through  
98 -  
99 - case 0:  
100 - TR_queuePutFirst(comm->read_buffer, data);  
101 - }  
102 - }  
103 -  
104 - if (this->current_message->ready) {  
105 - return this->current_message; 76 + end = TR_protoParse(comm->protocol, this->current_message, data);
  77 +
  78 + if (end != ((TR_SizedData)data)->size) {
  79 + /**
  80 + * TODO
  81 + * This means that the parser has not consumed all of the data.
  82 + * We do not know the reason, but with HTTP this should only occur
  83 + * when the message is complete... anyway, to prevent us from
  84 + * looping forever because a protocol implementation is buggy
  85 + * we should close the connection after end was 0 the second time.
  86 + * This can be done by firing a close event.
  87 + */
  88 + switch(end) {
  89 + default:
  90 + {
  91 + TR_RemoteData new_data = TR_new(
  92 + TR_RemoteData,
  93 + ((TR_SizedData)data)->data + end,
  94 + ((TR_SizedData)data)->size - end,
  95 + data->remote);
  96 + TR_delete(data);
  97 + data = new_data;
  98 + }
  99 + // intended drop through
  100 +
  101 + case 0:
  102 + TR_queuePutFirst(comm->read_buffer, data);
106 } 103 }
  104 + }
107 105
108 - data = TR_queueGet(comm->read_buffer); 106 + if (this->current_message->ready) {
  107 + ret_message = this->current_message;
  108 + this->current_message = NULL;
109 } 109 }
110 110
111 - return NULL; 111 + return ret_message;
112 } 112 }
113 113
114 static 114 static
@@ -46,12 +46,34 @@ ioHandlerRead(void * _this, TR_Event event) @@ -46,12 +46,34 @@ ioHandlerRead(void * _this, TR_Event event)
46 { 46 {
47 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; 47 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
48 48
49 - if (TR_cepBufferRead(endpoint)) {  
50 - TR_eventHandlerIssueEvent(  
51 - (TR_EventHandler)_this,  
52 - event->subject,  
53 - TR_CEP_EVENT_NEW_DATA,  
54 - NULL); 49 + switch (TR_cepBufferRead(endpoint)) {
  50 + default:
  51 + case FALSE:
  52 + break;
  53 +
  54 + case -1:
  55 + TR_eventHandlerIssueEvent(
  56 + (TR_EventHandler)_this,
  57 + event->subject,
  58 + TR_CEP_EVENT_CLOSE,
  59 + NULL);
  60 + break;
  61 +
  62 + case -2:
  63 + TR_eventHandlerIssueEvent(
  64 + (TR_EventHandler)_this,
  65 + event->subject,
  66 + TR_CEP_EVENT_SHUT_READ,
  67 + NULL);
  68 + break;
  69 +
  70 + case TRUE:
  71 + TR_eventHandlerIssueEvent(
  72 + (TR_EventHandler)_this,
  73 + event->subject,
  74 + TR_CEP_EVENT_NEW_DATA,
  75 + NULL);
  76 + break;
55 } 77 }
56 78
57 return TRUE; 79 return TRUE;
@@ -53,7 +53,7 @@ protocolHandlerParse(void * _this, TR_Event event) @@ -53,7 +53,7 @@ protocolHandlerParse(void * _this, TR_Event event)
53 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; 53 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
54 TR_ProtoMessage message = TR_cepNextMessage(endpoint); 54 TR_ProtoMessage message = TR_cepNextMessage(endpoint);
55 55
56 - while (message) { 56 + if (message) {
57 TR_eventHandlerIssueEvent( 57 TR_eventHandlerIssueEvent(
58 (TR_EventHandler)_this, 58 (TR_EventHandler)_this,
59 event->subject, 59 event->subject,
@@ -65,8 +65,6 @@ protocolHandlerParse(void * _this, TR_Event event) @@ -65,8 +65,6 @@ protocolHandlerParse(void * _this, TR_Event event)
65 // in the python code... 65 // in the python code...
66 TR_cepSetClose(endpoint); 66 TR_cepSetClose(endpoint);
67 } 67 }
68 -  
69 - message = TR_cepNextMessage(endpoint);  
70 } 68 }
71 69
72 return TRUE; 70 return TRUE;
@@ -79,18 +77,19 @@ protocolHandlerCompose(void * _this, TR_Event event) @@ -79,18 +77,19 @@ protocolHandlerCompose(void * _this, TR_Event event)
79 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; 77 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
80 TR_ProtoMessage message = (TR_ProtoMessage)event->data; 78 TR_ProtoMessage message = (TR_ProtoMessage)event->data;
81 79
  80 + if (message->close) {
  81 + // also check that we are a response. Well this is how it is done
  82 + // in the python code...
  83 + TR_cepSetClose(endpoint);
  84 + }
  85 +
82 if (TR_cepCompose(endpoint, message)) { 86 if (TR_cepCompose(endpoint, message)) {
83 TR_eventHandlerIssueEvent( 87 TR_eventHandlerIssueEvent(
84 (TR_EventHandler)_this, 88 (TR_EventHandler)_this,
85 event->subject, 89 event->subject,
86 TR_CEP_EVENT_WRITE_READY, 90 TR_CEP_EVENT_WRITE_READY,
87 NULL); 91 NULL);
88 - }  
89 -  
90 - if (message->close) {  
91 - // also check that we are a response. Well this is how it is done  
92 - // in the python code...  
93 - TR_cepSetClose(endpoint); 92 + TR_delete(message);
94 } 93 }
95 94
96 return TRUE; 95 return TRUE;
  1 +==25008== Memcheck, a memory error detector
  2 +==25008== Copyright (C) 2002-2013, and GNU GPL'd, by Julian Seward et al.
  3 +==25008== Using Valgrind-3.9.0 and LibVEX; rerun with -h for copyright info
  4 +==25008== Command: ./testserver
  5 +==25008==
  6 +==25008== Conditional jump or move depends on uninitialised value(s)
  7 +==25008== at 0x4017A74: index (strchr.S:77)
  8 +==25008== by 0x400743D: expand_dynamic_string_token (dl-load.c:425)
  9 +==25008== by 0x400802D: _dl_map_object (dl-load.c:2302)
  10 +==25008== by 0x400138D: map_doit (rtld.c:626)
  11 +==25008== by 0x400E993: _dl_catch_error (dl-error.c:187)
  12 +==25008== by 0x4000B30: do_preload (rtld.c:815)
  13 +==25008== by 0x4004122: dl_main (rtld.c:1632)
  14 +==25008== by 0x401533B: _dl_sysdep_start (dl-sysdep.c:249)
  15 +==25008== by 0x4004A4C: _dl_start (rtld.c:331)
  16 +==25008== by 0x40011A7: ??? (in /lib64/ld-2.19.so)
  17 +==25008==
  18 +==25008==
  19 +==25008== HEAP SUMMARY:
  20 +==25008== in use at exit: 1,024 bytes in 4 blocks
  21 +==25008== total heap usage: 12,272 allocs, 12,268 frees, 1,601,216 bytes allocated
  22 +==25008==
  23 +==25008== 256 bytes in 1 blocks are definitely lost in loss record 1 of 4
  24 +==25008== at 0x4C28730: malloc (vg_replace_malloc.c:291)
  25 +==25008== by 0x4E3497C: newElement (memory.c:82)
  26 +==25008== by 0x4E34B19: TR_malloc (memory.c:442)
  27 +==25008== by 0x4E34B50: TR_calloc (memory.c:460)
  28 +==25008== by 0x4E35144: TR_classNewv (i_class.c:55)
  29 +==25008== by 0x4E3532E: TR_classNew (i_class.c:81)
  30 +==25008== by 0x4018DA: main (testserver.c:123)
  31 +==25008==
  32 +==25008== 256 bytes in 1 blocks are definitely lost in loss record 2 of 4
  33 +==25008== at 0x4C28730: malloc (vg_replace_malloc.c:291)
  34 +==25008== by 0x4E3497C: newElement (memory.c:82)
  35 +==25008== by 0x4E34B19: TR_malloc (memory.c:442)
  36 +==25008== by 0x4E34B50: TR_calloc (memory.c:460)
  37 +==25008== by 0x4E35144: TR_classNewv (i_class.c:55)
  38 +==25008== by 0x4E3532E: TR_classNew (i_class.c:81)
  39 +==25008== by 0x4018F2: main (testserver.c:124)
  40 +==25008==
  41 +==25008== 256 bytes in 1 blocks are definitely lost in loss record 3 of 4
  42 +==25008== at 0x4C28730: malloc (vg_replace_malloc.c:291)
  43 +==25008== by 0x4E3497C: newElement (memory.c:82)
  44 +==25008== by 0x4E34B19: TR_malloc (memory.c:442)
  45 +==25008== by 0x4E34B50: TR_calloc (memory.c:460)
  46 +==25008== by 0x4E35144: TR_classNewv (i_class.c:55)
  47 +==25008== by 0x4E3532E: TR_classNew (i_class.c:81)
  48 +==25008== by 0x40190A: main (testserver.c:125)
  49 +==25008==
  50 +==25008== 256 bytes in 1 blocks are definitely lost in loss record 4 of 4
  51 +==25008== at 0x4C28730: malloc (vg_replace_malloc.c:291)
  52 +==25008== by 0x4E3497C: newElement (memory.c:82)
  53 +==25008== by 0x4E34B19: TR_malloc (memory.c:442)
  54 +==25008== by 0x4E34B50: TR_calloc (memory.c:460)
  55 +==25008== by 0x4E35144: TR_classNewv (i_class.c:55)
  56 +==25008== by 0x4E3532E: TR_classNew (i_class.c:81)
  57 +==25008== by 0x401920: main (testserver.c:126)
  58 +==25008==
  59 +==25008== LEAK SUMMARY:
  60 +==25008== definitely lost: 1,024 bytes in 4 blocks
  61 +==25008== indirectly lost: 0 bytes in 0 blocks
  62 +==25008== possibly lost: 0 bytes in 0 blocks
  63 +==25008== still reachable: 0 bytes in 0 blocks
  64 +==25008== suppressed: 0 bytes in 0 blocks
  65 +==25008==
  66 +==25008== For counts of detected and suppressed errors, rerun with: -v
  67 +==25008== Use --track-origins=yes to see where uninitialised values come from
  68 +==25008== ERROR SUMMARY: 5 errors from 5 contexts (suppressed: 0 from 0)
1 #include <stdio.h> 1 #include <stdio.h>
  2 +#include <string.h>
2 3
3 #include "trbase.h" 4 #include "trbase.h"
4 #include "trcomm.h" 5 #include "trcomm.h"
@@ -6,6 +7,7 @@ @@ -6,6 +7,7 @@
6 7
7 TR_CLASS(TestHandler) { 8 TR_CLASS(TestHandler) {
8 TR_EXTENDS(TR_EventHandler); 9 TR_EXTENDS(TR_EventHandler);
  10 + unsigned long long handled;
9 }; 11 };
10 TR_INSTANCE_INIT(TestHandler); 12 TR_INSTANCE_INIT(TestHandler);
11 TR_CLASSVARS_DECL(TestHandler) { 13 TR_CLASSVARS_DECL(TestHandler) {
@@ -16,7 +18,8 @@ static @@ -16,7 +18,8 @@ static
16 int 18 int
17 testHandlerHeartbeat(TR_EventHandler this, TR_Event event) 19 testHandlerHeartbeat(TR_EventHandler this, TR_Event event)
18 { 20 {
19 - puts("heartbeat"); 21 + printf("handled: %llu/s\n", ((TestHandler)this)->handled);
  22 + ((TestHandler)this)->handled = 0;
20 return FALSE; 23 return FALSE;
21 } 24 }
22 25
@@ -24,7 +27,26 @@ static @@ -24,7 +27,26 @@ static
24 int 27 int
25 testHandlerNewMessage(TR_EventHandler this, TR_Event event) 28 testHandlerNewMessage(TR_EventHandler this, TR_Event event)
26 { 29 {
27 - puts("new message"); 30 + TR_ProtoMessageRaw msg = event->data;
  31 + TR_SizedData data = (TR_SizedData)msg->data;
  32 + char buf[data->size + 1];
  33 + int i;
  34 +
  35 + ((TestHandler)this)->handled++;
  36 +
  37 + memcpy(buf, data->data, data->size);
  38 + buf[data->size] = 0;
  39 + for (i = 0; buf[i]; i++) {
  40 + if (! isprint(buf[i])) buf[i] = '.';
  41 + }
  42 +// printf("echo message: %s(%zd)\n", buf, data->size);
  43 +
  44 + TR_eventHandlerIssueEvent(
  45 + (TR_EventHandler)this,
  46 + event->subject,
  47 + TR_CEP_EVENT_SEND_MSG,
  48 + event->data);
  49 +
28 return FALSE; 50 return FALSE;
29 } 51 }
30 52
@@ -49,6 +71,8 @@ int @@ -49,6 +71,8 @@ int
49 testHandlerCtor(void * _this, va_list * params) 71 testHandlerCtor(void * _this, va_list * params)
50 { 72 {
51 TR_PARENTCALL(TestHandler, _this, TR_Class, ctor, params); 73 TR_PARENTCALL(TestHandler, _this, TR_Class, ctor, params);
  74 + ((TestHandler)_this)->handled = 0;
  75 +
52 return 0; 76 return 0;
53 } 77 }
54 78
@@ -102,6 +126,10 @@ main (int argc, char * argv[]) @@ -102,6 +126,10 @@ main (int argc, char * argv[])
102 { 126 {
103 TR_CommManager cmgr = (TR_CommManager)TR_new(TR_CommManagerPoll); 127 TR_CommManager cmgr = (TR_CommManager)TR_new(TR_CommManagerPoll);
104 TR_EventDispatcher dispatcher = TR_new(TR_EventDispatcher); 128 TR_EventDispatcher dispatcher = TR_new(TR_EventDispatcher);
  129 + TR_Connector connector = TR_new(TR_Connector);
  130 + TR_IoHandler io_handler = TR_new(TR_IoHandler);
  131 + TR_ProtocolHandler protocol_handler = TR_new(TR_ProtocolHandler);
  132 + TestHandler test_handler = TR_new(TestHandler);
105 TR_ConnEntryPoint ep; 133 TR_ConnEntryPoint ep;
106 TR_TcpSocket ep_sock; 134 TR_TcpSocket ep_sock;
107 TR_Protocol protocol; 135 TR_Protocol protocol;
@@ -109,14 +137,14 @@ main (int argc, char * argv[]) @@ -109,14 +137,14 @@ main (int argc, char * argv[])
109 TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger); 137 TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger);
110 138
111 TR_eventDispatcherRegisterHandler(dispatcher, (TR_EventHandler)cmgr); 139 TR_eventDispatcherRegisterHandler(dispatcher, (TR_EventHandler)cmgr);
112 - TR_eventDispatcherRegisterHandler(dispatcher,  
113 - (TR_EventHandler)TR_new(TR_Connector));  
114 - TR_eventDispatcherRegisterHandler(dispatcher,  
115 - (TR_EventHandler)TR_new(TR_IoHandler));  
116 - TR_eventDispatcherRegisterHandler(dispatcher,  
117 - (TR_EventHandler)TR_new(TR_ProtocolHandler));  
118 - TR_eventDispatcherRegisterHandler(dispatcher,  
119 - (TR_EventHandler)TR_new(TestHandler)); 140 + TR_eventDispatcherRegisterHandler(dispatcher, (TR_EventHandler)connector);
  141 + TR_eventDispatcherRegisterHandler(dispatcher, (TR_EventHandler)io_handler);
  142 + TR_eventDispatcherRegisterHandler(
  143 + dispatcher,
  144 + (TR_EventHandler)protocol_handler);
  145 + TR_eventDispatcherRegisterHandler(
  146 + dispatcher,
  147 + (TR_EventHandler)test_handler);
120 148
121 protocol = TR_new(TR_ProtocolRaw); 149 protocol = TR_new(TR_ProtocolRaw);
122 ep_sock = TR_new(TR_TcpSocket, TR_logger, "0.0.0.0", 5678, 0); 150 ep_sock = TR_new(TR_TcpSocket, TR_logger, "0.0.0.0", 5678, 0);
@@ -126,10 +154,25 @@ main (int argc, char * argv[]) @@ -126,10 +154,25 @@ main (int argc, char * argv[])
126 154
127 TR_eventDispatcherSetHeartbeat(dispatcher, 1000); 155 TR_eventDispatcherSetHeartbeat(dispatcher, 1000);
128 TR_eventDispatcherStart(dispatcher); 156 TR_eventDispatcherStart(dispatcher);
129 - TR_eventHandlerClassCleanup(TestHandler); 157 +
  158 + puts("cleanup...");
130 159
131 TR_delete(cmgr); 160 TR_delete(cmgr);
132 TR_delete(dispatcher); 161 TR_delete(dispatcher);
  162 + TR_delete(connector);
  163 + TR_delete(io_handler);
  164 + TR_delete(protocol_handler);
  165 + TR_delete(test_handler);
  166 + TR_delete(protocol);
  167 + //TR_delete(ep);
  168 +
  169 + TR_eventHandlerClassCleanup(TestHandler);
  170 + TR_eventHandlerClassCleanup(TR_ProtocolHandler);
  171 + TR_eventHandlerClassCleanup(TR_IoHandler);
  172 + TR_eventHandlerClassCleanup(TR_Connector);
  173 + TR_eventHandlerClassCleanup(TR_CommManagerPoll);
  174 +
  175 + TR_cleanup();
133 176
134 return 0; 177 return 0;
135 } 178 }
Please register or login to post a comment