Module osbot_utils.utils.Zip

Expand source code
import gzip
import io
import os
import shutil
import tarfile
import zipfile
from os.path import abspath

from osbot_utils.utils.Files import temp_folder, folder_files, temp_file, is_file, file_copy, file_move


def gz_tar_bytes_file_list(gz_bytes):
    gz_buffer_from_bytes = io.BytesIO(gz_bytes)
    with gzip.GzipFile(fileobj=gz_buffer_from_bytes, mode='rb') as gz:
        decompressed_data = gz.read()
        tar_buffer_from_bytes = io.BytesIO(decompressed_data)                       # Assuming the decompressed data is a tag file, process it
        with tarfile.open(fileobj=tar_buffer_from_bytes, mode='r:') as tar:
            return sorted(tar.getnames())

def gz_tar_bytes_get_file(gz_bytes, tar_file_path):
    gz_buffer_from_bytes = io.BytesIO(gz_bytes)
    with gzip.GzipFile(fileobj=gz_buffer_from_bytes, mode='rb') as gz:
        decompressed_data = gz.read()
        tar_buffer_from_bytes = io.BytesIO(decompressed_data)
        with tarfile.open(fileobj=tar_buffer_from_bytes, mode='r:') as tar:
            extracted_file = tar.extractfile(tar_file_path)
            if extracted_file:
                return extracted_file.read()
            else:
                raise FileNotFoundError(f"The file {tar_file_path} was not found in the tar archive.")

def gz_zip_bytes_file_list(gz_bytes):
    gz_buffer_from_bytes = io.BytesIO(gz_bytes)
    with gzip.GzipFile(fileobj=gz_buffer_from_bytes, mode='rb') as gz:
        decompressed_data = gz.read()
        zip_buffer_from_bytes = io.BytesIO(decompressed_data)                   # Assuming the decompressed data is a zip file, process it
        with zipfile.ZipFile(zip_buffer_from_bytes, 'r') as zf:
            return sorted(zf.namelist())

def unzip_file(zip_file, target_folder=None, format='zip'):
    target_folder = target_folder or temp_folder()
    shutil.unpack_archive(zip_file, extract_dir=target_folder, format=format)
    return target_folder

def zip_bytes_add_file(zip_bytes, zip_file_path, file_contents):
    if type(file_contents) is str:
        file_contents = file_contents.encode('utf-8')
    elif type(file_contents) is not bytes:
        return None
    zip_buffer = io.BytesIO(zip_bytes)
    with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED) as zf:
        zf.writestr(zip_file_path, file_contents)

    return zip_buffer.getvalue()

def zip_bytes_get_file(zip_bytes, zip_file_path):
    zip_buffer = io.BytesIO(zip_bytes)
    with zipfile.ZipFile(zip_buffer, 'r') as zf:
        return zf.read(zip_file_path)

def zip_bytes_extract_to_folder(zip_bytes, target_folder=None):
    target_folder = target_folder or temp_folder()              # Use the provided target folder or create a temporary one
    zip_buffer = io.BytesIO(zip_bytes)                          # Create a BytesIO buffer from the zip bytes
    with zipfile.ZipFile(zip_buffer, 'r') as zf:          # Open the zip file from the buffer
        zf.extractall(target_folder)                            # Extract all files to the target folder
    return target_folder                                        # Return the path of the target folder


def zip_bytes_file_list(zip_bytes):
    zip_buffer_from_bytes = io.BytesIO(zip_bytes)
    with zipfile.ZipFile(zip_buffer_from_bytes, 'r') as zf:
        return sorted(zf.namelist())

def zip_bytes_to_file(zip_bytes, target_file=None):
    if target_file is None:
        target_file = temp_file(extension='.zip')
    with open(target_file, 'wb') as f:
        f.write(zip_bytes)
    return target_file

def zip_files_to_bytes(target_files, root_folder=None):
    zip_buffer = io.BytesIO()                                                   # Create a BytesIO buffer to hold the zipped file
    with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:          # Create a ZipFile object with the buffer as the target
        for entry in target_files:
            if type(entry) is str:                                              # if entry is a string, assume it's a file path
                file_path = entry
                file_root_folder = root_folder
            else:
                file_path = entry.get('file')
                file_root_folder = entry.get('root_folder') or root_folder
            if file_root_folder:
                arcname = file_path.replace(file_root_folder,'')                      # Define the arcname, which is the name inside the zip file
            else:
                arcname = file_path                                                 # if root_path is not provided, use the full file path
            zf.write(file_path, arcname)                                            # Add the file to the zip file
    zip_buffer.seek(0)
    return zip_buffer

def zip_folder(root_dir, format='zip'):
    return shutil.make_archive(base_name=root_dir, format=format, root_dir=root_dir)

def zip_folder_to_file (root_dir, target_file):
    zip_file = zip_folder(root_dir)
    return file_move(zip_file, target_file)


def zip_folder_to_bytes(root_dir):      # todo add unit test
    zip_buffer = io.BytesIO()                                                   # Create a BytesIO buffer to hold the zipped file
    with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:          # Create a ZipFile object with the buffer as the target
        for foldername, subfolders, filenames in os.walk(root_dir):             # Walk the root_dir and add all files and folders to the zip file
            for filename in filenames:
                absolute_path = os.path.join(foldername, filename)              # Create the complete filepath
                arcname = os.path.relpath(absolute_path, root_dir)              # Define the arcname, which is the name inside the zip file
                zf.write(absolute_path, arcname)                                # Add the file to the zip file
    zip_buffer.seek(0)                                                          # Reset buffer position
    return zip_buffer

def zip_file_list(path):
    if is_file(path):
        with zipfile.ZipFile(path) as zip_file:
            return sorted(zip_file.namelist())
    return []

def zip_files(base_folder, file_pattern="*.*", target_file=None):
    base_folder = abspath(base_folder)
    file_list   = folder_files(base_folder, file_pattern)

    if len(file_list):                                                  # if there were files found
        target_file = target_file or temp_file(extension='zip')
        with zipfile.ZipFile(target_file,'w') as zip:
            for file_name in file_list:
                zip_file_path = file_name.replace(base_folder,'')
                zip.write(file_name, zip_file_path)

        return target_file


# extra function's mappings
file_unzip  = unzip_file
folder_zip  = zip_folder

Functions

def file_unzip(zip_file, target_folder=None, format='zip')
Expand source code
def unzip_file(zip_file, target_folder=None, format='zip'):
    target_folder = target_folder or temp_folder()
    shutil.unpack_archive(zip_file, extract_dir=target_folder, format=format)
    return target_folder
def folder_zip(root_dir, format='zip')
Expand source code
def zip_folder(root_dir, format='zip'):
    return shutil.make_archive(base_name=root_dir, format=format, root_dir=root_dir)
def gz_tar_bytes_file_list(gz_bytes)
Expand source code
def gz_tar_bytes_file_list(gz_bytes):
    gz_buffer_from_bytes = io.BytesIO(gz_bytes)
    with gzip.GzipFile(fileobj=gz_buffer_from_bytes, mode='rb') as gz:
        decompressed_data = gz.read()
        tar_buffer_from_bytes = io.BytesIO(decompressed_data)                       # Assuming the decompressed data is a tag file, process it
        with tarfile.open(fileobj=tar_buffer_from_bytes, mode='r:') as tar:
            return sorted(tar.getnames())
def gz_tar_bytes_get_file(gz_bytes, tar_file_path)
Expand source code
def gz_tar_bytes_get_file(gz_bytes, tar_file_path):
    gz_buffer_from_bytes = io.BytesIO(gz_bytes)
    with gzip.GzipFile(fileobj=gz_buffer_from_bytes, mode='rb') as gz:
        decompressed_data = gz.read()
        tar_buffer_from_bytes = io.BytesIO(decompressed_data)
        with tarfile.open(fileobj=tar_buffer_from_bytes, mode='r:') as tar:
            extracted_file = tar.extractfile(tar_file_path)
            if extracted_file:
                return extracted_file.read()
            else:
                raise FileNotFoundError(f"The file {tar_file_path} was not found in the tar archive.")
def gz_zip_bytes_file_list(gz_bytes)
Expand source code
def gz_zip_bytes_file_list(gz_bytes):
    gz_buffer_from_bytes = io.BytesIO(gz_bytes)
    with gzip.GzipFile(fileobj=gz_buffer_from_bytes, mode='rb') as gz:
        decompressed_data = gz.read()
        zip_buffer_from_bytes = io.BytesIO(decompressed_data)                   # Assuming the decompressed data is a zip file, process it
        with zipfile.ZipFile(zip_buffer_from_bytes, 'r') as zf:
            return sorted(zf.namelist())
def unzip_file(zip_file, target_folder=None, format='zip')
Expand source code
def unzip_file(zip_file, target_folder=None, format='zip'):
    target_folder = target_folder or temp_folder()
    shutil.unpack_archive(zip_file, extract_dir=target_folder, format=format)
    return target_folder
