Module osbot_utils.helpers.sqlite.sql_builder.SQL_Builder

Expand source code
from osbot_utils.base_classes.Kwargs_To_Self import Kwargs_To_Self
from osbot_utils.helpers.sqlite.Sqlite__Globals import ROW_BASE_CLASS
from osbot_utils.helpers.sqlite.Sqlite__Table import Sqlite__Table


class SQL_Builder(Kwargs_To_Self):
    table      : Sqlite__Table
    limit      : int            = None                  # set it to None to make it explict that the limit is not set

    def validate_query_data(self):
        if self.table.row_schema is None:
            raise ValueError("in SQL_Builder, there was no row_schema defined in the mapped table")

    def select_for_fields(self,  field_names: list = None):
        valid_fields = self.table.fields_names__cached()
        if field_names is None:
            field_names = valid_fields
        elif isinstance(field_names, list) is False:
            raise ValueError(f"in sql_query_for_fields, field_names must be a list, it was :{field_names}")

        invalid_field_names = [name for name in field_names if name not in valid_fields]    # If no valid field names are provided, raise an error or return a default query
        if invalid_field_names:                                                             # If there are any invalid field names, raise an exception listing them
            message = f"Invalid field names provided: {', '.join(invalid_field_names)}"
            raise ValueError(message)

        fields_str = ', '.join(field_names)                         # Construct the SQL query string
        sql_query = f"SELECT {fields_str} FROM {self.table.table_name}{self.sql_limit()};"  # Join the valid field names with commas
        return sql_query

    def command__delete_table(self):
        return f'DELETE FROM {self.table.table_name}'

    def command__delete_where(self, query_conditions):
        self.validator().validate_query_fields(self.table, [],query_conditions)      # todo: add method to validate_query_fields to handle just the query section (so that we don't need to provide a empty list for the return values)
        target_table = self.table.table_name
        where_fields = list(query_conditions.keys())
        params       = list(query_conditions.values())
        where_clause = ' AND '.join([f"{field}=?" for field in where_fields])                   # todo: refactor into method that just handle the WHERE statements
        sql_query    = f"DELETE FROM {target_table} WHERE {where_clause}"                       # todo: refactor into a method that creates the final statement from a number of other objects (or strings)
        return sql_query, params

    def command_for_insert(self, record):
        valid_field_names = self.table.fields_names__cached()
        if type(record) is dict:
            if record:
                field_names   = record.keys()
                params        =  list(record.values())
                for field_name in field_names:
                    if field_name not in valid_field_names:
                        raise ValueError(f'in sql_command_for_insert, there was a field_name "{field_name}" that did exist in the current table')

                columns       = ', '.join(field_names)                                                         # Construct column names and placeholders
                placeholders  = ', '.join(['?' for _ in field_names])
                sql_command   = f'INSERT INTO {self.table.table_name} ({columns}) VALUES ({placeholders})'    # Construct the SQL statement
                return sql_command, params

    def query_for_fields(self, field_names: list = None):
        return self.select_for_fields(field_names)

    def query_for_size(self):
        return f'SELECT COUNT(*) as size FROM {self.table.table_name}'

    def sql_limit(self):
        if self.limit is not None:
            return f" LIMIT {int(self.limit)}"
        return ""

    def query_select_fields_with_conditions(self, return_fields, query_conditions):
        self.validator().validate_query_fields(self.table , return_fields, query_conditions)
        target_table = self.table.table_name
        if target_table and return_fields and query_conditions:
            where_fields     = list(query_conditions.keys())
            params           = list(query_conditions.values())
            fields_to_return = ', '.join(return_fields)                                               # Join the select_fields list into a comma-separated string
            where_clause     = ' AND '.join([f"{field}=?" for field in where_fields])                 # Dynamically construct the WHERE clause based on condition_fields
            sql_query        = f"SELECT {fields_to_return} FROM {target_table} WHERE {where_clause}"  # Construct the full SQL query
            return sql_query, params

    def query_for_select_rows_where(self, **kwargs):
        valid_fields  = self.table.fields__cached()                                                               # Get a list of valid field names from the cached schema
        params        = []                                                                                  # Initialize an empty list to hold query parameters
        where_clauses = []                                                                                  # Initialize an empty list to hold parts of the WHERE clause
        for field_name, query_value in kwargs.items():                                                      # Iterate over each keyword argument and its value
            if field_name not in valid_fields:                                                              # Check if the provided field name is valid
                raise ValueError(f'in select_rows_where, the provided field is not valid: {field_name}')
            params.append(query_value)                                                                      # Append the query value to the parameters list
            where_clauses.append(f"{field_name} = ?")                                                       # Append the corresponding WHERE clause part, using a placeholder for the value
        where_clause = ' AND '.join(where_clauses)                                                          # Join the individual parts of the WHERE clause with 'AND'

        sql_query = f"SELECT * FROM {self.table.table_name} WHERE {where_clause}" # Construct the full SQL query
        return sql_query, params

    def sql_query_update_with_conditions(self, update_fields, query_conditions):
        update_keys     = list(update_fields.keys())            # todo: refactor self.validate_query_fields to use a more generic value for these fields
        condition_keys  = list(query_conditions.keys())
        self.validator().validate_query_fields(self.table, update_keys, query_conditions)
        target_table = self.table.table_name
        if target_table and update_fields and query_conditions:
            update_clause   = ', '.join([f"{key}=?" for key in update_keys])
            where_clause    = ' AND '.join([f"{field}=?" for field in condition_keys])
            sql_query       = f"UPDATE {target_table} SET {update_clause} WHERE {where_clause}"

            # The parameters for the SQL execution must include both the update values and the condition values, in the correct order.
            params = list(update_fields.values()) + list(query_conditions.values())
            return sql_query, params

    def validate_row_obj(self, row_obj):
        field_types = self.table.fields_types__cached()
        invalid_reason = ""
        if self.table.row_schema:
            if row_obj:
                if issubclass(type(row_obj), ROW_BASE_CLASS):
                    for field_name, field_type in row_obj.__annotations__.items():
                        if field_name not in field_types:
                            invalid_reason = f'provided row_obj has a field that is not part of the current table: {field_name}'
                            break

                        if field_type != field_types[field_name]:
                            invalid_reason = f'provided row_obj has a field {field_name} that has a field type {field_type} that does not match the current tables type of that field: {field_types[field_name]}'
                            break
                    if invalid_reason  == '':
                        for field_name, field_value in row_obj.__locals__().items():
                            if field_name not in field_types:
                                invalid_reason = f'provided row_obj has a field that is not part of the current table: {field_name}'
                                break
                            if type(field_value) != field_types.get(field_name):
                                invalid_reason = f'provided row_obj has a field {field_name} that has a field value {field_value} value that has a type {type(field_value)} that does not match the current tables type of that field: {field_types.get(field_name)}'
                                break
                else:
                    invalid_reason = f'provided row_obj ({type(row_obj)}) is not a subclass of {ROW_BASE_CLASS}'
            else:
                invalid_reason = f'provided row_obj was None'
        else:
            invalid_reason = f'there is no row_schema defined for this table {self.table.table_name}'
        return invalid_reason

    def validator(self):
        return SQL_Query__Validator()

