Source code for socktools.base_sock

"""
Python Sock tools: base_sock.py - implementation of the base socket classes
Copyright (C) 2016 GarethNelson

This file is part of python-sock-tools

python-sock-tools is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.

python-sock-tools is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with python-sock-tools.  If not, see <http://www.gnu.org/licenses/>.

This module provides the base classes used in the other modules, you should probably ignore it unless you plan to implement a new socket type.

"""
import eventlet
eventlet.monkey_patch()

import socket
import traceback
import time
import logging

[docs]class DummySocket(socket.socket): """ A socket object that does nothing """ def __init__(self,sock_family=socket.AF_INET,sock_type=socket.SOCK_DGRAM,proto=0): """ Hackish constructor for the dummy socket Accepts same params as socket.socket but does nothing with them In case you're curious though, the default arguments are (socket.AF_INET,socket.SOCK_DGRAM,0) Keyword args: sock_family (int): not used at all, seriously! sock_type (int): also not used proto (int): spotting a pattern yet? Note: You should NOT inherit from this class normally, instead override the create_socket() method of BaseSock and return a socket.socket """ pass
[docs] def recvfrom(self,buflen=8192,flags=0): """ Dummy implementation of socket.recvfrom() This method simply returns a null-length string coming from address ('127.0.0.1',1337) """ return ('',('127.0.0.1',1337))
[docs]class BaseSock(object): """The base class from which specific socket classes inherit This class is the base socket class which handles queuing of messages and the main event loop. It should not be used directly but rather subclassed or one of the existing child classes used. Keyword Args: bind (tuple): if not None, specifies a TCP/IP endpoint as (ip,port) to bind to and listen on connect (tuple): if not None, specifies a TCP/IP endpoint as (ip,port) to connect to handlers (dict): maps message types to list of message handler functions timeout (int): time in seconds before a peer is considered to have timed out tick_interval (int): time in seconds to wait between ticks sock (socket.socket): if not None, specifies a socket object to be used - should be used only for testing meta_sock (meta_sock.MetaSock): if not None, specifies the meta socket this socket belongs to, see meta_sock.py logger (logging.Logger): a logger object no_async (bool): if set True, no attempts will be made to go async except for sending messages Attributes: known_peers (dict): all currently connected peers have an entry in this dict, the contents of the dict are another dict with metadata pool (eventlet.GreenPool): all greenlets spawned by the socket belong to this pool, with a default concurrency of 1000 parse_q (eventlet.queue.LightQueue): raw packet data is added to this queue before being parsed, each item is a tuple of (data,addr) in_q (eventlet.queue.LightQueue): parsed messages are added to this queue after being parsed timeout (int): the timeout interval in seconds - peers must talk to us on a regular basis to avoid being timed out tick_interval (int): the tick interval - what exactly a tick does is up to the application handlers (dict): maps message types to list of message handler functions sock: arbitrary object representing the physical socket, defaults to an instance of DummySocket meta_sock: see meta_sock.py logger (logging.logger): the logger to use for this socket active (bool): indicates whether this socket is active and working """ def __init__(self,bind=None,connect=None,handlers={},timeout=10,tick_interval=0.25,sock=None,meta_sock=None,logger=None,no_async=False): self.known_peers = {} self.logger = logger self.pool = eventlet.GreenPool(1000) if not (sock is None): self.sock = sock else: self.sock = self.create_socket() self.handlers = self.get_default_handlers() self.handlers.update(handlers) self.parse_q = eventlet.queue.LightQueue(100) # raw packets ready to be parsed self.in_q = eventlet.queue.LightQueue(100) # parsed packets ready to be handled self.timeout = timeout self.tick_interval = tick_interval self.meta_sock = meta_sock if not (bind is None): self.sock.bind(bind) if not (connect is None): self.connect_to(connect) self.child_setup() self.active = True self.no_async = no_async if not no_async: for x in xrange(10): self.pool.spawn_n(self.parser_thread) for x in xrange(10): self.pool.spawn_n(self.handler_thread) self.pool.spawn_n(self.recv_thread) self.pool.spawn_n(self.timeout_thread)
[docs] def setup_logger(self,logger): """Setup a logger object to use instead of stdout Often it makes sense to log stuff to a logger object instead of dumping it to stdout using print. This is especially useful for daemon processes where you may otherwise not see the output at all. Args: logger (logging.logger): The logger object to use """ self.logger = logger
[docs] def child_setup(self): """Convenient setup hook for child classes To avoid the need to rewrite __init__ when inheriting from this class, __init__ calls this method before spawning threads and going active. Without this hook, any child class that adds extra params to __init__ would need to have a copy of the implementation as it is not possible to modify certain things after threads are spawned. See msgtype_mixin.py for a good example of how to use this. In the default implementation it does nothing. """ pass
[docs] def get_default_handlers(self): """Get a default handlers dict When inheriting from this class you should setup default handlers needed by the protocol by overriding this method. Additionally, children should always call the parent for this method. Returns: dict: a dict mapping message types to handlers, can be 0-length but must be a dict """ return {}
[docs] def timeout_thread(self): """Used internally - removes peers from the known_peers dict if they haven't sent us a packet within the last timeout interval Warning: This method must not be called from anywhere except inside the class, and only one instance should run at a time """ while self.active: if len(self.known_peers)==0: eventlet.greenthread.sleep(0) else: eventlet.greenthread.sleep(self.timeout / len(self.known_peers)) cur_time = time.time() for k,v in self.known_peers.items(): if not v.has_key('last'): v['last'] = time.time() if (cur_time - v['last']) >= self.timeout: self.log_debug('%s:%s timed out' % k) self.close_peer(k)
[docs] def tick(self): """Perform application-specific tick The default implementation does absolutely nothing, you should override this if you want tick functionality While debugging it may be helpful to insert a sleep in here to see what happens when you miss ticks Args: diff (int): the recorded time in seconds the last tick took to run, 0 on the first iteration """ pass
[docs] def tick_thread(self): """Used internally to call the tick() method in a loop with accurate timing Many network protocols, 3D gaming ones in particular, require a fixed frequency "tick", that's what is implemented here. This thread runs tick() in a loop and measures the time it took, adjusting the delay to compensate Note: This code makes no guarantees at all that the specified tick interval is actually achievable - if you get missed ticks, optimise your tick() to be more async """ diff = 0 while self.active: start_time = time.time() self.tick(diff) end_time = time.time() diff = end_time-start_time if diff >= self.tick_interval: eventlet.greenthread.sleep(self.tick_interval - diff) else: eventlet.greenthread.sleep(0)
[docs] def close_peer(self,peer): """Removes a peer from the known_peers list and does any required cleanup Args: peer (tuple): the peer to close as a TCP/IP endpoint tuple, i.e (ip,port) Notes: The default implementation simply removes the peer from the known_peers list, any child class should do the same when overriding this method """ if self.known_peers.has_key(peer): del self.known_peers[peer]
[docs] def recv_thread(self): """Used internally - reads from the socket and puts the raw packets into parse_q Warning: This method absolutely must not be called from anywhere except inside the class or bad things will happen """ while self.active: eventlet.greenthread.sleep(0) data,addr = None,None while (data is None) and self.active: eventlet.greenthread.sleep(0) try: data,addr = self.read_raw() data = self.decode_msg(data) # decode it if needed except Exception,e: self.log_error('Error reading from socket',exc=e) if not (data is None): if (len(data)>0) and self.good_peer(addr): if self.known_peers.has_key(addr): self.known_peers[addr]['last'] = time.time() self.log_debug('Got raw data: %s' % str(data)) if self.no_async: try: msg_type,msg_data = self.parse_msg(data) except Exception,e: self.log_error('Error parsing packet from %s:%s' % addr,exc=e) addr,msg_type,msg_data = self.handle_all(addr,msg_type,msg_data) self.run_handler(addr,msg_type,msg_data) else: self.parse_q.put((addr,data))
[docs] def run_handler(self,addr=None,msg_type=None,msg_data=None): eventlet.greenthread.sleep(0) while ((msg_data is None) and self.active): eventlet.greenthread.sleep(0) if not self.no_async: addr,msg_type,msg_data = self.in_q.get() if not (self.meta_sock is None): self.meta_sock.add_msg(self,addr,msg_type,msg_data) if self.meta_sock.override_mode: continue if self.handlers.has_key(msg_type): for handler in self.handlers[msg_type]: if self.no_async: self.handler_wrapper(handler,addr,msg_type,msg_data) else: self.pool.spawn_n(self.handler_wrapper,handler,addr,msg_type,msg_data)
[docs] def handler_thread(self): """Used internally - reads from in_q and passes to the appropriate handler Note: This method should only be run from inside the class and inside a greenthread """ while self.active: self.run_handler()
[docs] def handler_wrapper(self,handler,addr,msg_type,msg_data): """Invokes the specified handler while catching exceptions This method invokes a handler function while catching and logging any exceptions. If an exception is raised by the handler, BaseSock.log_error() is called to notify the end user Args: handler (function): a function accepting params (addr,msg_type,msg_data) addr (tuple): TCP/IP endpoint for the peer that originated the message msg_type: the message type - this depends on the application but usually an int msg_data: the message data - this depends on the application but usually a tuple or dict """ try: handler(addr,msg_type,msg_data) except Exception,e: self.log_error('Handler for message type %s failed' % msg_type,exc=e)
[docs] def log_debug(self,msg): """Logs debug info - if debug mode is off, this method should do nothing The default implementation prints the message to stdout Args: msg (str): message to log """ print msg if not (self.logger is None): self.logger.debug(msg)
[docs] def log_error(self,msg,exc=None): """Log an error with exception data This method logs errors and should be overridden to use whatever logging mechanism is appropriate in the end user application. The default implementation simply prints the message and exception data to stdout Args: msg (str): error message Keyword args exc (Exception): a python exception related to the error """ if exc is None: print 'Error: %s' % msg else: print 'Error: %s, Exception: %s' % (msg,traceback.format_exc(exc)) if not (self.logger is None): self.logger.error(msg) self.logger.error(traceback.format_exc(exc))
[docs] def parser_thread(self): """Used internally - reads from parse_q and puts parsed messages into in_q Note: This method should only be run from inside the class and inside a greenthread """ while self.active: addr,data,msg_type,msg_data = None,None,None,None eventlet.greenthread.sleep(0) addr,data = self.parse_q.get() try: msg_type,msg_data = self.parse_msg(data) except Exception,e: self.log_error('Error parsing packet from %s:%s' % addr,exc=e) addr,msg_type,msg_data = self.handle_all(addr,msg_type,msg_data) self.log_debug('Putting parsed message type %s from %s:%s on queue: %s' % (str(msg_type),str(addr[0]),str(addr[1]),str(msg_data))) self.in_q.put((addr,msg_type,msg_data))
[docs] def add_handler(self,msg_type,handler,exclusive=False): """Add a handler for the specified message type This method adds message handlers to the socket after it has been setup, allowing dynamic handlers. It is preferable to use static handlers whenever possible Args: msg_type: the message type the handler is for, this depends on application but is usually an int handler: the handler function to add, must accept params (addr,msg_type,msg_data) Keyword args: exclusive (bool): if True, deletes all previously set handlers for this message type """ if not self.handlers.has_key(msg_type): self.handlers[msg_type] = [] if exclusive: self.handlers[msg_type] = [] self.handlers[msg_type].append(handler)
[docs] def parse_msg(self,data): """Parse a raw message into a format usable by the application This method should be overridden by the application as appropriate and handles message parsing from raw packets. Args: data (str): the raw packet to be parsed, this should be one whole message Returns: tuple: a tuple of (msg_type,msg_data) - the specific types of msg_type and msg_data depend on the application Note: If not overridden, by default this method returns a message of type 0 and the raw data as msg_data. Please also note that this method should NOT implement anything beyond parsing, see good_peer() and handle_all() for higher level logic. """ return (0,str(data))
[docs] def good_peer(self,peer): """Check if the specified peer is one we want to talk to If this method returns False for a peer as it sends us a message, that message will be dropped and not parsed. The default implementation returns True for every peer Args: peer (tuple): the TCP/IP endpoint of the peer as an (ip,port) tuple Returns: bool: True if the peer is good, otherwise False """ return True
[docs] def handle_all(self,from_addr,msg_type,msg_data): """Called before doing anything with a decoded/parsed packet before all other handlers This can be used as a hook to implement whatever you want with decoded packets - stuff like encryption and compression though belongs in parse_msg(). After being parsed all messages pass through this method and any transformations required can be applied. Most applications will not need this and the default implementation (which is a simple identity function) will work fine. One application where this may be of use is to send "unknown peer" type messages if the address is not in the known peers dict. Args: from_addr (tuple): The remote peer's TCP/IP endpoint as an (ip,port) tuple msg_type: The message type, what this is depends on the application but is usually an int msg_data: The message data, what this is depends on the application Returns: tuple: (from_addr,msg_type,msg_data) - the message, modified or not, by default this is identical to the params unless overridden """ return (from_addr,msg_type,msg_data)
[docs] def read_raw(self): """Read a single raw packet from the socket or sockets If the underlying physical socket is UDP, this method should simply read from it and return as quickly as possible. If the underlying physical socket is a TCP socket connected to one single peer, this method should read from it and return as quickly as possible. If the underlying physical socket is a TCP server, this method should return a raw packet from the next available client. In all cases, this method should return a raw packet without parsing of any kind beyond size checking. If no data is available, this method should block using eventlet.greenthread.sleep(0) or equivalent until data is available. Note: The default implementation together with DummySocket always returns a null-length string from localhost:1337 Using the default implementation with another physical socket type probably won't work Warning: this method must NOT block except via eventlet, failure to heed this warning will result in crippled performance Returns: tuple: (data,from_addr) - data should be a string or byte buffer, from_addr should be a TCP/IP endpoint tuple in the form (ip,port) """ return self.sock.recvfrom(8192)
[docs] def serialise_msg(self,msg_type,msg_data): """Serialise a single message This method is where you should implement your message serialisation code for custom protocols. In the default implementation this function just returns str(msg_data) Args: msg_type (int): the message type msg_data: a python object to be serialised into a message Returns: str: the serialised message """ return str(msg_data)
[docs] def encode_msg(self,data): """Encode a serialised message This method is where you should implement any form of encoding required by the physical socket including things such as prefixing a length for TCP sockets. You may also handle compression,encryption etc here. In the default implementation this is an identity function. Args: data (str): the message serialised as a string Returns: str: the encoded message """ return data
[docs] def decode_msg(self,data): """Used internally - decodes raw messages If encode_msg() is implemented, this should do the inverse Args: data (str): the message serialised as a string and still encoded Returns: str: the decoded but still serialised message """ return data
[docs] def send_msg(self,msg_type,msg_data,to_peer=None): """Encode and then send a single message to the specified peer This method encodes and then sends a single message to the specified peer, or optionally to all connected peers. Args: msg_type (int): the type of message to send msg_data: format depends on the application, usually a tuple, list or dict Note: This method relies upon the serialise_msg() method and the encode_msg() method - please ensure these methods are defined appropriately Keyword args: to_peer (tuple): the TCP/IP endpoint as a (host,ip) tuple to transmit to, if this is set to None then all peers will get the message """ serialised_msg = self.serialise_msg(msg_type,msg_data) encoded_msg = self.encode_msg(serialised_msg) self.send_raw(encoded_msg,to_peer=to_peer)
[docs] def send_raw(self,data,to_peer=None): """Send a single raw packet from the socket to the specified peer This method sends a single raw packet (already encoded as appropriate) to the specified peer, or optionally to all connected peers. The whole packet must be transmitted before this method returns and it must block if required via eventlet.greenthread.sleep(0) or equivalent Args: data(str): the raw data to send, this must already be encoded and ready for transmission on the physical socket - including length if required Keyword args: to_peer(tuple): the TCP/IP endpoint as a (host,ip) tuple to transmit to, if this is set to None then all peers will be sent the packet Warnings: This method must NOT block except via eventlet, failure to heed this warning will result in crippled performance. Further, it is vitally important to wrap the actual transmission in an appropriate try/catch block when broadcasting so that a failure to transmit to one client does not result in failing to transmit to others """ if to_peer is None: # broadcast for k,v in self.known_peers.items(): try: self.sock.sendto(data,k) except Exception,e: self.log_error('Error transmitting to %s:%s' % k,exc=e) else: self.sock.sendto(data,to_peer)
[docs] def create_socket(self): """Create the physical socket object To be of any practical use, this should be overridden Returns: socket.socket: The physical socket object, in the default implementation this is a DummySocket instance """ return DummySocket()
[docs] def connect_to(self,endpoint): """Connect to a specified endpoint This method should be overridden and used to implement any authentication etc before adding the specified endpoint to the known peers list In the default implementation nothing is done here except adding the peer to the known peers list Arg: endpoint (tuple): the TCP/IP endpoint as (ip,port) """ self.known_peers[endpoint] = {}