Module osbot_utils.helpers.trace.Trace_Call__Stack

Expand source code
import linecache
import time
from copy                                               import deepcopy
from osbot_utils.base_classes.Kwargs_To_Self            import Kwargs_To_Self
from osbot_utils.helpers.trace.Trace_Call__Config       import Trace_Call__Config
from osbot_utils.helpers.trace.Trace_Call__Stack_Node   import Trace_Call__Stack_Node


class Trace_Call__Stack(Kwargs_To_Self):
    call_index : int
    stack_data : list
    config     : Trace_Call__Config
    root_node  : Trace_Call__Stack_Node
    line_index : int

    def __eq__(self, target):
        if self is target:
            return True
        return self.stack_data == target

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def __iter__(self):
        return iter(self.stack_data)

    def __getitem__(self, index):
        if -len(self.stack_data) <= index < len(self.stack_data):
            return self.stack_data[index]

    def __len__(self):
        return self.size()

    def add_frame(self, frame):
        if frame and frame.__class__.__name__=='frame':
            self.call_index += 1  # Increment the call index
            code        = frame.f_code                                                      # Get code object from frame
            func_name   = code.co_name                                                      # Get function name
            module      = frame.f_globals.get("__name__", "")                               # Get module name

            source_code = self.map_source_code(frame)
            full_name   = self.map_full_name(frame, module, func_name)
            new_node    = self.create_stack_node(frame, full_name, source_code, self.call_index)
            if self.add_stack_node(new_node, frame):
                return new_node


    def add_node(self, title: str):
        new_node = self.new_stack_node(title)
        if self.config.capture_duration:
            new_node.call_start = time.perf_counter()
        if self.add_stack_node(new_node):
            return new_node

    def add_stack_node(self, stack_node : Trace_Call__Stack_Node, frame=None):
        if type(stack_node) is Trace_Call__Stack_Node:
            if self.stack_data:                                             # if there are items in the stack
                self.top().children.append(stack_node)                      # add an xref to the new node to the children of the top node
            else:
                self.root_node = stack_node                                 # if not this is the first node and capture it as a root node
            self.stack_data.append(stack_node)                              # append the new node to the stack
            return True
        return False

    def bottom(self):
        if self.stack_data:
            return self.stack_data[0]

    def create_stack_node(self, frame, full_name, source_code, call_index):
        new_node = Trace_Call__Stack_Node(call_index=call_index, name=full_name)
        if frame:
            code      = frame.f_code
            new_node.func_name = code.co_name                           # Get function name
            new_node.module    = frame.f_globals.get("__name__", "")    # Get module name
            if source_code:
                 new_node.source_code           = source_code.get('source_code'          )
                 new_node.source_code_caller    = source_code.get('source_code_caller'   )
                 new_node.source_code_location  = source_code.get('source_code_location' )

            if self.config.capture_frame:
                new_node.frame = frame
            if self.config.capture_locals:
                if self.config.deep_copy_locals:
                    try:
                        new_node.locals = deepcopy(frame.f_locals)
                    except Exception as error:
                        new_node.locals = {'error': f'error in deepcopy: {error}'}
                else:
                    new_node.locals = frame.f_locals

            if self.config.capture_duration:
                new_node.call_start = time.perf_counter()
        return new_node

    def empty_stack(self):              # use to make sure the stack is empty (usally called at the end of a trace sessions)
        for node in reversed(list(self.stack_data)):
            self.pop(node)
        return self

    def map_source_code(self, frame):
        if self.config.trace_capture_source_code:
            filename             = frame.f_code.co_filename
            lineno               = frame.f_lineno
            source_code          = linecache.getline(filename, lineno).strip()

            caller_filename      = frame.f_back.f_code.co_filename
            caller_lineno        = frame.f_back.f_lineno
            source_code_caller   = linecache.getline(caller_filename, caller_lineno).strip()
            source_code_location = f'{filename}:{lineno}'
        else:
            source_code          = ''
            source_code_caller   = ''
            source_code_location = ''

        return dict(source_code          = source_code          ,
                    source_code_caller   = source_code_caller   ,
                    source_code_location = source_code_location )

    def map_full_name(self, frame, module, func_name):
        if frame and module and func_name:
            instance = frame.f_locals.get("self", None)                                                           # Get instance if available
            try:
                class_name = instance.__class__.__name__ if instance else ""
            except Exception:                                                               # note: this will trigger this exception: ansi_text_visible_length("some text")
                class_name = "<unavailable>"
            if class_name:
                full_name = f"{module}.{class_name}.{func_name}"
            else:
                full_name = f"{module}.{func_name}"
            return full_name

    def new_stack_node(self, name):
        return Trace_Call__Stack_Node(call_index=self.call_index, name=name)

    def nodes(self):
        return self.stack_data

    def remove_from_top(self, top_node, extra_data: dict):
        if self.config.capture_duration:
            top_node.call_end     = time.perf_counter()
            top_node.call_duration = top_node.call_end - top_node.call_start
        if type(extra_data) is dict:
            top_node.extra_data.update(extra_data)
        self.stack_data.pop()
        return True

    def pop(self, target, extra_data: dict = None):
        top_node = self.top()
        if target and top_node :
            if type(target) is Trace_Call__Stack_Node:                  # handle the case when target is Trace_Call__Stack_Node
                if target == top_node:                                  # if they match, pop the stack (since we are only capturing a subset of the stack)
                    return self.remove_from_top(top_node, extra_data)
            elif target is top_node.frame:                              # if not assume target is a frame
                return self.remove_from_top(top_node, extra_data)       # if they match, pop the stack (since we are only capturing a subset of the stack)
        return False

    def push(self, frame):
        return self.add_frame(frame)


    def top(self):
        if self.stack_data:
            return self.stack_data[-1]

    def size(self):
        return self.stack_data.__len__()

