Commit e3a6d0e91989180ae0b849b6eafd18c98a0566c0

Authored by Georg Hopp
1 parent b1483469

change the whole thing to be more edge triggered no matter wich poll method is u…

…sed. In fact this works now with edge triggered epoll
@@ -40,4 +40,4 @@ test-driver @@ -40,4 +40,4 @@ test-driver
40 /assets/html/_documentation.html 40 /assets/html/_documentation.html
41 tags 41 tags
42 /trcomm.h* 42 /trcomm.h*
43 -/testers/testserver 43 +/testers/testserver*
  1 +/**
  2 + * \file
  3 + *
  4 + * \author Georg Hopp
  5 + *
  6 + * \copyright
  7 + * Copyright © 2014 Georg Hopp
  8 + *
  9 + * This program is free software: you can redistribute it and/or modify
  10 + * it under the terms of the GNU General Public License as published by
  11 + * the Free Software Foundation, either version 3 of the License, or
  12 + * (at your option) any later version.
  13 + *
  14 + * This program is distributed in the hope that it will be useful,
  15 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17 + * GNU General Public License for more details.
  18 + *
  19 + * You should have received a copy of the GNU General Public License
  20 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21 + */
  22 +
  23 +#ifndef __INT_TR_COMM_MANAGER_H__
  24 +#define __INT_TR_COMM_MANAGER_H__
  25 +
  26 +#include "tr/comm_end_point.h"
  27 +#include "tr/connect_entry_point.h"
  28 +
  29 +#define TR_ISSUE_IO_EVENT(this, type, subject) \
  30 + TR_eventHandlerIssueEvent((TR_EventHandler)(this), TR_eventSubjectEmit( \
  31 + (TR_EventSubject)(subject), type, NULL))
  32 +
  33 +#define TR_ISSUE_IO_ACC_EVENT(this, subject) \
  34 + (TR_ISSUE_IO_EVENT(this, TR_CET_EVENT_ACC_READY, subject))
  35 +#define TR_ISSUE_IO_READ_EVENT(this, subject) \
  36 + (TR_ISSUE_IO_EVENT(this, TR_CEP_EVENT_DO_READ, subject))
  37 +#define TR_ISSUE_IO_WRITE_EVENT(this, subject) \
  38 + (TR_ISSUE_IO_EVENT(this, TR_CEP_EVENT_DO_WRITE, subject))
  39 +#define TR_ISSUE_IO_CLOSE_EVENT(this, subject) \
  40 + (TR_ISSUE_IO_EVENT(this, TR_CEP_EVENT_CLOSE, subject))
  41 +#define TR_ISSUE_IO_SHUT_READ_EVENT(this, subject) \
  42 + (TR_ISSUE_IO_EVENT(this, TR_CEP_EVENT_SHUT_READ, subject))
  43 +#define TR_ISSUE_IO_SHUT_WRITE_EVENT(this, subject) \
  44 + (TR_ISSUE_IO_EVENT(this, TR_CEP_EVENT_SHUT_WRITE, subject))
  45 +
  46 +#endif // __INT_TR_COMM_MANAGER_H__
  47 +
  48 +// vim: set ts=4 sw=4:
  49 +
@@ -29,34 +29,40 @@ @@ -29,34 +29,40 @@
29 #include "trevent.h" 29 #include "trevent.h"
30 #include "trdata.h" 30 #include "trdata.h"
31 31
  32 +/*
  33 + * Read ahead limits.
  34 + * These values should be conficurable in the future.
  35 + */
  36 +#define CEP_WRITE_BUFFER_THRESHOLD 128 * 1024
  37 +
32 TR_CLASS(TR_CommEndPoint) { 38 TR_CLASS(TR_CommEndPoint) {
33 TR_EXTENDS(TR_EventSubject); 39 TR_EXTENDS(TR_EventSubject);
34 40
35 void * protocol; // will be type TR_Protocol as soon as it is there. 41 void * protocol; // will be type TR_Protocol as soon as it is there.
36 TR_Socket transport; 42 TR_Socket transport;
37 - size_t read_chunk_size;  
38 int do_close; 43 int do_close;
39 - TR_Queue read_buffer;  
40 TR_Queue write_buffer; 44 TR_Queue write_buffer;
  45 + size_t write_buffer_size;
  46 + size_t read_chunk_size; // bytes
41 }; 47 };
42 TR_INSTANCE_INIT(TR_CommEndPoint); 48 TR_INSTANCE_INIT(TR_CommEndPoint);
43 TR_CLASSVARS_DECL(TR_CommEndPoint) { 49 TR_CLASSVARS_DECL(TR_CommEndPoint) {
44 TR_CV_EXTENDS(TR_EventSubject); 50 TR_CV_EXTENDS(TR_EventSubject);
45 }; 51 };
46 52
47 -#define TR_CEP_EVENT_READ_READY 0  
48 -#define TR_CEP_EVENT_READ_BLOCK 1  
49 -#define TR_CEP_EVENT_WRITE_READY 2  
50 -#define TR_CEP_EVENT_UPGRADE 3  
51 -#define TR_CEP_EVENT_NEW_DATA 4  
52 -#define TR_CEP_EVENT_PENDING_DATA 5  
53 -#define TR_CEP_EVENT_END_DATA 6  
54 -#define TR_CEP_EVENT_NEW_MSG 7  
55 -#define TR_CEP_EVENT_SEND_MSG 8  
56 -#define TR_CEP_EVENT_SHUT_READ 9  
57 -#define TR_CEP_EVENT_SHUT_WRITE 10  
58 -#define TR_CEP_EVENT_CLOSE 11  
59 -#define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_CLOSE) 53 +#define TR_CEP_EVENT_DO_READ 0 // IoHandler
  54 +#define TR_CEP_EVENT_DO_WRITE 1 // IoHandler
  55 +#define TR_CEP_EVENT_READ_BLOCK 2 // CommManager
  56 +#define TR_CEP_EVENT_WRITE_BLOCK 3 // CommManager
  57 +#define TR_CEP_EVENT_NEW_DATA 4 // ProtocolHandler
  58 +#define TR_CEP_EVENT_NEW_MSG 5 // Application
  59 +#define TR_CEP_EVENT_MSG_READY 6 // ProtocolHandler
  60 +#define TR_CEP_EVENT_DATA_READY 7 // CommManager
  61 +#define TR_CEP_EVENT_DATA_END 8 // CommManager
  62 +#define TR_CEP_EVENT_SHUT_READ 9 // CommManager
  63 +#define TR_CEP_EVENT_SHUT_WRITE 10 // CommManager
  64 +#define TR_CEP_EVENT_CLOSE 11 // CommManager
  65 +#define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_CLOSE)
60 66
61 #define TR_cepSetClose(ep) ((ep)->do_close = 1) 67 #define TR_cepSetClose(ep) ((ep)->do_close = 1)
62 #define TR_cepHasProto(ep, proto) (TR_INSTANCE_OF(proto, TR_cepGetProto(ep))) 68 #define TR_cepHasProto(ep, proto) (TR_INSTANCE_OF(proto, TR_cepGetProto(ep)))
@@ -67,8 +73,8 @@ TR_CLASSVARS_DECL(TR_CommEndPoint) { @@ -67,8 +73,8 @@ TR_CLASSVARS_DECL(TR_CommEndPoint) {
67 73
68 void TR_cepAppendReadData(TR_CommEndPoint, TR_RemoteData); 74 void TR_cepAppendReadData(TR_CommEndPoint, TR_RemoteData);
69 void TR_cepAppendWriteData(TR_CommEndPoint, TR_RemoteData); 75 void TR_cepAppendWriteData(TR_CommEndPoint, TR_RemoteData);
70 -int TR_cepBufferRead(TR_CommEndPoint);  
71 -int TR_cepWriteBuffered(TR_CommEndPoint); 76 +int TR_commEndPointRead(TR_CommEndPoint, TR_RemoteData *);
  77 +int TR_cepWriteBuffered(TR_CommEndPoint, size_t *);
72 78
73 #endif // __TR_COMM_END_POINT_H__ 79 #endif // __TR_COMM_END_POINT_H__
74 80
@@ -26,6 +26,7 @@ @@ -26,6 +26,7 @@
26 #include <sys/types.h> 26 #include <sys/types.h>
27 27
28 #include "trbase.h" 28 #include "trbase.h"
  29 +#include "trdata.h"
29 #include "trevent.h" 30 #include "trevent.h"
30 31
31 #include "tr/comm_end_point.h" 32 #include "tr/comm_end_point.h"
@@ -34,6 +35,9 @@ TR_CLASS(TR_CommManager) { @@ -34,6 +35,9 @@ TR_CLASS(TR_CommManager) {
34 TR_EXTENDS(TR_EventHandler); 35 TR_EXTENDS(TR_EventHandler);
35 36
36 TR_CommEndPoint * endpoints; 37 TR_CommEndPoint * endpoints;
  38 + TR_Hash accept;
  39 + TR_Hash write;
  40 + TR_Hash read;
37 size_t n_endpoints; 41 size_t n_endpoints;
38 size_t max_handle; 42 size_t max_handle;
39 }; 43 };
@@ -42,8 +46,7 @@ TR_CLASSVARS_DECL(TR_CommManager) { @@ -42,8 +46,7 @@ TR_CLASSVARS_DECL(TR_CommManager) {
42 TR_CV_EXTENDS(TR_EventHandler); 46 TR_CV_EXTENDS(TR_EventHandler);
43 }; 47 };
44 48
45 -void TR_commManagerAddEndpoint(void *, TR_CommEndPoint);  
46 -TR_EventDone TR_commManagerShutdown(void * _this, TR_Event event); 49 +TR_EventDone TR_commManagerShutdown(TR_CommManager, TR_Event event);
47 50
48 #endif // __TR_COMM_MANAGER_H__ 51 #endif // __TR_COMM_MANAGER_H__
49 52
@@ -31,8 +31,8 @@ @@ -31,8 +31,8 @@
31 #include "tr/comm_end_point.h" 31 #include "tr/comm_end_point.h"
32 #include "tr/proto_message.h" 32 #include "tr/proto_message.h"
33 33
34 -typedef TR_ProtoMessage (* fptr_TR_cepNextMessage)(void *);  
35 -typedef int (* fptr_TR_cepCompose)(void *, TR_ProtoMessage); 34 +typedef TR_ProtoMessage (* fptr_TR_cepNextMessage)(void *, TR_RemoteData *);
  35 +typedef size_t (* fptr_TR_cepCompose)(void *, TR_ProtoMessage);
36 36
37 TR_INTERFACE(TR_CommEndPoint) { 37 TR_INTERFACE(TR_CommEndPoint) {
38 TR_IFID; 38 TR_IFID;
@@ -40,8 +40,8 @@ TR_INTERFACE(TR_CommEndPoint) { @@ -40,8 +40,8 @@ TR_INTERFACE(TR_CommEndPoint) {
40 fptr_TR_cepCompose compose; 40 fptr_TR_cepCompose compose;
41 }; 41 };
42 42
43 -TR_ProtoMessage TR_cepNextMessage(void *);  
44 -int TR_cepCompose(void *, TR_ProtoMessage); 43 +TR_ProtoMessage TR_cepNextMessage(void *, TR_RemoteData *);
  44 +size_t TR_cepCompose(void *, TR_ProtoMessage);
45 45
46 #endif // __TR_INTERFACE_COMM_END_POINT_H__ 46 #endif // __TR_INTERFACE_COMM_END_POINT_H__
47 47
@@ -32,23 +32,17 @@ @@ -32,23 +32,17 @@
32 32
33 typedef TR_EventDone (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint); 33 typedef TR_EventDone (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint);
34 typedef TR_EventDone (* fptr_TR_commManagerSelect)(void *, TR_Event, int); 34 typedef TR_EventDone (* fptr_TR_commManagerSelect)(void *, TR_Event, int);
35 -typedef TR_EventDone (* fptr_TR_commManagerEnableWrite)(void *, TR_Event);  
36 -typedef TR_EventDone (* fptr_TR_commManagerDisableWrite)(void *, TR_Event);  
37 -typedef TR_EventDone (* fptr_TR_commManagerEnableRead)(void *, TR_Event); 35 +typedef TR_EventDone (* fptr_TR_commManagerPollWrite)(void *, TR_Event);
  36 +typedef TR_EventDone (* fptr_TR_commManagerPollRead)(void *, TR_Event);
38 typedef TR_EventDone (* fptr_TR_commManagerClose)(void *, TR_Event); 37 typedef TR_EventDone (* fptr_TR_commManagerClose)(void *, TR_Event);
39 -typedef TR_EventDone (* fptr_TR_commManagerShutdownRead)(void *, TR_Event);  
40 -typedef TR_EventDone (* fptr_TR_commManagerShutdownWrite)(void *, TR_Event);  
41 38
42 TR_INTERFACE(TR_CommManager) { 39 TR_INTERFACE(TR_CommManager) {
43 TR_IFID; 40 TR_IFID;
44 - fptr_TR_commManagerAddEndpoint addEndpoint;  
45 - fptr_TR_commManagerSelect select;  
46 - fptr_TR_commManagerEnableWrite enableWrite;  
47 - fptr_TR_commManagerDisableWrite disableWrite;  
48 - fptr_TR_commManagerEnableRead enableRead;  
49 - fptr_TR_commManagerClose close;  
50 - fptr_TR_commManagerShutdownWrite shutdownWrite;  
51 - fptr_TR_commManagerShutdownRead shutdownRead; 41 + fptr_TR_commManagerAddEndpoint addEndpoint;
  42 + fptr_TR_commManagerSelect select;
  43 + fptr_TR_commManagerPollWrite pollWrite;
  44 + fptr_TR_commManagerPollRead pollRead;
  45 + fptr_TR_commManagerClose close;
52 }; 46 };
53 47
54 void TR_commManagerAddEndpoint(void *, TR_CommEndPoint); 48 void TR_commManagerAddEndpoint(void *, TR_CommEndPoint);
@@ -24,6 +24,7 @@ @@ -24,6 +24,7 @@
24 #define __TR_IO_HANDLER_H__ 24 #define __TR_IO_HANDLER_H__
25 25
26 #include <sys/types.h> 26 #include <sys/types.h>
  27 +#include <stdint.h>
27 28
28 #include "trbase.h" 29 #include "trbase.h"
29 #include "trevent.h" 30 #include "trevent.h"
@@ -33,6 +33,7 @@ @@ -33,6 +33,7 @@
33 #include "tr/io_handler.h" 33 #include "tr/io_handler.h"
34 #include "tr/protocol_handler.h" 34 #include "tr/protocol_handler.h"
35 #include "tr/protocol.h" 35 #include "tr/protocol.h"
  36 +#include "tr/interface/comm_manager.h"
36 37
37 TR_CLASS(TR_Server) { 38 TR_CLASS(TR_Server) {
38 TR_CommManager comm_manager; 39 TR_CommManager comm_manager;
@@ -3,21 +3,21 @@ AUTOMAKE_OPTIONS = subdir-objects @@ -3,21 +3,21 @@ AUTOMAKE_OPTIONS = subdir-objects
3 3
4 AM_CFLAGS += -I../include/ 4 AM_CFLAGS += -I../include/
5 5
6 -TRCOMM = cep_append_read_data.c \  
7 - cep_append_write_data.c \  
8 - cet_accept.c \  
9 - cep_buffer_read.c \ 6 +TRCOMM = cet_accept.c \
10 cep_write_buffered.c \ 7 cep_write_buffered.c \
  8 + comm_end_point_read.c \
11 comm_end_point.c \ 9 comm_end_point.c \
12 - comm_manager.c \  
13 - comm_manager_poll.c \  
14 - comm_manager_epoll.c \  
15 - comm_manager_shutdown.c \  
16 conn_entry_point.c \ 10 conn_entry_point.c \
17 connection.c \ 11 connection.c \
18 connector.c \ 12 connector.c \
19 datagram_service.c \ 13 datagram_service.c \
20 datagram_entry_point.c \ 14 datagram_entry_point.c \
  15 + comm_manager.c \
  16 + comm_manager_poll.c \
  17 + comm_manager_epoll.c \
  18 + comm_manager_shutdown.c \
  19 + comm_manager_shutdown_read.c \
  20 + comm_manager_shutdown_write.c \
21 io_handler.c \ 21 io_handler.c \
22 proto_message.c \ 22 proto_message.c \
23 protocol.c \ 23 protocol.c \
@@ -26,24 +26,57 @@ @@ -26,24 +26,57 @@
26 #include "tr/comm_end_point.h" 26 #include "tr/comm_end_point.h"
27 27
28 int 28 int
29 -TR_cepWriteBuffered(TR_CommEndPoint this) 29 +TR_cepWriteBuffered(TR_CommEndPoint this, size_t * size)
30 { 30 {
31 TR_RemoteData data; 31 TR_RemoteData data;
32 int send; 32 int send;
33 33
  34 + *size = 0;
  35 +
34 data = TR_cepNextWriteData(this); 36 data = TR_cepNextWriteData(this);
35 - send = TR_socketSend(this->transport, data);  
36 -  
37 - switch (send) {  
38 - case FALSE: // EAGAIN  
39 - TR_queuePutFirst(this->write_buffer, data);  
40 - break;  
41 -  
42 - case -1: // FAILURE  
43 - case -2: // remote close  
44 - default:  
45 - TR_delete(data);  
46 - break; 37 +
  38 + while(data) {
  39 + send = TR_socketSend(this->transport, data);
  40 +
  41 + switch (send) {
  42 + case FALSE: // EAGAIN
  43 + case -3: // remote not ready
  44 + TR_queuePutFirst(this->write_buffer, data);
  45 + break;
  46 +
  47 + case -1: // FAILURE
  48 + case -2: // remote close
  49 + TR_delete(data);
  50 + TR_queueDestroy(this->write_buffer);
  51 + *size = this->write_buffer_size;
  52 + break;
  53 +
  54 + default:
  55 + {
  56 + TR_RemoteData new_data = NULL;
  57 +
  58 + if (send != ((TR_SizedData)data)->size) {
  59 + new_data = TR_new(
  60 + TR_RemoteData,
  61 + ((TR_SizedData)data)->data + send,
  62 + ((TR_SizedData)data)->size - send,
  63 + data->remote);
  64 + } else {
  65 + new_data = TR_cepNextWriteData(this);
  66 + }
  67 +
  68 + *size += send;
  69 + TR_delete(data);
  70 + data = new_data;
  71 + }
  72 + break;
  73 + }
  74 +
  75 + if (send <= 0) break;
  76 + }
  77 +
  78 + if (! data) {
  79 + return -4; // no more data to send
47 } 80 }
48 81
49 return send; 82 return send;
@@ -42,7 +42,6 @@ commEndPointCtor(void * _this, va_list * params) @@ -42,7 +42,6 @@ commEndPointCtor(void * _this, va_list * params)
42 this->protocol = va_arg(*params, TR_Protocol); 42 this->protocol = va_arg(*params, TR_Protocol);
43 this->read_chunk_size = va_arg(*params, int); 43 this->read_chunk_size = va_arg(*params, int);
44 this->do_close = 0; 44 this->do_close = 0;
45 - this->read_buffer = TR_new(TR_Queue);  
46 this->write_buffer = TR_new(TR_Queue); 45 this->write_buffer = TR_new(TR_Queue);
47 46
48 return 0; 47 return 0;
@@ -55,52 +54,80 @@ commEndPointDtor(void * _this) @@ -55,52 +54,80 @@ commEndPointDtor(void * _this)
55 TR_CommEndPoint this = _this; 54 TR_CommEndPoint this = _this;
56 55
57 TR_delete(this->transport); 56 TR_delete(this->transport);
58 - TR_delete(this->read_buffer);  
59 TR_delete(this->write_buffer); 57 TR_delete(this->write_buffer);
60 } 58 }
61 59
62 static 60 static
  61 +unsigned long
  62 +commEndPointGetHash(void * _this)
  63 +{
  64 + return (unsigned long)((TR_CommEndPoint)_this)->transport->handle;
  65 +}
  66 +
  67 +static
  68 +void
  69 +commEndPointHandleDouble(void * _current, void * _new)
  70 +{
  71 + TR_CommEndPoint current = _current;
  72 + TR_CommEndPoint new = _new;
  73 +
  74 + // add will delete _new after this function is processed, so it's
  75 + // neccessary to cleanup _current and reinit it with whatever was
  76 + // in _new.
  77 + // This will only be called if _current and _new are different.
  78 + commEndPointDtor(current);
  79 +
  80 + current->transport = new->transport;
  81 + current->protocol = new->protocol;
  82 + current->read_chunk_size = new->read_chunk_size;
  83 + current->do_close = new->do_close;
  84 + current->write_buffer = new->write_buffer;
  85 +}
  86 +
  87 +static
63 void 88 void
64 commEndPointCvInit(TR_class_ptr cls) 89 commEndPointCvInit(TR_class_ptr cls)
65 { 90 {
66 - TR_EVENT_CREATE(cls, TR_CEP_EVENT_READ_READY); 91 + TR_EVENT_CREATE(cls, TR_CEP_EVENT_DO_READ);
  92 + TR_EVENT_CREATE(cls, TR_CEP_EVENT_DO_WRITE);
67 TR_EVENT_CREATE(cls, TR_CEP_EVENT_READ_BLOCK); 93 TR_EVENT_CREATE(cls, TR_CEP_EVENT_READ_BLOCK);
68 - TR_EVENT_CREATE(cls, TR_CEP_EVENT_WRITE_READY);  
69 - TR_EVENT_CREATE(cls, TR_CEP_EVENT_UPGRADE); 94 + TR_EVENT_CREATE(cls, TR_CEP_EVENT_WRITE_BLOCK);
70 TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_DATA); 95 TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_DATA);
71 - TR_EVENT_CREATE(cls, TR_CEP_EVENT_PENDING_DATA);  
72 - TR_EVENT_CREATE(cls, TR_CEP_EVENT_END_DATA);  
73 TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_MSG); 96 TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_MSG);
74 - TR_EVENT_CREATE(cls, TR_CEP_EVENT_SEND_MSG); 97 + TR_EVENT_CREATE(cls, TR_CEP_EVENT_MSG_READY);
  98 + TR_EVENT_CREATE(cls, TR_CEP_EVENT_DATA_READY);
  99 + TR_EVENT_CREATE(cls, TR_CEP_EVENT_DATA_END);
75 TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_READ); 100 TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_READ);
76 TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_WRITE); 101 TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_WRITE);
77 TR_EVENT_CREATE(cls, TR_CEP_EVENT_CLOSE); 102 TR_EVENT_CREATE(cls, TR_CEP_EVENT_CLOSE);
78 } 103 }
79 104
80 const char * TR_cepEventStrings[] = { 105 const char * TR_cepEventStrings[] = {
81 - "TR_CEP_EVENT_READ_READY", 106 + "TR_CEP_EVENT_DO_READ",
  107 + "TR_CEP_EVENT_DO_WRITE",
82 "TR_CEP_EVENT_READ_BLOCK", 108 "TR_CEP_EVENT_READ_BLOCK",
83 - "TR_CEP_EVENT_WRITE_READY",  
84 - "TR_CEP_EVENT_UPGRADE", 109 + "TR_CEP_EVENT_WRITE_BLOCK",
85 "TR_CEP_EVENT_NEW_DATA", 110 "TR_CEP_EVENT_NEW_DATA",
86 - "TR_CEP_EVENT_PENDING_DATA",  
87 - "TR_CEP_EVENT_END_DATA",  
88 "TR_CEP_EVENT_NEW_MSG", 111 "TR_CEP_EVENT_NEW_MSG",
89 - "TR_CEP_EVENT_SEND_MSG", 112 + "TR_CEP_EVENT_MSG_READY",
  113 + "TR_CEP_EVENT_DATA_READY",
  114 + "TR_CEP_EVENT_DATA_END",
90 "TR_CEP_EVENT_SHUT_READ", 115 "TR_CEP_EVENT_SHUT_READ",
91 "TR_CEP_EVENT_SHUT_WRITE", 116 "TR_CEP_EVENT_SHUT_WRITE",
92 - "TR_CEP_EVENT_CLOSE" 117 + "TR_CEP_EVENT_CLOSE",
93 }; 118 };
94 119
95 intptr_t comm_end_point_events[TR_CEP_EVENT_MAX + 1]; 120 intptr_t comm_end_point_events[TR_CEP_EVENT_MAX + 1];
96 TR_INIT_IFACE(TR_Class, commEndPointCtor, commEndPointDtor, NULL); 121 TR_INIT_IFACE(TR_Class, commEndPointCtor, commEndPointDtor, NULL);
97 TR_INIT_IFACE(TR_CommEndPoint, NULL, NULL); 122 TR_INIT_IFACE(TR_CommEndPoint, NULL, NULL);
  123 +TR_INIT_IFACE(TR_Hashable, commEndPointGetHash, commEndPointHandleDouble);
98 TR_CREATE_CLASS( 124 TR_CREATE_CLASS(
99 TR_CommEndPoint, 125 TR_CommEndPoint,
100 TR_EventSubject, 126 TR_EventSubject,
101 commEndPointCvInit, 127 commEndPointCvInit,
102 TR_IF(TR_Class), 128 TR_IF(TR_Class),
103 - TR_IF(TR_CommEndPoint)) = { 129 + TR_IF(TR_CommEndPoint),
  130 + TR_IF(TR_Hashable)) = {
104 { 131 {
105 TR_cepEventStrings, 132 TR_cepEventStrings,
106 TR_CEP_EVENT_MAX + 1, 133 TR_CEP_EVENT_MAX + 1,
@@ -25,10 +25,16 @@ @@ -25,10 +25,16 @@
25 25
26 #include "tr/comm_end_point.h" 26 #include "tr/comm_end_point.h"
27 27
28 -void  
29 -TR_cepAppendReadData(TR_CommEndPoint this, TR_RemoteData data) 28 +int
  29 +TR_commEndPointRead(TR_CommEndPoint this, TR_RemoteData * data_ptr)
30 { 30 {
31 - TR_queuePut(this->read_buffer, data); 31 + *data_ptr = TR_socketRecv(this->transport, this->read_chunk_size);
  32 +
  33 + if (! *data_ptr) return -1; // ment to trigger a close
  34 + if (*data_ptr == (void*)-1) return -2; // remote close... shutdown
  35 + if (*data_ptr == TR_emptyRemoteData) return FALSE; // read blocked
  36 +
  37 + return TRUE;
32 } 38 }
33 39
34 // vim: set ts=4 sw=4: 40 // vim: set ts=4 sw=4:
@@ -24,6 +24,7 @@ @@ -24,6 +24,7 @@
24 #include <poll.h> 24 #include <poll.h>
25 25
26 #include "trbase.h" 26 #include "trbase.h"
  27 +#include "trdata.h"
27 #include "trevent.h" 28 #include "trevent.h"
28 29
29 #include "tr/comm_end_point.h" 30 #include "tr/comm_end_point.h"
@@ -39,6 +40,9 @@ commManagerCtor(void * _this, va_list * params) @@ -39,6 +40,9 @@ commManagerCtor(void * _this, va_list * params)
39 40
40 TR_PARENTCALL(TR_CommManager, _this, TR_Class, ctor, params); 41 TR_PARENTCALL(TR_CommManager, _this, TR_Class, ctor, params);
41 42
  43 + this->accept = TR_new(TR_Hash);
  44 + this->write = TR_new(TR_Hash);
  45 + this->read = TR_new(TR_Hash);
42 this->n_endpoints = sysconf(_SC_OPEN_MAX); 46 this->n_endpoints = sysconf(_SC_OPEN_MAX);
43 this->endpoints = TR_calloc(sizeof(TR_CommEndPoint), this->n_endpoints); 47 this->endpoints = TR_calloc(sizeof(TR_CommEndPoint), this->n_endpoints);
44 48
@@ -56,81 +60,98 @@ commManagerDtor(void * _this) @@ -56,81 +60,98 @@ commManagerDtor(void * _this)
56 TR_delete(this->endpoints[i]); 60 TR_delete(this->endpoints[i]);
57 } 61 }
58 TR_MEM_FREE(this->endpoints); 62 TR_MEM_FREE(this->endpoints);
  63 + TR_delete(this->read);
  64 + TR_delete(this->write);
  65 + TR_delete(this->accept);
59 } 66 }
60 67
61 static 68 static
62 TR_EventDone 69 TR_EventDone
63 -TR__commManagerAddEndpoint(void * _this, TR_Event event) 70 +TR_commManagerEnableWrite(void * _this, TR_Event event)
64 { 71 {
65 - TR_commManagerAddEndpoint(  
66 - (TR_CommManager)_this,  
67 - (TR_CommEndPoint)event->subject); 72 + TR_CommManager this = _this;
  73 +
  74 + TR_hashAdd(this->write, event->subject);
  75 +
  76 + return TR_EVENT_DONE;
  77 +}
  78 +
  79 +static
  80 +TR_EventDone
  81 +TR_commManagerDisableWrite(void * _this, TR_Event event)
  82 +{
  83 + TR_CommManager this = _this;
  84 +
  85 + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
  86 +
  87 + return TR_EVENT_DONE;
  88 +}
  89 +
  90 +static
  91 +TR_EventDone
  92 +TR_commManagerAddEndpointEvt(TR_CommManager this, TR_Event event)
  93 +{
  94 + TR_commManagerAddEndpoint(this, (TR_CommEndPoint)event->subject);
68 95
69 return TR_EVENT_DONE; 96 return TR_EVENT_DONE;
70 } 97 }
71 98
72 TR_EventDone TR_commManagerSelect(void *, TR_Event, int); 99 TR_EventDone TR_commManagerSelect(void *, TR_Event, int);
73 -TR_EventDone TR_commManagerEnableWrite(void *, TR_Event);  
74 -TR_EventDone TR_commManagerDisableWrite(void *, TR_Event);  
75 -TR_EventDone TR_commManagerEnableRead(void *, TR_Event); 100 +TR_EventDone TR_commManagerPollWrite(void *, TR_Event);
  101 +TR_EventDone TR_commManagerPollRead(void *, TR_Event);
76 TR_EventDone TR_commManagerClose(void *, TR_Event); 102 TR_EventDone TR_commManagerClose(void *, TR_Event);
77 -TR_EventDone TR_commManagerShutdownRead(void *, TR_Event);  
78 -TR_EventDone TR_commManagerShutdownWrite(void *, TR_Event); 103 +TR_EventDone TR_commManagerShutdownRead(TR_CommManager, TR_Event);
  104 +TR_EventDone TR_commManagerShutdownWrite(TR_CommManager, TR_Event);
79 105
80 static 106 static
81 void 107 void
82 commManagerCvInit(TR_class_ptr cls) 108 commManagerCvInit(TR_class_ptr cls)
83 { 109 {
84 TR_EVENT_HANDLER_SET_METHOD( 110 TR_EVENT_HANDLER_SET_METHOD(
85 - cls,  
86 - TR_EventDispatcher, 111 + cls, TR_EventDispatcher,
87 TR_DISPATCHER_EVENT_DATA_WAIT, 112 TR_DISPATCHER_EVENT_DATA_WAIT,
88 TR_commManagerSelect); 113 TR_commManagerSelect);
89 TR_EVENT_HANDLER_SET_METHOD( 114 TR_EVENT_HANDLER_SET_METHOD(
90 - cls,  
91 - TR_EventDispatcher, 115 + cls, TR_EventDispatcher,
92 TR_DISPATCHER_EVENT_SHUTDOWN, 116 TR_DISPATCHER_EVENT_SHUTDOWN,
93 TR_commManagerShutdown); 117 TR_commManagerShutdown);
94 TR_EVENT_HANDLER_SET_METHOD( 118 TR_EVENT_HANDLER_SET_METHOD(
95 - cls,  
96 - TR_Connection, 119 + cls, TR_Connection,
97 TR_CON_EVENT_NEW_CON, 120 TR_CON_EVENT_NEW_CON,
98 - TR__commManagerAddEndpoint);  
99 - TR_EVENT_HANDLER_SET_METHOD(  
100 - cls,  
101 - TR_CommEndPoint,  
102 - TR_CEP_EVENT_PENDING_DATA,  
103 - TR_commManagerEnableWrite); 121 + TR_commManagerAddEndpointEvt);
104 TR_EVENT_HANDLER_SET_METHOD( 122 TR_EVENT_HANDLER_SET_METHOD(
105 - cls,  
106 - TR_CommEndPoint,  
107 - TR_CEP_EVENT_END_DATA,  
108 - TR_commManagerDisableWrite); 123 + cls, TR_CommEndPoint,
  124 + TR_CEP_EVENT_WRITE_BLOCK,
  125 + TR_commManagerPollWrite);
109 TR_EVENT_HANDLER_SET_METHOD( 126 TR_EVENT_HANDLER_SET_METHOD(
110 - cls,  
111 - TR_CommEndPoint, 127 + cls, TR_CommEndPoint,
112 TR_CEP_EVENT_READ_BLOCK, 128 TR_CEP_EVENT_READ_BLOCK,
113 - TR_commManagerEnableRead); 129 + TR_commManagerPollRead);
114 TR_EVENT_HANDLER_SET_METHOD( 130 TR_EVENT_HANDLER_SET_METHOD(
115 - cls,  
116 - TR_CommEndPoint, 131 + cls, TR_CommEndPoint,
117 TR_CEP_EVENT_CLOSE, 132 TR_CEP_EVENT_CLOSE,
118 TR_commManagerClose); 133 TR_commManagerClose);
119 TR_EVENT_HANDLER_SET_METHOD( 134 TR_EVENT_HANDLER_SET_METHOD(
120 - cls,  
121 - TR_CommEndPoint, 135 + cls, TR_CommEndPoint,
122 TR_CEP_EVENT_SHUT_READ, 136 TR_CEP_EVENT_SHUT_READ,
123 TR_commManagerShutdownRead); 137 TR_commManagerShutdownRead);
124 TR_EVENT_HANDLER_SET_METHOD( 138 TR_EVENT_HANDLER_SET_METHOD(
125 - cls,  
126 - TR_CommEndPoint, 139 + cls, TR_CommEndPoint,
127 TR_CEP_EVENT_SHUT_WRITE, 140 TR_CEP_EVENT_SHUT_WRITE,
128 TR_commManagerShutdownWrite); 141 TR_commManagerShutdownWrite);
  142 + TR_EVENT_HANDLER_SET_METHOD(
  143 + cls, TR_CommEndPoint,
  144 + TR_CEP_EVENT_DATA_READY,
  145 + TR_commManagerEnableWrite);
  146 + TR_EVENT_HANDLER_SET_METHOD(
  147 + cls, TR_CommEndPoint,
  148 + TR_CEP_EVENT_DATA_END,
  149 + TR_commManagerDisableWrite);
129 } 150 }
130 151
131 TR_INSTANCE(TR_Hash, commManagerEventMethods); 152 TR_INSTANCE(TR_Hash, commManagerEventMethods);
132 TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL); 153 TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL);
133 -TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); 154 +TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL);
134 TR_CREATE_CLASS( 155 TR_CREATE_CLASS(
135 TR_CommManager, 156 TR_CommManager,
136 TR_EventHandler, 157 TR_EventHandler,
@@ -34,8 +34,9 @@ @@ -34,8 +34,9 @@
34 #include "tr/comm_end_point.h" 34 #include "tr/comm_end_point.h"
35 #include "tr/connection.h" 35 #include "tr/connection.h"
36 #include "tr/connect_entry_point.h" 36 #include "tr/connect_entry_point.h"
  37 +#include "tr/_comm_manager.h"
37 38
38 -#define MAXEVENTS 256 39 +#define MAXEVENTS 1024
39 40
40 struct epoll_event events[MAXEVENTS]; 41 struct epoll_event events[MAXEVENTS];
41 42
@@ -72,7 +73,7 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint) @@ -72,7 +73,7 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
72 int handle = endpoint->transport->handle; 73 int handle = endpoint->transport->handle;
73 struct epoll_event event; 74 struct epoll_event event;
74 75
75 - this->events[handle] = EPOLLIN; 76 + this->events[handle] = EPOLLET;
76 event.data.ptr = endpoint; 77 event.data.ptr = endpoint;
77 event.events = this->events[handle]; 78 event.events = this->events[handle];
78 79
@@ -84,69 +85,52 @@ void @@ -84,69 +85,52 @@ void
84 TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout) 85 TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
85 { 86 {
86 TR_CommManagerEpoll this = _this; 87 TR_CommManagerEpoll this = _this;
  88 + TR_CommManager cmgr = _this;
87 int i, nevents; 89 int i, nevents;
  90 + struct epoll_event _event;
88 91
89 nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout); 92 nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout);
90 93
91 for (i=0; i<nevents; i++) { 94 for (i=0; i<nevents; i++) {
92 TR_CommEndPoint endpoint = (TR_CommEndPoint)events[i].data.ptr; 95 TR_CommEndPoint endpoint = (TR_CommEndPoint)events[i].data.ptr;
  96 + int handle = endpoint->transport->handle;
93 97
94 if ((events[i].events & EPOLLIN) == EPOLLIN) { 98 if ((events[i].events & EPOLLIN) == EPOLLIN) {
95 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) 99 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
96 && ((TR_TcpSocket)endpoint->transport)->listen) { 100 && ((TR_TcpSocket)endpoint->transport)->listen) {
97 - TR_eventHandlerIssueEvent((TR_EventHandler)this,  
98 - TR_eventSubjectEmit(  
99 - (TR_EventSubject)endpoint,  
100 - TR_CET_EVENT_ACC_READY,  
101 - NULL)); 101 + TR_hashAdd(cmgr->accept, endpoint);
102 } else { 102 } else {
103 - TR_eventHandlerIssueEvent((TR_EventHandler)this,  
104 - TR_eventSubjectEmit(  
105 - (TR_EventSubject)endpoint,  
106 - TR_CEP_EVENT_READ_READY,  
107 - NULL)); 103 + TR_hashAdd(cmgr->read, endpoint);
  104 +
108 } 105 }
  106 +
  107 + this->events[handle] &= ~EPOLLIN;
  108 + _event.data.ptr = endpoint;
  109 + _event.events = this->events[handle];
  110 + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
109 } 111 }
110 112
111 if ((events[i].events & EPOLLOUT) == EPOLLOUT) { 113 if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
112 - TR_eventHandlerIssueEvent((TR_EventHandler)this,  
113 - TR_eventSubjectEmit(  
114 - (TR_EventSubject)endpoint,  
115 - TR_CEP_EVENT_WRITE_READY,  
116 - NULL)); 114 + TR_hashAdd(cmgr->write, endpoint);
  115 + this->events[handle] &= ~EPOLLOUT;
  116 + _event.data.ptr = endpoint;
  117 + _event.events = this->events[handle];
  118 + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
117 } 119 }
118 } 120 }
119 } 121 }
120 122
121 static 123 static
  124 +inline
122 void 125 void
123 -TR_commManagerEpollEnableWrite(void * _this, TR_Event event)  
124 -{  
125 - TR_CommManagerEpoll this = _this;  
126 - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;  
127 -  
128 - if (! TR_socketFinWr(endpoint->transport)) {  
129 - int handle = endpoint->transport->handle;  
130 - struct epoll_event _event;  
131 -  
132 - this->events[handle] |= EPOLLOUT;  
133 - _event.data.ptr = endpoint;  
134 - _event.events = this->events[handle];  
135 -  
136 - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);  
137 - }  
138 -}  
139 -  
140 -static  
141 -void  
142 -TR_commManagerEpollDisableWrite(void * _this, TR_Event event) 126 +TR_commManagerEpollEnable(void * _this, uint32_t mask, TR_Event event)
143 { 127 {
144 TR_CommManagerEpoll this = _this; 128 TR_CommManagerEpoll this = _this;
145 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; 129 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
146 int handle = endpoint->transport->handle; 130 int handle = endpoint->transport->handle;
147 struct epoll_event _event; 131 struct epoll_event _event;
148 132
149 - this->events[handle] &= ~EPOLLOUT; 133 + this->events[handle] |= mask;
150 _event.data.ptr = endpoint; 134 _event.data.ptr = endpoint;
151 _event.events = this->events[handle]; 135 _event.events = this->events[handle];
152 136
@@ -155,37 +139,20 @@ TR_commManagerEpollDisableWrite(void * _this, TR_Event event) @@ -155,37 +139,20 @@ TR_commManagerEpollDisableWrite(void * _this, TR_Event event)
155 139
156 static 140 static
157 void 141 void
158 -TR_commManagerEpollEnableRead(void * _this, TR_Event event) 142 +TR_commManagerEpollEnableWrite(void * _this, TR_Event event)
159 { 143 {
160 - TR_CommManagerEpoll this = _this;  
161 - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;  
162 -  
163 - if (! TR_socketFinRd(endpoint->transport)) {  
164 - int handle = endpoint->transport->handle;  
165 - struct epoll_event _event;  
166 -  
167 - this->events[handle] |= EPOLLIN;  
168 - _event.data.ptr = endpoint;  
169 - _event.events = this->events[handle];  
170 -  
171 - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); 144 + if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) {
  145 + TR_commManagerEpollEnable(_this, EPOLLOUT, event);
172 } 146 }
173 } 147 }
174 148
175 static 149 static
176 void 150 void
177 -TR_commManagerEpollDisableRead(void * _this, TR_Event event) 151 +TR_commManagerEpollEnableRead(void * _this, TR_Event event)
178 { 152 {
179 - TR_CommManagerEpoll this = _this;  
180 - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;  
181 - int handle = endpoint->transport->handle;  
182 - struct epoll_event _event;  
183 -  
184 - this->events[handle] &= ~EPOLLIN;  
185 - _event.data.ptr = endpoint;  
186 - _event.events = this->events[handle];  
187 -  
188 - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); 153 + if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
  154 + TR_commManagerEpollEnable(_this, EPOLLIN, event);
  155 + }
189 } 156 }
190 157
191 static 158 static
@@ -207,14 +174,11 @@ TR_commManagerEpollCvInit(TR_class_ptr cls) { @@ -207,14 +174,11 @@ TR_commManagerEpollCvInit(TR_class_ptr cls) {
207 TR_INIT_IFACE(TR_Class, commManagerEpollCtor, commManagerEpollDtor, NULL); 174 TR_INIT_IFACE(TR_Class, commManagerEpollCtor, commManagerEpollDtor, NULL);
208 TR_INIT_IFACE( 175 TR_INIT_IFACE(
209 TR_CommManager, 176 TR_CommManager,
210 - TR_commManagerEpollAddEndpoint, // TR_CON_EVENT_NEW_CON  
211 - TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT  
212 - TR_commManagerEpollEnableWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK  
213 - TR_commManagerEpollDisableWrite, // TR_CEP_EVENT_END_DATA  
214 - TR_commManagerEpollEnableRead, // TR_CEP_EVENT_READ_BLOCK  
215 - TR_commManagerEpollClose, // TR_CEP_EVENT_CLOSE  
216 - TR_commManagerEpollDisableWrite, // TR_CEP_EVENT_SHUT_READ  
217 - TR_commManagerEpollEnableRead); // TR_CEP_EVENT_SHUT_WRITE 177 + TR_commManagerEpollAddEndpoint,
  178 + TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT
  179 + TR_commManagerEpollEnableWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK
  180 + TR_commManagerEpollEnableRead, // TR_CEP_EVENT_READ_BLOCK
  181 + TR_commManagerEpollClose); // TR_CEP_EVENT_CLOSE
218 TR_CREATE_CLASS( 182 TR_CREATE_CLASS(
219 TR_CommManagerEpoll, 183 TR_CommManagerEpoll,
220 TR_CommManager, 184 TR_CommManager,
@@ -32,6 +32,7 @@ @@ -32,6 +32,7 @@
32 #include "tr/comm_end_point.h" 32 #include "tr/comm_end_point.h"
33 #include "tr/connection.h" 33 #include "tr/connection.h"
34 #include "tr/connect_entry_point.h" 34 #include "tr/connect_entry_point.h"
  35 +#include "tr/_comm_manager.h"
35 36
36 static 37 static
37 int 38 int
@@ -69,9 +70,9 @@ TR_commManagerPollAddEndpoint(void * _this, TR_CommEndPoint endpoint) @@ -69,9 +70,9 @@ TR_commManagerPollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
69 TR_CommManagerPoll this = _this; 70 TR_CommManagerPoll this = _this;
70 71
71 this->fds[endpoint->transport->handle].fd = endpoint->transport->handle; 72 this->fds[endpoint->transport->handle].fd = endpoint->transport->handle;
72 - this->fds[endpoint->transport->handle].events = POLLIN; 73 + this->fds[endpoint->transport->handle].events = 0;
73 } 74 }
74 - 75 +
75 static 76 static
76 void 77 void
77 TR_commManagerPollSelect(void * _this, TR_Event event, int timeout) 78 TR_commManagerPollSelect(void * _this, TR_Event event, int timeout)
@@ -85,39 +86,28 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout) @@ -85,39 +86,28 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout)
85 86
86 if (nevents) { 87 if (nevents) {
87 for (i = 0; i < cmgr->max_handle+1; i++) { 88 for (i = 0; i < cmgr->max_handle+1; i++) {
88 - TR_CommEndPoint endpoint = cmgr->endpoints[i];  
89 -  
90 - if ((this->fds[i].revents & POLLIN) == POLLIN) {  
91 - TR_Event event; 89 + if (this->fds[i].revents != 0) {
  90 + TR_CommEndPoint endpoint = cmgr->endpoints[i];
92 91
93 nevents--; 92 nevents--;
94 - if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)  
95 - && ((TR_TcpSocket)endpoint->transport)->listen) {  
96 - event = TR_eventSubjectEmit(  
97 - (TR_EventSubject)endpoint,  
98 - TR_CET_EVENT_ACC_READY,  
99 - NULL);  
100 - } else {  
101 - event = TR_eventSubjectEmit(  
102 - (TR_EventSubject)endpoint,  
103 - TR_CEP_EVENT_READ_READY,  
104 - NULL);  
105 - }  
106 93
107 - TR_eventHandlerIssueEvent((TR_EventHandler)this, event);  
108 - } 94 + if ((this->fds[i].revents & POLLIN) == POLLIN) {
  95 + if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
  96 + && ((TR_TcpSocket)endpoint->transport)->listen) {
  97 + TR_hashAdd(cmgr->accept, endpoint);
  98 + } else {
  99 + TR_hashAdd(cmgr->read, endpoint);
  100 + }
  101 + this->fds[endpoint->transport->handle].events &= ~POLLIN;
  102 + }
109 103
110 - if ((this->fds[i].revents & POLLOUT) == POLLOUT) {  
111 - nevents--;  
112 - TR_Event _event = TR_eventSubjectEmit(  
113 - (TR_EventSubject)endpoint,  
114 - TR_CEP_EVENT_WRITE_READY,  
115 - NULL); 104 + if ((this->fds[i].revents & POLLOUT) == POLLOUT) {
  105 + TR_hashAdd(cmgr->write, endpoint);
  106 + this->fds[endpoint->transport->handle].events &= ~POLLOUT;
  107 + }
116 108
117 - TR_eventHandlerIssueEvent((TR_EventHandler)this, _event); 109 + if (nevents <= 0) break;
118 } 110 }
119 -  
120 - if (nevents <= 0) break;  
121 } 111 }
122 } 112 }
123 } 113 }
@@ -136,16 +126,6 @@ TR_commManagerPollEnableWrite(void * _this, TR_Event event) @@ -136,16 +126,6 @@ TR_commManagerPollEnableWrite(void * _this, TR_Event event)
136 126
137 static 127 static
138 void 128 void
139 -TR_commManagerPollDisableWrite(void * _this, TR_Event event)  
140 -{  
141 - TR_CommManagerPoll this = _this;  
142 - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;  
143 -  
144 - this->fds[endpoint->transport->handle].events &= ~POLLOUT;  
145 -}  
146 -  
147 -static  
148 -void  
149 TR_commManagerPollEnableRead(void * _this, TR_Event event) 129 TR_commManagerPollEnableRead(void * _this, TR_Event event)
150 { 130 {
151 TR_CommManagerPoll this = _this; 131 TR_CommManagerPoll this = _this;
@@ -158,16 +138,6 @@ TR_commManagerPollEnableRead(void * _this, TR_Event event) @@ -158,16 +138,6 @@ TR_commManagerPollEnableRead(void * _this, TR_Event event)
158 138
159 static 139 static
160 void 140 void
161 -TR_commManagerPollDisableRead(void * _this, TR_Event event)  
162 -{  
163 - TR_CommManagerPoll this = _this;  
164 - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;  
165 -  
166 - this->fds[endpoint->transport->handle].events &= ~POLLIN;  
167 -}  
168 -  
169 -static  
170 -void  
171 TR_commManagerPollClose(void * _this, TR_Event event) 141 TR_commManagerPollClose(void * _this, TR_Event event)
172 { 142 {
173 TR_CommManagerPoll this = _this; 143 TR_CommManagerPoll this = _this;
@@ -189,11 +159,8 @@ TR_INIT_IFACE( @@ -189,11 +159,8 @@ TR_INIT_IFACE(
189 TR_commManagerPollAddEndpoint, 159 TR_commManagerPollAddEndpoint,
190 TR_commManagerPollSelect, 160 TR_commManagerPollSelect,
191 TR_commManagerPollEnableWrite, 161 TR_commManagerPollEnableWrite,
192 - TR_commManagerPollDisableWrite,  
193 TR_commManagerPollEnableRead, 162 TR_commManagerPollEnableRead,
194 - TR_commManagerPollClose,  
195 - TR_commManagerPollDisableWrite,  
196 - TR_commManagerPollDisableRead); 163 + TR_commManagerPollClose);
197 TR_CREATE_CLASS( 164 TR_CREATE_CLASS(
198 TR_CommManagerPoll, 165 TR_CommManagerPoll,
199 TR_CommManager, 166 TR_CommManager,
@@ -27,22 +27,17 @@ @@ -27,22 +27,17 @@
27 #include "trevent.h" 27 #include "trevent.h"
28 28
29 #include "tr/comm_manager.h" 29 #include "tr/comm_manager.h"
  30 +#include "tr/_comm_manager.h"
30 31
31 TR_EventDone 32 TR_EventDone
32 -TR_commManagerShutdown(void * _this, TR_Event event) 33 +TR_commManagerShutdown(TR_CommManager this, TR_Event event)
33 { 34 {
34 - TR_CommManager this = _this;  
35 - nfds_t i; 35 + nfds_t i;
36 36
37 for (i=0; i<=this->max_handle; i++) { 37 for (i=0; i<=this->max_handle; i++) {
38 if (this->endpoints[i]) { 38 if (this->endpoints[i]) {
39 - TR_eventHandlerIssueEvent(  
40 - (TR_EventHandler)_this,  
41 - TR_eventSubjectEmit(  
42 - (TR_EventSubject)this->endpoints[i],  
43 - TR_CEP_EVENT_CLOSE,  
44 - NULL));  
45 - } 39 + TR_ISSUE_IO_CLOSE_EVENT(this, this->endpoints[i]);
  40 + }
46 } 41 }
47 42
48 return TR_EVENT_DONE; 43 return TR_EVENT_DONE;
@@ -21,21 +21,34 @@ @@ -21,21 +21,34 @@
21 */ 21 */
22 22
23 #include "trbase.h" 23 #include "trbase.h"
24 -#include "trio.h" 24 +#include "trdata.h"
  25 +#include "trevent.h"
25 26
26 #include "tr/comm_end_point.h" 27 #include "tr/comm_end_point.h"
  28 +#include "tr/comm_manager.h"
  29 +#include "tr/_comm_manager.h"
27 30
28 -int  
29 -TR_cepBufferRead(TR_CommEndPoint this) 31 +TR_EventDone
  32 +TR_commManagerShutdownRead(TR_CommManager this, TR_Event event)
30 { 33 {
31 - TR_RemoteData data = TR_socketRecv(this->transport, this->read_chunk_size); 34 + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
32 35
33 - if (! data) return -1; // ment to trigger a close  
34 - if (data == (void*)-1) return -2; // remote close... shutdown  
35 - if (data == TR_emptyRemoteData) return FALSE; 36 + if (! TR_socketFinRd(endpoint->transport)) {
  37 + TR_socketShutdownRead(endpoint->transport);
  38 + }
36 39
37 - TR_cepAppendReadData(this, data);  
38 - return TRUE; 40 + if (TR_socketFinRdWr(endpoint->transport)) {
  41 + // close
  42 + TR_ISSUE_IO_CLOSE_EVENT(this, event->subject);
  43 + }
  44 +
  45 + if (! TR_cepHasPendingData(endpoint)) {
  46 + TR_ISSUE_IO_SHUT_WRITE_EVENT(this, event->subject);
  47 + }
  48 +
  49 + TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
  50 +
  51 + return TR_EVENT_DONE;
39 } 52 }
40 53
41 // vim: set ts=4 sw=4: 54 // vim: set ts=4 sw=4:
@@ -21,14 +21,26 @@ @@ -21,14 +21,26 @@
21 */ 21 */
22 22
23 #include "trbase.h" 23 #include "trbase.h"
24 -#include "trio.h" 24 +#include "trevent.h"
25 25
26 #include "tr/comm_end_point.h" 26 #include "tr/comm_end_point.h"
  27 +#include "tr/comm_manager.h"
  28 +#include "tr/_comm_manager.h"
27 29
28 -void  
29 -TR_cepAppendWriteData(TR_CommEndPoint this, TR_RemoteData data) 30 +TR_EventDone
  31 +TR_commManagerShutdownWrite(TR_CommManager this, TR_Event event)
30 { 32 {
31 - TR_queuePut(this->write_buffer, data); 33 + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  34 +
  35 + if (! TR_socketFinWr(endpoint->transport)) {
  36 + TR_socketShutdownWrite(endpoint->transport);
  37 + }
  38 +
  39 + TR_ISSUE_IO_CLOSE_EVENT(this, event->subject);
  40 +
  41 + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
  42 +
  43 + return TR_EVENT_DONE;
32 } 44 }
33 45
34 // vim: set ts=4 sw=4: 46 // vim: set ts=4 sw=4:
@@ -57,62 +57,74 @@ connectionDtor(void * _this) @@ -57,62 +57,74 @@ connectionDtor(void * _this)
57 57
58 static 58 static
59 TR_ProtoMessage 59 TR_ProtoMessage
60 -connectionNextMessage(void * _this) 60 +connectionNextMessage(void * _this, TR_RemoteData * data)
61 { 61 {
62 TR_Connection this = _this; 62 TR_Connection this = _this;
63 TR_CommEndPoint comm = _this; 63 TR_CommEndPoint comm = _this;
64 - TR_RemoteData data = TR_queueGet(comm->read_buffer);  
65 TR_ProtoMessage ret_message = NULL; 64 TR_ProtoMessage ret_message = NULL;
  65 + TR_RemoteData new_data = NULL;
66 size_t end; 66 size_t end;
67 67
68 - if (NULL == data) return ret_message;  
69 -  
70 - if (! this->current_message || this->current_message->ready)  
71 - {  
72 - this->current_message =  
73 - TR_protoCreateMessage(comm->protocol, data->remote);  
74 - } 68 + if (*data) {
  69 + if (! this->current_message || this->current_message->ready)
  70 + {
  71 + this->current_message =
  72 + TR_protoCreateMessage(comm->protocol, (*data)->remote);
  73 + }
75 74
76 - end = TR_protoParse(comm->protocol, this->current_message, data); 75 + end = TR_protoParse(comm->protocol, this->current_message, *data);
77 76
78 - if (end != ((TR_SizedData)data)->size) {  
79 /** 77 /**
80 - * TODO  
81 - * This means that the parser has not consumed all of the data.  
82 - * We do not know the reason, but with HTTP this should only occur  
83 - * when the message is complete... anyway, to prevent us from  
84 - * looping forever because a protocol implementation is buggy  
85 - * we should close the connection after end was 0 the second time.  
86 - * This can be done by firing a close event. 78 + * We define that the only valid reason for a protocol parser to not
  79 + * consume all data is, that the current message is complete.
  80 + * When a parser returns a not completely consumed data then we first
  81 + * check if the current message is ready. If it is we create a
  82 + * new data object from the remaining data and return it to the caller
  83 + * along with the message. The caller (which is the protocol handler)
  84 + * can then call this again with the remaining data.
  85 + * If the message is not ready we drop the data silently because there
  86 + * is either wrong data or a bug in the parser and the data will never
  87 + * be consumed correctly.
  88 + * INFO: Usually we do not free data here at all. We leave this to the
  89 + * protocol implementation. The protocol might take the data without
  90 + * copying it at all or if it copies it is responsible for the free too.
  91 + * Only if we got a wrong behaviour of the protocol we free the data
  92 + * ourself.
  93 + * IMPORTANT: The protocol should never free the data when it does not
  94 + * consume it completely.
  95 + * IMPORTANT: To keep this maintainable we must write a log message here
  96 + * when we drop data. This message should be a WARNING as the protocol
  97 + * might want to drop data intentionally... (probably this is not
  98 + * true and we should make it an ERROR).
87 */ 99 */
88 - switch(end) {  
89 - default:  
90 - {  
91 - TR_RemoteData new_data = TR_new(  
92 - TR_RemoteData,  
93 - ((TR_SizedData)data)->data + end,  
94 - ((TR_SizedData)data)->size - end,  
95 - data->remote);  
96 - TR_delete(data);  
97 - data = new_data;  
98 - }  
99 - // intended drop through  
100 -  
101 - case 0:  
102 - TR_queuePutFirst(comm->read_buffer, data); 100 + if (this->current_message->ready) {
  101 + if (end != ((TR_SizedData)*data)->size) {
  102 + new_data = TR_new(
  103 + TR_RemoteData,
  104 + ((TR_SizedData)*data)->data + end,
  105 + ((TR_SizedData)*data)->size - end,
  106 + (*data)->remote);
  107 + }
  108 + ret_message = this->current_message;
  109 + this->current_message = NULL;
  110 + } else {
  111 + if (end != ((TR_SizedData)*data)->size) {
  112 + TR_delete(*data);
  113 + TR_loggerLog(
  114 + TR_logger,
  115 + TR_LOGGER_WARNING,
  116 + "Drop data not consumed by protocol.");
  117 + }
103 } 118 }
104 - }  
105 119
106 - if (this->current_message->ready) {  
107 - ret_message = this->current_message;  
108 - this->current_message = NULL; 120 + *data = new_data;
109 } 121 }
110 122
111 return ret_message; 123 return ret_message;
112 } 124 }
113 125
114 static 126 static
115 -int 127 +size_t
116 connectionCompose(void * _this, TR_ProtoMessage message) 128 connectionCompose(void * _this, TR_ProtoMessage message)
117 { 129 {
118 TR_RemoteData data = 130 TR_RemoteData data =
@@ -123,7 +135,7 @@ connectionCompose(void * _this, TR_ProtoMessage message) @@ -123,7 +135,7 @@ connectionCompose(void * _this, TR_ProtoMessage message)
123 } 135 }
124 136
125 TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data); 137 TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data);
126 - return TRUE; 138 + return ((TR_SizedData)data)->size;
127 } 139 }
128 140
129 static 141 static
@@ -76,6 +76,15 @@ connectorAccept(void * _this, TR_Event event) @@ -76,6 +76,15 @@ connectorAccept(void * _this, TR_Event event)
76 socket = TR_socketAccept((TR_TcpSocket)connection->transport); 76 socket = TR_socketAccept((TR_TcpSocket)connection->transport);
77 } 77 }
78 78
  79 + if (! socket) {
  80 + TR_eventHandlerIssueEvent(
  81 + (TR_EventHandler)this,
  82 + TR_eventSubjectEmit(
  83 + (TR_EventSubject)connection,
  84 + TR_CEP_EVENT_READ_BLOCK,
  85 + NULL));
  86 + }
  87 +
79 return TR_EVENT_DONE; 88 return TR_EVENT_DONE;
80 } 89 }
81 90
@@ -51,48 +51,31 @@ datagramServiceDtor(void * _this) @@ -51,48 +51,31 @@ datagramServiceDtor(void * _this)
51 51
52 static 52 static
53 TR_ProtoMessage 53 TR_ProtoMessage
54 -datagramServiceNextMessage(void * _this) 54 +datagramServiceNextMessage(void * _this, TR_RemoteData * data)
55 { 55 {
56 - TR_CommEndPoint comm = _this;  
57 - TR_RemoteData data = TR_queueGet(comm->read_buffer);  
58 - TR_ProtoMessage ret_message = NULL;  
59 - size_t end;  
60 -  
61 - if (NULL == data) return ret_message;  
62 -  
63 - ret_message = TR_protoCreateMessage(comm->protocol, data->remote);  
64 - end = TR_protoParse(comm->protocol, ret_message, data);  
65 -  
66 - if (end != ((TR_SizedData)data)->size) {  
67 - /**  
68 - * TODO  
69 - * This means that the parser has not consumed all of the data.  
70 - * We do not know the reason, but with HTTP this should only occur  
71 - * when the message is complete... anyway, to prevent us from  
72 - * looping forever because a protocol implementation is buggy  
73 - * we should close the connection after end was 0 the second time.  
74 - * This can be done by firing a close event. 56 + TR_CommEndPoint comm = _this;
  57 + TR_ProtoMessage ret_message = NULL;
  58 +
  59 + if (*data) {
  60 + ret_message = TR_protoCreateMessage(comm->protocol, (*data)->remote);
  61 + TR_protoParse(comm->protocol, ret_message, *data);
  62 +
  63 + /*
  64 + * In UDP I don't care about remaining data. UDP is an all or nothing
  65 + * approach. If the parser is unable to create a message from the data I
  66 + * drop the data.
  67 + * Here the protocol must not free the data if it does not create a
  68 + * complete message from it. If a message was created the original data
  69 + * might or might not traval with the created message. In any case the
  70 + * protocol is then responsible for freeing the data.
75 */ 71 */
76 - switch(end) {  
77 - default:  
78 - {  
79 - TR_RemoteData new_data = TR_new(  
80 - TR_RemoteData,  
81 - ((TR_SizedData)data)->data + end,  
82 - ((TR_SizedData)data)->size - end,  
83 - data->remote);  
84 - TR_delete(data);  
85 - data = new_data;  
86 - }  
87 - // intended drop through  
88 -  
89 - case 0:  
90 - TR_queuePutFirst(comm->read_buffer, data); 72 +
  73 + if (! ret_message->ready) {
  74 + TR_delete(*data);
  75 + TR_delete(ret_message);
91 } 76 }
92 - }  
93 77
94 - if (! ret_message->ready) {  
95 - TR_delete(ret_message); 78 + *data = NULL;
96 } 79 }
97 80
98 return ret_message; 81 return ret_message;
@@ -110,7 +93,7 @@ datagramServiceCompose(void * _this, TR_ProtoMessage message) @@ -110,7 +93,7 @@ datagramServiceCompose(void * _this, TR_ProtoMessage message)
110 } 93 }
111 94
112 TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data); 95 TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data);
113 - return TRUE; 96 + return ((TR_SizedData)data)->size;
114 } 97 }
115 98
116 intptr_t datagramService_events[TR_CEP_EVENT_MAX + 1]; 99 intptr_t datagramService_events[TR_CEP_EVENT_MAX + 1];
@@ -31,17 +31,17 @@ @@ -31,17 +31,17 @@
31 TR_CREATE_INTERFACE(TR_CommEndPoint, 2); 31 TR_CREATE_INTERFACE(TR_CommEndPoint, 2);
32 32
33 TR_ProtoMessage 33 TR_ProtoMessage
34 -TR_cepNextMessage(void * _this) 34 +TR_cepNextMessage(void * _this, TR_RemoteData * data)
35 { 35 {
36 TR_ProtoMessage callret; 36 TR_ProtoMessage callret;
37 - TR_RETCALL(_this, TR_CommEndPoint, nextMessage, callret); 37 + TR_RETCALL(_this, TR_CommEndPoint, nextMessage, callret, data);
38 return callret; 38 return callret;
39 } 39 }
40 40
41 -int 41 +size_t
42 TR_cepCompose(void * _this, TR_ProtoMessage message) 42 TR_cepCompose(void * _this, TR_ProtoMessage message)
43 { 43 {
44 - int callret; 44 + size_t callret;
45 TR_RETCALL(_this, TR_CommEndPoint, compose, callret, message); 45 TR_RETCALL(_this, TR_CommEndPoint, compose, callret, message);
46 return callret; 46 return callret;
47 } 47 }
@@ -29,8 +29,9 @@ @@ -29,8 +29,9 @@
29 #include "tr/interface/comm_manager.h" 29 #include "tr/interface/comm_manager.h"
30 #include "tr/comm_end_point.h" 30 #include "tr/comm_end_point.h"
31 #include "tr/comm_manager.h" 31 #include "tr/comm_manager.h"
  32 +#include "tr/_comm_manager.h"
32 33
33 -TR_CREATE_INTERFACE(TR_CommManager, 8); 34 +TR_CREATE_INTERFACE(TR_CommManager, 5);
34 35
35 void 36 void
36 TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint) 37 TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint)
@@ -48,17 +49,53 @@ TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint) @@ -48,17 +49,53 @@ TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint)
48 : this->max_handle; 49 : this->max_handle;
49 50
50 this->endpoints[endpoint->transport->handle] = endpoint; 51 this->endpoints[endpoint->transport->handle] = endpoint;
  52 +
  53 + if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
  54 + && ((TR_TcpSocket)endpoint->transport)->listen) {
  55 + TR_hashAdd(this->accept, endpoint);
  56 + TR_ISSUE_IO_ACC_EVENT(this, endpoint);
  57 + } else {
  58 + TR_hashAdd(this->read, endpoint);
  59 + TR_ISSUE_IO_READ_EVENT(this, endpoint);
  60 + }
  61 +
51 TR_CALL(_this, TR_CommManager, addEndpoint, endpoint); 62 TR_CALL(_this, TR_CommManager, addEndpoint, endpoint);
52 } 63 }
53 64
  65 +static
  66 +void
  67 +commManagerIssueAcceptEvents(const void * endpoint, const void * comm_manager)
  68 +{
  69 + TR_ISSUE_IO_ACC_EVENT(comm_manager, endpoint);
  70 +}
  71 +
  72 +static
  73 +void
  74 +commManagerIssueWriteEvents(const void * endpoint, const void * comm_manager)
  75 +{
  76 + TR_ISSUE_IO_WRITE_EVENT(comm_manager, endpoint);
  77 +}
  78 +
  79 +static
  80 +void
  81 +commManagerIssueReadEvents(const void * endpoint, const void * comm_manager)
  82 +{
  83 + TR_ISSUE_IO_READ_EVENT(comm_manager, endpoint);
  84 +}
  85 +
54 TR_EventDone 86 TR_EventDone
55 TR_commManagerSelect(void * _this, TR_Event event) 87 TR_commManagerSelect(void * _this, TR_Event event)
56 { 88 {
  89 + TR_CommManager this = _this;
57 int timeout; // milliseconds 90 int timeout; // milliseconds
58 int * timeoutptr = event->data; 91 int * timeoutptr = event->data;
59 TR_EventDispatcher dispatcher = (TR_EventDispatcher)event->subject; 92 TR_EventDispatcher dispatcher = (TR_EventDispatcher)event->subject;
60 93
61 - if (NULL == timeoutptr) { 94 + if (! (TR_hashEmpty(this->read)
  95 + && TR_hashEmpty(this->write)
  96 + && TR_hashEmpty(this->accept))) {
  97 + timeout = 0;
  98 + } else if (NULL == timeoutptr) {
62 timeout = TR_eventDispatcherGetDataWaitTime(dispatcher); 99 timeout = TR_eventDispatcherGetDataWaitTime(dispatcher);
63 } else { 100 } else {
64 timeout = *timeoutptr; 101 timeout = *timeoutptr;
@@ -66,41 +103,42 @@ TR_commManagerSelect(void * _this, TR_Event event) @@ -66,41 +103,42 @@ TR_commManagerSelect(void * _this, TR_Event event)
66 103
67 TR_CALL(_this, TR_CommManager, select, event, timeout); 104 TR_CALL(_this, TR_CommManager, select, event, timeout);
68 105
  106 + TR_hashEach(this->write, this, commManagerIssueWriteEvents);
  107 + TR_hashEach(this->accept, this, commManagerIssueAcceptEvents);
  108 + TR_hashEach(this->read, this, commManagerIssueReadEvents);
  109 +
69 return TR_EVENT_DONE; 110 return TR_EVENT_DONE;
70 } 111 }
71 112
72 TR_EventDone 113 TR_EventDone
73 -TR_commManagerEnableWrite(void * _this, TR_Event event) 114 +TR_commManagerPollWrite(void * _this, TR_Event event)
74 { 115 {
75 - TR_CALL(_this, TR_CommManager, enableWrite, event); 116 + TR_CommManager this = _this;
  117 +
  118 + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
  119 + if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) {
  120 + TR_CALL(_this, TR_CommManager, pollWrite, event);
  121 + }
76 122
77 return TR_EVENT_DONE; 123 return TR_EVENT_DONE;
78 } 124 }
79 125
80 TR_EventDone 126 TR_EventDone
81 -TR_commManagerDisableWrite(void * _this, TR_Event event) 127 +TR_commManagerPollRead(void * _this, TR_Event event)
82 { 128 {
83 - TR_EventHandler this = _this; 129 + TR_CommManager this = _this;
84 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; 130 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
85 131
86 - TR_CALL(_this, TR_CommManager, disableWrite, event);  
87 -  
88 - if (TR_socketFinRd(endpoint->transport)) {  
89 - TR_eventHandlerIssueEvent(  
90 - this,  
91 - TR_eventSubjectEmit(  
92 - event->subject,  
93 - TR_CEP_EVENT_SHUT_READ,  
94 - NULL)); 132 + if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
  133 + && ((TR_TcpSocket)endpoint->transport)->listen) {
  134 + TR_hashDeleteByVal(this->accept, TR_hashableGetHash(event->subject));
  135 + } else {
  136 + TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
95 } 137 }
96 138
97 - return TR_EVENT_DONE;  
98 -}  
99 -  
100 -TR_EventDone  
101 -TR_commManagerEnableRead(void * _this, TR_Event event)  
102 -{  
103 - TR_CALL(_this, TR_CommManager, enableRead, event); 139 + if (! TR_socketFinRd(endpoint->transport)) {
  140 + TR_CALL(_this, TR_CommManager, pollRead, event);
  141 + }
104 142
105 return TR_EVENT_DONE; 143 return TR_EVENT_DONE;
106 } 144 }
@@ -110,74 +148,26 @@ TR_commManagerClose(void * _this, TR_Event event) @@ -110,74 +148,26 @@ TR_commManagerClose(void * _this, TR_Event event)
110 { 148 {
111 TR_CommManager this = _this; 149 TR_CommManager this = _this;
112 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; 150 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
113 -  
114 - TR_CALL(_this, TR_CommManager, close, event); 151 + int handle = endpoint->transport->handle;
115 152
116 if (! TR_socketFinRdWr(endpoint->transport)) { 153 if (! TR_socketFinRdWr(endpoint->transport)) {
117 TR_socketShutdown(endpoint->transport); 154 TR_socketShutdown(endpoint->transport);
118 } 155 }
119 156
120 - if (endpoint->transport->handle == this->max_handle) { 157 + if (handle == this->max_handle) {
121 while (! this->endpoints[--this->max_handle]); 158 while (! this->endpoints[--this->max_handle]);
122 } 159 }
123 160
124 - if (this->endpoints[endpoint->transport->handle]) {  
125 - TR_eventSubjectFinalize(  
126 - (TR_EventSubject)this->endpoints[endpoint->transport->handle]);  
127 - this->endpoints[endpoint->transport->handle] = NULL;  
128 - }  
129 -  
130 - return TR_EVENT_DONE;  
131 -}  
132 -  
133 -TR_EventDone  
134 -TR_commManagerShutdownRead(void * _this, TR_Event event)  
135 -{  
136 - TR_CALL(_this, TR_CommManager, shutdownRead, event);  
137 -  
138 - if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {  
139 - TR_socketShutdownRead(((TR_CommEndPoint)event->subject)->transport); 161 + if (this->endpoints[handle]) {
  162 + TR_eventSubjectFinalize((TR_EventSubject)this->endpoints[handle]);
  163 + this->endpoints[handle] = NULL;
  164 + TR_hashDeleteByVal(this->write, TR_hashableGetHash(endpoint));
  165 + TR_hashDeleteByVal(this->read, TR_hashableGetHash(endpoint));
140 } 166 }
141 167
142 - if (TR_socketFinRdWr(((TR_CommEndPoint)event->subject)->transport)) {  
143 - // close  
144 - TR_eventHandlerIssueEvent(  
145 - (TR_EventHandler)_this,  
146 - TR_eventSubjectEmit(  
147 - event->subject,  
148 - TR_CEP_EVENT_CLOSE,  
149 - NULL));  
150 - }  
151 -  
152 - if (! TR_cepHasPendingData((TR_CommEndPoint)event->subject)) {  
153 - TR_eventHandlerIssueEvent(  
154 - (TR_EventHandler)_this,  
155 - TR_eventSubjectEmit(  
156 - event->subject,  
157 - TR_CEP_EVENT_SHUT_WRITE,  
158 - NULL));  
159 - }  
160 -  
161 - return TR_EVENT_DONE;  
162 -}  
163 -  
164 -TR_EventDone  
165 -TR_commManagerShutdownWrite(void * _this, TR_Event event)  
166 -{  
167 - TR_CALL(_this, TR_CommManager, shutdownWrite, event);  
168 -  
169 - if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) {  
170 - TR_socketShutdownWrite(((TR_CommEndPoint)event->subject)->transport);  
171 - } 168 + TR_CALL(_this, TR_CommManager, close, event);
172 169
173 - if (TR_socketFinRdWr(((TR_CommEndPoint)event->subject)->transport)) {  
174 - TR_eventHandlerIssueEvent(  
175 - (TR_EventHandler)_this,  
176 - TR_eventSubjectEmit(  
177 - event->subject,  
178 - TR_CEP_EVENT_CLOSE,  
179 - NULL));  
180 - } 170 + this->endpoints[handle] = NULL;
181 171
182 return TR_EVENT_DONE; 172 return TR_EVENT_DONE;
183 } 173 }
@@ -21,9 +21,12 @@ @@ -21,9 +21,12 @@
21 */ 21 */
22 22
23 #include <unistd.h> 23 #include <unistd.h>
  24 +#include <inttypes.h>
  25 +#include <stdio.h>
24 26
25 #include "trbase.h" 27 #include "trbase.h"
26 #include "trevent.h" 28 #include "trevent.h"
  29 +#include "trio.h"
27 30
28 #include "tr/io_handler.h" 31 #include "tr/io_handler.h"
29 #include "tr/comm_end_point.h" 32 #include "tr/comm_end_point.h"
@@ -44,40 +47,47 @@ static @@ -44,40 +47,47 @@ static
44 TR_EventDone 47 TR_EventDone
45 ioHandlerRead(void * _this, TR_Event event) 48 ioHandlerRead(void * _this, TR_Event event)
46 { 49 {
47 - TR_Event revent; 50 + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  51 + TR_Event revent;
  52 + TR_RemoteData data;
48 53
49 - switch (TR_cepBufferRead((TR_CommEndPoint)event->subject)) {  
50 - case FALSE: // EAGAIN  
51 - revent = TR_eventSubjectEmit(  
52 - event->subject,  
53 - TR_CEP_EVENT_READ_BLOCK,  
54 - NULL);  
55 - break; 54 + if (endpoint->write_buffer_size < CEP_WRITE_BUFFER_THRESHOLD) {
  55 + switch (TR_commEndPointRead(endpoint, &data)) {
  56 + case FALSE: // EAGAIN
  57 + revent = TR_eventSubjectEmit(
  58 + event->subject,
  59 + TR_CEP_EVENT_READ_BLOCK,
  60 + NULL);
  61 + break;
56 62
57 - case -1: // error  
58 - revent = TR_eventSubjectEmit(  
59 - event->subject,  
60 - TR_CEP_EVENT_CLOSE,  
61 - NULL);  
62 - break; 63 + case -1: // error
  64 + revent = TR_eventSubjectEmit(
  65 + event->subject,
  66 + TR_CEP_EVENT_CLOSE,
  67 + NULL);
  68 + break;
63 69
64 - default:  
65 - case -2: // remote close  
66 - revent = TR_eventSubjectEmit(  
67 - event->subject,  
68 - TR_CEP_EVENT_SHUT_READ,  
69 - NULL);  
70 - break; 70 + default:
  71 + case -2: // remote close
  72 + revent = TR_eventSubjectEmit(
  73 + event->subject,
  74 + TR_CEP_EVENT_SHUT_READ,
  75 + NULL);
  76 + break;
71 77
72 - case TRUE:  
73 - revent = TR_eventSubjectEmit(  
74 - event->subject,  
75 - TR_CEP_EVENT_NEW_DATA,  
76 - NULL);  
77 - break;  
78 - } 78 + case -3: // read limit
  79 + return TR_EVENT_DONE;
79 80
80 - TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); 81 + case TRUE:
  82 + revent = TR_eventSubjectEmit(
  83 + event->subject,
  84 + TR_CEP_EVENT_NEW_DATA,
  85 + data);
  86 + break;
  87 + }
  88 +
  89 + TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
  90 + }
81 91
82 return TR_EVENT_DONE; 92 return TR_EVENT_DONE;
83 } 93 }
@@ -86,15 +96,15 @@ static @@ -86,15 +96,15 @@ static
86 TR_EventDone 96 TR_EventDone
87 ioHandlerWrite(void * _this, TR_Event event) 97 ioHandlerWrite(void * _this, TR_Event event)
88 { 98 {
89 - TR_Event revent = NULL,  
90 - close_event = NULL; 99 + TR_Event revent = NULL;
91 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; 100 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
  101 + size_t written;
92 102
93 - switch (TR_cepWriteBuffered(endpoint)) { 103 + switch (TR_cepWriteBuffered(endpoint, &written)) {
94 case FALSE: // EAGAIN 104 case FALSE: // EAGAIN
95 revent = TR_eventSubjectEmit( 105 revent = TR_eventSubjectEmit(
96 event->subject, 106 event->subject,
97 - TR_CEP_EVENT_PENDING_DATA, // is WRITE_BLOCK 107 + TR_CEP_EVENT_WRITE_BLOCK,
98 NULL); 108 NULL);
99 break; 109 break;
100 110
@@ -112,24 +122,31 @@ ioHandlerWrite(void * _this, TR_Event event) @@ -112,24 +122,31 @@ ioHandlerWrite(void * _this, TR_Event event)
112 NULL); 122 NULL);
113 break; 123 break;
114 124
  125 + case -3: // remote end not ready
  126 + break;
  127 +
  128 + case -4: // no more data to send
  129 + revent = TR_eventSubjectEmit(
  130 + event->subject,
  131 + TR_CEP_EVENT_DATA_END,
  132 + NULL);
  133 + break;
  134 +
115 default: 135 default:
116 - if (! TR_cepHasPendingData((TR_CommEndPoint)event->subject)) { 136 + // TODO This still looks wrong...
  137 + if (TRUE == endpoint->do_close) {
117 revent = TR_eventSubjectEmit( 138 revent = TR_eventSubjectEmit(
118 event->subject, 139 event->subject,
119 - TR_CEP_EVENT_END_DATA, 140 + TR_CEP_EVENT_CLOSE,
120 NULL); 141 NULL);
121 -  
122 - if (TRUE == ((TR_CommEndPoint)event->subject)->do_close) {  
123 - close_event = TR_eventSubjectEmit(  
124 - event->subject,  
125 - TR_CEP_EVENT_CLOSE,  
126 - NULL);  
127 - }  
128 } 142 }
129 } 143 }
130 144
131 - TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);  
132 - TR_eventHandlerIssueEvent((TR_EventHandler)_this, close_event); 145 + endpoint->write_buffer_size -= written;
  146 +
  147 + if (revent) {
  148 + TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
  149 + }