def zip_bytes_add_file(zip_bytes, zip_file_path, file_contents)
Expand source code
def zip_bytes_add_file(zip_bytes, zip_file_path, file_contents):
    if type(file_contents) is str:
        file_contents = file_contents.encode('utf-8')
    elif type(file_contents) is not bytes:
        return None
    zip_buffer = io.BytesIO(zip_bytes)
    with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED) as zf:
        zf.writestr(zip_file_path, file_contents)

    return zip_buffer.getvalue()
def zip_bytes_extract_to_folder(zip_bytes, target_folder=None)
Expand source code
def zip_bytes_extract_to_folder(zip_bytes, target_folder=None):
    target_folder = target_folder or temp_folder()              # Use the provided target folder or create a temporary one
    zip_buffer = io.BytesIO(zip_bytes)                          # Create a BytesIO buffer from the zip bytes
    with zipfile.ZipFile(zip_buffer, 'r') as zf:          # Open the zip file from the buffer
        zf.extractall(target_folder)                            # Extract all files to the target folder
    return target_folder                                        # Return the path of the target folder
def zip_bytes_file_list(zip_bytes)
Expand source code
def zip_bytes_file_list(zip_bytes):
    zip_buffer_from_bytes = io.BytesIO(zip_bytes)
    with zipfile.ZipFile(zip_buffer_from_bytes, 'r') as zf:
        return sorted(zf.namelist())
def zip_bytes_get_file(zip_bytes, zip_file_path)
Expand source code
def zip_bytes_get_file(zip_bytes, zip_file_path):
    zip_buffer = io.BytesIO(zip_bytes)
    with zipfile.ZipFile(zip_buffer, 'r') as zf:
        return zf.read(zip_file_path)
def zip_bytes_to_file(zip_bytes, target_file=None)
Expand source code
def zip_bytes_to_file(zip_bytes, target_file=None):
    if target_file is None:
        target_file = temp_file(extension='.zip')
    with open(target_file, 'wb') as f:
        f.write(zip_bytes)
    return target_file
def zip_file_list(path)
Expand source code
def zip_file_list(path):
    if is_file(path):
        with zipfile.ZipFile(path) as zip_file:
            return sorted(zip_file.namelist())
    return []
def zip_files(base_folder, file_pattern='*.*', target_file=None)
Expand source code
def zip_files(base_folder, file_pattern="*.*", target_file=None):
    base_folder = abspath(base_folder)
    file_list   = folder_files(base_folder, file_pattern)

    if len(file_list):                                                  # if there were files found
        target_file = target_file or temp_file(extension='zip')
        with zipfile.ZipFile(target_file,'w') as zip:
            for file_name in file_list:
                zip_file_path = file_name.replace(base_folder,'')
                zip.write(file_name, zip_file_path)

        return target_file
def zip_files_to_bytes(target_files, root_folder=None)
Expand source code
def zip_files_to_bytes(target_files, root_folder=None):
    zip_buffer = io.BytesIO()                                                   # Create a BytesIO buffer to hold the zipped file
    with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:          # Create a ZipFile object with the buffer as the target
        for entry in target_files:
            if type(entry) is str:                                              # if entry is a string, assume it's a file path
                file_path = entry
                file_root_folder = root_folder
            else:
                file_path = entry.get('file')
                file_root_folder = entry.get('root_folder') or root_folder
            if file_root_folder:
                arcname = file_path.replace(file_root_folder,'')                      # Define the arcname, which is the name inside the zip file
            else:
                arcname = file_path                                                 # if root_path is not provided, use the full file path
            zf.write(file_path, arcname)                                            # Add the file to the zip file
    zip_buffer.seek(0)
    return zip_buffer
def zip_folder(root_dir, format='zip')
Expand source code
def zip_folder(root_dir, format='zip'):
    return shutil.make_archive(base_name=root_dir, format=format, root_dir=root_dir)
def zip_folder_to_bytes(root_dir)
Expand source code
def zip_folder_to_bytes(root_dir):      # todo add unit test
    zip_buffer = io.BytesIO()                                                   # Create a BytesIO buffer to hold the zipped file
    with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:          # Create a ZipFile object with the buffer as the target
        for foldername, subfolders, filenames in os.walk(root_dir):             # Walk the root_dir and add all files and folders to the zip file
            for filename in filenames:
                absolute_path = os.path.join(foldername, filename)              # Create the complete filepath
                arcname = os.path.relpath(absolute_path, root_dir)              # Define the arcname, which is the name inside the zip file
                zf.write(absolute_path, arcname)                                # Add the file to the zip file
    zip_buffer.seek(0)                                                          # Reset buffer position
    return zip_buffer
def zip_folder_to_file(root_dir, target_file)
Expand source code
def zip_folder_to_file (root_dir, target_file):
    zip_file = zip_folder(root_dir)
    return file_move(zip_file, target_file)