Classes

class Trace_Call__Stack (**kwargs)

A mixin class to strictly assign keyword arguments to pre-defined instance attributes during initialization.

This base class provides an init method that assigns values from keyword arguments to instance attributes. If an attribute with the same name as a key from the kwargs is defined in the class, it will be set to the value from kwargs. If the key does not match any predefined attribute names, an exception is raised.

This behavior enforces strict control over the attributes of instances, ensuring that only predefined attributes can be set at the time of instantiation and avoids silent attribute creation which can lead to bugs in the code.

Usage

class MyConfigurableClass(Kwargs_To_Self): attribute1 = 'default_value' attribute2 = True attribute3 : str attribute4 : list attribute4 : int = 42

# Other methods can be added here

Correctly override default values by passing keyword arguments

instance = MyConfigurableClass(attribute1='new_value', attribute2=False)

This will raise an exception as 'attribute3' is not predefined

instance = MyConfigurableClass(attribute3='invalid_attribute')

this will also assign the default value to any variable that has a type defined. In the example above the default values (mapped by default__kwargs and locals) will be: attribute1 = 'default_value' attribute2 = True attribute3 = '' # default value of str attribute4 = [] # default value of list attribute4 = 42 # defined value in the class

Note

It is important that all attributes which may be set at instantiation are predefined in the class. Failure to do so will result in an exception being raised.

Methods

init(**kwargs): The initializer that handles the assignment of keyword arguments to instance attributes. It enforces strict attribute assignment rules, only allowing attributes that are already defined in the class to be set.

Initialize an instance of the derived class, strictly assigning provided keyword arguments to corresponding instance attributes.

Parameters

**kwargs: Variable length keyword arguments.

Raises

Exception
If a key from kwargs does not correspond to any attribute pre-defined in the class, an exception is raised to prevent setting an undefined attribute.
Expand source code
class Trace_Call__Stack(Kwargs_To_Self):
    call_index : int
    stack_data : list
    config     : Trace_Call__Config
    root_node  : Trace_Call__Stack_Node
    line_index : int

    def __eq__(self, target):
        if self is target:
            return True
        return self.stack_data == target

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def __iter__(self):
        return iter(self.stack_data)

    def __getitem__(self, index):
        if -len(self.stack_data) <= index < len(self.stack_data):
            return self.stack_data[index]

    def __len__(self):
        return self.size()

    def add_frame(self, frame):
        if frame and frame.__class__.__name__=='frame':
            self.call_index += 1  # Increment the call index
            code        = frame.f_code                                                      # Get code object from frame
            func_name   = code.co_name                                                      # Get function name
            module      = frame.f_globals.get("__name__", "")                               # Get module name

            source_code = self.map_source_code(frame)
            full_name   = self.map_full_name(frame, module, func_name)
            new_node    = self.create_stack_node(frame, full_name, source_code, self.call_index)
            if self.add_stack_node(new_node, frame):
                return new_node


    def add_node(self, title: str):
        new_node = self.new_stack_node(title)
        if self.config.capture_duration:
            new_node.call_start = time.perf_counter()
        if self.add_stack_node(new_node):
            return new_node

    def add_stack_node(self, stack_node : Trace_Call__Stack_Node, frame=None):
        if type(stack_node) is Trace_Call__Stack_Node:
            if self.stack_data:                                             # if there are items in the stack
                self.top().children.append(stack_node)                      # add an xref to the new node to the children of the top node
            else:
                self.root_node = stack_node                                 # if not this is the first node and capture it as a root node
            self.stack_data.append(stack_node)                              # append the new node to the stack
            return True
        return False

    def bottom(self):
        if self.stack_data:
            return self.stack_data[0]

    def create_stack_node(self, frame, full_name, source_code, call_index):
        new_node = Trace_Call__Stack_Node(call_index=call_index, name=full_name)
        if frame:
            code      = frame.f_code
            new_node.func_name = code.co_name                           # Get function name
            new_node.module    = frame.f_globals.get("__name__", "")    # Get module name
            if source_code:
                 new_node.source_code           = source_code.get('source_code'          )
                 new_node.source_code_caller    = source_code.get('source_code_caller'   )
                 new_node.source_code_location  = source_code.get('source_code_location' )

            if self.config.capture_frame:
                new_node.frame = frame
            if self.config.capture_locals:
                if self.config.deep_copy_locals:
                    try:
                        new_node.locals = deepcopy(frame.f_locals)
                    except Exception as error:
                        new_node.locals = {'error': f'error in deepcopy: {error}'}
                else:
                    new_node.locals = frame.f_locals

            if self.config.capture_duration:
                new_node.call_start = time.perf_counter()
        return new_node

    def empty_stack(self):              # use to make sure the stack is empty (usally called at the end of a trace sessions)
        for node in reversed(list(self.stack_data)):
            self.pop(node)
        return self

    def map_source_code(self, frame):
        if self.config.trace_capture_source_code:
            filename             = frame.f_code.co_filename
            lineno               = frame.f_lineno
            source_code          = linecache.getline(filename, lineno).strip()

            caller_filename      = frame.f_back.f_code.co_filename
            caller_lineno        = frame.f_back.f_lineno
            source_code_caller   = linecache.getline(caller_filename, caller_lineno).strip()
            source_code_location = f'{filename}:{lineno}'
        else:
            source_code          = ''
            source_code_caller   = ''
            source_code_location = ''

        return dict(source_code          = source_code          ,
                    source_code_caller   = source_code_caller   ,
                    source_code_location = source_code_location )

    def map_full_name(self, frame, module, func_name):
        if frame and module and func_name:
            instance = frame.f_locals.get("self", None)                                                           # Get instance if available
            try:
                class_name = instance.__class__.__name__ if instance else ""
            except Exception:                                                               # note: this will trigger this exception: ansi_text_visible_length("some text")
                class_name = "<unavailable>"
            if class_name:
                full_name = f"{module}.{class_name}.{func_name}"
            else:
                full_name = f"{module}.{func_name}"
            return full_name

    def new_stack_node(self, name):
        return Trace_Call__Stack_Node(call_index=self.call_index, name=name)

    def nodes(self):
        return self.stack_data

    def remove_from_top(self, top_node, extra_data: dict):
        if self.config.capture_duration:
            top_node.call_end     = time.perf_counter()
            top_node.call_duration = top_node.call_end - top_node.call_start
        if type(extra_data) is dict:
            top_node.extra_data.update(extra_data)
        self.stack_data.pop()
        return True

    def pop(self, target, extra_data: dict = None):
        top_node = self.top()
        if target and top_node :
            if type(target) is Trace_Call__Stack_Node:                  # handle the case when target is Trace_Call__Stack_Node
                if target == top_node:                                  # if they match, pop the stack (since we are only capturing a subset of the stack)
                    return self.remove_from_top(top_node, extra_data)
            elif target is top_node.frame:                              # if not assume target is a frame
                return self.remove_from_top(top_node, extra_data)       # if they match, pop the stack (since we are only capturing a subset of the stack)
        return False

    def push(self, frame):
        return self.add_frame(frame)


    def top(self):
        if self.stack_data:
            return self.stack_data[-1]

    def size(self):
        return self.stack_data.__len__()

Ancestors

Class variables

var call_index : int
var configTrace_Call__Config
var line_index : int
var root_nodeTrace_Call__Stack_Node
var stack_data : list

Methods

def add_frame(self, frame)
Expand source code
def add_frame(self, frame):
    if frame and frame.__class__.__name__=='frame':
        self.call_index += 1  # Increment the call index
        code        = frame.f_code                                                      # Get code object from frame
        func_name   = code.co_name                                                      # Get function name
        module      = frame.f_globals.get("__name__", "")                               # Get module name

        source_code = self.map_source_code(frame)
        full_name   = self.map_full_name(frame, module, func_name)
        new_node    = self.create_stack_node(frame, full_name, source_code, self.call_index)
        if self.add_stack_node(new_node, frame):
            return new_node
def add_node(self, title: str)
Expand source code
def add_node(self, title: str):
    new_node = self.new_stack_node(title)
    if self.config.capture_duration:
        new_node.call_start = time.perf_counter()
    if self.add_stack_node(new_node):
        return new_node
def add_stack_node(self, stack_node: Trace_Call__Stack_Node, frame=None)
Expand source code
def add_stack_node(self, stack_node : Trace_Call__Stack_Node, frame=None):
    if type(stack_node) is Trace_Call__Stack_Node:
        if self.stack_data:                                             # if there are items in the stack
            self.top().children.append(stack_node)                      # add an xref to the new node to the children of the top node
        else:
            self.root_node = stack_node                                 # if not this is the first node and capture it as a root node
        self.stack_data.append(stack_node)                              # append the new node to the stack
        return True
    return False
def bottom(self)
Expand source code
def bottom(self):
    if self.stack_data:
        return self.stack_data[0]
def create_stack_node(self, frame, full_name, source_code, call_index)
Expand source code
def create_stack_node(self, frame, full_name, source_code, call_index):
    new_node = Trace_Call__Stack_Node(call_index=call_index, name=full_name)
    if frame:
        code      = frame.f_code
        new_node.func_name = code.co_name                           # Get function name
        new_node.module    = frame.f_globals.get("__name__", "")    # Get module name
        if source_code:
             new_node.source_code           = source_code.get('source_code'          )
             new_node.source_code_caller    = source_code.get('source_code_caller'   )
             new_node.source_code_location  = source_code.get('source_code_location' )

        if self.config.capture_frame:
            new_node.frame = frame
        if self.config.capture_locals:
            if self.config.deep_copy_locals:
                try:
                    new_node.locals = deepcopy(frame.f_locals)
                except Exception as error:
                    new_node.locals = {'error': f'error in deepcopy: {error}'}
            else:
                new_node.locals = frame.f_locals

        if self.config.capture_duration:
            new_node.call_start = time.perf_counter()
    return new_node
def empty_stack(self)
Expand source code
def empty_stack(self):              # use to make sure the stack is empty (usally called at the end of a trace sessions)
    for node in reversed(list(self.stack_data)):
        self.pop(node)
    return self
def map_full_name(self, frame, module, func_name)
Expand source code
def map_full_name(self, frame, module, func_name):
    if frame and module and func_name:
        instance = frame.f_locals.get("self", None)                                                           # Get instance if available
        try:
            class_name = instance.__class__.__name__ if instance else ""
        except Exception:                                                               # note: this will trigger this exception: ansi_text_visible_length("some text")
            class_name = "<unavailable>"
        if class_name:
            full_name = f"{module}.{class_name}.{func_name}"
        else:
            full_name = f"{module}.{func_name}"
        return full_name
def map_source_code(self, frame)
Expand source code
def map_source_code(self, frame):
    if self.config.trace_capture_source_code:
        filename             = frame.f_code.co_filename
        lineno               = frame.f_lineno
        source_code          = linecache.getline(filename, lineno).strip()

        caller_filename      = frame.f_back.f_code.co_filename
        caller_lineno        = frame.f_back.f_lineno
        source_code_caller   = linecache.getline(caller_filename, caller_lineno).strip()
        source_code_location = f'{filename}:{lineno}'
    else:
        source_code          = ''
        source_code_caller   = ''
        source_code_location = ''

    return dict(source_code          = source_code          ,
                source_code_caller   = source_code_caller   ,
                source_code_location = source_code_location )
def new_stack_node(self, name)
Expand source code
def new_stack_node(self, name):
    return Trace_Call__Stack_Node(call_index=self.call_index, name=name)
def nodes(self)
Expand source code
def nodes(self):
    return self.stack_data
def pop(self, target, extra_data: dict = None)
Expand source code
def pop(self, target, extra_data: dict = None):
    top_node = self.top()
    if target and top_node :
        if type(target) is Trace_Call__Stack_Node:                  # handle the case when target is Trace_Call__Stack_Node
            if target == top_node:                                  # if they match, pop the stack (since we are only capturing a subset of the stack)
                return self.remove_from_top(top_node, extra_data)
        elif target is top_node.frame:                              # if not assume target is a frame
            return self.remove_from_top(top_node, extra_data)       # if they match, pop the stack (since we are only capturing a subset of the stack)
    return False
def push(self, frame)
Expand source code
def push(self, frame):
    return self.add_frame(frame)
def remove_from_top(self, top_node, extra_data: dict)
Expand source code
def remove_from_top(self, top_node, extra_data: dict):
    if self.config.capture_duration:
        top_node.call_end     = time.perf_counter()
        top_node.call_duration = top_node.call_end - top_node.call_start
    if type(extra_data) is dict:
        top_node.extra_data.update(extra_data)
    self.stack_data.pop()
    return True
def size(self)
Expand source code
def size(self):
    return self.stack_data.__len__()
def top(self)
Expand source code
def top(self):
    if self.stack_data:
        return self.stack_data[-1]

Inherited members