133 150
134 return TR_EVENT_DONE; 151 return TR_EVENT_DONE;
135 } 152 }
@@ -141,13 +158,12 @@ ioHandlerCvInit(TR_class_ptr cls) @@ -141,13 +158,12 @@ ioHandlerCvInit(TR_class_ptr cls)
141 TR_EVENT_HANDLER_SET_METHOD( 158 TR_EVENT_HANDLER_SET_METHOD(
142 cls, 159 cls,
143 TR_CommEndPoint, 160 TR_CommEndPoint,
144 - TR_CEP_EVENT_READ_READY, 161 + TR_CEP_EVENT_DO_READ,
145 ioHandlerRead); 162 ioHandlerRead);
146 -  
147 TR_EVENT_HANDLER_SET_METHOD( 163 TR_EVENT_HANDLER_SET_METHOD(
148 cls, 164 cls,
149 TR_CommEndPoint, 165 TR_CommEndPoint,
150 - TR_CEP_EVENT_WRITE_READY, 166 + TR_CEP_EVENT_DO_WRITE,
151 ioHandlerWrite); 167 ioHandlerWrite);
152 } 168 }
153 169
@@ -51,9 +51,10 @@ protocolHandlerParse(void * _this, TR_Event event) @@ -51,9 +51,10 @@ protocolHandlerParse(void * _this, TR_Event event)
51 * TODO No upgrade for now. Add it later on. 51 * TODO No upgrade for now. Add it later on.
52 */ 52 */
53 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; 53 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
54 - TR_ProtoMessage message = TR_cepNextMessage(endpoint); 54 + TR_RemoteData data = event->data;
  55 + TR_ProtoMessage message;
55 56
56 - if (message) { 57 + while ((message = TR_cepNextMessage(endpoint, &data))) {
57 TR_eventHandlerIssueEvent( 58 TR_eventHandlerIssueEvent(
58 (TR_EventHandler)_this, 59 (TR_EventHandler)_this,
59 TR_eventSubjectEmit( 60 TR_eventSubjectEmit(
@@ -62,8 +63,8 @@ protocolHandlerParse(void * _this, TR_Event event) @@ -62,8 +63,8 @@ protocolHandlerParse(void * _this, TR_Event event)
62 message)); 63 message));
63 64
64 if (message->close) { 65 if (message->close) {
65 - // also check that we are a response. Well this is how it is done  
66 - // in the python code... 66 + // also check that we are a response. Well this is
  67 + // how it is done in the python code...
67 TR_cepSetClose(endpoint); 68 TR_cepSetClose(endpoint);
68 } 69 }
69 } 70 }
@@ -77,6 +78,7 @@ protocolHandlerCompose(void * _this, TR_Event event) @@ -77,6 +78,7 @@ protocolHandlerCompose(void * _this, TR_Event event)
77 { 78 {
78 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; 79 TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
79 TR_ProtoMessage message = (TR_ProtoMessage)event->data; 80 TR_ProtoMessage message = (TR_ProtoMessage)event->data;
  81 + size_t message_size;
80 82
81 if (message->close) { 83 if (message->close) {
82 // also check that we are a response. Well this is how it is done 84 // also check that we are a response. Well this is how it is done
@@ -84,13 +86,17 @@ protocolHandlerCompose(void * _this, TR_Event event) @@ -84,13 +86,17 @@ protocolHandlerCompose(void * _this, TR_Event event)
84 TR_cepSetClose(endpoint); 86 TR_cepSetClose(endpoint);
85 } 87 }
86 88
87 - if (TR_cepCompose(endpoint, message)) {  
88 - TR_Event _event = TR_eventSubjectEmit(  
89 - event->subject,  
90 - TR_CEP_EVENT_WRITE_READY,  
91 - NULL); 89 + if ((message_size = TR_cepCompose(endpoint, message))) {
  90 + endpoint->write_buffer_size += message_size;
92 91
93 - TR_eventHandlerIssueEvent((TR_EventHandler)_this, _event); 92 + if (endpoint->write_buffer->nmsg == 1) {
  93 + TR_eventHandlerIssueEvent(
  94 + (TR_EventHandler)_this,
  95 + TR_eventSubjectEmit(
  96 + event->subject,
  97 + TR_CEP_EVENT_DATA_READY,
  98 + NULL));
  99 + }
94 } 100 }
95 TR_delete(message); 101 TR_delete(message);
96 102
@@ -112,18 +118,16 @@ protocolHandlerCvInit(TR_class_ptr cls) @@ -112,18 +118,16 @@ protocolHandlerCvInit(TR_class_ptr cls)
112 TR_CommEndPoint, 118 TR_CommEndPoint,
113 TR_CEP_EVENT_NEW_DATA, 119 TR_CEP_EVENT_NEW_DATA,
114 protocolHandlerParse); 120 protocolHandlerParse);
115 -  
116 TR_EVENT_HANDLER_SET_METHOD( 121 TR_EVENT_HANDLER_SET_METHOD(
117 cls, 122 cls,
118 TR_CommEndPoint, 123 TR_CommEndPoint,
119 - TR_CEP_EVENT_SEND_MSG, 124 + TR_CEP_EVENT_MSG_READY,
120 protocolHandlerCompose); 125 protocolHandlerCompose);
121 -  
122 - TR_EVENT_HANDLER_SET_METHOD(  
123 - cls,  
124 - TR_CommEndPoint,  
125 - TR_CEP_EVENT_UPGRADE,  
126 - protocolHandlerUpgrade); 126 +// TR_EVENT_HANDLER_SET_METHOD(
  127 +// cls,
  128 +// TR_CommEndPoint,
  129 +// TR_CEP_EVENT_UPGRADE,
  130 +// protocolHandlerUpgrade);
127 } 131 }
128 132
129 TR_INSTANCE(TR_Hash, protocolHandlerEventMethods); 133 TR_INSTANCE(TR_Hash, protocolHandlerEventMethods);
@@ -39,7 +39,7 @@ testHandlerNewMessage(TR_EventHandler this, TR_Event event) @@ -39,7 +39,7 @@ testHandlerNewMessage(TR_EventHandler this, TR_Event event)
39 39
40 _event = TR_eventSubjectEmit( 40 _event = TR_eventSubjectEmit(
41 event->subject, 41 event->subject,
42 - TR_CEP_EVENT_SEND_MSG, 42 + TR_CEP_EVENT_MSG_READY,
43 event->data); 43 event->data);
44 44
45 TR_eventHandlerIssueEvent((TR_EventHandler)this, _event); 45 TR_eventHandlerIssueEvent((TR_EventHandler)this, _event);
@@ -56,14 +56,14 @@ testHandlerClose(TR_EventHandler this, TR_Event event) @@ -56,14 +56,14 @@ testHandlerClose(TR_EventHandler this, TR_Event event)
56 return TR_EVENT_PENDING; 56 return TR_EVENT_PENDING;
57 } 57 }
58 58
59 -static  
60 -TR_EventDone  
61 -testHandlerUpgrade(TR_EventHandler this, TR_Event event)  
62 -{  
63 - printf("upgrade: %"PRIdPTR"\n", event->id);  
64 -  
65 - return TR_EVENT_PENDING;  
66 -} 59 +//static
  60 +//TR_EventDone
  61 +//testHandlerUpgrade(TR_EventHandler this, TR_Event event)
  62 +//{
  63 +// printf("upgrade: %"PRIdPTR"\n", event->id);
  64 +//
  65 +// return TR_EVENT_PENDING;
  66 +//}
67 67
68 static 68 static
69 int 69 int
@@ -101,11 +101,11 @@ testHandlerCvInit(TR_class_ptr class) @@ -101,11 +101,11 @@ testHandlerCvInit(TR_class_ptr class)
101 TR_CommEndPoint, 101 TR_CommEndPoint,
102 TR_CEP_EVENT_CLOSE, 102 TR_CEP_EVENT_CLOSE,
103 testHandlerClose); 103 testHandlerClose);
104 - TR_EVENT_HANDLER_SET_METHOD(  
105 - class,  
106 - TR_CommEndPoint,  
107 - TR_CEP_EVENT_UPGRADE,  
108 - testHandlerUpgrade); 104 +// TR_EVENT_HANDLER_SET_METHOD(
  105 +// class,
  106 +// TR_CommEndPoint,
  107 +// TR_CEP_EVENT_UPGRADE,
  108 +// testHandlerUpgrade);
109 } 109 }
110 110
111 TR_INSTANCE(TR_Hash, testHandlerEventMethods); 111 TR_INSTANCE(TR_Hash, testHandlerEventMethods);
1 #!/bin/sh 1 #!/bin/sh
2 2
  3 +BS=8192
  4 +COUNT=25000
  5 +CONCURENT=200
  6 +IP="192.168.2.13"
3 pids="" 7 pids=""
4 i=0 8 i=0
5 -while [ $i -lt 800 ] 9 +
  10 +while [ $i -lt ${CONCURENT} ]
6 do 11 do
7 - dd if=/dev/zero bs=8192 count=2500 | nc 192.168.2.13 5678 & 12 + dd if=/dev/zero bs=${BS} count=${COUNT} | nc ${IP} 5678 >/dev/null &
8 pids="${pids} $!" 13 pids="${pids} $!"
9 i=$((i + 1)) 14 i=$((i + 1))
10 done 15 done
Please register or login to post a comment