Module osbot_utils.helpers.sqlite.Sqlite__Table

Expand source code
import re

from osbot_utils.base_classes.Kwargs_To_Self                import Kwargs_To_Self
from osbot_utils.decorators.lists.filter_list               import filter_list
from osbot_utils.decorators.lists.index_by                  import index_by
from osbot_utils.decorators.methods.cache_on_self           import cache_on_self
from osbot_utils.helpers.Print_Table                        import Print_Table
from osbot_utils.helpers.sqlite.Sqlite__Database            import Sqlite__Database
from osbot_utils.helpers.sqlite.Sqlite__Globals             import DEFAULT_FIELD_NAME__ID, ROW_BASE_CLASS, SQL_TABLE__MODULE_NAME__ROW_SCHEMA
from osbot_utils.helpers.sqlite.models.Sqlite__Field__Type  import Sqlite__Field__Type
from osbot_utils.utils.Dev import pprint
from osbot_utils.utils.Json                                 import json_load
from osbot_utils.utils.Misc                                 import list_set
from osbot_utils.utils.Objects import base_types, default_value, bytes_to_obj, obj_to_bytes
from osbot_utils.utils.Str                                  import str_cap_snake_case

class Sqlite__Table(Kwargs_To_Self):
    database        : Sqlite__Database
    table_name      : str
    row_schema      : type
    auto_pickle_blob: bool = False

    def __setattr__(self, key, value):
        if key =='table_name':                                                              # SQL injection protection
            if re.search(r'[^a-zA-Z0-9_-]',value):                                  # make sure table name cannot be used to inject SQL, str_safe uses r'[^a-zA-Z0-9_-]' regex, i.e. only allows letter, numbers and the chars - _
                raise ValueError( "Invalid table name. Table names can only contain alphanumeric characters, numbers, underscores, and hyphens.")

        super().__setattr__(key, value)

    def _table_create(self):                                                                         # todo: Sqlite__Table__Create needs to be refactored (since that was created before we had support for table_class )
        from osbot_utils.helpers.sqlite.Sqlite__Table__Create import Sqlite__Table__Create
        table_create       = Sqlite__Table__Create(self.table_name)                                  # todo: fix this workflow
        table_create.table = self
        return table_create                                                                            #       since it is weird to have to overwrite the table vale of Sqlite__Table__Create

    def add_row(self, **row_data):
        new_row  = self.new_row_obj(row_data)
        return self.row_add(new_row)

    def add_row_and_commit(self, **row_data):
        new_row  = self.new_row_obj(row_data)
        return self.row_add_and_commit(new_row)

    def clear(self):
        sql_query = self.sql_builder().command__delete_table()
        return self.cursor().execute_and_commit(sql_query)

    def create(self):
        table_create = self._table_create()
        return table_create.create_table__from_row_schema(self.row_schema)

    def commit(self):
        return self.cursor().commit()

    def connection(self):
        return self.database.connection()

    def cursor(self):
        return self.database.cursor()

    def delete(self):
        if self.exists() is False:                                  # if table doesn't exist
            return False                                            # return False
        self.cursor().table_delete(self.table_name)                 # delete table
        return self.exists() is False                               # confirm table does not exist

    def exists(self):
        return self.cursor().table_exists(self.table_name)

    def fields(self):
        return self.schema(index_by='name')

    @cache_on_self
    def fields__cached(self):
        return self.fields()

    def fields_data__from_raw_json(self, target_field):                     # todo: see if this should be refactored into a Select/Data filtering class
        fields_data = []
        for raw_field_data in self.select_field_values(target_field):
            fields_data.append(json_load(raw_field_data))
        return fields_data

    def fields_types__cached(self, exclude_id=False):
        fields_types = {}
        for field_name, field_data in self.fields__cached().items():
            if exclude_id and field_name == DEFAULT_FIELD_NAME__ID:
                continue
            sqlite_field_type = field_data['type']
            field_type = Sqlite__Field__Type.enum_map().get(sqlite_field_type)
            fields_types[field_name] = field_type
        return fields_types

    def fields_names__cached(self, exclude_id=False, include_star_field=False):
        field_names = list_set(self.fields__cached())
        if exclude_id:
            field_names.remove(DEFAULT_FIELD_NAME__ID)
        if include_star_field:
            field_names.append('*')
        return field_names

    def index_create(self, index_field):
        if index_field not in self.fields_names__cached():
            raise ValueError(f"in index_create, invalid target_field: {index_field}")

        index_name = self.index_name(index_field)
        sql_query = f'CREATE INDEX IF NOT EXISTS {index_name} ON {self.table_name}({index_field});'
        return self.cursor().execute_and_commit(sql_query)

    def index_delete(self, index_name):
        sql_query = f'DROP INDEX IF EXISTS {index_name};'
        return self.cursor().execute_and_commit(sql_query)

    def index_exists(self, index_field):
        index_name = self.index_name(index_field)
        return index_name in self.indexes()

    def index_name(self, index_field):
        return f'idx__{self.table_name}__{index_field}'

    def list_of_field_name_from_rows(self, rows, field_name):
        return [row[field_name] for row in rows]

    def indexes(self):
        field_name        = 'name'
        return_fields     = [field_name]
        table_sqlite_master = self.database.table__sqlite_master()
        table_type        = 'index'
        query_conditions  = {'type': table_type, 'tbl_name': self.table_name}
        sql_query, params = table_sqlite_master.sql_builder().query_select_fields_with_conditions(return_fields, query_conditions)
        rows              = table_sqlite_master.cursor().execute__fetch_all(sql_query, params)
        return table_sqlite_master.list_of_field_name_from_rows(rows, field_name)


    def new_row_obj(self, row_data=None):
        if self.row_schema:
            new_obj = self.row_schema()
            if row_data and ROW_BASE_CLASS in base_types(new_obj):
                row_data = self.parse_new_row_data(row_data)
                new_obj.update_from_kwargs(**row_data)
            return new_obj

    def not_exists(self):
        return self.exists() is False

    def parse_new_row_data(self, row_data):
        if row_data:
            if self.auto_pickle_blob:
                fields = self.fields__cached()
                picked_row_data = {}
                for field_name, field_value in row_data.items():
                    field_type = fields.get(field_name, {}).get('type')
                    if field_type == 'BLOB':
                        picked_row_data[field_name] = obj_to_bytes(field_value)
                    else:
                        picked_row_data[field_name] = field_value
                return picked_row_data
        return row_data

    def parse_row(self, row):
        if row and self.auto_pickle_blob:
            fields = self.fields__cached()
            for field_name, field_value in row.items():
                field_type = fields.get(field_name, {}).get('type')
                if field_type == 'BLOB':
                    row[field_name] = bytes_to_obj(field_value)
        return row

    def parse_rows(self, rows):
        return [self.parse_row(row) for row in rows]

    def print(self, **kwargs):
        return Print_Table(**kwargs).print(self.rows())

    def row(self, where, fields=None):
        if fields is None:
            return self.select_row_where(**where)

        sql_query, params = self.sql_builder(limit=1).query_select_fields_with_conditions(fields, where)
        row               = self.cursor().execute__fetch_one(sql_query, params)
        return self.parse_row(row)

    def row_add(self, row_obj=None):
        invalid_reason = self.sql_builder().validate_row_obj(row_obj)
        if invalid_reason:
            raise Exception(f"in row_add the provided row_obj is not valid: {invalid_reason}")
        return self.row_add_record(row_obj.__dict__)

    def row_add_and_commit(self, row_obj=None):
        if self.row_add(row_obj).get('status') == 'ok':
            self.commit()
            return row_obj                                      # this allows the original callers to see the actual object that was added to the table

    def row_add_record(self, record):
        validation_result = self.validate_record_with_schema(record)
        if validation_result:
            raise ValueError(f"row_add_record, validation_result for provided record failed with {validation_result}")

        sql_command,params = self.sql_builder().command_for_insert(record)
        return self.cursor().execute(sql_command, params)                    # Execute the SQL statement with the filtered data values

    def row_schema__create_from_current_field_types(self):
        exclude_field_id                 = True                                                                 # don't include the id field since in most cases the row_schema doesn't include it
        field_types                      = self.fields_types__cached(exclude_id=exclude_field_id)               # mapping with field name to field type (in python)
        caps_table_name                  = str_cap_snake_case(self.table_name)
        dynamic_class_name               = f'Row_Schema__{caps_table_name}'                                    # name that we will give to the dynamic class generated
        dynamic_class_dict               = { k: default_value(v) for k, v in field_types.items()}              # assign the field values its default value (for that type)
        dynamic_class_dict['__module__'] = SQL_TABLE__MODULE_NAME__ROW_SCHEMA                                            # set the module name
        Dynamic_Class                    = type(dynamic_class_name, (ROW_BASE_CLASS,), dynamic_class_dict)     # Create the dynamic class
        Dynamic_Class.__annotations__    = field_types                                                         # Set annotations of the new Dynamic_Class to be the mappings we have from field_types
        return Dynamic_Class                                                                                   # return the Dynamic class (whose fields should match the field_types)

    def row_schema__set_from_field_types(self):
        self.row_schema = self.row_schema__create_from_current_field_types()
        return self

    def row_update(self, update_fields, query_conditions ):
        sql_query, params = self.sql_builder().sql_query_update_with_conditions(update_fields, query_conditions)
        return self.cursor().execute_and_commit(sql_query, params)

    def rows(self, fields_names=None, limit=None):
        sql_query = self.sql_builder(limit=limit).query_for_fields(fields_names)
        rows = self.cursor().execute__fetch_all(sql_query)
        return self.parse_rows(rows)

    def rows_add(self, records, commit=True):           # todo: refactor to use row_add
        for record in records:
            if type(record) is dict:
                self.row_add_record(record)
            else:
                self.row_add(row_obj=record)
        if commit:
            self.cursor().commit()
        return self

    def rows_delete_where(self, **query_conditions):
        sql_query,params = self.sql_builder().command__delete_where(query_conditions)
        return self.cursor().execute_and_commit(sql_query,params)

    def select_row_where(self, **kwargs):
        rows = self.select_rows_where(**kwargs)                     # execute the query
        if len(rows) == 1:                                          # only return a result if there is one row
            return rows[0]
        return None                                                 # return None if there no match or more than one match

    def select_rows_where(self, **kwargs):
        sql_query, params = self.sql_builder().query_for_select_rows_where(**kwargs)
        rows = self.cursor().execute__fetch_all(sql_query, params)                      # Execute the query and return the results
        return self.parse_rows(rows)

    def select_rows_where_one(self, **kwargs):
        sql_query, params = self.sql_builder().query_for_select_rows_where(**kwargs)
        row = self.cursor().execute__fetch_one(sql_query, params)                      # Execute the query and return the results
        return self.parse_row(row)

    def select_field_values(self, field_name):
        if field_name not in self.fields__cached():
            raise ValueError(f'in select_all_vales_from_field, the provide field_name "{field_name}" does not exist in the current table "{self.table_name}"')

        sql_query  = self.sql_builder().query_for_fields([field_name])
        rows       = self.cursor().execute__fetch_all(sql_query)        # Execute the SQL query and get all rows
        all_rows   = self.parse_rows(rows)
        all_values = [row[field_name] for row in all_rows]              # Extract the desired field from each row in the result set
        return all_values

    @index_by
    def schema(self):
        return self.cursor().table_schema(self.table_name)

    @filter_list
    def schema__by_name_type(self):
        return {item.get('name'): item.get('type') for item in self.schema()}

    def size(self):
        sql_query = self.sql_builder().query_for_size()
        result = self.cursor().execute__fetch_one(sql_query)
        return result.get('size')

    @cache_on_self
    def sql_builder(self, limit=None):
        from osbot_utils.helpers.sqlite.sql_builder.SQL_Builder import SQL_Builder
        return SQL_Builder(table=self, limit=limit)

    def validate_record_with_schema(self, record):                                          # todo: refactor out to a validator class
        schema = self.fields__cached()

        extra_keys = [key for key in record if key not in schema]                           # Check for keys in record that are not in the schema
        if extra_keys:
            return f'Validation error: Unrecognized keys {extra_keys} in record.'
        return ''                                                                           # If we reach here, the record is valid

    # query helpers
    def contains(self, **kwargs):
        result = self.where_one(**kwargs)
        return result is not None

    def not_contains(self, **kwargs):
        return self.contains(**kwargs) is False

    def where(self, **kwargs):
        return self.select_rows_where(**kwargs)

    def where_one(self, **kwargs):
        return self.select_rows_where_one(**kwargs)

