Module osbot_utils.helpers.pubsub.PubSub__Server

Expand source code
import queue
from threading import Thread
from queue import Queue
from typing import Set, Dict

from osbot_utils.helpers.pubsub.Event__Queue                        import Event__Queue
from osbot_utils.helpers.pubsub.PubSub__Client                      import PubSub__Client
from osbot_utils.helpers.pubsub.PubSub__Room import PubSub__Room
from osbot_utils.helpers.pubsub.schemas.Schema__Event               import Schema__Event
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Connect      import Schema__Event__Connect
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Leave_Room   import Schema__Event__Leave_Room
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Disconnect   import Schema__Event__Disconnect
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Join_Room    import Schema__Event__Join_Room
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Message import Schema__Event__Message
from osbot_utils.testing.Logging                                    import Logging
from osbot_utils.utils.Dev                                          import pprint


class PubSub__Server(Event__Queue):
    #pubsub_db: PubSub__Sqlite
    clients          : Dict
    clients_connected: Set[PubSub__Client]
    rooms            : Dict[str, PubSub__Room]
    logging          : Logging

    def __init__ (self):
        super().__init__()

    # def db_table_clients(self):
    #     return self.pubsub_db.table_clients()     # todo refactor to class that uses this as a base and uses sqlite to capture connections

    def add_client(self, client: PubSub__Client):
        client_id = client.client_id
        if client_id:
            self.clients[client_id] = client
        return self

    def client_connect(self, client):
        self.clients_connected.add(client)

    def client_disconnect(self, client):
        self.clients_connected.discard(client)

    def client_join_room(self, client, event):
        room_name = event.room_name
        if room_name:
            self.room(room_name).clients.add(client)

    def client_message(self, client, event):
        pass

    def client_leave_room(self, client, event):
        room_name = event.room_name
        if room_name:
            self.room(room_name).clients.discard(client)

    def get_client(self, client_id):
        return self.clients.get(client_id)

    def handle_event(self, event: Schema__Event):
        event_type = type(event)
        client     = self.clients.get(event.connection_id)
        if client:
            if   event_type is Schema__Event__Connect    : self.client_connect   (client)
            elif event_type is Schema__Event__Disconnect : self.client_disconnect(client)
            elif event_type is Schema__Event__Join_Room  : self.client_join_room (client, event)
            elif event_type is Schema__Event__Leave_Room : self.client_leave_room(client, event)
            elif event_type is Schema__Event__Message    : self.client_message   (client, event)
            else:
                return False

        if self.log_events:
            self.events.append(event)
        return True

    def log(self, message):
        self.logging.debug(message)
        return self

    def new_client(self):
        client = PubSub__Client(event_queue = self)
        self.add_client(client)
        return client

    def room(self, room_name):
        if room_name not in self.rooms:
            new_room = PubSub__Room(room_name=room_name)
            self.rooms[room_name] = new_room

        return self.rooms.get(room_name)

Classes

class PubSub__Server

A mixin class to strictly assign keyword arguments to pre-defined instance attributes during initialization.

This base class provides an init method that assigns values from keyword arguments to instance attributes. If an attribute with the same name as a key from the kwargs is defined in the class, it will be set to the value from kwargs. If the key does not match any predefined attribute names, an exception is raised.

This behavior enforces strict control over the attributes of instances, ensuring that only predefined attributes can be set at the time of instantiation and avoids silent attribute creation which can lead to bugs in the code.

Usage

class MyConfigurableClass(Kwargs_To_Self): attribute1 = 'default_value' attribute2 = True attribute3 : str attribute4 : list attribute4 : int = 42

# Other methods can be added here

Correctly override default values by passing keyword arguments

instance = MyConfigurableClass(attribute1='new_value', attribute2=False)

This will raise an exception as 'attribute3' is not predefined

instance = MyConfigurableClass(attribute3='invalid_attribute')

this will also assign the default value to any variable that has a type defined. In the example above the default values (mapped by default__kwargs and locals) will be: attribute1 = 'default_value' attribute2 = True attribute3 = '' # default value of str attribute4 = [] # default value of list attribute4 = 42 # defined value in the class

Note

It is important that all attributes which may be set at instantiation are predefined in the class. Failure to do so will result in an exception being raised.

Methods

init(**kwargs): The initializer that handles the assignment of keyword arguments to instance attributes. It enforces strict attribute assignment rules, only allowing attributes that are already defined in the class to be set.

Initialize an instance of the derived class, strictly assigning provided keyword arguments to corresponding instance attributes.

Parameters

**kwargs: Variable length keyword arguments.

Raises

Exception
If a key from kwargs does not correspond to any attribute pre-defined in the class, an exception is raised to prevent setting an undefined attribute.
Expand source code
class PubSub__Server(Event__Queue):
    #pubsub_db: PubSub__Sqlite
    clients          : Dict
    clients_connected: Set[PubSub__Client]
    rooms            : Dict[str, PubSub__Room]
    logging          : Logging

    def __init__ (self):
        super().__init__()

    # def db_table_clients(self):
    #     return self.pubsub_db.table_clients()     # todo refactor to class that uses this as a base and uses sqlite to capture connections

    def add_client(self, client: PubSub__Client):
        client_id = client.client_id
        if client_id:
            self.clients[client_id] = client
        return self

    def client_connect(self, client):
        self.clients_connected.add(client)

    def client_disconnect(self, client):
        self.clients_connected.discard(client)

    def client_join_room(self, client, event):
        room_name = event.room_name
        if room_name:
            self.room(room_name).clients.add(client)

    def client_message(self, client, event):
        pass

    def client_leave_room(self, client, event):
        room_name = event.room_name
        if room_name:
            self.room(room_name).clients.discard(client)

    def get_client(self, client_id):
        return self.clients.get(client_id)

    def handle_event(self, event: Schema__Event):
        event_type = type(event)
        client     = self.clients.get(event.connection_id)
        if client:
            if   event_type is Schema__Event__Connect    : self.client_connect   (client)
            elif event_type is Schema__Event__Disconnect : self.client_disconnect(client)
            elif event_type is Schema__Event__Join_Room  : self.client_join_room (client, event)
            elif event_type is Schema__Event__Leave_Room : self.client_leave_room(client, event)
            elif event_type is Schema__Event__Message    : self.client_message   (client, event)
            else:
                return False

        if self.log_events:
            self.events.append(event)
        return True

    def log(self, message):
        self.logging.debug(message)
        return self

    def new_client(self):
        client = PubSub__Client(event_queue = self)
        self.add_client(client)
        return client

    def room(self, room_name):
        if room_name not in self.rooms:
            new_room = PubSub__Room(room_name=room_name)
            self.rooms[room_name] = new_room

        return self.rooms.get(room_name)

Ancestors

Class variables

var clients : Dict
var clients_connected : Set[PubSub__Client]
var loggingLogging
var rooms : Dict[str, PubSub__Room]

Methods

def add_client(self, client: PubSub__Client)
Expand source code
def add_client(self, client: PubSub__Client):
    client_id = client.client_id
    if client_id:
        self.clients[client_id] = client
    return self
def client_connect(self, client)
Expand source code
def client_connect(self, client):
    self.clients_connected.add(client)
def client_disconnect(self, client)
Expand source code
def client_disconnect(self, client):
    self.clients_connected.discard(client)
def client_join_room(self, client, event)
Expand source code
def client_join_room(self, client, event):
    room_name = event.room_name
    if room_name:
        self.room(room_name).clients.add(client)
def client_leave_room(self, client, event)
Expand source code
def client_leave_room(self, client, event):
    room_name = event.room_name
    if room_name:
        self.room(room_name).clients.discard(client)
def client_message(self, client, event)
Expand source code
def client_message(self, client, event):
    pass
def get_client(self, client_id)
Expand source code
def get_client(self, client_id):
    return self.clients.get(client_id)
def handle_event(self, event: Schema__Event)
Expand source code
def handle_event(self, event: Schema__Event):
    event_type = type(event)
    client     = self.clients.get(event.connection_id)
    if client:
        if   event_type is Schema__Event__Connect    : self.client_connect   (client)
        elif event_type is Schema__Event__Disconnect : self.client_disconnect(client)
        elif event_type is Schema__Event__Join_Room  : self.client_join_room (client, event)
        elif event_type is Schema__Event__Leave_Room : self.client_leave_room(client, event)
        elif event_type is Schema__Event__Message    : self.client_message   (client, event)
        else:
            return False

    if self.log_events:
        self.events.append(event)
    return True
def log(self, message)
Expand source code
def log(self, message):
    self.logging.debug(message)
    return self
def new_client(self)
Expand source code
def new_client(self):
    client = PubSub__Client(event_queue = self)
    self.add_client(client)
    return client
def room(self, room_name)
Expand source code
def room(self, room_name):
    if room_name not in self.rooms:
        new_room = PubSub__Room(room_name=room_name)
        self.rooms[room_name] = new_room

    return self.rooms.get(room_name)

Inherited members