Module osbot_utils.helpers.pubsub.Event__Queue

Expand source code
import time
from enum import Enum
from queue                                      import Queue, Empty
from threading                                  import Thread
from typing import Any

from osbot_utils.helpers.pubsub.schemas.Schema__Event import Schema__Event
from osbot_utils.helpers.pubsub.schemas.Schema__Event__Message import Schema__Event__Message
from osbot_utils.utils                                import Misc
from osbot_utils.base_classes.Kwargs_To_Self          import Kwargs_To_Self
from osbot_utils.utils.Misc import random_text, wait_for, timestamp_utc_now, random_guid

QUEUE_WAIT_TIMEOUT  = 1.0                           # todo: see if this value is a good one to use here

class Event__Queue(Kwargs_To_Self):
    events       : list
    event_class  : type
    log_events   : bool   = False
    queue        : Queue
    queue_name   : str    = random_text('event_queue')
    queue_timeout: float  = QUEUE_WAIT_TIMEOUT
    running      : bool
    thread       : Thread = None


    def __init__(self, **kwargs):
        self.event_class = Schema__Event
        super().__init__(**kwargs)

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()
        return False

    def new_event_obj(self, **kwargs):
        return self.event_class(**kwargs)

    def handle_event(self, event):
        if self.log_events:
            self.events.append(event)
        return True

    def send_event(self, event: Schema__Event):
        if isinstance(event, Schema__Event):
            if not event.timestamp:
                event.timestamp = timestamp_utc_now()
            if not event.event_id:
                event.event_id = random_guid()
            self.queue.put(event)
            return True
        return False

    def send_data(self, event_data, **kwargs):
        if type(event_data) is not dict:
            event_data = {'data': event_data}
        new_event = Schema__Event__Message(event_data=event_data, **kwargs)
        if self.send_event(new_event):
            return new_event

    def send_message(self, message, **kwargs):
        new_event = Schema__Event__Message(event_message=str(message), **kwargs)
        if self.send_event(new_event):
            return new_event

    def start(self):
        self.running = True
        self.thread =  Thread(target=self.run_thread, daemon=True)
        self.thread.start()
        return self

    def stop(self):
        self.running = False
        return self

    def run_thread(self):
        while self.running:
            try:
                event = self.queue.get(timeout=self.queue_timeout)
                if isinstance(event, self.event_class):
                    self.handle_event(event)
            except Empty:
                continue
            except Exception as e:                          # todo: add way to handle this (which are errors in the handle_event), may call an on_event_handler_exceptions method
                continue

    def wait_micro_seconds(self, value=10):
        time.sleep(0.000001 * value)


    def wait_for_thread_ends(self):
        self.thread.join()
        return self

Classes

class Event__Queue (**kwargs)

A mixin class to strictly assign keyword arguments to pre-defined instance attributes during initialization.

This base class provides an init method that assigns values from keyword arguments to instance attributes. If an attribute with the same name as a key from the kwargs is defined in the class, it will be set to the value from kwargs. If the key does not match any predefined attribute names, an exception is raised.

This behavior enforces strict control over the attributes of instances, ensuring that only predefined attributes can be set at the time of instantiation and avoids silent attribute creation which can lead to bugs in the code.

Usage

class MyConfigurableClass(Kwargs_To_Self): attribute1 = 'default_value' attribute2 = True attribute3 : str attribute4 : list attribute4 : int = 42

# Other methods can be added here

Correctly override default values by passing keyword arguments

instance = MyConfigurableClass(attribute1='new_value', attribute2=False)

This will raise an exception as 'attribute3' is not predefined

instance = MyConfigurableClass(attribute3='invalid_attribute')

this will also assign the default value to any variable that has a type defined. In the example above the default values (mapped by default__kwargs and locals) will be: attribute1 = 'default_value' attribute2 = True attribute3 = '' # default value of str attribute4 = [] # default value of list attribute4 = 42 # defined value in the class

Note

It is important that all attributes which may be set at instantiation are predefined in the class. Failure to do so will result in an exception being raised.

Methods

init(**kwargs): The initializer that handles the assignment of keyword arguments to instance attributes. It enforces strict attribute assignment rules, only allowing attributes that are already defined in the class to be set.

Initialize an instance of the derived class, strictly assigning provided keyword arguments to corresponding instance attributes.

Parameters

**kwargs: Variable length keyword arguments.

Raises

Exception
If a key from kwargs does not correspond to any attribute pre-defined in the class, an exception is raised to prevent setting an undefined attribute.
Expand source code
class Event__Queue(Kwargs_To_Self):
    events       : list
    event_class  : type
    log_events   : bool   = False
    queue        : Queue
    queue_name   : str    = random_text('event_queue')
    queue_timeout: float  = QUEUE_WAIT_TIMEOUT
    running      : bool
    thread       : Thread = None


    def __init__(self, **kwargs):
        self.event_class = Schema__Event
        super().__init__(**kwargs)

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()
        return False

    def new_event_obj(self, **kwargs):
        return self.event_class(**kwargs)

    def handle_event(self, event):
        if self.log_events:
            self.events.append(event)
        return True

    def send_event(self, event: Schema__Event):
        if isinstance(event, Schema__Event):
            if not event.timestamp:
                event.timestamp = timestamp_utc_now()
            if not event.event_id:
                event.event_id = random_guid()
            self.queue.put(event)
            return True
        return False

    def send_data(self, event_data, **kwargs):
        if type(event_data) is not dict:
            event_data = {'data': event_data}
        new_event = Schema__Event__Message(event_data=event_data, **kwargs)
        if self.send_event(new_event):
            return new_event

    def send_message(self, message, **kwargs):
        new_event = Schema__Event__Message(event_message=str(message), **kwargs)
        if self.send_event(new_event):
            return new_event

    def start(self):
        self.running = True
        self.thread =  Thread(target=self.run_thread, daemon=True)
        self.thread.start()
        return self

    def stop(self):
        self.running = False
        return self

    def run_thread(self):
        while self.running:
            try:
                event = self.queue.get(timeout=self.queue_timeout)
                if isinstance(event, self.event_class):
                    self.handle_event(event)
            except Empty:
                continue
            except Exception as e:                          # todo: add way to handle this (which are errors in the handle_event), may call an on_event_handler_exceptions method
                continue

    def wait_micro_seconds(self, value=10):
        time.sleep(0.000001 * value)


    def wait_for_thread_ends(self):
        self.thread.join()
        return self

Ancestors

Subclasses

Class variables

var event_class : type
var events : list
var log_events : bool
var queue : queue.Queue
var queue_name : str
var queue_timeout : float
var running : bool
var thread : threading.Thread

Methods

def handle_event(self, event)
Expand source code
def handle_event(self, event):
    if self.log_events:
        self.events.append(event)
    return True
def new_event_obj(self, **kwargs)
Expand source code
def new_event_obj(self, **kwargs):
    return self.event_class(**kwargs)
def run_thread(self)
Expand source code
def run_thread(self):
    while self.running:
        try:
            event = self.queue.get(timeout=self.queue_timeout)
            if isinstance(event, self.event_class):
                self.handle_event(event)
        except Empty:
            continue
        except Exception as e:                          # todo: add way to handle this (which are errors in the handle_event), may call an on_event_handler_exceptions method
            continue
def send_data(self, event_data, **kwargs)
Expand source code
def send_data(self, event_data, **kwargs):
    if type(event_data) is not dict:
        event_data = {'data': event_data}
    new_event = Schema__Event__Message(event_data=event_data, **kwargs)
    if self.send_event(new_event):
        return new_event
def send_event(self, event: Schema__Event)
Expand source code
def send_event(self, event: Schema__Event):
    if isinstance(event, Schema__Event):
        if not event.timestamp:
            event.timestamp = timestamp_utc_now()
        if not event.event_id:
            event.event_id = random_guid()
        self.queue.put(event)
        return True
    return False
def send_message(self, message, **kwargs)
Expand source code
def send_message(self, message, **kwargs):
    new_event = Schema__Event__Message(event_message=str(message), **kwargs)
    if self.send_event(new_event):
        return new_event
def start(self)
Expand source code
def start(self):
    self.running = True
    self.thread =  Thread(target=self.run_thread, daemon=True)
    self.thread.start()
    return self
def stop(self)
Expand source code
def stop(self):
    self.running = False
    return self
def wait_for_thread_ends(self)
Expand source code
def wait_for_thread_ends(self):
    self.thread.join()
    return self
def wait_micro_seconds(self, value=10)
Expand source code
def wait_micro_seconds(self, value=10):
    time.sleep(0.000001 * value)

Inherited members