Module osbot_utils.helpers.SSH
Expand source code
from decimal import Decimal
from osbot_utils.base_classes.Kwargs_To_Self import Kwargs_To_Self
from osbot_utils.context_managers.capture_duration import capture_duration
from osbot_utils.decorators.lists.group_by import group_by
from osbot_utils.decorators.lists.index_by import index_by
from osbot_utils.utils.Dev import pprint
from osbot_utils.utils.Functions import function_source_code
from osbot_utils.utils.Misc import timestamp_utc_now
from osbot_utils.utils.Process import start_process
from osbot_utils.utils.Status import status_error
class SSH(Kwargs_To_Self):
ssh_host : str
ssh_key_file : str
ssh_key_user : str
strict_host_check : bool = False
def exec(self, command):
return self.execute_command__return_stdout(command)
def execute_command(self, command):
if self.ssh_host and self.ssh_key_file and self.ssh_key_user and command: # todo: add check to see if ssh executable exists (this check can be cached)
ssh_args = self.execute_command_args(command)
with capture_duration() as duration:
result = start_process("ssh", ssh_args) # execute command using subprocess.run(...)
result['duration'] = duration.data()
return result
return status_error(error='in execute_command not all required vars were setup')
def execute_python__code(self, python_code, python_executable='python3'):
python_command = f"{python_executable} -c \"{python_code}\""
return self.execute_command(python_command)
def execute_python__code__return_stdout(self, *args, **kwargs):
return self.execute_python__code(*args, **kwargs).get('stdout').strip()
def execute_python__function(self, function, python_executable='python3'):
function_name = function.__name__
function_code = function_source_code(function)
exec_code = f"{function_code}\nresult= {function_name}(); print(result)"
python_command = f"{python_executable} -c \"{exec_code}\""
return self.execute_command(python_command)
def execute_python__function__return_stderr(self, *args, **kwargs):
return self.execute_python__function(*args, **kwargs).get('stderr').strip()
def execute_python__function__return_stdout(self, *args, **kwargs):
return self.execute_python__function(*args, **kwargs).get('stdout').strip()
def execute_ssh_args(self, command=None):
ssh_args = []
if self.strict_host_check is False:
ssh_args += ['-o',
'StrictHostKeyChecking=no'] # todo: add support for updating the local hosts file so that we dont need to do this that often
if self.ssh_key_file:
ssh_args += ['-i', self.ssh_key_file]
return ssh_args
def execute_command_args(self, command=None):
ssh_args = self.execute_ssh_args()
if self.ssh_host:
ssh_args += [self.execute_command_target_host()]
if command:
ssh_args += [command]
return ssh_args
def execute_command_target_host(self):
if self.ssh_key_user:
return f'{self.ssh_key_user}@{self.ssh_host}'
else:
return f'{self.ssh_host}'
def execute_command__return_stdout(self, command):
return self.execute_command(command).get('stdout').strip()
def execute_command__return_stderr(self, command):
return self.execute_command(command).get('stderr').strip()
@index_by
@group_by
def execute_command__return_dict(self, command):
stdout = self.execute_command(command).get('stdout').strip()
return self.parse_stdout_to_dict(stdout)
# helpers for common linux methods
def cat(self, path=''):
command = f'cat {path}'
return self.execute_command__return_stdout(command)
@index_by
def disk_space(self):
command = "df -h"
stdout = self.execute_command__return_stdout(command)
stdout_disk_space = stdout.replace('Mounted on', 'Mounted_on') # todo, find a better way to do this
disk_space = self.parse_stdout_to_dict(stdout_disk_space)
return disk_space
def find(self, path=''):
command = f'find {path}'
return self.execute_command__return_stdout(command)
def ls(self, path=''):
command = f'ls {path}'
ls_raw = self.execute_command__return_stdout(command)
return ls_raw.splitlines()
def mkdir(self, folder):
command = f'mkdir -p {folder}'
return self.execute_command__return_stdout(command)
def memory_usage(self):
command = "free -h"
memory_usage_raw = self.execute_command__return_stdout(command) # todo: add fix for data parsing issue
return memory_usage_raw.splitlines()
def rm(self, path=''):
command = f'rm {path}'
return self.execute_command__return_stderr(command)
def running_processes(self,**kwargs):
command = "ps aux"
return self.execute_command__return_dict(command, **kwargs)
def system_uptime(self):
command = "uptime"
uptime_raw = self.execute_command__return_stdout(command)
return uptime_raw.strip()
def uname(self):
return self.execute_command__return_stdout('uname')
def parse_stdout_to_dict(self, stdout):
lines = stdout.splitlines()
headers = lines[0].split()
result = []
for line in lines[1:]: # Split each line into parts based on whitespace
parts = line.split() # Combine the parts with headers to create a dictionary
entry = {headers[i]: parts[i] for i in range(len(headers))}
result.append(entry)
return result
def which(self, target):
command = f'which {target}' # todo: security-vuln: add protection against code injection
return self.execute_command__return_stdout(command)
def whoami(self):
command = f'whoami' # todo: security-vuln: add protection against code injection
return self.execute_command__return_stdout(command)
# print helpers
def print_ls(self, path=''):
pprint(self.ls(path))
return self
def print_exec(self, command=''):
pprint(self.exec(command))
return self
# def ifconfig(self):
# command = "export PATH=$PATH:/sbin && ifconfig" # todo add example with PATH modification
# return self.execute_command__return_stdout(command)
# def ifconfig(self): # todo add command to execute in separate bash (see when it is needed)
# command = "bash -l -c 'ifconfig'"
# return self.execute_command__return_stdout(command)
# if port_forward: # todo: add support for port forward (this will need async execution)
# local_port = port_forward.get('local_port' )
# remote_ip = port_forward.get('remote_ip' )
# remote_port = port_forward.get('remote_port')
Classes
class SSH (**kwargs)
-
A mixin class to strictly assign keyword arguments to pre-defined instance attributes during initialization.
This base class provides an init method that assigns values from keyword arguments to instance attributes. If an attribute with the same name as a key from the kwargs is defined in the class, it will be set to the value from kwargs. If the key does not match any predefined attribute names, an exception is raised.
This behavior enforces strict control over the attributes of instances, ensuring that only predefined attributes can be set at the time of instantiation and avoids silent attribute creation which can lead to bugs in the code.
Usage
class MyConfigurableClass(Kwargs_To_Self): attribute1 = 'default_value' attribute2 = True attribute3 : str attribute4 : list attribute4 : int = 42
# Other methods can be added here
Correctly override default values by passing keyword arguments
instance = MyConfigurableClass(attribute1='new_value', attribute2=False)
This will raise an exception as 'attribute3' is not predefined
instance = MyConfigurableClass(attribute3='invalid_attribute')
this will also assign the default value to any variable that has a type defined. In the example above the default values (mapped by default__kwargs and locals) will be: attribute1 = 'default_value' attribute2 = True attribute3 = '' # default value of str attribute4 = [] # default value of list attribute4 = 42 # defined value in the class
Note
It is important that all attributes which may be set at instantiation are predefined in the class. Failure to do so will result in an exception being raised.
Methods
init(**kwargs): The initializer that handles the assignment of keyword arguments to instance attributes. It enforces strict attribute assignment rules, only allowing attributes that are already defined in the class to be set.
Initialize an instance of the derived class, strictly assigning provided keyword arguments to corresponding instance attributes.
Parameters
**kwargs: Variable length keyword arguments.
Raises
Exception
- If a key from kwargs does not correspond to any attribute pre-defined in the class, an exception is raised to prevent setting an undefined attribute.
Expand source code
class SSH(Kwargs_To_Self): ssh_host : str ssh_key_file : str ssh_key_user : str strict_host_check : bool = False def exec(self, command): return self.execute_command__return_stdout(command) def execute_command(self, command): if self.ssh_host and self.ssh_key_file and self.ssh_key_user and command: # todo: add check to see if ssh executable exists (this check can be cached) ssh_args = self.execute_command_args(command) with capture_duration() as duration: result = start_process("ssh", ssh_args) # execute command using subprocess.run(...) result['duration'] = duration.data() return result return status_error(error='in execute_command not all required vars were setup') def execute_python__code(self, python_code, python_executable='python3'): python_command = f"{python_executable} -c \"{python_code}\"" return self.execute_command(python_command) def execute_python__code__return_stdout(self, *args, **kwargs): return self.execute_python__code(*args, **kwargs).get('stdout').strip() def execute_python__function(self, function, python_executable='python3'): function_name = function.__name__ function_code = function_source_code(function) exec_code = f"{function_code}\nresult= {function_name}(); print(result)" python_command = f"{python_executable} -c \"{exec_code}\"" return self.execute_command(python_command) def execute_python__function__return_stderr(self, *args, **kwargs): return self.execute_python__function(*args, **kwargs).get('stderr').strip() def execute_python__function__return_stdout(self, *args, **kwargs): return self.execute_python__function(*args, **kwargs).get('stdout').strip() def execute_ssh_args(self, command=None): ssh_args = [] if self.strict_host_check is False: ssh_args += ['-o', 'StrictHostKeyChecking=no'] # todo: add support for updating the local hosts file so that we dont need to do this that often if self.ssh_key_file: ssh_args += ['-i', self.ssh_key_file] return ssh_args def execute_command_args(self, command=None): ssh_args = self.execute_ssh_args() if self.ssh_host: ssh_args += [self.execute_command_target_host()] if command: ssh_args += [command] return ssh_args def execute_command_target_host(self): if self.ssh_key_user: return f'{self.ssh_key_user}@{self.ssh_host}' else: return f'{self.ssh_host}' def execute_command__return_stdout(self, command): return self.execute_command(command).get('stdout').strip() def execute_command__return_stderr(self, command): return self.execute_command(command).get('stderr').strip() @index_by @group_by def execute_command__return_dict(self, command): stdout = self.execute_command(command).get('stdout').strip() return self.parse_stdout_to_dict(stdout) # helpers for common linux methods def cat(self, path=''): command = f'cat {path}' return self.execute_command__return_stdout(command) @index_by def disk_space(self): command = "df -h" stdout = self.execute_command__return_stdout(command) stdout_disk_space = stdout.replace('Mounted on', 'Mounted_on') # todo, find a better way to do this disk_space = self.parse_stdout_to_dict(stdout_disk_space) return disk_space def find(self, path=''): command = f'find {path}' return self.execute_command__return_stdout(command) def ls(self, path=''): command = f'ls {path}' ls_raw = self.execute_command__return_stdout(command) return ls_raw.splitlines() def mkdir(self, folder): command = f'mkdir -p {folder}' return self.execute_command__return_stdout(command) def memory_usage(self): command = "free -h" memory_usage_raw = self.execute_command__return_stdout(command) # todo: add fix for data parsing issue return memory_usage_raw.splitlines() def rm(self, path=''): command = f'rm {path}' return self.execute_command__return_stderr(command) def running_processes(self,**kwargs): command = "ps aux" return self.execute_command__return_dict(command, **kwargs) def system_uptime(self): command = "uptime" uptime_raw = self.execute_command__return_stdout(command) return uptime_raw.strip() def uname(self): return self.execute_command__return_stdout('uname') def parse_stdout_to_dict(self, stdout): lines = stdout.splitlines() headers = lines[0].split() result = [] for line in lines[1:]: # Split each line into parts based on whitespace parts = line.split() # Combine the parts with headers to create a dictionary entry = {headers[i]: parts[i] for i in range(len(headers))} result.append(entry) return result def which(self, target): command = f'which {target}' # todo: security-vuln: add protection against code injection return self.execute_command__return_stdout(command) def whoami(self): command = f'whoami' # todo: security-vuln: add protection against code injection return self.execute_command__return_stdout(command) # print helpers def print_ls(self, path=''): pprint(self.ls(path)) return self def print_exec(self, command=''): pprint(self.exec(command)) return self # def ifconfig(self): # command = "export PATH=$PATH:/sbin && ifconfig" # todo add example with PATH modification # return self.execute_command__return_stdout(command) # def ifconfig(self): # todo add command to execute in separate bash (see when it is needed) # command = "bash -l -c 'ifconfig'" # return self.execute_command__return_stdout(command) # if port_forward: # todo: add support for port forward (this will need async execution) # local_port = port_forward.get('local_port' ) # remote_ip = port_forward.get('remote_ip' ) # remote_port = port_forward.get('remote_port')
Ancestors
Subclasses
Class variables
var ssh_host : str
var ssh_key_file : str
var ssh_key_user : str
var strict_host_check : bool
Methods
def cat(self, path='')
-
Expand source code
def cat(self, path=''): command = f'cat {path}' return self.execute_command__return_stdout(command)
def disk_space(self)
-
Expand source code
@index_by def disk_space(self): command = "df -h" stdout = self.execute_command__return_stdout(command) stdout_disk_space = stdout.replace('Mounted on', 'Mounted_on') # todo, find a better way to do this disk_space = self.parse_stdout_to_dict(stdout_disk_space) return disk_space
def exec(self, command)
-
Expand source code
def exec(self, command): return self.execute_command__return_stdout(command)
def execute_command(self, command)
-
Expand source code
def execute_command(self, command): if self.ssh_host and self.ssh_key_file and self.ssh_key_user and command: # todo: add check to see if ssh executable exists (this check can be cached) ssh_args = self.execute_command_args(command) with capture_duration() as duration: result = start_process("ssh", ssh_args) # execute command using subprocess.run(...) result['duration'] = duration.data() return result return status_error(error='in execute_command not all required vars were setup')
def execute_command__return_dict(self, command)
-
Expand source code
@index_by @group_by def execute_command__return_dict(self, command): stdout = self.execute_command(command).get('stdout').strip() return self.parse_stdout_to_dict(stdout)
def execute_command__return_stderr(self, command)
-
Expand source code
def execute_command__return_stderr(self, command): return self.execute_command(command).get('stderr').strip()
def execute_command__return_stdout(self, command)
-
Expand source code
def execute_command__return_stdout(self, command): return self.execute_command(command).get('stdout').strip()
def execute_command_args(self, command=None)
-
Expand source code
def execute_command_args(self, command=None): ssh_args = self.execute_ssh_args() if self.ssh_host: ssh_args += [self.execute_command_target_host()] if command: ssh_args += [command] return ssh_args
def execute_command_target_host(self)
-
Expand source code
def execute_command_target_host(self): if self.ssh_key_user: return f'{self.ssh_key_user}@{self.ssh_host}' else: return f'{self.ssh_host}'
def execute_python__code(self, python_code, python_executable='python3')
-
Expand source code
def execute_python__code(self, python_code, python_executable='python3'): python_command = f"{python_executable} -c \"{python_code}\"" return self.execute_command(python_command)
def execute_python__code__return_stdout(self, *args, **kwargs)
-
Expand source code
def execute_python__code__return_stdout(self, *args, **kwargs): return self.execute_python__code(*args, **kwargs).get('stdout').strip()
def execute_python__function(self, function, python_executable='python3')
-
Expand source code
def execute_python__function(self, function, python_executable='python3'): function_name = function.__name__ function_code = function_source_code(function) exec_code = f"{function_code}\nresult= {function_name}(); print(result)" python_command = f"{python_executable} -c \"{exec_code}\"" return self.execute_command(python_command)
def execute_python__function__return_stderr(self, *args, **kwargs)
-
Expand source code
def execute_python__function__return_stderr(self, *args, **kwargs): return self.execute_python__function(*args, **kwargs).get('stderr').strip()
def execute_python__function__return_stdout(self, *args, **kwargs)
-
Expand source code
def execute_python__function__return_stdout(self, *args, **kwargs): return self.execute_python__function(*args, **kwargs).get('stdout').strip()
def execute_ssh_args(self, command=None)
-
Expand source code
def execute_ssh_args(self, command=None): ssh_args = [] if self.strict_host_check is False: ssh_args += ['-o', 'StrictHostKeyChecking=no'] # todo: add support for updating the local hosts file so that we dont need to do this that often if self.ssh_key_file: ssh_args += ['-i', self.ssh_key_file] return ssh_args
def find(self, path='')
-
Expand source code
def find(self, path=''): command = f'find {path}' return self.execute_command__return_stdout(command)
def ls(self, path='')
-
Expand source code
def ls(self, path=''): command = f'ls {path}' ls_raw = self.execute_command__return_stdout(command) return ls_raw.splitlines()
def memory_usage(self)
-
Expand source code
def memory_usage(self): command = "free -h" memory_usage_raw = self.execute_command__return_stdout(command) # todo: add fix for data parsing issue return memory_usage_raw.splitlines()
def mkdir(self, folder)
-
Expand source code
def mkdir(self, folder): command = f'mkdir -p {folder}' return self.execute_command__return_stdout(command)
def parse_stdout_to_dict(self, stdout)
-
Expand source code
def parse_stdout_to_dict(self, stdout): lines = stdout.splitlines() headers = lines[0].split() result = [] for line in lines[1:]: # Split each line into parts based on whitespace parts = line.split() # Combine the parts with headers to create a dictionary entry = {headers[i]: parts[i] for i in range(len(headers))} result.append(entry) return result
def print_exec(self, command='')
-
Expand source code
def print_exec(self, command=''): pprint(self.exec(command)) return self
def print_ls(self, path='')
-
Expand source code
def print_ls(self, path=''): pprint(self.ls(path)) return self
def rm(self, path='')
-
Expand source code
def rm(self, path=''): command = f'rm {path}' return self.execute_command__return_stderr(command)
def running_processes(self, **kwargs)
-
Expand source code
def running_processes(self,**kwargs): command = "ps aux" return self.execute_command__return_dict(command, **kwargs)
def system_uptime(self)
-
Expand source code
def system_uptime(self): command = "uptime" uptime_raw = self.execute_command__return_stdout(command) return uptime_raw.strip()
def uname(self)
-
Expand source code
def uname(self): return self.execute_command__return_stdout('uname')
def which(self, target)
-
Expand source code
def which(self, target): command = f'which {target}' # todo: security-vuln: add protection against code injection return self.execute_command__return_stdout(command)
def whoami(self)
-
Expand source code
def whoami(self): command = f'whoami' # todo: security-vuln: add protection against code injection return self.execute_command__return_stdout(command)
Inherited members