Module osbot_utils.helpers.pubsub.PubSub__Server
Expand source code
import queue
from threading import Thread
from queue import Queue
from typing import Set, Dict
from osbot_utils.helpers.pubsub.Event__Queue import Event__Queue
from osbot_utils.helpers.pubsub.PubSub__Client import PubSub__Client
from osbot_utils.helpers.pubsub.PubSub__Room import PubSub__Room
from osbot_utils.helpers.pubsub.schemas.Schema__Event import Schema__Event
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Connect import Schema__Event__Connect
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Leave_Room import Schema__Event__Leave_Room
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Disconnect import Schema__Event__Disconnect
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Join_Room import Schema__Event__Join_Room
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Message import Schema__Event__Message
from osbot_utils.testing.Logging import Logging
from osbot_utils.utils.Dev import pprint
class PubSub__Server(Event__Queue):
#pubsub_db: PubSub__Sqlite
clients : Dict
clients_connected: Set[PubSub__Client]
rooms : Dict[str, PubSub__Room]
logging : Logging
def __init__ (self):
super().__init__()
# def db_table_clients(self):
# return self.pubsub_db.table_clients() # todo refactor to class that uses this as a base and uses sqlite to capture connections
def add_client(self, client: PubSub__Client):
client_id = client.client_id
if client_id:
self.clients[client_id] = client
return self
def client_connect(self, client):
self.clients_connected.add(client)
def client_disconnect(self, client):
self.clients_connected.discard(client)
def client_join_room(self, client, event):
room_name = event.room_name
if room_name:
self.room(room_name).clients.add(client)
def client_message(self, client, event):
pass
def client_leave_room(self, client, event):
room_name = event.room_name
if room_name:
self.room(room_name).clients.discard(client)
def get_client(self, client_id):
return self.clients.get(client_id)
def handle_event(self, event: Schema__Event):
event_type = type(event)
client = self.clients.get(event.connection_id)
if client:
if event_type is Schema__Event__Connect : self.client_connect (client)
elif event_type is Schema__Event__Disconnect : self.client_disconnect(client)
elif event_type is Schema__Event__Join_Room : self.client_join_room (client, event)
elif event_type is Schema__Event__Leave_Room : self.client_leave_room(client, event)
elif event_type is Schema__Event__Message : self.client_message (client, event)
else:
return False
if self.log_events:
self.events.append(event)
return True
def log(self, message):
self.logging.debug(message)
return self
def new_client(self):
client = PubSub__Client(event_queue = self)
self.add_client(client)
return client
def room(self, room_name):
if room_name not in self.rooms:
new_room = PubSub__Room(room_name=room_name)
self.rooms[room_name] = new_room
return self.rooms.get(room_name)
Classes
class PubSub__Server
-
A mixin class to strictly assign keyword arguments to pre-defined instance attributes during initialization.
This base class provides an init method that assigns values from keyword arguments to instance attributes. If an attribute with the same name as a key from the kwargs is defined in the class, it will be set to the value from kwargs. If the key does not match any predefined attribute names, an exception is raised.
This behavior enforces strict control over the attributes of instances, ensuring that only predefined attributes can be set at the time of instantiation and avoids silent attribute creation which can lead to bugs in the code.
Usage
class MyConfigurableClass(Kwargs_To_Self): attribute1 = 'default_value' attribute2 = True attribute3 : str attribute4 : list attribute4 : int = 42
# Other methods can be added here
Correctly override default values by passing keyword arguments
instance = MyConfigurableClass(attribute1='new_value', attribute2=False)
This will raise an exception as 'attribute3' is not predefined
instance = MyConfigurableClass(attribute3='invalid_attribute')
this will also assign the default value to any variable that has a type defined. In the example above the default values (mapped by default__kwargs and locals) will be: attribute1 = 'default_value' attribute2 = True attribute3 = '' # default value of str attribute4 = [] # default value of list attribute4 = 42 # defined value in the class
Note
It is important that all attributes which may be set at instantiation are predefined in the class. Failure to do so will result in an exception being raised.
Methods
init(**kwargs): The initializer that handles the assignment of keyword arguments to instance attributes. It enforces strict attribute assignment rules, only allowing attributes that are already defined in the class to be set.
Initialize an instance of the derived class, strictly assigning provided keyword arguments to corresponding instance attributes.
Parameters
**kwargs: Variable length keyword arguments.
Raises
Exception
- If a key from kwargs does not correspond to any attribute pre-defined in the class, an exception is raised to prevent setting an undefined attribute.
Expand source code
class PubSub__Server(Event__Queue): #pubsub_db: PubSub__Sqlite clients : Dict clients_connected: Set[PubSub__Client] rooms : Dict[str, PubSub__Room] logging : Logging def __init__ (self): super().__init__() # def db_table_clients(self): # return self.pubsub_db.table_clients() # todo refactor to class that uses this as a base and uses sqlite to capture connections def add_client(self, client: PubSub__Client): client_id = client.client_id if client_id: self.clients[client_id] = client return self def client_connect(self, client): self.clients_connected.add(client) def client_disconnect(self, client): self.clients_connected.discard(client) def client_join_room(self, client, event): room_name = event.room_name if room_name: self.room(room_name).clients.add(client) def client_message(self, client, event): pass def client_leave_room(self, client, event): room_name = event.room_name if room_name: self.room(room_name).clients.discard(client) def get_client(self, client_id): return self.clients.get(client_id) def handle_event(self, event: Schema__Event): event_type = type(event) client = self.clients.get(event.connection_id) if client: if event_type is Schema__Event__Connect : self.client_connect (client) elif event_type is Schema__Event__Disconnect : self.client_disconnect(client) elif event_type is Schema__Event__Join_Room : self.client_join_room (client, event) elif event_type is Schema__Event__Leave_Room : self.client_leave_room(client, event) elif event_type is Schema__Event__Message : self.client_message (client, event) else: return False if self.log_events: self.events.append(event) return True def log(self, message): self.logging.debug(message) return self def new_client(self): client = PubSub__Client(event_queue = self) self.add_client(client) return client def room(self, room_name): if room_name not in self.rooms: new_room = PubSub__Room(room_name=room_name) self.rooms[room_name] = new_room return self.rooms.get(room_name)
Ancestors
Class variables
var clients : Dict
var clients_connected : Set[PubSub__Client]
var logging : Logging
var rooms : Dict[str, PubSub__Room]
Methods
def add_client(self, client: PubSub__Client)
-
Expand source code
def add_client(self, client: PubSub__Client): client_id = client.client_id if client_id: self.clients[client_id] = client return self
def client_connect(self, client)
-
Expand source code
def client_connect(self, client): self.clients_connected.add(client)
def client_disconnect(self, client)
-
Expand source code
def client_disconnect(self, client): self.clients_connected.discard(client)
def client_join_room(self, client, event)
-
Expand source code
def client_join_room(self, client, event): room_name = event.room_name if room_name: self.room(room_name).clients.add(client)
def client_leave_room(self, client, event)
-
Expand source code
def client_leave_room(self, client, event): room_name = event.room_name if room_name: self.room(room_name).clients.discard(client)
def client_message(self, client, event)
-
Expand source code
def client_message(self, client, event): pass
def get_client(self, client_id)
-
Expand source code
def get_client(self, client_id): return self.clients.get(client_id)
def handle_event(self, event: Schema__Event)
-
Expand source code
def handle_event(self, event: Schema__Event): event_type = type(event) client = self.clients.get(event.connection_id) if client: if event_type is Schema__Event__Connect : self.client_connect (client) elif event_type is Schema__Event__Disconnect : self.client_disconnect(client) elif event_type is Schema__Event__Join_Room : self.client_join_room (client, event) elif event_type is Schema__Event__Leave_Room : self.client_leave_room(client, event) elif event_type is Schema__Event__Message : self.client_message (client, event) else: return False if self.log_events: self.events.append(event) return True
def log(self, message)
-
Expand source code
def log(self, message): self.logging.debug(message) return self
def new_client(self)
-
Expand source code
def new_client(self): client = PubSub__Client(event_queue = self) self.add_client(client) return client
def room(self, room_name)
-
Expand source code
def room(self, room_name): if room_name not in self.rooms: new_room = PubSub__Room(room_name=room_name) self.rooms[room_name] = new_room return self.rooms.get(room_name)
Inherited members