Module osbot_utils.helpers.pubsub.PubSub__Client

Expand source code
from queue import Queue
from typing import List

from osbot_utils.base_classes.Kwargs_To_Self import Kwargs_To_Self
from osbot_utils.helpers.pubsub.Event__Queue import Event__Queue
from osbot_utils.helpers.pubsub.schemas.Schema__Event import Schema__Event
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Connect import Schema__Event__Connect
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Disconnect import Schema__Event__Disconnect
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Join_Room import Schema__Event__Join_Room
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Leave_Room import Schema__Event__Leave_Room
from osbot_utils.utils.Misc import random_guid


class PubSub__Client(Kwargs_To_Self):
    event_queue       : Event__Queue
    client_id         : str
    received_messages : List[str]           # todo: fix this to be Events/Messages received via event_queue

    def __init__(self, **kwargs):
        self.client_id = kwargs.get('client_id') or random_guid()
        super().__init__(**kwargs)

    def connect(self):
        event_connect = Schema__Event__Connect(connection_id=self.client_id)
        self.send_event(event_connect)
        return self

    def disconnect(self):
        event_connect = Schema__Event__Disconnect(connection_id=self.client_id)
        self.send_event(event_connect)
        return self

    def join_room(self, room_name):
        event  = Schema__Event__Join_Room(connection_id=self.client_id, room_name=room_name)
        self.send_event(event)
        return self

    def leave_room(self, room_name):
        event  = Schema__Event__Leave_Room(connection_id=self.client_id, room_name=room_name)
        self.send_event(event)
        return self

    def send_data(self, event_data, **kwargs):
        return self.event_queue.send_data(event_data, connection_id=self.client_id, **kwargs)

    def send_event(self, event : Schema__Event):
        event.connection_id = self.client_id
        return self.event_queue.send_event(event)

    def send_message(self, message, **kwargs):
        return self.event_queue.send_message(message, connection_id=self.client_id, **kwargs)

    def receive_message(self, message):
        self.received_messages.append(message)      # todo: fix this to be Events/Messages received via event_queue

Classes

class PubSub__Client (**kwargs)

A mixin class to strictly assign keyword arguments to pre-defined instance attributes during initialization.

This base class provides an init method that assigns values from keyword arguments to instance attributes. If an attribute with the same name as a key from the kwargs is defined in the class, it will be set to the value from kwargs. If the key does not match any predefined attribute names, an exception is raised.

This behavior enforces strict control over the attributes of instances, ensuring that only predefined attributes can be set at the time of instantiation and avoids silent attribute creation which can lead to bugs in the code.

Usage

class MyConfigurableClass(Kwargs_To_Self): attribute1 = 'default_value' attribute2 = True attribute3 : str attribute4 : list attribute4 : int = 42

# Other methods can be added here

Correctly override default values by passing keyword arguments

instance = MyConfigurableClass(attribute1='new_value', attribute2=False)

This will raise an exception as 'attribute3' is not predefined

instance = MyConfigurableClass(attribute3='invalid_attribute')

this will also assign the default value to any variable that has a type defined. In the example above the default values (mapped by default__kwargs and locals) will be: attribute1 = 'default_value' attribute2 = True attribute3 = '' # default value of str attribute4 = [] # default value of list attribute4 = 42 # defined value in the class

Note

It is important that all attributes which may be set at instantiation are predefined in the class. Failure to do so will result in an exception being raised.

Methods

init(**kwargs): The initializer that handles the assignment of keyword arguments to instance attributes. It enforces strict attribute assignment rules, only allowing attributes that are already defined in the class to be set.

Initialize an instance of the derived class, strictly assigning provided keyword arguments to corresponding instance attributes.

Parameters

**kwargs: Variable length keyword arguments.

Raises

Exception
If a key from kwargs does not correspond to any attribute pre-defined in the class, an exception is raised to prevent setting an undefined attribute.
Expand source code
class PubSub__Client(Kwargs_To_Self):
    event_queue       : Event__Queue
    client_id         : str
    received_messages : List[str]           # todo: fix this to be Events/Messages received via event_queue

    def __init__(self, **kwargs):
        self.client_id = kwargs.get('client_id') or random_guid()
        super().__init__(**kwargs)

    def connect(self):
        event_connect = Schema__Event__Connect(connection_id=self.client_id)
        self.send_event(event_connect)
        return self

    def disconnect(self):
        event_connect = Schema__Event__Disconnect(connection_id=self.client_id)
        self.send_event(event_connect)
        return self

    def join_room(self, room_name):
        event  = Schema__Event__Join_Room(connection_id=self.client_id, room_name=room_name)
        self.send_event(event)
        return self

    def leave_room(self, room_name):
        event  = Schema__Event__Leave_Room(connection_id=self.client_id, room_name=room_name)
        self.send_event(event)
        return self

    def send_data(self, event_data, **kwargs):
        return self.event_queue.send_data(event_data, connection_id=self.client_id, **kwargs)

    def send_event(self, event : Schema__Event):
        event.connection_id = self.client_id
        return self.event_queue.send_event(event)

    def send_message(self, message, **kwargs):
        return self.event_queue.send_message(message, connection_id=self.client_id, **kwargs)

    def receive_message(self, message):
        self.received_messages.append(message)      # todo: fix this to be Events/Messages received via event_queue

Ancestors

Class variables

var client_id : str
var event_queueEvent__Queue
var received_messages : List[str]

Methods

def connect(self)
Expand source code
def connect(self):
    event_connect = Schema__Event__Connect(connection_id=self.client_id)
    self.send_event(event_connect)
    return self
def disconnect(self)
Expand source code
def disconnect(self):
    event_connect = Schema__Event__Disconnect(connection_id=self.client_id)
    self.send_event(event_connect)
    return self
def join_room(self, room_name)
Expand source code
def join_room(self, room_name):
    event  = Schema__Event__Join_Room(connection_id=self.client_id, room_name=room_name)
    self.send_event(event)
    return self
def leave_room(self, room_name)
Expand source code
def leave_room(self, room_name):
    event  = Schema__Event__Leave_Room(connection_id=self.client_id, room_name=room_name)
    self.send_event(event)
    return self
def receive_message(self, message)
Expand source code
def receive_message(self, message):
    self.received_messages.append(message)      # todo: fix this to be Events/Messages received via event_queue
def send_data(self, event_data, **kwargs)
Expand source code
def send_data(self, event_data, **kwargs):
    return self.event_queue.send_data(event_data, connection_id=self.client_id, **kwargs)
def send_event(self, event: Schema__Event)
Expand source code
def send_event(self, event : Schema__Event):
    event.connection_id = self.client_id
    return self.event_queue.send_event(event)
def send_message(self, message, **kwargs)
Expand source code
def send_message(self, message, **kwargs):
    return self.event_queue.send_message(message, connection_id=self.client_id, **kwargs)

Inherited members