Commit 4438a03ef6a66dee9e99955f14a86f8e538de6e8

Authored by Georg Hopp
1 parent 5baa2f2d

Change socket hashtables for read, accept and write to set

... ... @@ -3,3 +3,30 @@
3 3 Socket communication layer build upon libtrevent.
4 4
5 5 ## BUGS
  6 +
  7 +## MULTIPLE WORKER
  8 +
  9 +### Stream sockets
  10 +
  11 +For stream socket some new classes might be neede:
  12 +
  13 + * A connector derivate that does not create a TR_CON_EVENT_NEW_CON
  14 + event but instead hands over the socket to one of its worker processes.
  15 +
  16 + * Another connector no doing TR_socketAccept but instead get the
  17 + docket via its socket pair. This one then might isse TR_CON_EVENT_NEW_CON
  18 +
  19 +### Datagram sockets
  20 +
  21 +Currently there is no clear plan to parallelize the communication
  22 +on datagram sockets.
  23 +
  24 +As we only have one socket for all communication it makes no sense
  25 +to share it between multiple processes. This neccicarily will lead
  26 +to race condition.
  27 +
  28 +One possible way might be to share the socket with all worker, but
  29 +do the read only in the master worker. When a packet has arraive
  30 +it will be handed over to the worker via shared memory. This might
  31 +be done as an event message. The worker then will process the packet
  32 +and send a respone.
... ...
... ... @@ -35,9 +35,9 @@ TR_CLASS(TR_CommManager) {
35 35 TR_EXTENDS(TR_EventHandler);
36 36
37 37 TR_CommEndPoint * endpoints;
38   - TR_Hash accept;
39   - TR_Hash write;
40   - TR_Hash read;
  38 + TR_Set accept;
  39 + TR_Set write;
  40 + TR_Set read;
41 41 size_t n_endpoints;
42 42 size_t max_handle;
43 43 };
... ...
... ... @@ -41,8 +41,6 @@ TR_CLASSVARS_DECL(TR_ConnEntryPoint) {
41 41 #define TR_CET_EVENT_ACC_READY (TR_CEP_EVENT_MAX + 1)
42 42 #define TR_CET_EVENT_MAX ((size_t)TR_CET_EVENT_ACC_READY)
43 43
44   -TR_TcpSocket TR_cetAccept(TR_ConnEntryPoint);
45   -
46 44 #endif // __TR_CONNECT_ENTRY_POINT_H__
47 45
48 46 // vim: set ts=4 sw=4:
... ...
... ... @@ -4,8 +4,7 @@ AUTOMAKE_OPTIONS = subdir-objects
4 4 AM_CFLAGS += -I../include/ -std=c99
5 5 AM_LDFLAGS +=
6 6
7   -TRCOMM = cet_accept.c \
8   - cep_write_buffered.c \
  7 +TRCOMM = cep_write_buffered.c \
9 8 comm_end_point_read.c \
10 9 comm_end_point.c \
11 10 conn_entry_point.c \
... ...
1   -/**
2   - * \file
3   - *
4   - * \author Georg Hopp
5   - *
6   - * \copyright
7   - * Copyright © 2014 Georg Hopp
8   - *
9   - * This program is free software: you can redistribute it and/or modify
10   - * it under the terms of the GNU General Public License as published by
11   - * the Free Software Foundation, either version 3 of the License, or
12   - * (at your option) any later version.
13   - *
14   - * This program is distributed in the hope that it will be useful,
15   - * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17   - * GNU General Public License for more details.
18   - *
19   - * You should have received a copy of the GNU General Public License
20   - * along with this program. If not, see <http://www.gnu.org/licenses/>.
21   - */
22   -
23   -#include "trio.h"
24   -
25   -#include "tr/comm_end_point.h"
26   -#include "tr/connect_entry_point.h"
27   -
28   -TR_TcpSocket
29   -TR_cetAccept(TR_ConnEntryPoint cet)
30   -{
31   - return TR_socketAccept((TR_TcpSocket)((TR_CommEndPoint)cet)->transport);
32   -}
33   -
34   -// vim: set ts=4 sw=4:
... ... @@ -40,13 +40,13 @@ commManagerCtor(void * _this, va_list * params)
40 40
41 41 TR_PARENTCALL(TR_CommManager, _this, TR_Class, ctor, params);
42 42
43   - this->accept = TR_new(TR_Hash);
44   - this->write = TR_new(TR_Hash);
45   - this->read = TR_new(TR_Hash);
  43 + this->accept = TR_new(TR_Set);
  44 + this->write = TR_new(TR_Set);
  45 + this->read = TR_new(TR_Set);
46 46
47   - this->accept->cleanup_no_free = TRUE;
48   - this->write->cleanup_no_free = TRUE;
49   - this->read->cleanup_no_free = TRUE;
  47 + this->accept->free_msgs = 0;
  48 + this->write->free_msgs = 0;
  49 + this->read->free_msgs = 0;
50 50
51 51 this->n_endpoints = sysconf(_SC_OPEN_MAX);
52 52 this->endpoints = TR_calloc(sizeof(TR_CommEndPoint), this->n_endpoints);
... ... @@ -76,7 +76,7 @@ TR_commManagerEnableWrite(void * _this, TR_Event event)
76 76 {
77 77 TR_CommManager this = _this;
78 78
79   - TR_hashAdd(this->write, event->subject);
  79 + TR_setAdd(this->write, event->subject);
80 80
81 81 return TR_EVENT_DONE;
82 82 }
... ...
... ... @@ -101,10 +101,10 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, unsigned long timeout)
101 101 if ((events[i].events & EPOLLIN) == EPOLLIN) {
102 102 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
103 103 && ((TR_TcpSocket)endpoint->transport)->listen) {
104   - TR_hashAdd(cmgr->accept, endpoint);
  104 + TR_setAdd(cmgr->accept, endpoint);
105 105 } else {
106 106 if (! event->subject->fin) {
107   - TR_hashAdd(cmgr->read, endpoint);
  107 + TR_setAdd(cmgr->read, endpoint);
108 108 }
109 109 }
110 110
... ... @@ -116,7 +116,7 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, unsigned long timeout)
116 116
117 117 if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
118 118 if (! event->subject->fin) {
119   - TR_hashAdd(cmgr->write, endpoint);
  119 + TR_setAdd(cmgr->write, endpoint);
120 120 }
121 121 this->events[handle] &= ~EPOLLOUT;
122 122 _event.data.ptr = endpoint;
... ...
... ... @@ -97,10 +97,10 @@ TR_commManagerPollSelect(void * _this, TR_Event event, unsigned long timeout)
97 97 if ((this->fds[i].revents & POLLIN) == POLLIN) {
98 98 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
99 99 && ((TR_TcpSocket)endpoint->transport)->listen) {
100   - TR_hashAdd(cmgr->accept, endpoint);
  100 + TR_setAdd(cmgr->accept, endpoint);
101 101 } else {
102 102 if (! event->subject->fin) {
103   - TR_hashAdd(cmgr->read, endpoint);
  103 + TR_setAdd(cmgr->read, endpoint);
104 104 }
105 105 }
106 106 this->fds[endpoint->transport->handle].events &= ~POLLIN;
... ... @@ -108,7 +108,7 @@ TR_commManagerPollSelect(void * _this, TR_Event event, unsigned long timeout)
108 108
109 109 if ((this->fds[i].revents & POLLOUT) == POLLOUT) {
110 110 if (! event->subject->fin) {
111   - TR_hashAdd(cmgr->write, endpoint);
  111 + TR_setAdd(cmgr->write, endpoint);
112 112 }
113 113 this->fds[endpoint->transport->handle].events &=
114 114 ~(POLLOUT|POLLHUP);
... ...
... ... @@ -47,7 +47,7 @@ TR_commManagerShutdownRead(void * _this, TR_Event event)
47 47 TR_ISSUE_IO_SHUT_WRITE_EVENT(this, event->subject);
48 48 }
49 49
50   - TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
  50 + TR_setDelete(this->read, event->subject);