Classes

class Sqlite__Table (**kwargs)

A mixin class to strictly assign keyword arguments to pre-defined instance attributes during initialization.

This base class provides an init method that assigns values from keyword arguments to instance attributes. If an attribute with the same name as a key from the kwargs is defined in the class, it will be set to the value from kwargs. If the key does not match any predefined attribute names, an exception is raised.

This behavior enforces strict control over the attributes of instances, ensuring that only predefined attributes can be set at the time of instantiation and avoids silent attribute creation which can lead to bugs in the code.

Usage

class MyConfigurableClass(Kwargs_To_Self): attribute1 = 'default_value' attribute2 = True attribute3 : str attribute4 : list attribute4 : int = 42

# Other methods can be added here

Correctly override default values by passing keyword arguments

instance = MyConfigurableClass(attribute1='new_value', attribute2=False)

This will raise an exception as 'attribute3' is not predefined

instance = MyConfigurableClass(attribute3='invalid_attribute')

this will also assign the default value to any variable that has a type defined. In the example above the default values (mapped by default__kwargs and locals) will be: attribute1 = 'default_value' attribute2 = True attribute3 = '' # default value of str attribute4 = [] # default value of list attribute4 = 42 # defined value in the class

Note

It is important that all attributes which may be set at instantiation are predefined in the class. Failure to do so will result in an exception being raised.

Methods

init(**kwargs): The initializer that handles the assignment of keyword arguments to instance attributes. It enforces strict attribute assignment rules, only allowing attributes that are already defined in the class to be set.

Initialize an instance of the derived class, strictly assigning provided keyword arguments to corresponding instance attributes.

Parameters

**kwargs: Variable length keyword arguments.

Raises

Exception
If a key from kwargs does not correspond to any attribute pre-defined in the class, an exception is raised to prevent setting an undefined attribute.
Expand source code
class Sqlite__Table(Kwargs_To_Self):
    database        : Sqlite__Database
    table_name      : str
    row_schema      : type
    auto_pickle_blob: bool = False

    def __setattr__(self, key, value):
        if key =='table_name':                                                              # SQL injection protection
            if re.search(r'[^a-zA-Z0-9_-]',value):                                  # make sure table name cannot be used to inject SQL, str_safe uses r'[^a-zA-Z0-9_-]' regex, i.e. only allows letter, numbers and the chars - _
                raise ValueError( "Invalid table name. Table names can only contain alphanumeric characters, numbers, underscores, and hyphens.")

        super().__setattr__(key, value)

    def _table_create(self):                                                                         # todo: Sqlite__Table__Create needs to be refactored (since that was created before we had support for table_class )
        from osbot_utils.helpers.sqlite.Sqlite__Table__Create import Sqlite__Table__Create
        table_create       = Sqlite__Table__Create(self.table_name)                                  # todo: fix this workflow
        table_create.table = self
        return table_create                                                                            #       since it is weird to have to overwrite the table vale of Sqlite__Table__Create

    def add_row(self, **row_data):
        new_row  = self.new_row_obj(row_data)
        return self.row_add(new_row)

    def add_row_and_commit(self, **row_data):
        new_row  = self.new_row_obj(row_data)
        return self.row_add_and_commit(new_row)

    def clear(self):
        sql_query = self.sql_builder().command__delete_table()
        return self.cursor().execute_and_commit(sql_query)

    def create(self):
        table_create = self._table_create()
        return table_create.create_table__from_row_schema(self.row_schema)

    def commit(self):
        return self.cursor().commit()

    def connection(self):
        return self.database.connection()

    def cursor(self):
        return self.database.cursor()

    def delete(self):
        if self.exists() is False:                                  # if table doesn't exist
            return False                                            # return False
        self.cursor().table_delete(self.table_name)                 # delete table
        return self.exists() is False                               # confirm table does not exist

    def exists(self):
        return self.cursor().table_exists(self.table_name)

    def fields(self):
        return self.schema(index_by='name')

    @cache_on_self
    def fields__cached(self):
        return self.fields()

    def fields_data__from_raw_json(self, target_field):                     # todo: see if this should be refactored into a Select/Data filtering class
        fields_data = []
        for raw_field_data in self.select_field_values(target_field):
            fields_data.append(json_load(raw_field_data))
        return fields_data

    def fields_types__cached(self, exclude_id=False):
        fields_types = {}
        for field_name, field_data in self.fields__cached().items():
            if exclude_id and field_name == DEFAULT_FIELD_NAME__ID:
                continue
            sqlite_field_type = field_data['type']
            field_type = Sqlite__Field__Type.enum_map().get(sqlite_field_type)
            fields_types[field_name] = field_type
        return fields_types

    def fields_names__cached(self, exclude_id=False, include_star_field=False):
        field_names = list_set(self.fields__cached())
        if exclude_id:
            field_names.remove(DEFAULT_FIELD_NAME__ID)
        if include_star_field:
            field_names.append('*')
        return field_names

    def index_create(self, index_field):
        if index_field not in self.fields_names__cached():
            raise ValueError(f"in index_create, invalid target_field: {index_field}")

        index_name = self.index_name(index_field)
        sql_query = f'CREATE INDEX IF NOT EXISTS {index_name} ON {self.table_name}({index_field});'
        return self.cursor().execute_and_commit(sql_query)

    def index_delete(self, index_name):
        sql_query = f'DROP INDEX IF EXISTS {index_name};'
        return self.cursor().execute_and_commit(sql_query)

    def index_exists(self, index_field):
        index_name = self.index_name(index_field)
        return index_name in self.indexes()

    def index_name(self, index_field):
        return f'idx__{self.table_name}__{index_field}'

    def list_of_field_name_from_rows(self, rows, field_name):
        return [row[field_name] for row in rows]

    def indexes(self):
        field_name        = 'name'
        return_fields     = [field_name]
        table_sqlite_master = self.database.table__sqlite_master()
        table_type        = 'index'
        query_conditions  = {'type': table_type, 'tbl_name': self.table_name}
        sql_query, params = table_sqlite_master.sql_builder().query_select_fields_with_conditions(return_fields, query_conditions)
        rows              = table_sqlite_master.cursor().execute__fetch_all(sql_query, params)
        return table_sqlite_master.list_of_field_name_from_rows(rows, field_name)


    def new_row_obj(self, row_data=None):
        if self.row_schema:
            new_obj = self.row_schema()
            if row_data and ROW_BASE_CLASS in base_types(new_obj):
                row_data = self.parse_new_row_data(row_data)
                new_obj.update_from_kwargs(**row_data)
            return new_obj

    def not_exists(self):
        return self.exists() is False

    def parse_new_row_data(self, row_data):
        if row_data:
            if self.auto_pickle_blob:
                fields = self.fields__cached()
                picked_row_data = {}
                for field_name, field_value in row_data.items():
                    field_type = fields.get(field_name, {}).get('type')
                    if field_type == 'BLOB':
                        picked_row_data[field_name] = obj_to_bytes(field_value)
                    else:
                        picked_row_data[field_name] = field_value
                return picked_row_data
        return row_data

    def parse_row(self, row):
        if row and self.auto_pickle_blob:
            fields = self.fields__cached()
            for field_name, field_value in row.items():
                field_type = fields.get(field_name, {}).get('type')
                if field_type == 'BLOB':
                    row[field_name] = bytes_to_obj(field_value)
        return row

    def parse_rows(self, rows):
        return [self.parse_row(row) for row in rows]

    def print(self, **kwargs):
        return Print_Table(**kwargs).print(self.rows())

    def row(self, where, fields=None):
        if fields is None:
            return self.select_row_where(**where)

        sql_query, params = self.sql_builder(limit=1).query_select_fields_with_conditions(fields, where)
        row               = self.cursor().execute__fetch_one(sql_query, params)
        return self.parse_row(row)

    def row_add(self, row_obj=None):
        invalid_reason = self.sql_builder().validate_row_obj(row_obj)
        if invalid_reason:
            raise Exception(f"in row_add the provided row_obj is not valid: {invalid_reason}")
        return self.row_add_record(row_obj.__dict__)

    def row_add_and_commit(self, row_obj=None):
        if self.row_add(row_obj).get('status') == 'ok':
            self.commit()
            return row_obj                                      # this allows the original callers to see the actual object that was added to the table

    def row_add_record(self, record):
        validation_result = self.validate_record_with_schema(record)
        if validation_result:
            raise ValueError(f"row_add_record, validation_result for provided record failed with {validation_result}")

        sql_command,params = self.sql_builder().command_for_insert(record)
        return self.cursor().execute(sql_command, params)                    # Execute the SQL statement with the filtered data values

    def row_schema__create_from_current_field_types(self):
        exclude_field_id                 = True                                                                 # don't include the id field since in most cases the row_schema doesn't include it
        field_types                      = self.fields_types__cached(exclude_id=exclude_field_id)               # mapping with field name to field type (in python)
        caps_table_name                  = str_cap_snake_case(self.table_name)
        dynamic_class_name               = f'Row_Schema__{caps_table_name}'                                    # name that we will give to the dynamic class generated
        dynamic_class_dict               = { k: default_value(v) for k, v in field_types.items()}              # assign the field values its default value (for that type)
        dynamic_class_dict['__module__'] = SQL_TABLE__MODULE_NAME__ROW_SCHEMA                                            # set the module name
        Dynamic_Class                    = type(dynamic_class_name, (ROW_BASE_CLASS,), dynamic_class_dict)     # Create the dynamic class
        Dynamic_Class.__annotations__    = field_types                                                         # Set annotations of the new Dynamic_Class to be the mappings we have from field_types
        return Dynamic_Class                                                                                   # return the Dynamic class (whose fields should match the field_types)

    def row_schema__set_from_field_types(self):
        self.row_schema = self.row_schema__create_from_current_field_types()
        return self

    def row_update(self, update_fields, query_conditions ):
        sql_query, params = self.sql_builder().sql_query_update_with_conditions(update_fields, query_conditions)
        return self.cursor().execute_and_commit(sql_query, params)

    def rows(self, fields_names=None, limit=None):
        sql_query = self.sql_builder(limit=limit).query_for_fields(fields_names)
        rows = self.cursor().execute__fetch_all(sql_query)
        return self.parse_rows(rows)

    def rows_add(self, records, commit=True):           # todo: refactor to use row_add
        for record in records:
            if type(record) is dict:
                self.row_add_record(record)
            else:
                self.row_add(row_obj=record)
        if commit:
            self.cursor().commit()
        return self

    def rows_delete_where(self, **query_conditions):
        sql_query,params = self.sql_builder().command__delete_where(query_conditions)
        return self.cursor().execute_and_commit(sql_query,params)

    def select_row_where(self, **kwargs):
        rows = self.select_rows_where(**kwargs)                     # execute the query
        if len(rows) == 1:                                          # only return a result if there is one row
            return rows[0]
        return None                                                 # return None if there no match or more than one match

    def select_rows_where(self, **kwargs):
        sql_query, params = self.sql_builder().query_for_select_rows_where(**kwargs)
        rows = self.cursor().execute__fetch_all(sql_query, params)                      # Execute the query and return the results
        return self.parse_rows(rows)

    def select_rows_where_one(self, **kwargs):
        sql_query, params = self.sql_builder().query_for_select_rows_where(**kwargs)
        row = self.cursor().execute__fetch_one(sql_query, params)                      # Execute the query and return the results
        return self.parse_row(row)

    def select_field_values(self, field_name):
        if field_name not in self.fields__cached():
            raise ValueError(f'in select_all_vales_from_field, the provide field_name "{field_name}" does not exist in the current table "{self.table_name}"')

        sql_query  = self.sql_builder().query_for_fields([field_name])
        rows       = self.cursor().execute__fetch_all(sql_query)        # Execute the SQL query and get all rows
        all_rows   = self.parse_rows(rows)
        all_values = [row[field_name] for row in all_rows]              # Extract the desired field from each row in the result set
        return all_values

    @index_by
    def schema(self):
        return self.cursor().table_schema(self.table_name)

    @filter_list
    def schema__by_name_type(self):
        return {item.get('name'): item.get('type') for item in self.schema()}

    def size(self):
        sql_query = self.sql_builder().query_for_size()
        result = self.cursor().execute__fetch_one(sql_query)
        return result.get('size')

    @cache_on_self
    def sql_builder(self, limit=None):
        from osbot_utils.helpers.sqlite.sql_builder.SQL_Builder import SQL_Builder
        return SQL_Builder(table=self, limit=limit)

    def validate_record_with_schema(self, record):                                          # todo: refactor out to a validator class
        schema = self.fields__cached()

        extra_keys = [key for key in record if key not in schema]                           # Check for keys in record that are not in the schema
        if extra_keys:
            return f'Validation error: Unrecognized keys {extra_keys} in record.'
        return ''                                                                           # If we reach here, the record is valid

    # query helpers
    def contains(self, **kwargs):
        result = self.where_one(**kwargs)
        return result is not None

    def not_contains(self, **kwargs):
        return self.contains(**kwargs) is False

    def where(self, **kwargs):
        return self.select_rows_where(**kwargs)

    def where_one(self, **kwargs):
        return self.select_rows_where_one(**kwargs)

Ancestors

Subclasses

Class variables

var auto_pickle_blob : bool
var databaseSqlite__Database
var row_schema : type
var table_name : str

Methods

def add_row(self, **row_data)
Expand source code
def add_row(self, **row_data):
    new_row  = self.new_row_obj(row_data)
    return self.row_add(new_row)
def add_row_and_commit(self, **row_data)
Expand source code
def add_row_and_commit(self, **row_data):
    new_row  = self.new_row_obj(row_data)
    return self.row_add_and_commit(new_row)
def clear(self)
Expand source code
def clear(self):
    sql_query = self.sql_builder().command__delete_table()
    return self.cursor().execute_and_commit(sql_query)
def commit(self)
Expand source code
def commit(self):
    return self.cursor().commit()
def connection(self)
Expand source code
def connection(self):
    return self.database.connection()
def contains(self, **kwargs)
Expand source code
def contains(self, **kwargs):
    result = self.where_one(**kwargs)
    return result is not None
def create(self)
Expand source code
def create(self):
    table_create = self._table_create()
    return table_create.create_table__from_row_schema(self.row_schema)
def cursor(self)
Expand source code
def cursor(self):
    return self.database.cursor()
def delete(self)
Expand source code
def delete(self):
    if self.exists() is False:                                  # if table doesn't exist
        return False                                            # return False
    self.cursor().table_delete(self.table_name)                 # delete table
    return self.exists() is False                               # confirm table does not exist
