Module osbot_utils.helpers.pubsub.Event__Queue
Expand source code
import time
from enum import Enum
from queue import Queue, Empty
from threading import Thread
from typing import Any
from osbot_utils.helpers.pubsub.schemas.Schema__Event import Schema__Event
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Message import Schema__Event__Message
from osbot_utils.utils import Misc
from osbot_utils.base_classes.Kwargs_To_Self import Kwargs_To_Self
from osbot_utils.utils.Misc import random_text, wait_for, timestamp_utc_now, random_guid
QUEUE_WAIT_TIMEOUT = 1.0 # todo: see if this value is a good one to use here
class Event__Queue(Kwargs_To_Self):
events : list
event_class : type
log_events : bool = False
queue : Queue
queue_name : str = random_text('event_queue')
queue_timeout: float = QUEUE_WAIT_TIMEOUT
running : bool
thread : Thread = None
def __init__(self, **kwargs):
self.event_class = Schema__Event
super().__init__(**kwargs)
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
return False
def new_event_obj(self, **kwargs):
return self.event_class(**kwargs)
def handle_event(self, event):
if self.log_events:
self.events.append(event)
return True
def send_event(self, event: Schema__Event):
if isinstance(event, Schema__Event):
if not event.timestamp:
event.timestamp = timestamp_utc_now()
if not event.event_id:
event.event_id = random_guid()
self.queue.put(event)
return True
return False
def send_data(self, event_data, **kwargs):
if type(event_data) is not dict:
event_data = {'data': event_data}
new_event = Schema__Event__Message(event_data=event_data, **kwargs)
if self.send_event(new_event):
return new_event
def send_message(self, message, **kwargs):
new_event = Schema__Event__Message(event_message=str(message), **kwargs)
if self.send_event(new_event):
return new_event
def start(self):
self.running = True
self.thread = Thread(target=self.run_thread, daemon=True)
self.thread.start()
return self
def stop(self):
self.running = False
return self
def run_thread(self):
while self.running:
try:
event = self.queue.get(timeout=self.queue_timeout)
if isinstance(event, self.event_class):
self.handle_event(event)
except Empty:
continue
except Exception as e: # todo: add way to handle this (which are errors in the handle_event), may call an on_event_handler_exceptions method
continue
def wait_micro_seconds(self, value=10):
time.sleep(0.000001 * value)
def wait_for_thread_ends(self):
self.thread.join()
return self
Classes
class Event__Queue (**kwargs)
-
A mixin class to strictly assign keyword arguments to pre-defined instance attributes during initialization.
This base class provides an init method that assigns values from keyword arguments to instance attributes. If an attribute with the same name as a key from the kwargs is defined in the class, it will be set to the value from kwargs. If the key does not match any predefined attribute names, an exception is raised.
This behavior enforces strict control over the attributes of instances, ensuring that only predefined attributes can be set at the time of instantiation and avoids silent attribute creation which can lead to bugs in the code.
Usage
class MyConfigurableClass(Kwargs_To_Self): attribute1 = 'default_value' attribute2 = True attribute3 : str attribute4 : list attribute4 : int = 42
# Other methods can be added here
Correctly override default values by passing keyword arguments
instance = MyConfigurableClass(attribute1='new_value', attribute2=False)
This will raise an exception as 'attribute3' is not predefined
instance = MyConfigurableClass(attribute3='invalid_attribute')
this will also assign the default value to any variable that has a type defined. In the example above the default values (mapped by default__kwargs and locals) will be: attribute1 = 'default_value' attribute2 = True attribute3 = '' # default value of str attribute4 = [] # default value of list attribute4 = 42 # defined value in the class
Note
It is important that all attributes which may be set at instantiation are predefined in the class. Failure to do so will result in an exception being raised.
Methods
init(**kwargs): The initializer that handles the assignment of keyword arguments to instance attributes. It enforces strict attribute assignment rules, only allowing attributes that are already defined in the class to be set.
Initialize an instance of the derived class, strictly assigning provided keyword arguments to corresponding instance attributes.
Parameters
**kwargs: Variable length keyword arguments.
Raises
Exception
- If a key from kwargs does not correspond to any attribute pre-defined in the class, an exception is raised to prevent setting an undefined attribute.
Expand source code
class Event__Queue(Kwargs_To_Self): events : list event_class : type log_events : bool = False queue : Queue queue_name : str = random_text('event_queue') queue_timeout: float = QUEUE_WAIT_TIMEOUT running : bool thread : Thread = None def __init__(self, **kwargs): self.event_class = Schema__Event super().__init__(**kwargs) def __enter__(self): self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop() return False def new_event_obj(self, **kwargs): return self.event_class(**kwargs) def handle_event(self, event): if self.log_events: self.events.append(event) return True def send_event(self, event: Schema__Event): if isinstance(event, Schema__Event): if not event.timestamp: event.timestamp = timestamp_utc_now() if not event.event_id: event.event_id = random_guid() self.queue.put(event) return True return False def send_data(self, event_data, **kwargs): if type(event_data) is not dict: event_data = {'data': event_data} new_event = Schema__Event__Message(event_data=event_data, **kwargs) if self.send_event(new_event): return new_event def send_message(self, message, **kwargs): new_event = Schema__Event__Message(event_message=str(message), **kwargs) if self.send_event(new_event): return new_event def start(self): self.running = True self.thread = Thread(target=self.run_thread, daemon=True) self.thread.start() return self def stop(self): self.running = False return self def run_thread(self): while self.running: try: event = self.queue.get(timeout=self.queue_timeout) if isinstance(event, self.event_class): self.handle_event(event) except Empty: continue except Exception as e: # todo: add way to handle this (which are errors in the handle_event), may call an on_event_handler_exceptions method continue def wait_micro_seconds(self, value=10): time.sleep(0.000001 * value) def wait_for_thread_ends(self): self.thread.join() return self
Ancestors
Subclasses
Class variables
var event_class : type
var events : list
var log_events : bool
var queue : queue.Queue
var queue_name : str
var queue_timeout : float
var running : bool
var thread : threading.Thread
Methods
def handle_event(self, event)
-
Expand source code
def handle_event(self, event): if self.log_events: self.events.append(event) return True
def new_event_obj(self, **kwargs)
-
Expand source code
def new_event_obj(self, **kwargs): return self.event_class(**kwargs)
def run_thread(self)
-
Expand source code
def run_thread(self): while self.running: try: event = self.queue.get(timeout=self.queue_timeout) if isinstance(event, self.event_class): self.handle_event(event) except Empty: continue except Exception as e: # todo: add way to handle this (which are errors in the handle_event), may call an on_event_handler_exceptions method continue
def send_data(self, event_data, **kwargs)
-
Expand source code
def send_data(self, event_data, **kwargs): if type(event_data) is not dict: event_data = {'data': event_data} new_event = Schema__Event__Message(event_data=event_data, **kwargs) if self.send_event(new_event): return new_event
def send_event(self, event: Schema__Event)
-
Expand source code
def send_event(self, event: Schema__Event): if isinstance(event, Schema__Event): if not event.timestamp: event.timestamp = timestamp_utc_now() if not event.event_id: event.event_id = random_guid() self.queue.put(event) return True return False
def send_message(self, message, **kwargs)
-
Expand source code
def send_message(self, message, **kwargs): new_event = Schema__Event__Message(event_message=str(message), **kwargs) if self.send_event(new_event): return new_event
def start(self)
-
Expand source code
def start(self): self.running = True self.thread = Thread(target=self.run_thread, daemon=True) self.thread.start() return self
def stop(self)
-
Expand source code
def stop(self): self.running = False return self
def wait_for_thread_ends(self)
-
Expand source code
def wait_for_thread_ends(self): self.thread.join() return self
def wait_micro_seconds(self, value=10)
-
Expand source code
def wait_micro_seconds(self, value=10): time.sleep(0.000001 * value)
Inherited members