51 51
52 52 return TR_EVENT_DONE;
53 53 }
... ...
... ... @@ -39,7 +39,7 @@ TR_commManagerShutdownWrite(void * _this, TR_Event event)
39 39
40 40 TR_ISSUE_IO_CLOSE_EVENT(this, event->subject);
41 41
42   - TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
  42 + TR_setDelete(this->write, event->subject);
43 43
44 44 return TR_EVENT_DONE;
45 45 }
... ...
... ... @@ -72,6 +72,16 @@ connectorAccept(void * _this, TR_Event event)
72 72 (TR_EventSubject)new_con,
73 73 TR_CON_EVENT_NEW_CON,
74 74 NULL));
  75 + /*
  76 + * TODO The break is fatal when using edge triggered events...
  77 + * so either make sure to only use level trigged events with
  78 + * the accept socket or remove the break...
  79 + * Currently I think the break can be removed...
  80 + *
  81 + * TODO Prevent malicious or broken clients from doing a connection
  82 + * bomb here... well, doing this might mean that the break needs to be
  83 + * here.
  84 + */
75 85 if (++count > 100) break;
76 86 socket = TR_socketAccept((TR_TcpSocket)connection->transport);
77 87 }
... ...
... ... @@ -52,37 +52,16 @@ TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint)
52 52
53 53 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
54 54 && ((TR_TcpSocket)endpoint->transport)->listen) {
55   - TR_hashAdd(this->accept, endpoint);
  55 + TR_setAdd(this->accept, endpoint);
56 56 TR_ISSUE_IO_ACC_EVENT(this, endpoint);
57 57 } else {
58   - TR_hashAdd(this->read, endpoint);
  58 + TR_setAdd(this->read, endpoint);
59 59 TR_ISSUE_IO_READ_EVENT(this, endpoint);
60 60 }
61 61
62 62 TR_CALL(_this, TR_CommManager, addEndpoint, endpoint);
63 63 }
64 64
65   -static
66   -void
67   -commManagerIssueAcceptEvents(const void * endpoint, const void * comm_manager)
68   -{
69   - TR_ISSUE_IO_ACC_EVENT(comm_manager, endpoint);
70   -}
71   -
72   -static
73   -void
74   -commManagerIssueWriteEvents(const void * endpoint, const void * comm_manager)
75   -{
76   - TR_ISSUE_IO_WRITE_EVENT(comm_manager, endpoint);
77   -}
78   -
79   -static
80   -void
81   -commManagerIssueReadEvents(const void * endpoint, const void * comm_manager)
82   -{
83   - TR_ISSUE_IO_READ_EVENT(comm_manager, endpoint);
84   -}
85   -
86 65 TR_EventDone
87 66 TR_commManagerSelect(void * _this, TR_Event event)
88 67 {
... ... @@ -90,13 +69,23 @@ TR_commManagerSelect(void * _this, TR_Event event)
90 69 TR_Timer timer = (TR_Timer)event->data;
91 70 TR_EventDispatcher dispatcher = (TR_EventDispatcher)event->subject;
92 71 unsigned long timeout; // milliseconds
93   - unsigned long io_triggerd;
94 72
95   - io_triggerd = TR_hashEach(this->write, this, commManagerIssueWriteEvents);
96   - io_triggerd += TR_hashEach(this->accept, this, commManagerIssueAcceptEvents);
97   - io_triggerd += TR_hashEach(this->read, this, commManagerIssueReadEvents);
  73 +#define IO_TRIGGERED \
  74 + (TR_setSize(this->write) || \
  75 + TR_setSize(this->accept) || \
  76 + TR_setSize(this->read))
  77 +
  78 + TR_iterableForeach(this->write) {
  79 + TR_ISSUE_IO_WRITE_EVENT(this, TR_iterableCurrent(this->write));
  80 + }
  81 + TR_iterableForeach(this->accept) {
  82 + TR_ISSUE_IO_ACC_EVENT(this, TR_iterableCurrent(this->accept));
  83 + }
  84 + TR_iterableForeach(this->read) {
  85 + TR_ISSUE_IO_READ_EVENT(this, TR_iterableCurrent(this->read));
  86 + }
98 87
99   - if (io_triggerd) {
  88 + if (IO_TRIGGERED) {
100 89 timeout = 0;
101 90 } else if (NULL == timer) {
102 91 timeout = TR_eventDispatcherGetDataWaitTime(dispatcher);
... ... @@ -114,7 +103,7 @@ TR_commManagerPollWrite(void * _this, TR_Event event)
114 103 {
115 104 TR_CommManager this = _this;
116 105
117   - TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
  106 + TR_setDelete(this->write, event->subject);
118 107 if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) {
119 108 TR_CALL(_this, TR_CommManager, pollWrite, event);
120 109 }
... ... @@ -130,9 +119,9 @@ TR_commManagerPollRead(void * _this, TR_Event event)
130 119
131 120 if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
132 121 && ((TR_TcpSocket)endpoint->transport)->listen) {
133   - TR_hashDeleteByVal(this->accept, TR_hashableGetHash(event->subject));
  122 + TR_setDelete(this->accept, event->subject);
134 123 } else {
135   - TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
  124 + TR_setDelete(this->read, event->subject);
136 125 }
137 126
138 127 if (! TR_socketFinRd(endpoint->transport)) {
... ... @@ -146,7 +135,7 @@ TR_EventDone
146 135 TR_commManagerDisableRead(void * _this, TR_Event event)
147 136 {
148 137 TR_CommManager this = _this;
149   - TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject));
  138 + TR_setDelete(this->read, event->subject);
150 139 TR_CALL(_this, TR_CommManager, disableRead, event);
151 140
152 141 return TR_EVENT_DONE;
... ... @@ -157,9 +146,9 @@ TR_commManagerDisableWrite(void * _this, TR_Event event)
157 146 {
158 147 TR_CommManager this = _this;
159 148
160   - TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
  149 + TR_setDelete(this->write, event->subject);
161 150 if (! event->subject->fin) {
162   - TR_hashAdd(this->read, event->subject);
  151 + TR_setAdd(this->read, event->subject);
163 152 }
164 153 TR_CALL(_this, TR_CommManager, disableWrite, event);
165 154
... ... @@ -186,8 +175,8 @@ TR_commManagerClose(void * _this, TR_Event event)
186 175 TR_eventSubjectFinalize((TR_EventSubject)this->endpoints[handle]);
187 176 TR_CALL(_this, TR_CommManager, disableWrite, event);
188 177 TR_CALL(_this, TR_CommManager, disableRead, event);
189   - TR_hashDeleteByVal(this->write, TR_hashableGetHash(endpoint));
190   - TR_hashDeleteByVal(this->read, TR_hashableGetHash(endpoint));
  178 + TR_setDelete(this->write, endpoint);
  179 + TR_setDelete(this->read, endpoint);
191 180 }
192 181
193 182 TR_CALL(_this, TR_CommManager, close, event);
... ...
... ... @@ -41,7 +41,7 @@ TR_serverBindTcp(
41 41
42 42 TR_serverAddEndpoint(
43 43 this,
44   - TR_new(TR_ConnEntryPoint, socket, proto, 2048));
  44 + TR_new(TR_ConnEntryPoint, socket, proto, CEP_DEFAULT_READ_SIZE));
45 45 }
46 46
47 47 // vim: set ts=4 sw=4:
... ...
... ... @@ -40,7 +40,8 @@ TR_serverBindUdp(
40 40 port, 0);
41 41
42 42 TR_serverAddEndpoint(
43   - this, TR_new(TR_DatagramEntryPoint, socket, proto, 2048));
  43 + this, TR_new(
  44 + TR_DatagramEntryPoint, socket, proto, CEP_DEFAULT_READ_SIZE));