def exists(self)
Expand source code
def exists(self):
    return self.cursor().table_exists(self.table_name)
def fields(self)
Expand source code
def fields(self):
    return self.schema(index_by='name')
def fields__cached(self)
Expand source code
@cache_on_self
def fields__cached(self):
    return self.fields()
def fields_data__from_raw_json(self, target_field)
Expand source code
def fields_data__from_raw_json(self, target_field):                     # todo: see if this should be refactored into a Select/Data filtering class
    fields_data = []
    for raw_field_data in self.select_field_values(target_field):
        fields_data.append(json_load(raw_field_data))
    return fields_data
def fields_names__cached(self, exclude_id=False, include_star_field=False)
Expand source code
def fields_names__cached(self, exclude_id=False, include_star_field=False):
    field_names = list_set(self.fields__cached())
    if exclude_id:
        field_names.remove(DEFAULT_FIELD_NAME__ID)
    if include_star_field:
        field_names.append('*')
    return field_names
def fields_types__cached(self, exclude_id=False)
Expand source code
def fields_types__cached(self, exclude_id=False):
    fields_types = {}
    for field_name, field_data in self.fields__cached().items():
        if exclude_id and field_name == DEFAULT_FIELD_NAME__ID:
            continue
        sqlite_field_type = field_data['type']
        field_type = Sqlite__Field__Type.enum_map().get(sqlite_field_type)
        fields_types[field_name] = field_type
    return fields_types
