Showing
9 changed files
with
57 additions
and
54 deletions
@@ -35,7 +35,8 @@ typedef TR_ProtoMessage (* fptr_TR_protoCreateMessage)(void *); | @@ -35,7 +35,8 @@ typedef TR_ProtoMessage (* fptr_TR_protoCreateMessage)(void *); | ||
35 | typedef TR_ProtoMessage (* fptr_TR_protoCreateRequest)(void *, va_list *); | 35 | typedef TR_ProtoMessage (* fptr_TR_protoCreateRequest)(void *, va_list *); |
36 | typedef TR_ProtoMessage (* fptr_TR_protoCreateResponse)(void *, va_list *); | 36 | typedef TR_ProtoMessage (* fptr_TR_protoCreateResponse)(void *, va_list *); |
37 | typedef TR_RemoteData (* fptr_TR_protoCompose)(void *, TR_ProtoMessage); | 37 | typedef TR_RemoteData (* fptr_TR_protoCompose)(void *, TR_ProtoMessage); |
38 | -typedef size_t (* fptr_TR_protoParse)(void *, TR_ProtoMessage, TR_RemoteData); | 38 | +typedef TR_RemoteData (* fptr_TR_protoParse)( |
39 | + void *, TR_ProtoMessage, TR_RemoteData); | ||
39 | 40 | ||
40 | TR_INTERFACE(TR_Protocol) { | 41 | TR_INTERFACE(TR_Protocol) { |
41 | TR_IFID; | 42 | TR_IFID; |
@@ -51,7 +52,7 @@ TR_ProtoMessage TR_vprotoCreateRequest(void *, TR_Socket, va_list*); | @@ -51,7 +52,7 @@ TR_ProtoMessage TR_vprotoCreateRequest(void *, TR_Socket, va_list*); | ||
51 | TR_ProtoMessage TR_protoCreateRequest(void *, TR_Socket, ...); | 52 | TR_ProtoMessage TR_protoCreateRequest(void *, TR_Socket, ...); |
52 | TR_ProtoMessage TR_vprotoCreateResponse(void *, TR_Socket, va_list*); | 53 | TR_ProtoMessage TR_vprotoCreateResponse(void *, TR_Socket, va_list*); |
53 | TR_ProtoMessage TR_protoCreateResponse(void *, TR_Socket, ...); | 54 | TR_ProtoMessage TR_protoCreateResponse(void *, TR_Socket, ...); |
54 | -size_t TR_protoParse(void *, TR_ProtoMessage, TR_RemoteData); | 55 | +TR_RemoteData TR_protoParse(void *, TR_ProtoMessage, TR_RemoteData); |
55 | TR_RemoteData TR_protoCompose(void *, TR_ProtoMessage); | 56 | TR_RemoteData TR_protoCompose(void *, TR_ProtoMessage); |
56 | 57 | ||
57 | #endif // __TR_INTERFACE_PROTOCOL_H__ | 58 | #endif // __TR_INTERFACE_PROTOCOL_H__ |
@@ -63,7 +63,6 @@ connectionNextMessage(void * _this, TR_RemoteData * data) | @@ -63,7 +63,6 @@ connectionNextMessage(void * _this, TR_RemoteData * data) | ||
63 | TR_CommEndPoint comm = _this; | 63 | TR_CommEndPoint comm = _this; |
64 | TR_ProtoMessage ret_message = NULL; | 64 | TR_ProtoMessage ret_message = NULL; |
65 | TR_RemoteData new_data = NULL; | 65 | TR_RemoteData new_data = NULL; |
66 | - size_t end; | ||
67 | 66 | ||
68 | if (*data) { | 67 | if (*data) { |
69 | if (! this->current_message || this->current_message->ready) | 68 | if (! this->current_message || this->current_message->ready) |
@@ -72,7 +71,15 @@ connectionNextMessage(void * _this, TR_RemoteData * data) | @@ -72,7 +71,15 @@ connectionNextMessage(void * _this, TR_RemoteData * data) | ||
72 | TR_protoCreateMessage(comm->protocol, (*data)->remote); | 71 | TR_protoCreateMessage(comm->protocol, (*data)->remote); |
73 | } | 72 | } |
74 | 73 | ||
75 | - end = TR_protoParse(comm->protocol, this->current_message, *data); | 74 | + /* |
75 | + * This will return NULL if all data was consumed or a new TR_RemoteData | ||
76 | + * with the remaining data. | ||
77 | + * In other words, the protocol implementatio is completely responsible | ||
78 | + * for the data management here. It can decide to copy the data, but | ||
79 | + * then has to free the given TR_RemoteData by itself. This gives most | ||
80 | + * flexibility when writing a protocol. | ||
81 | + */ | ||
82 | + new_data = TR_protoParse(comm->protocol, this->current_message, *data); | ||
76 | 83 | ||
77 | /** | 84 | /** |
78 | * We define that the only valid reason for a protocol parser to not | 85 | * We define that the only valid reason for a protocol parser to not |
@@ -98,18 +105,12 @@ connectionNextMessage(void * _this, TR_RemoteData * data) | @@ -98,18 +105,12 @@ connectionNextMessage(void * _this, TR_RemoteData * data) | ||
98 | * true and we should make it an ERROR). | 105 | * true and we should make it an ERROR). |
99 | */ | 106 | */ |
100 | if (this->current_message->ready) { | 107 | if (this->current_message->ready) { |
101 | - if (end != ((TR_SizedData)*data)->size) { | ||
102 | - new_data = TR_new( | ||
103 | - TR_RemoteData, | ||
104 | - ((TR_SizedData)*data)->data + end, | ||
105 | - ((TR_SizedData)*data)->size - end, | ||
106 | - (*data)->remote); | ||
107 | - } | ||
108 | ret_message = this->current_message; | 108 | ret_message = this->current_message; |
109 | this->current_message = NULL; | 109 | this->current_message = NULL; |
110 | } else { | 110 | } else { |
111 | - if (end != ((TR_SizedData)*data)->size) { | 111 | + if (new_data) { |
112 | TR_delete(*data); | 112 | TR_delete(*data); |
113 | + TR_delete(new_data); | ||
113 | TR_loggerLog( | 114 | TR_loggerLog( |
114 | TR_logger, | 115 | TR_logger, |
115 | TR_LOGGER_WARNING, | 116 | TR_LOGGER_WARNING, |
@@ -85,10 +85,10 @@ TR_protoCreateResponse(void * _this, TR_Socket remote, ...) | @@ -85,10 +85,10 @@ TR_protoCreateResponse(void * _this, TR_Socket remote, ...) | ||
85 | return callret; | 85 | return callret; |
86 | } | 86 | } |
87 | 87 | ||
88 | -size_t | 88 | +TR_RemoteData |
89 | TR_protoParse(void * _this, TR_ProtoMessage message, TR_RemoteData data) | 89 | TR_protoParse(void * _this, TR_ProtoMessage message, TR_RemoteData data) |
90 | { | 90 | { |
91 | - size_t callret; | 91 | + TR_RemoteData callret; |
92 | TR_RETCALL(_this, TR_Protocol, parse, callret, message, data); | 92 | TR_RETCALL(_this, TR_Protocol, parse, callret, message, data); |
93 | return callret; | 93 | return callret; |
94 | } | 94 | } |
@@ -78,6 +78,11 @@ ioHandlerRead(void * _this, TR_Event event) | @@ -78,6 +78,11 @@ ioHandlerRead(void * _this, TR_Event event) | ||
78 | return TR_EVENT_DONE; | 78 | return TR_EVENT_DONE; |
79 | 79 | ||
80 | case TRUE: | 80 | case TRUE: |
81 | + if (event->subject->fin) { | ||
82 | + TR_delete(data); | ||
83 | + return TR_EVENT_DONE; | ||
84 | + } | ||
85 | + | ||
81 | revent = TR_eventSubjectEmit( | 86 | revent = TR_eventSubjectEmit( |
82 | event->subject, | 87 | event->subject, |
83 | TR_CEP_EVENT_NEW_DATA, | 88 | TR_CEP_EVENT_NEW_DATA, |
@@ -21,6 +21,7 @@ | @@ -21,6 +21,7 @@ | ||
21 | */ | 21 | */ |
22 | 22 | ||
23 | #include <unistd.h> | 23 | #include <unistd.h> |
24 | +#include <inttypes.h> | ||
24 | 25 | ||
25 | #include "trbase.h" | 26 | #include "trbase.h" |
26 | #include "trevent.h" | 27 | #include "trevent.h" |
@@ -55,17 +56,19 @@ protocolHandlerParse(void * _this, TR_Event event) | @@ -55,17 +56,19 @@ protocolHandlerParse(void * _this, TR_Event event) | ||
55 | TR_ProtoMessage message; | 56 | TR_ProtoMessage message; |
56 | 57 | ||
57 | while ((message = TR_cepNextMessage(endpoint, &data))) { | 58 | while ((message = TR_cepNextMessage(endpoint, &data))) { |
58 | - TR_eventHandlerIssueEvent( | 59 | + if (! TR_eventHandlerIssueEvent( |
59 | (TR_EventHandler)_this, | 60 | (TR_EventHandler)_this, |
60 | TR_eventSubjectEmit( | 61 | TR_eventSubjectEmit( |
61 | event->subject, | 62 | event->subject, |
62 | TR_CEP_EVENT_NEW_MSG, | 63 | TR_CEP_EVENT_NEW_MSG, |
63 | - message)); | ||
64 | - | ||
65 | - if (message->close) { | ||
66 | - // also check that we are a response. Well this is | ||
67 | - // how it is done in the python code... | ||
68 | - TR_cepSetClose(endpoint); | 64 | + message))) { |
65 | + TR_delete(message); | ||
66 | + } else { | ||
67 | + if (message->close) { | ||
68 | + // also check that we are a response. Well this is | ||
69 | + // how it is done in the python code... | ||
70 | + TR_cepSetClose(endpoint); | ||
71 | + } | ||
69 | } | 72 | } |
70 | } | 73 | } |
71 | 74 |
@@ -48,7 +48,7 @@ protocolRawCreateMessage(void * _this, TR_Socket remote) | @@ -48,7 +48,7 @@ protocolRawCreateMessage(void * _this, TR_Socket remote) | ||
48 | } | 48 | } |
49 | 49 | ||
50 | static | 50 | static |
51 | -size_t | 51 | +TR_RemoteData |
52 | protocolRawParse(void * _this, TR_ProtoMessage _message, TR_RemoteData data) | 52 | protocolRawParse(void * _this, TR_ProtoMessage _message, TR_RemoteData data) |
53 | { | 53 | { |
54 | TR_ProtoMessageRaw message = (TR_ProtoMessageRaw)_message; | 54 | TR_ProtoMessageRaw message = (TR_ProtoMessageRaw)_message; |
@@ -56,7 +56,7 @@ protocolRawParse(void * _this, TR_ProtoMessage _message, TR_RemoteData data) | @@ -56,7 +56,7 @@ protocolRawParse(void * _this, TR_ProtoMessage _message, TR_RemoteData data) | ||
56 | message->data = data; | 56 | message->data = data; |
57 | _message->ready = 1; | 57 | _message->ready = 1; |
58 | 58 | ||
59 | - return ((TR_SizedData)data)->size; | 59 | + return NULL; |
60 | } | 60 | } |
61 | 61 | ||
62 | static | 62 | static |
@@ -11,7 +11,7 @@ static | @@ -11,7 +11,7 @@ static | ||
11 | TR_EventDone | 11 | TR_EventDone |
12 | testHandlerHeartbeat(TR_EventHandler this, TR_Event event) | 12 | testHandlerHeartbeat(TR_EventHandler this, TR_Event event) |
13 | { | 13 | { |
14 | - printf("%zd beats since last beat / handled: %llu/s\n", | 14 | + printf("%zd beat(s) since last beat / handled: %llu/s\n", |
15 | ((TR_EventDispatcher)event->subject)->n_beats, | 15 | ((TR_EventDispatcher)event->subject)->n_beats, |
16 | ((TestHandler)this)->handled); | 16 | ((TestHandler)this)->handled); |
17 | ((TestHandler)this)->handled = 0; | 17 | ((TestHandler)this)->handled = 0; |
@@ -23,22 +23,10 @@ static | @@ -23,22 +23,10 @@ static | ||
23 | TR_EventDone | 23 | TR_EventDone |
24 | testHandlerNewMessage(TR_EventHandler this, TR_Event event) | 24 | testHandlerNewMessage(TR_EventHandler this, TR_Event event) |
25 | { | 25 | { |
26 | -// TR_ProtoMessageRaw msg = event->data; | ||
27 | -// TR_SizedData data = (TR_SizedData)msg->data; | ||
28 | -// char buf[data->size + 1]; | ||
29 | -// int i; | ||
30 | TR_Event _event; | 26 | TR_Event _event; |
31 | 27 | ||
32 | ((TestHandler)this)->handled++; | 28 | ((TestHandler)this)->handled++; |
33 | 29 | ||
34 | -// printf("handled data %p\n", event->data); | ||
35 | -// memcpy(buf, data->data, data->size); | ||
36 | -// buf[data->size] = 0; | ||
37 | -// for (i = 0; buf[i]; i++) { | ||
38 | -// if (! isprint(buf[i])) buf[i] = '.'; | ||
39 | -// } | ||
40 | -// printf("echo message: %s(%zd)\n", buf, data->size); | ||
41 | - | ||
42 | _event = TR_eventSubjectEmit( | 30 | _event = TR_eventSubjectEmit( |
43 | event->subject, | 31 | event->subject, |
44 | TR_CEP_EVENT_MSG_READY, | 32 | TR_CEP_EVENT_MSG_READY, |
@@ -58,14 +46,16 @@ testHandlerClose(TR_EventHandler this, TR_Event event) | @@ -58,14 +46,16 @@ testHandlerClose(TR_EventHandler this, TR_Event event) | ||
58 | return TR_EVENT_PENDING; | 46 | return TR_EVENT_PENDING; |
59 | } | 47 | } |
60 | 48 | ||
61 | -//static | ||
62 | -//TR_EventDone | ||
63 | -//testHandlerUpgrade(TR_EventHandler this, TR_Event event) | ||
64 | -//{ | ||
65 | -// printf("upgrade: %"PRIdPTR"\n", event->id); | ||
66 | -// | ||
67 | -// return TR_EVENT_PENDING; | ||
68 | -//} | 49 | +#if 0 |
50 | +static | ||
51 | +TR_EventDone | ||
52 | +testHandlerUpgrade(TR_EventHandler this, TR_Event event) | ||
53 | +{ | ||
54 | + printf("upgrade: %"PRIdPTR"\n", event->id); | ||
55 | + | ||
56 | + return TR_EVENT_PENDING; | ||
57 | +} | ||
58 | +#endif | ||
69 | 59 | ||
70 | static | 60 | static |
71 | int | 61 | int |
@@ -103,11 +93,13 @@ testHandlerCvInit(TR_class_ptr class) | @@ -103,11 +93,13 @@ testHandlerCvInit(TR_class_ptr class) | ||
103 | TR_CommEndPoint, | 93 | TR_CommEndPoint, |
104 | TR_CEP_EVENT_CLOSE, | 94 | TR_CEP_EVENT_CLOSE, |
105 | testHandlerClose); | 95 | testHandlerClose); |
106 | -// TR_EVENT_HANDLER_SET_METHOD( | ||
107 | -// class, | ||
108 | -// TR_CommEndPoint, | ||
109 | -// TR_CEP_EVENT_UPGRADE, | ||
110 | -// testHandlerUpgrade); | 96 | +#if 0 |
97 | + TR_EVENT_HANDLER_SET_METHOD( | ||
98 | + class, | ||
99 | + TR_CommEndPoint, | ||
100 | + TR_CEP_EVENT_UPGRADE, | ||
101 | + testHandlerUpgrade); | ||
102 | +#endif | ||
111 | } | 103 | } |
112 | 104 | ||
113 | TR_INSTANCE(TR_Hash, testHandlerEventMethods); | 105 | TR_INSTANCE(TR_Hash, testHandlerEventMethods); |
1 | #!/bin/sh | 1 | #!/bin/sh |
2 | 2 | ||
3 | BS=8192 | 3 | BS=8192 |
4 | -COUNT=10000 | ||
5 | -CONCURENT=200 | ||
6 | -IP="192.168.2.13" | 4 | +COUNT=1000000 |
5 | +CONCURENT=20 | ||
6 | +IP="localhost" | ||
7 | pids="" | 7 | pids="" |
8 | i=0 | 8 | i=0 |
9 | 9 | ||
@@ -12,7 +12,7 @@ MESSAGE="GET / HTTP/1.1\r\nConnection: keep-alive\r\n\r\n" | @@ -12,7 +12,7 @@ MESSAGE="GET / HTTP/1.1\r\nConnection: keep-alive\r\n\r\n" | ||
12 | while [ $i -lt ${CONCURENT} ] | 12 | while [ $i -lt ${CONCURENT} ] |
13 | do | 13 | do |
14 | dd if=/dev/zero bs=${BS} count=${COUNT} | nc -q 1 ${IP} 5678 >/dev/null & | 14 | dd if=/dev/zero bs=${BS} count=${COUNT} | nc -q 1 ${IP} 5678 >/dev/null & |
15 | - #echo -en "${MESSAGE}" | nc -q 1 ${IP} 5678 & | 15 | + #echo -en "${MESSAGE}" | nc -q 1 -u ${IP} 5678 & |
16 | 16 | ||
17 | pids="${pids} $!" | 17 | pids="${pids} $!" |
18 | i=$((i + 1)) | 18 | i=$((i + 1)) |
@@ -10,6 +10,7 @@ | @@ -10,6 +10,7 @@ | ||
10 | #include "test_handler.h" | 10 | #include "test_handler.h" |
11 | 11 | ||
12 | TR_INSTANCE(TR_LoggerSyslog, mylogger, {TR_LOGGER_INFO}); | 12 | TR_INSTANCE(TR_LoggerSyslog, mylogger, {TR_LOGGER_INFO}); |
13 | +TR_INSTANCE(TR_LoggerStderr, mylogger2, {TR_LOGGER_INFO}); | ||
13 | 14 | ||
14 | int | 15 | int |
15 | main (int argc, char * argv[]) | 16 | main (int argc, char * argv[]) |
@@ -18,7 +19,7 @@ main (int argc, char * argv[]) | @@ -18,7 +19,7 @@ main (int argc, char * argv[]) | ||
18 | TR_Protocol protocol = TR_new(TR_ProtocolRaw); | 19 | TR_Protocol protocol = TR_new(TR_ProtocolRaw); |
19 | TestHandler test_handler = TR_new(TestHandler); | 20 | TestHandler test_handler = TR_new(TestHandler); |
20 | 21 | ||
21 | - TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger); | 22 | + TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2); |
22 | 23 | ||
23 | TR_serverAddHandler(server, (TR_EventHandler)test_handler); | 24 | TR_serverAddHandler(server, (TR_EventHandler)test_handler); |
24 | TR_serverBindTcp(server, "0.0.0.0", 5678, protocol); | 25 | TR_serverBindTcp(server, "0.0.0.0", 5678, protocol); |
Please
register
or
login
to post a comment