44 45 }
45 46
46 47 // vim: set ts=4 sw=4:
... ...
... ... @@ -14,7 +14,8 @@ LIBS = $(TRLIBS) \
14 14 PROGRAMS = testserver2 \
15 15 testtcp \
16 16 testudp \
17   - testiterator
  17 + testiterator \
  18 + testset
18 19
19 20 all: $(PROGRAMS)
20 21
... ... @@ -27,11 +28,14 @@ testtcp: testclient.o
27 28 testudp: testclient.o
28 29 $(CC) $(LDFLAGS) -std=c99 $(LIBS) -o $@ $<
29 30
  31 +testudp.o: testclient.c
  32 + $(CC) $(CFLAGS) -DUDP=1 -std=c99 -c -o $@ $<
  33 +
30 34 testiterator: testiterator.o
31 35 $(CC) $(LDFLAGS) -std=c99 $(LIBS) -o $@ $<
32 36
33   -testudp.o: testclient.c
34   - $(CC) $(CFLAGS) -DUDP=1 -std=c99 -c -o $@ $<
  37 +testset: testset.o
  38 + $(CC) $(LDFLAGS) -std=c99 $(LIBS) -o $@ $<
35 39
36 40 %.o: %.c
37 41 $(CC) $(CFLAGS) -std=c99 -c -o $@ $<
... ...
  1 +The testclient throws a segfault when interruption on a waiting connect.
... ...
... ... @@ -34,11 +34,19 @@ main (int argc, char * argv[])
34 34 protocol = TR_new(TR_ProtocolRaw);
35 35 #if UDP
36 36 socket = TR_new(TR_UdpSocket, TR_logger, "127.0.0.1", 5678, 0);
37   - connection = TR_new(TR_DatagramService, socket, protocol, 2048);
  37 + connection = TR_new(
  38 + TR_DatagramService,
  39 + socket,
  40 + protocol,
  41 + CEP_DEFAULT_READ_SIZE);
38 42 TR_socketOpen((TR_Socket)socket);
39 43 #else
40 44 socket = TR_new(TR_TcpSocket, TR_logger, "127.0.0.1", 5678, 0);
41   - connection = TR_new(TR_Connection, socket, protocol, 2048);
  45 + connection = TR_new(
  46 + TR_Connection,
  47 + socket,
  48 + protocol,
  49 + CEP_DEFAULT_READ_SIZE);
42 50 TR_socketConnect((TR_Socket)socket);
43 51 #endif
44 52
... ... @@ -56,7 +64,7 @@ main (int argc, char * argv[])
56 64 message = (TR_ProtoMessageRaw)TR_simpleClientIssue(
57 65 client,
58 66 (TR_ProtoMessage)message,
59   - 1000);
  67 + 100000);
60 68
61 69 if (! message) break;
62 70 #if 0
... ...
No preview for this file type
  1 +#include <stdio.h>
  2 +
  3 +#include "trbase.h"
  4 +#include "trdata.h"
  5 +
  6 +int
  7 +main (int argc, char * argv[])
  8 +{
  9 + TR_Set set = TR_new(TR_Set);
  10 +
  11 + set->free_msgs = 0;
  12 +
  13 + TR_setAdd(set, "a");
  14 + TR_setAdd(set, "b");
  15 + TR_setAdd(set, "c");
  16 +
  17 + TR_iterableForeach(set) {
  18 + printf("%s\n", (char *)TR_iterableCurrent(set));
  19 + }
  20 +
  21 + TR_setDelete(set, "a");
  22 + TR_setDelete(set, "b");
  23 + TR_setAdd(set, "b");
  24 + TR_setAdd(set, "a");
  25 +
  26 + TR_iterableForeach(set) {
  27 + printf("%s\n", (char *)TR_iterableCurrent(set));
  28 + }
  29 +
  30 + TR_delete(set);
  31 + TR_cleanup();
  32 +
  33 + return 0;
  34 +}
  35 +
  36 +// vim: set ts=4 sw=4:
... ...
Please register or login to post a comment