def index_create(self, index_field)
Expand source code
def index_create(self, index_field):
    if index_field not in self.fields_names__cached():
        raise ValueError(f"in index_create, invalid target_field: {index_field}")

    index_name = self.index_name(index_field)
    sql_query = f'CREATE INDEX IF NOT EXISTS {index_name} ON {self.table_name}({index_field});'
    return self.cursor().execute_and_commit(sql_query)
def index_delete(self, index_name)
Expand source code
def index_delete(self, index_name):
    sql_query = f'DROP INDEX IF EXISTS {index_name};'
    return self.cursor().execute_and_commit(sql_query)
def index_exists(self, index_field)
Expand source code
def index_exists(self, index_field):
    index_name = self.index_name(index_field)
    return index_name in self.indexes()
def index_name(self, index_field)
Expand source code
def index_name(self, index_field):
    return f'idx__{self.table_name}__{index_field}'
def indexes(self)
Expand source code
def indexes(self):
    field_name        = 'name'
    return_fields     = [field_name]
    table_sqlite_master = self.database.table__sqlite_master()
    table_type        = 'index'
    query_conditions  = {'type': table_type, 'tbl_name': self.table_name}
    sql_query, params = table_sqlite_master.sql_builder().query_select_fields_with_conditions(return_fields, query_conditions)
    rows              = table_sqlite_master.cursor().execute__fetch_all(sql_query, params)
    return table_sqlite_master.list_of_field_name_from_rows(rows, field_name)
