EventDispatcher.py
4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
Dispatch Events to registered handlers.
Author: Georg Hopp <ghopp@spamtitan.com>
"""
import sys
import time
import threading
from collections import deque
from Event import Event
from EventHandler import EventHandler
from EventSubject import EventSubject
class DefaultHandler(EventHandler):
def handleEvent(self, event):
return True
SERVER = 0x00
CLIENT = 0x01
class EventDispatcher(EventSubject):
_EVENTS = {
'heartbeat' : 0x01,
'user_wait' : 0x02,
'data_wait' : 0x03,
'shutdown' : 0x04
}
def __init__(self, mode = SERVER, default_handler = DefaultHandler()):
super(EventDispatcher, self).__init__()
self._events = deque([])
self._handler = {}
self._default_handler = default_handler
self._running = False
self._heartbeat = 0.0
self._nextbeat = 0.0
self._mode = mode
self._queue_lock = threading.Lock()
self._event_wait = threading.Condition()
self._data_wait_id = EventDispatcher.eventId('data_wait')
self._user_wait_id = EventDispatcher.eventId('user_wait')
def registerHandler(self, handler):
for eid in handler.getHandledIds():
if eid in self._handler:
self._handler[eid].append(handler)
else:
self._handler[eid] = [handler]
handler.setDispatcher(self)
def setHeartbeat(self, heartbeat):
self._heartbeat = heartbeat
if self._heartbeat:
self._nextbeat = time.time() + self._heartbeat
else:
self._nextbeat = 0.0
def getBeattime(self):
return self._nextbeat - time.time()
def getDataWaitTime(self):
if self._mode == SERVER:
return self.getBeattime()
# here comes a timeout into play.... currently I expect
# the stuff to work...
# TODO add timeout
return 0.0
def queueEvent(self, event):
self._queue_lock.acquire()
self._events.append(event)
self._queue_lock.release()
self._event_wait.acquire()
self._event_wait.notify_all()
self._event_wait.release()
def start(self, name):
self._running = True
while self._running or self._events:
now = time.time()
if self._nextbeat and self._nextbeat <= now:
self._nextbeat += self._heartbeat
self.queueEvent(self.emit('heartbeat'))
current = None
if not self._events:
if not name:
if self._mode == CLIENT:
current = self.emit('user_wait')
else:
current = self.emit('data_wait')
else:
self._event_wait.acquire()
self._event_wait.wait()
self._event_wait.release()
self._queue_lock.acquire()
if (not current) and self._events:
current = self._events.popleft()
self._queue_lock.release()
if current:
if current.type not in self._handler:
#print '[%s] handle: %s(%d) on %s: %s' % (
# name, current.name, current.sno, hex(id(current.subject)), 'default')
self._default_handler.handleEvent(current)
else:
for handler in self._handler[current.type]:
#print '[%s] handle: %s(%d) on %s: %s' % (
# name, current.name, current.sno, hex(id(current.subject)),
# handler.__class__.__name__)
if handler.handleEvent(current):
break
# if we leave the loop eventually inform all other threads
# so they can quit too.
self._event_wait.acquire()
self._event_wait.notify_all()
self._event_wait.release()
def stop(self):
self._running = False
def shutdown(self):
self.queueEvent(self.emit('shutdown'))
self.stop()
# vim: set ft=python et ts=8 sw=4 sts=4: