Module osbot_utils.helpers.pubsub.PubSub__Client
Expand source code
from queue import Queue
from typing import List
from osbot_utils.base_classes.Kwargs_To_Self import Kwargs_To_Self
from osbot_utils.helpers.pubsub.Event__Queue import Event__Queue
from osbot_utils.helpers.pubsub.schemas.Schema__Event import Schema__Event
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Connect import Schema__Event__Connect
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Disconnect import Schema__Event__Disconnect
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Join_Room import Schema__Event__Join_Room
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Leave_Room import Schema__Event__Leave_Room
from osbot_utils.utils.Misc import random_guid
class PubSub__Client(Kwargs_To_Self):
event_queue : Event__Queue
client_id : str
received_messages : List[str] # todo: fix this to be Events/Messages received via event_queue
def __init__(self, **kwargs):
self.client_id = kwargs.get('client_id') or random_guid()
super().__init__(**kwargs)
def connect(self):
event_connect = Schema__Event__Connect(connection_id=self.client_id)
self.send_event(event_connect)
return self
def disconnect(self):
event_connect = Schema__Event__Disconnect(connection_id=self.client_id)
self.send_event(event_connect)
return self
def join_room(self, room_name):
event = Schema__Event__Join_Room(connection_id=self.client_id, room_name=room_name)
self.send_event(event)
return self
def leave_room(self, room_name):
event = Schema__Event__Leave_Room(connection_id=self.client_id, room_name=room_name)
self.send_event(event)
return self
def send_data(self, event_data, **kwargs):
return self.event_queue.send_data(event_data, connection_id=self.client_id, **kwargs)
def send_event(self, event : Schema__Event):
event.connection_id = self.client_id
return self.event_queue.send_event(event)
def send_message(self, message, **kwargs):
return self.event_queue.send_message(message, connection_id=self.client_id, **kwargs)
def receive_message(self, message):
self.received_messages.append(message) # todo: fix this to be Events/Messages received via event_queue
Classes
class PubSub__Client (**kwargs)
-
A mixin class to strictly assign keyword arguments to pre-defined instance attributes during initialization.
This base class provides an init method that assigns values from keyword arguments to instance attributes. If an attribute with the same name as a key from the kwargs is defined in the class, it will be set to the value from kwargs. If the key does not match any predefined attribute names, an exception is raised.
This behavior enforces strict control over the attributes of instances, ensuring that only predefined attributes can be set at the time of instantiation and avoids silent attribute creation which can lead to bugs in the code.
Usage
class MyConfigurableClass(Kwargs_To_Self): attribute1 = 'default_value' attribute2 = True attribute3 : str attribute4 : list attribute4 : int = 42
# Other methods can be added here
Correctly override default values by passing keyword arguments
instance = MyConfigurableClass(attribute1='new_value', attribute2=False)
This will raise an exception as 'attribute3' is not predefined
instance = MyConfigurableClass(attribute3='invalid_attribute')
this will also assign the default value to any variable that has a type defined. In the example above the default values (mapped by default__kwargs and locals) will be: attribute1 = 'default_value' attribute2 = True attribute3 = '' # default value of str attribute4 = [] # default value of list attribute4 = 42 # defined value in the class
Note
It is important that all attributes which may be set at instantiation are predefined in the class. Failure to do so will result in an exception being raised.
Methods
init(**kwargs): The initializer that handles the assignment of keyword arguments to instance attributes. It enforces strict attribute assignment rules, only allowing attributes that are already defined in the class to be set.
Initialize an instance of the derived class, strictly assigning provided keyword arguments to corresponding instance attributes.
Parameters
**kwargs: Variable length keyword arguments.
Raises
Exception
- If a key from kwargs does not correspond to any attribute pre-defined in the class, an exception is raised to prevent setting an undefined attribute.
Expand source code
class PubSub__Client(Kwargs_To_Self): event_queue : Event__Queue client_id : str received_messages : List[str] # todo: fix this to be Events/Messages received via event_queue def __init__(self, **kwargs): self.client_id = kwargs.get('client_id') or random_guid() super().__init__(**kwargs) def connect(self): event_connect = Schema__Event__Connect(connection_id=self.client_id) self.send_event(event_connect) return self def disconnect(self): event_connect = Schema__Event__Disconnect(connection_id=self.client_id) self.send_event(event_connect) return self def join_room(self, room_name): event = Schema__Event__Join_Room(connection_id=self.client_id, room_name=room_name) self.send_event(event) return self def leave_room(self, room_name): event = Schema__Event__Leave_Room(connection_id=self.client_id, room_name=room_name) self.send_event(event) return self def send_data(self, event_data, **kwargs): return self.event_queue.send_data(event_data, connection_id=self.client_id, **kwargs) def send_event(self, event : Schema__Event): event.connection_id = self.client_id return self.event_queue.send_event(event) def send_message(self, message, **kwargs): return self.event_queue.send_message(message, connection_id=self.client_id, **kwargs) def receive_message(self, message): self.received_messages.append(message) # todo: fix this to be Events/Messages received via event_queue
Ancestors
Class variables
var client_id : str
var event_queue : Event__Queue
var received_messages : List[str]
Methods
def connect(self)
-
Expand source code
def connect(self): event_connect = Schema__Event__Connect(connection_id=self.client_id) self.send_event(event_connect) return self
def disconnect(self)
-
Expand source code
def disconnect(self): event_connect = Schema__Event__Disconnect(connection_id=self.client_id) self.send_event(event_connect) return self
def join_room(self, room_name)
-
Expand source code
def join_room(self, room_name): event = Schema__Event__Join_Room(connection_id=self.client_id, room_name=room_name) self.send_event(event) return self
def leave_room(self, room_name)
-
Expand source code
def leave_room(self, room_name): event = Schema__Event__Leave_Room(connection_id=self.client_id, room_name=room_name) self.send_event(event) return self
def receive_message(self, message)
-
Expand source code
def receive_message(self, message): self.received_messages.append(message) # todo: fix this to be Events/Messages received via event_queue
def send_data(self, event_data, **kwargs)
-
Expand source code
def send_data(self, event_data, **kwargs): return self.event_queue.send_data(event_data, connection_id=self.client_id, **kwargs)
def send_event(self, event: Schema__Event)
-
Expand source code
def send_event(self, event : Schema__Event): event.connection_id = self.client_id return self.event_queue.send_event(event)
def send_message(self, message, **kwargs)
-
Expand source code
def send_message(self, message, **kwargs): return self.event_queue.send_message(message, connection_id=self.client_id, **kwargs)
Inherited members