class SQL_Query__Validator:

    # todo: refactor this method to use a more generic value for these return_fields since it is already being used in two use cases: return fields and update fields
    def validate_query_fields(self, table, return_fields, query_conditions):
        target_table = table.table_name
        valid_fields = table.fields_names__cached(include_star_field=True)
        if target_table not in table.database.tables_names(include_sqlite_master=True):
            raise ValueError(f'in validate_query_fields, invalid target_table name: "{target_table}"')
        if type(return_fields) is not list:
            raise ValueError(f'in validate_query_fields, return_fields value must be a list, and it was "{type(return_fields)}"')
        for return_field in return_fields:
            if return_field not in valid_fields:
                raise ValueError(f'in validate_query_fields, invalid, invalid return_field: "{return_field}"')
        if type(query_conditions) is not dict:
            raise ValueError(f'in validate_query_fields, query_conditions value must be a dict, and it was "{type(query_conditions)}"')
        for where_field in query_conditions.keys():
            if where_field not in valid_fields:
                raise ValueError(f'in validate_query_fields, invalid, invalid return_field: "{where_field}"')

        return self

Classes

class SQL_Builder (**kwargs)

A mixin class to strictly assign keyword arguments to pre-defined instance attributes during initialization.

This base class provides an init method that assigns values from keyword arguments to instance attributes. If an attribute with the same name as a key from the kwargs is defined in the class, it will be set to the value from kwargs. If the key does not match any predefined attribute names, an exception is raised.

This behavior enforces strict control over the attributes of instances, ensuring that only predefined attributes can be set at the time of instantiation and avoids silent attribute creation which can lead to bugs in the code.

Usage

class MyConfigurableClass(Kwargs_To_Self): attribute1 = 'default_value' attribute2 = True attribute3 : str attribute4 : list attribute4 : int = 42

# Other methods can be added here

Correctly override default values by passing keyword arguments

instance = MyConfigurableClass(attribute1='new_value', attribute2=False)

This will raise an exception as 'attribute3' is not predefined

instance = MyConfigurableClass(attribute3='invalid_attribute')

this will also assign the default value to any variable that has a type defined. In the example above the default values (mapped by default__kwargs and locals) will be: attribute1 = 'default_value' attribute2 = True attribute3 = '' # default value of str attribute4 = [] # default value of list attribute4 = 42 # defined value in the class

Note

It is important that all attributes which may be set at instantiation are predefined in the class. Failure to do so will result in an exception being raised.

Methods

init(**kwargs): The initializer that handles the assignment of keyword arguments to instance attributes. It enforces strict attribute assignment rules, only allowing attributes that are already defined in the class to be set.

Initialize an instance of the derived class, strictly assigning provided keyword arguments to corresponding instance attributes.

Parameters

**kwargs: Variable length keyword arguments.

Raises

Exception
If a key from kwargs does not correspond to any attribute pre-defined in the class, an exception is raised to prevent setting an undefined attribute.
Expand source code
class SQL_Builder(Kwargs_To_Self):
    table      : Sqlite__Table
    limit      : int            = None                  # set it to None to make it explict that the limit is not set

    def validate_query_data(self):
        if self.table.row_schema is None:
            raise ValueError("in SQL_Builder, there was no row_schema defined in the mapped table")

    def select_for_fields(self,  field_names: list = None):
        valid_fields = self.table.fields_names__cached()
        if field_names is None:
            field_names = valid_fields
        elif isinstance(field_names, list) is False:
            raise ValueError(f"in sql_query_for_fields, field_names must be a list, it was :{field_names}")

        invalid_field_names = [name for name in field_names if name not in valid_fields]    # If no valid field names are provided, raise an error or return a default query
        if invalid_field_names:                                                             # If there are any invalid field names, raise an exception listing them
            message = f"Invalid field names provided: {', '.join(invalid_field_names)}"
            raise ValueError(message)

        fields_str = ', '.join(field_names)                         # Construct the SQL query string
        sql_query = f"SELECT {fields_str} FROM {self.table.table_name}{self.sql_limit()};"  # Join the valid field names with commas
        return sql_query

    def command__delete_table(self):
        return f'DELETE FROM {self.table.table_name}'

    def command__delete_where(self, query_conditions):
        self.validator().validate_query_fields(self.table, [],query_conditions)      # todo: add method to validate_query_fields to handle just the query section (so that we don't need to provide a empty list for the return values)
        target_table = self.table.table_name
        where_fields = list(query_conditions.keys())
        params       = list(query_conditions.values())
        where_clause = ' AND '.join([f"{field}=?" for field in where_fields])                   # todo: refactor into method that just handle the WHERE statements
        sql_query    = f"DELETE FROM {target_table} WHERE {where_clause}"                       # todo: refactor into a method that creates the final statement from a number of other objects (or strings)
        return sql_query, params

    def command_for_insert(self, record):
        valid_field_names = self.table.fields_names__cached()
        if type(record) is dict:
            if record:
                field_names   = record.keys()
                params        =  list(record.values())
                for field_name in field_names:
                    if field_name not in valid_field_names:
                        raise ValueError(f'in sql_command_for_insert, there was a field_name "{field_name}" that did exist in the current table')

                columns       = ', '.join(field_names)                                                         # Construct column names and placeholders
                placeholders  = ', '.join(['?' for _ in field_names])
                sql_command   = f'INSERT INTO {self.table.table_name} ({columns}) VALUES ({placeholders})'    # Construct the SQL statement
                return sql_command, params

    def query_for_fields(self, field_names: list = None):
        return self.select_for_fields(field_names)

    def query_for_size(self):
        return f'SELECT COUNT(*) as size FROM {self.table.table_name}'

    def sql_limit(self):
        if self.limit is not None:
            return f" LIMIT {int(self.limit)}"
        return ""

    def query_select_fields_with_conditions(self, return_fields, query_conditions):
        self.validator().validate_query_fields(self.table , return_fields, query_conditions)
        target_table = self.table.table_name
        if target_table and return_fields and query_conditions:
            where_fields     = list(query_conditions.keys())
            params           = list(query_conditions.values())
            fields_to_return = ', '.join(return_fields)                                               # Join the select_fields list into a comma-separated string
            where_clause     = ' AND '.join([f"{field}=?" for field in where_fields])                 # Dynamically construct the WHERE clause based on condition_fields
            sql_query        = f"SELECT {fields_to_return} FROM {target_table} WHERE {where_clause}"  # Construct the full SQL query
            return sql_query, params

    def query_for_select_rows_where(self, **kwargs):
        valid_fields  = self.table.fields__cached()                                                               # Get a list of valid field names from the cached schema
        params        = []                                                                                  # Initialize an empty list to hold query parameters
        where_clauses = []                                                                                  # Initialize an empty list to hold parts of the WHERE clause
        for field_name, query_value in kwargs.items():                                                      # Iterate over each keyword argument and its value
            if field_name not in valid_fields:                                                              # Check if the provided field name is valid
                raise ValueError(f'in select_rows_where, the provided field is not valid: {field_name}')
            params.append(query_value)                                                                      # Append the query value to the parameters list
            where_clauses.append(f"{field_name} = ?")                                                       # Append the corresponding WHERE clause part, using a placeholder for the value
        where_clause = ' AND '.join(where_clauses)                                                          # Join the individual parts of the WHERE clause with 'AND'

        sql_query = f"SELECT * FROM {self.table.table_name} WHERE {where_clause}" # Construct the full SQL query
        return sql_query, params

    def sql_query_update_with_conditions(self, update_fields, query_conditions):
        update_keys     = list(update_fields.keys())            # todo: refactor self.validate_query_fields to use a more generic value for these fields
        condition_keys  = list(query_conditions.keys())
        self.validator().validate_query_fields(self.table, update_keys, query_conditions)
        target_table = self.table.table_name
        if target_table and update_fields and query_conditions:
            update_clause   = ', '.join([f"{key}=?" for key in update_keys])
            where_clause    = ' AND '.join([f"{field}=?" for field in condition_keys])
            sql_query       = f"UPDATE {target_table} SET {update_clause} WHERE {where_clause}"

            # The parameters for the SQL execution must include both the update values and the condition values, in the correct order.
            params = list(update_fields.values()) + list(query_conditions.values())
            return sql_query, params

    def validate_row_obj(self, row_obj):
        field_types = self.table.fields_types__cached()
        invalid_reason = ""
        if self.table.row_schema:
            if row_obj:
                if issubclass(type(row_obj), ROW_BASE_CLASS):
                    for field_name, field_type in row_obj.__annotations__.items():
                        if field_name not in field_types:
                            invalid_reason = f'provided row_obj has a field that is not part of the current table: {field_name}'
                            break

                        if field_type != field_types[field_name]:
                            invalid_reason = f'provided row_obj has a field {field_name} that has a field type {field_type} that does not match the current tables type of that field: {field_types[field_name]}'
                            break
                    if invalid_reason  == '':
                        for field_name, field_value in row_obj.__locals__().items():
                            if field_name not in field_types:
                                invalid_reason = f'provided row_obj has a field that is not part of the current table: {field_name}'
                                break
                            if type(field_value) != field_types.get(field_name):
                                invalid_reason = f'provided row_obj has a field {field_name} that has a field value {field_value} value that has a type {type(field_value)} that does not match the current tables type of that field: {field_types.get(field_name)}'
                                break
                else:
                    invalid_reason = f'provided row_obj ({type(row_obj)}) is not a subclass of {ROW_BASE_CLASS}'
            else:
                invalid_reason = f'provided row_obj was None'
        else:
            invalid_reason = f'there is no row_schema defined for this table {self.table.table_name}'
        return invalid_reason

    def validator(self):
        return SQL_Query__Validator()

Ancestors

Subclasses

Class variables

var limit : int
var tableSqlite__Table

Methods

def command__delete_table(self)
Expand source code
def command__delete_table(self):
    return f'DELETE FROM {self.table.table_name}'
def command__delete_where(self, query_conditions)
Expand source code
def command__delete_where(self, query_conditions):
    self.validator().validate_query_fields(self.table, [],query_conditions)      # todo: add method to validate_query_fields to handle just the query section (so that we don't need to provide a empty list for the return values)
    target_table = self.table.table_name
    where_fields = list(query_conditions.keys())
    params       = list(query_conditions.values())
    where_clause = ' AND '.join([f"{field}=?" for field in where_fields])                   # todo: refactor into method that just handle the WHERE statements
    sql_query    = f"DELETE FROM {target_table} WHERE {where_clause}"                       # todo: refactor into a method that creates the final statement from a number of other objects (or strings)
    return sql_query, params
def command_for_insert(self, record)
Expand source code
def command_for_insert(self, record):
    valid_field_names = self.table.fields_names__cached()
    if type(record) is dict:
        if record:
            field_names   = record.keys()
            params        =  list(record.values())
            for field_name in field_names:
                if field_name not in valid_field_names:
                    raise ValueError(f'in sql_command_for_insert, there was a field_name "{field_name}" that did exist in the current table')

            columns       = ', '.join(field_names)                                                         # Construct column names and placeholders
            placeholders  = ', '.join(['?' for _ in field_names])
            sql_command   = f'INSERT INTO {self.table.table_name} ({columns}) VALUES ({placeholders})'    # Construct the SQL statement
            return sql_command, params
def query_for_fields(self, field_names: list = None)
Expand source code
def query_for_fields(self, field_names: list = None):
    return self.select_for_fields(field_names)
def query_for_select_rows_where(self, **kwargs)
Expand source code
def query_for_select_rows_where(self, **kwargs):
    valid_fields  = self.table.fields__cached()                                                               # Get a list of valid field names from the cached schema
    params        = []                                                                                  # Initialize an empty list to hold query parameters
    where_clauses = []                                                                                  # Initialize an empty list to hold parts of the WHERE clause
    for field_name, query_value in kwargs.items():                                                      # Iterate over each keyword argument and its value
        if field_name not in valid_fields:                                                              # Check if the provided field name is valid
            raise ValueError(f'in select_rows_where, the provided field is not valid: {field_name}')
        params.append(query_value)                                                                      # Append the query value to the parameters list
        where_clauses.append(f"{field_name} = ?")                                                       # Append the corresponding WHERE clause part, using a placeholder for the value
    where_clause = ' AND '.join(where_clauses)                                                          # Join the individual parts of the WHERE clause with 'AND'

    sql_query = f"SELECT * FROM {self.table.table_name} WHERE {where_clause}" # Construct the full SQL query
    return sql_query, params
def query_for_size(self)
Expand source code
def query_for_size(self):
    return f'SELECT COUNT(*) as size FROM {self.table.table_name}'
def query_select_fields_with_conditions(self, return_fields, query_conditions)
Expand source code
def query_select_fields_with_conditions(self, return_fields, query_conditions):
    self.validator().validate_query_fields(self.table , return_fields, query_conditions)
    target_table = self.table.table_name
    if target_table and return_fields and query_conditions:
        where_fields     = list(query_conditions.keys())
        params           = list(query_conditions.values())
        fields_to_return = ', '.join(return_fields)                                               # Join the select_fields list into a comma-separated string
        where_clause     = ' AND '.join([f"{field}=?" for field in where_fields])                 # Dynamically construct the WHERE clause based on condition_fields
        sql_query        = f"SELECT {fields_to_return} FROM {target_table} WHERE {where_clause}"  # Construct the full SQL query
        return sql_query, params
def select_for_fields(self, field_names: list = None)
Expand source code
def select_for_fields(self,  field_names: list = None):
    valid_fields = self.table.fields_names__cached()
    if field_names is None:
        field_names = valid_fields
    elif isinstance(field_names, list) is False:
        raise ValueError(f"in sql_query_for_fields, field_names must be a list, it was :{field_names}")

    invalid_field_names = [name for name in field_names if name not in valid_fields]    # If no valid field names are provided, raise an error or return a default query
    if invalid_field_names:                                                             # If there are any invalid field names, raise an exception listing them
        message = f"Invalid field names provided: {', '.join(invalid_field_names)}"
        raise ValueError(message)

    fields_str = ', '.join(field_names)                         # Construct the SQL query string
    sql_query = f"SELECT {fields_str} FROM {self.table.table_name}{self.sql_limit()};"  # Join the valid field names with commas
    return sql_query
def sql_limit(self)
Expand source code
def sql_limit(self):
    if self.limit is not None:
        return f" LIMIT {int(self.limit)}"
    return ""
def sql_query_update_with_conditions(self, update_fields, query_conditions)
Expand source code
def sql_query_update_with_conditions(self, update_fields, query_conditions):
    update_keys     = list(update_fields.keys())            # todo: refactor self.validate_query_fields to use a more generic value for these fields
    condition_keys  = list(query_conditions.keys())
    self.validator().validate_query_fields(self.table, update_keys, query_conditions)
    target_table = self.table.table_name
    if target_table and update_fields and query_conditions:
        update_clause   = ', '.join([f"{key}=?" for key in update_keys])
        where_clause    = ' AND '.join([f"{field}=?" for field in condition_keys])
        sql_query       = f"UPDATE {target_table} SET {update_clause} WHERE {where_clause}"

        # The parameters for the SQL execution must include both the update values and the condition values, in the correct order.
        params = list(update_fields.values()) + list(query_conditions.values())
        return sql_query, params
def validate_query_data(self)
Expand source code
def validate_query_data(self):
    if self.table.row_schema is None:
        raise ValueError("in SQL_Builder, there was no row_schema defined in the mapped table")
def validate_row_obj(self, row_obj)
Expand source code
def validate_row_obj(self, row_obj):
    field_types = self.table.fields_types__cached()
    invalid_reason = ""
    if self.table.row_schema:
        if row_obj:
            if issubclass(type(row_obj), ROW_BASE_CLASS):
                for field_name, field_type in row_obj.__annotations__.items():
                    if field_name not in field_types:
                        invalid_reason = f'provided row_obj has a field that is not part of the current table: {field_name}'
                        break

                    if field_type != field_types[field_name]:
                        invalid_reason = f'provided row_obj has a field {field_name} that has a field type {field_type} that does not match the current tables type of that field: {field_types[field_name]}'
                        break
                if invalid_reason  == '':
                    for field_name, field_value in row_obj.__locals__().items():
                        if field_name not in field_types:
                            invalid_reason = f'provided row_obj has a field that is not part of the current table: {field_name}'
                            break
                        if type(field_value) != field_types.get(field_name):
                            invalid_reason = f'provided row_obj has a field {field_name} that has a field value {field_value} value that has a type {type(field_value)} that does not match the current tables type of that field: {field_types.get(field_name)}'
                            break
            else:
                invalid_reason = f'provided row_obj ({type(row_obj)}) is not a subclass of {ROW_BASE_CLASS}'
        else:
            invalid_reason = f'provided row_obj was None'
    else:
        invalid_reason = f'there is no row_schema defined for this table {self.table.table_name}'
    return invalid_reason
def validator(self)
Expand source code
def validator(self):
    return SQL_Query__Validator()

Inherited members

class SQL_Query__Validator
Expand source code
class SQL_Query__Validator:

    # todo: refactor this method to use a more generic value for these return_fields since it is already being used in two use cases: return fields and update fields
    def validate_query_fields(self, table, return_fields, query_conditions):
        target_table = table.table_name
        valid_fields = table.fields_names__cached(include_star_field=True)
        if target_table not in table.database.tables_names(include_sqlite_master=True):
            raise ValueError(f'in validate_query_fields, invalid target_table name: "{target_table}"')
        if type(return_fields) is not list:
            raise ValueError(f'in validate_query_fields, return_fields value must be a list, and it was "{type(return_fields)}"')
        for return_field in return_fields:
            if return_field not in valid_fields:
                raise ValueError(f'in validate_query_fields, invalid, invalid return_field: "{return_field}"')
        if type(query_conditions) is not dict:
            raise ValueError(f'in validate_query_fields, query_conditions value must be a dict, and it was "{type(query_conditions)}"')
        for where_field in query_conditions.keys():
            if where_field not in valid_fields:
                raise ValueError(f'in validate_query_fields, invalid, invalid return_field: "{where_field}"')

        return self

Methods

def validate_query_fields(self, table, return_fields, query_conditions)
Expand source code
def validate_query_fields(self, table, return_fields, query_conditions):
    target_table = table.table_name
    valid_fields = table.fields_names__cached(include_star_field=True)
    if target_table not in table.database.tables_names(include_sqlite_master=True):
        raise ValueError(f'in validate_query_fields, invalid target_table name: "{target_table}"')
    if type(return_fields) is not list:
        raise ValueError(f'in validate_query_fields, return_fields value must be a list, and it was "{type(return_fields)}"')
    for return_field in return_fields:
        if return_field not in valid_fields:
            raise ValueError(f'in validate_query_fields, invalid, invalid return_field: "{return_field}"')
    if type(query_conditions) is not dict:
        raise ValueError(f'in validate_query_fields, query_conditions value must be a dict, and it was "{type(query_conditions)}"')
    for where_field in query_conditions.keys():
        if where_field not in valid_fields:
            raise ValueError(f'in validate_query_fields, invalid, invalid return_field: "{where_field}"')

    return self