def list_of_field_name_from_rows(self, rows, field_name)
Expand source code
def list_of_field_name_from_rows(self, rows, field_name):
    return [row[field_name] for row in rows]
def new_row_obj(self, row_data=None)
Expand source code
def new_row_obj(self, row_data=None):
    if self.row_schema:
        new_obj = self.row_schema()
        if row_data and ROW_BASE_CLASS in base_types(new_obj):
            row_data = self.parse_new_row_data(row_data)
            new_obj.update_from_kwargs(**row_data)
        return new_obj
def not_contains(self, **kwargs)
Expand source code
def not_contains(self, **kwargs):
    return self.contains(**kwargs) is False
def not_exists(self)
Expand source code
def not_exists(self):
    return self.exists() is False
def parse_new_row_data(self, row_data)
Expand source code
def parse_new_row_data(self, row_data):
    if row_data:
        if self.auto_pickle_blob:
            fields = self.fields__cached()
            picked_row_data = {}
            for field_name, field_value in row_data.items():
                field_type = fields.get(field_name, {}).get('type')
                if field_type == 'BLOB':
                    picked_row_data[field_name] = obj_to_bytes(field_value)
                else:
                    picked_row_data[field_name] = field_value
            return picked_row_data
    return row_data
def parse_row(self, row)
Expand source code
def parse_row(self, row):
    if row and self.auto_pickle_blob:
        fields = self.fields__cached()
        for field_name, field_value in row.items():
            field_type = fields.get(field_name, {}).get('type')
            if field_type == 'BLOB':
                row[field_name] = bytes_to_obj(field_value)
    return row
def parse_rows(self, rows)
Expand source code
def parse_rows(self, rows):
    return [self.parse_row(row) for row in rows]
def print(self, **kwargs)
Expand source code
def print(self, **kwargs):
    return Print_Table(**kwargs).print(self.rows())
def row(self, where, fields=None)
Expand source code
def row(self, where, fields=None):
    if fields is None:
        return self.select_row_where(**where)

    sql_query, params = self.sql_builder(limit=1).query_select_fields_with_conditions(fields, where)
    row               = self.cursor().execute__fetch_one(sql_query, params)
    return self.parse_row(row)
def row_add(self, row_obj=None)
Expand source code
def row_add(self, row_obj=None):
    invalid_reason = self.sql_builder().validate_row_obj(row_obj)
    if invalid_reason:
        raise Exception(f"in row_add the provided row_obj is not valid: {invalid_reason}")
    return self.row_add_record(row_obj.__dict__)
def row_add_and_commit(self, row_obj=None)
Expand source code
def row_add_and_commit(self, row_obj=None):
    if self.row_add(row_obj).get('status') == 'ok':
        self.commit()
        return row_obj                                      # this allows the original callers to see the actual object that was added to the table
def row_add_record(self, record)
Expand source code
def row_add_record(self, record):
    validation_result = self.validate_record_with_schema(record)
    if validation_result:
        raise ValueError(f"row_add_record, validation_result for provided record failed with {validation_result}")

    sql_command,params = self.sql_builder().command_for_insert(record)
    return self.cursor().execute(sql_command, params)                    # Execute the SQL statement with the filtered data values
