Manager.py 5.61 KB
"""
Manage Communication Events.

The events handled here are:
    new_con:    

@author Georg Hopp <ghopp@spamtitan.com>
""" 
import threading

from EndPoint import CommunicationEndPoint as EndPoint
from Connection import Connection

from Event.EventHandler import EventHandler
from Event.EventDispatcher import EventDispatcher as Dispatcher

class CommunicationManager(EventHandler):
    def __init__(self):
        super(CommunicationManager, self).__init__()

        self._cons      = {}
        self._listen    = {}
        self._wcons     = []
        self._rcons     = []
        self._ready     = ([],[],[])
        self._cons_lock = threading.Lock()

        self._event_methods = {
            Dispatcher.eventId('data_wait')    : self._select,
            Dispatcher.eventId('shutdown')     : self._shutdown,
            Connection.eventId('new_con')      : self._addCon,
            Connection.eventId('pending_data') : self._enableWrite,
            Connection.eventId('end_data')     : self._disableWrite,
            EndPoint.eventId('close')          : self._close,
            EndPoint.eventId('shutdown_read')  : self._shutdownRead,
            EndPoint.eventId('shutdown_write') : self._shutdownWrite
        }

    def addEndPoint(self, end_point):
        handle = end_point.getHandle()
        self._cons_lock.acquire()
        if handle not in self._listen and handle not in self._cons:
            if end_point.getTransport().isListen():
                self._listen[handle] = end_point
            else:
                self._cons[handle] = end_point
            self._rcons.append(handle)
        self._cons_lock.release()

    def _addCon(self, event):
        self.addEndPoint(event.subject)
        return True

    def _enableWrite(self, event):
        handle    = event.subject.getHandle()
        fin_state = event.subject.getTransport()._fin_state

        if handle not in self._wcons and 0 == fin_state & 2:
            self._wcons.append(handle)
        return True

    def _disableWrite(self, event):
        handle    = event.subject.getHandle()
        fin_state = event.subject.getTransport()._fin_state

        if handle in self._wcons:
            self._wcons.remove(handle)

        if 1 == fin_state & 1:
            self.issueEvent(event.subject, 'shutdown_write')
        return True
        
    def _select(self, event):
        import select

        try:
            timeout = event.data
            if timeout is None:
                timeout = event.subject.getDataWaitTime()

            self._cons_lock.acquire()
            if timeout < 0.0:
                self._ready = select.select(self._rcons, self._wcons, [])
            else:
                self._ready = select.select(self._rcons, self._wcons, [], timeout)
            self._cons_lock.release()
        except select.error:
            self._cons_lock.release()
            pass


        for handle in self._ready[0]:
            if handle in self._listen:
                self.issueEvent(self._listen[handle], 'acc_ready')
            if handle in self._cons:
                self.issueEvent(self._cons[handle], 'read_ready')

        for handle in self._ready[1]:
            if handle in self._cons:
                self.issueEvent(self._cons[handle], 'write_ready')

        return True

    def _shutdown(self, event):
        for handle in self._listen:
            self.issueEvent(self._listen[handle], 'close')

        for handle in self._cons:
            self.issueEvent(self._cons[handle], 'close')

        self._rcons = self._wcons = []

        return False

    """
    shutdown and close events...these are handled here because the communication
    end points need to be remove for the according lists here. So this is the 
    highest abstraction level that needs to react on this event.
    """
    def _shutdownRead(self, event):
        handle = event.subject.getHandle()
        if handle in self._rcons:
            self._rcons.remove(handle)

        if 3 == event.subject.getTransport().shutdownRead():
            """
            close in any case
            """
            self.issueEvent(event.subject, 'close')
        elif not event.subject.hasPendingData():
            """
            If there is pending data we will handle a disable_write later on.
            There this event will be fired. In that case.
            """
            self.issueEvent(event.subject, 'shutdown_write')
        else:
            """
            Flag this endpoint as subject to close when there is nothing more
            to do with it. After this is set all pending IO may finish and then
            a close event should be issued
            """
            event.subject.setClose()
        return False

    def _shutdownWrite(self, event):
        handle = event.subject.getHandle()
        if handle in self._wcons:
            self._wcons.remove(handle)

        if 3 == event.subject.getTransport().shutdownWrite():
            self.issueEvent(event.subject, 'close')
        # a read will be done anyway so no special handling here.
        # As long as the socket is ready for reading we will read from it.
        return False

    def _close(self, event):
        self._cons_lock.acquire()
        event.subject.getTransport().shutdown()

        handle = event.subject.getHandle()
        if handle in self._rcons:
            self._rcons.remove(handle)
        if handle in self._wcons:
            self._wcons.remove(handle)

        if handle in self._listen:
            del(self._listen[handle])
        else:
            del(self._cons[handle])

        event.subject.getTransport().close()
        self._cons_lock.release()
        return False

# vim: set ft=python et ts=8 sw=4 sts=4: