Manager.py
5.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""
Manage Communication Events.
The events handled here are:
new_con:
@author Georg Hopp <ghopp@spamtitan.com>
"""
import threading
from EndPoint import CommunicationEndPoint as EndPoint
from Connection import Connection
from Event.EventHandler import EventHandler
from Event.EventDispatcher import EventDispatcher as Dispatcher
class CommunicationManager(EventHandler):
def __init__(self):
super(CommunicationManager, self).__init__()
self._cons = {}
self._listen = {}
self._wcons = []
self._rcons = []
self._ready = ([],[],[])
self._cons_lock = threading.Lock()
self._event_methods = {
Dispatcher.eventId('data_wait') : self._select,
Dispatcher.eventId('shutdown') : self._shutdown,
Connection.eventId('new_con') : self._addCon,
Connection.eventId('pending_data') : self._enableWrite,
Connection.eventId('end_data') : self._disableWrite,
EndPoint.eventId('close') : self._close,
EndPoint.eventId('shutdown_read') : self._shutdownRead,
EndPoint.eventId('shutdown_write') : self._shutdownWrite
}
def addEndPoint(self, end_point):
handle = end_point.getHandle()
self._cons_lock.acquire()
if handle not in self._listen and handle not in self._cons:
if end_point.getTransport().isListen():
self._listen[handle] = end_point
else:
self._cons[handle] = end_point
self._rcons.append(handle)
self._cons_lock.release()
def _addCon(self, event):
self.addEndPoint(event.subject)
return True
def _enableWrite(self, event):
handle = event.subject.getHandle()
fin_state = event.subject.getTransport()._fin_state
if handle not in self._wcons and 0 == fin_state & 2:
self._wcons.append(handle)
return True
def _disableWrite(self, event):
handle = event.subject.getHandle()
fin_state = event.subject.getTransport()._fin_state
if handle in self._wcons:
self._wcons.remove(handle)
if 1 == fin_state & 1:
self.issueEvent(event.subject, 'shutdown_write')
return True
def _select(self, event):
import select
try:
timeout = event.data
if timeout is None:
timeout = event.subject.getDataWaitTime()
self._cons_lock.acquire()
if timeout < 0.0:
self._ready = select.select(self._rcons, self._wcons, [])
else:
self._ready = select.select(self._rcons, self._wcons, [], timeout)
self._cons_lock.release()
except select.error:
self._cons_lock.release()
pass
for handle in self._ready[0]:
if handle in self._listen:
self.issueEvent(self._listen[handle], 'acc_ready')
if handle in self._cons:
self.issueEvent(self._cons[handle], 'read_ready')
for handle in self._ready[1]:
if handle in self._cons:
self.issueEvent(self._cons[handle], 'write_ready')
return True
def _shutdown(self, event):
for handle in self._listen:
self.issueEvent(self._listen[handle], 'close')
for handle in self._cons:
self.issueEvent(self._cons[handle], 'close')
self._rcons = self._wcons = []
return False
"""
shutdown and close events...these are handled here because the communication
end points need to be remove for the according lists here. So this is the
highest abstraction level that needs to react on this event.
"""
def _shutdownRead(self, event):
handle = event.subject.getHandle()
if handle in self._rcons:
self._rcons.remove(handle)
if 3 == event.subject.getTransport().shutdownRead():
"""
close in any case
"""
self.issueEvent(event.subject, 'close')
elif not event.subject.hasPendingData():
"""
If there is pending data we will handle a disable_write later on.
There this event will be fired. In that case.
"""
self.issueEvent(event.subject, 'shutdown_write')
else:
"""
Flag this endpoint as subject to close when there is nothing more
to do with it. After this is set all pending IO may finish and then
a close event should be issued
"""
event.subject.setClose()
return False
def _shutdownWrite(self, event):
handle = event.subject.getHandle()
if handle in self._wcons:
self._wcons.remove(handle)
if 3 == event.subject.getTransport().shutdownWrite():
self.issueEvent(event.subject, 'close')
# a read will be done anyway so no special handling here.
# As long as the socket is ready for reading we will read from it.
return False
def _close(self, event):
self._cons_lock.acquire()
event.subject.getTransport().shutdown()
handle = event.subject.getHandle()
if handle in self._rcons:
self._rcons.remove(handle)
if handle in self._wcons:
self._wcons.remove(handle)
if handle in self._listen:
del(self._listen[handle])
else:
del(self._cons[handle])
event.subject.getTransport().close()
self._cons_lock.release()
return False
# vim: set ft=python et ts=8 sw=4 sts=4: