
"""Select channels

This module contains general purpose, non-abstract select channel
objects to use with SelectManager.SelectManager. Basically, everything
that can be made into a file (in the Unix sense of the word) can be a
select channel. """

import SelectManager
import os
import string
from thread import allocate_lock

class FileChannel( SelectManager.BufferedChannel ):

    """ Buffered channel for arbitrary files

    This class can be used to form a buffered channel from any file
    descriptor. Because one should not mix the low-level system calls
    and the (somewhat higher-level) python object calls, you should
    probably form the instance thus:

    FileChannel( os.dup( myfile.fileno() ), myhandler ) """

    def __init__( s, fd, handler ):
        s.fd = fd
        SelectManager.BufferedChannel.__init__( s, handler )

    def fileno( s ):
        return s.fd

    def make_receive( s ):
        return os.read( s.fd, s.max_buffersize - len( s.rbuf ))

    def make_send( s ):
        if not s.wbuf: return
        sent = os.write( s.fd, s.wbuf )
        s.wbuf = s.wbuf[sent:]

    def close( s ):
        os.close( s.fd )

def ChanneledPipe( rhandler, whandler ):

    """ Channel-wrappers for pipes

    Given a read handler and a write handler, this function returns a
    tuple of channel objects: ( <readable channel>, <writeable channel> ).
    Everything written to the writeable channel will be available for
    read in the readable channel. Good for inter-thread communications. """
    
    return map( FileChannel, os.pipe(), ( rhandler, whandler ))

class DescentRecurseChannel( SelectManager.BufferedChannel ):

    """ DescentRecurseChannel - channel for dr-parseable protocols

    This class implements parsing of message boundaries for protocols where
    for every message there are open and close markers, and these can be
    nested. """

    openblock = '{'
    closeblock = '}'

    def __init__( s, handler ):
        s.recurselevel = 0
        s.scanpoint = 0
        BufferedChannel.__init__( s, handler )

    def collect( s ):
        while 1:
            op = string.find( s.rbuf, s.openblock, s.scanpoint + 1 )
            cl = string.find( s.rbuf, s.closeblock, s.scanpoint + 1 )
            if op < 0 and cl < 0:
                if len( s.rbuf ) == s.max_buffersize: s.handle_buffer_over()
                return
            if op < 0 or cl <= op:
                s.recurselevel = s.recurselevel - 1
                s.scanpoint = cl
                if s.recurselevel <= 0: break
            else:
                if s.recurselevel == 0:
                    s.rbuf = s.rbuf[op:]
                    s.scanpoint = 0
                else: s.scanpoint = op
                s.recurselevel = s.recurselevel + 1

        data = s.rbuf[:s.scanpoint + 1]
        s.rbuf = s.rbuf[s.scanpoint + 1:]
        s.handler.receive( data )

    def handle_buffer_over( s ):
        log( 4, "Over %d bytes w/o terminator, discarding" % s.max_buffersize )
        s.rbuf = ''
        s.scanpoint = 0
        s.recurselevel = 0

class SelectableCondition( SelectManager.SelectChannel ):

    """ A selectable version of threading.Condition

    This class implements a selectable message queue, which is suited
    for inter-thread communications. It is most appropriate for
    one-thread-producer-one-thread-consumer situations, but can be used
    with multiple threads as producers or consumers, too.

    The locking mechanism of threading.Condition is not supported.  As
    far as I understand, it was only needed to ensure the integrity of
    the Condition; but now, atomicity is enforced by system calls.

    Don't mix Condition-type calls (notify* and wait) with queue-type
    calls (*_message). """

    max_msg_size = 4096
    terminator = "]/["

    def __init__( s, server ):
        s.rfd, s.wfd = os.pipe()
        s.server = server
        server.add_channel( s )
        s.msgs = []
        s.qlock = allocate_lock()

    def fileno( s ):
        return s.rfd

    def can_handle_read( s ):
        return 1

    def handle_read_event( s, server ):
        s.qlock.acquire()
        map( s.msgs.append, string.split( os.read( s.rfd, s.max_msg_size ), \
                                          s.terminator ))
        s.qlock.release()

    def send_message( s, msg ):

        """ Put a message on the queue. """
        
        os.write( s.wfd, msg + s.terminator )

    def get_message( s ):

        """ Get a message from the queue.

        Does not block. If there are no messages, None is returned. """
        
        s.qlock.acquire()
        while 1:
            if not s.msgs:
                rs = None
                break
            rs = s.msgs[0]
            del s.msgs[0]
            if rs: break
        s.qlock.release()
        return rs

    def wait_message( s ):

        """ Wait for a message to become available on the queue.

        Other channels on the server can process data while the message
        is waited for. (This is the point of the whole class.) """

        while 1:
            rs = s.get_message()
            if rs != None: return rs
            s.server.once()

    def acquire( s ):

        """ For conformance with threading.Condition """
        
        pass

    def release( s ):

        """ For conformance with threading.Condition """
        
        pass

    def wait( s, to = None ):

        """ Wait for notification.

        For conformance with threading.Condition. """
        
        m = s.wait_message()
        if m == "@": s.send_message( m )

    def notify( s ):

        """ Wake up one thread in wait() """
        
        s.send_message( "!" )

    def notifyAll( s ):

        """ Wake up all threads in wait() """
        
        s.send_message( "@" )