def row_schema__create_from_current_field_types(self)
Expand source code
def row_schema__create_from_current_field_types(self):
    exclude_field_id                 = True                                                                 # don't include the id field since in most cases the row_schema doesn't include it
    field_types                      = self.fields_types__cached(exclude_id=exclude_field_id)               # mapping with field name to field type (in python)
    caps_table_name                  = str_cap_snake_case(self.table_name)
    dynamic_class_name               = f'Row_Schema__{caps_table_name}'                                    # name that we will give to the dynamic class generated
    dynamic_class_dict               = { k: default_value(v) for k, v in field_types.items()}              # assign the field values its default value (for that type)
    dynamic_class_dict['__module__'] = SQL_TABLE__MODULE_NAME__ROW_SCHEMA                                            # set the module name
    Dynamic_Class                    = type(dynamic_class_name, (ROW_BASE_CLASS,), dynamic_class_dict)     # Create the dynamic class
    Dynamic_Class.__annotations__    = field_types                                                         # Set annotations of the new Dynamic_Class to be the mappings we have from field_types
    return Dynamic_Class                                                                                   # return the Dynamic class (whose fields should match the field_types)
def row_schema__set_from_field_types(self)
Expand source code
def row_schema__set_from_field_types(self):
    self.row_schema = self.row_schema__create_from_current_field_types()
    return self
def row_update(self, update_fields, query_conditions)
Expand source code
def row_update(self, update_fields, query_conditions ):
    sql_query, params = self.sql_builder().sql_query_update_with_conditions(update_fields, query_conditions)
    return self.cursor().execute_and_commit(sql_query, params)
def rows(self, fields_names=None, limit=None)
Expand source code
def rows(self, fields_names=None, limit=None):
    sql_query = self.sql_builder(limit=limit).query_for_fields(fields_names)
    rows = self.cursor().execute__fetch_all(sql_query)
    return self.parse_rows(rows)
def rows_add(self, records, commit=True)
Expand source code
def rows_add(self, records, commit=True):           # todo: refactor to use row_add
    for record in records:
        if type(record) is dict:
            self.row_add_record(record)
        else:
            self.row_add(row_obj=record)
    if commit:
        self.cursor().commit()
    return self
def rows_delete_where(self, **query_conditions)
Expand source code
def rows_delete_where(self, **query_conditions):
    sql_query,params = self.sql_builder().command__delete_where(query_conditions)
    return self.cursor().execute_and_commit(sql_query,params)
def schema(self)
Expand source code
@index_by
def schema(self):
    return self.cursor().table_schema(self.table_name)
def schema__by_name_type(self)
Expand source code
@filter_list
def schema__by_name_type(self):
    return {item.get('name'): item.get('type') for item in self.schema()}
def select_field_values(self, field_name)
Expand source code
def select_field_values(self, field_name):
    if field_name not in self.fields__cached():
        raise ValueError(f'in select_all_vales_from_field, the provide field_name "{field_name}" does not exist in the current table "{self.table_name}"')

    sql_query  = self.sql_builder().query_for_fields([field_name])
    rows       = self.cursor().execute__fetch_all(sql_query)        # Execute the SQL query and get all rows
    all_rows   = self.parse_rows(rows)
    all_values = [row[field_name] for row in all_rows]              # Extract the desired field from each row in the result set
    return all_values
def select_row_where(self, **kwargs)
Expand source code
def select_row_where(self, **kwargs):
    rows = self.select_rows_where(**kwargs)                     # execute the query
    if len(rows) == 1:                                          # only return a result if there is one row
        return rows[0]
    return None                                                 # return None if there no match or more than one match
def select_rows_where(self, **kwargs)
Expand source code
def select_rows_where(self, **kwargs):
    sql_query, params = self.sql_builder().query_for_select_rows_where(**kwargs)
    rows = self.cursor().execute__fetch_all(sql_query, params)                      # Execute the query and return the results
    return self.parse_rows(rows)
def select_rows_where_one(self, **kwargs)
Expand source code
def select_rows_where_one(self, **kwargs):
    sql_query, params = self.sql_builder().query_for_select_rows_where(**kwargs)
    row = self.cursor().execute__fetch_one(sql_query, params)                      # Execute the query and return the results
    return self.parse_row(row)
def size(self)
Expand source code
def size(self):
    sql_query = self.sql_builder().query_for_size()
    result = self.cursor().execute__fetch_one(sql_query)
    return result.get('size')
def sql_builder(self, limit=None)
Expand source code
@cache_on_self
def sql_builder(self, limit=None):
    from osbot_utils.helpers.sqlite.sql_builder.SQL_Builder import SQL_Builder
    return SQL_Builder(table=self, limit=limit)
def validate_record_with_schema(self, record)
Expand source code
def validate_record_with_schema(self, record):                                          # todo: refactor out to a validator class
    schema = self.fields__cached()

    extra_keys = [key for key in record if key not in schema]                           # Check for keys in record that are not in the schema
    if extra_keys:
        return f'Validation error: Unrecognized keys {extra_keys} in record.'
    return ''                                                                           # If we reach here, the record is valid
def where(self, **kwargs)
Expand source code
def where(self, **kwargs):
    return self.select_rows_where(**kwargs)
def where_one(self, **kwargs)
Expand source code
def where_one(self, **kwargs):
    return self.select_rows_where_one(**kwargs)

Inherited members