diff --git a/perxis/files/uploader.py b/perxis/files/uploader.py index 3d7117eca3fb23751ee46927522d09f6d5d81542..8ec6af740bebe1e391f35782a2ed482f61ff4d7e 100644 --- a/perxis/files/uploader.py +++ b/perxis/files/uploader.py @@ -1,20 +1,78 @@ +""" +Модуль для загрузки файлов РІ систему Perxis. + +Ртот модуль предоставляет класс [`FileUploader`](#FileUploader), который поддерживает: +- Чтение файлов чанками +- Загрузку файлов РІ Perxis РїРѕ частям +- Автоматическую публикацию загруженных файлов + +Рспользуется для работы СЃ API Perxis, обеспечивая передачу файлов. + +## Пример использования: +```python +import aiofiles +from aiofiles.os import path as aiopath +from perxis.channel import get_grpc_channel +from perxis.provider import PerxisFileProvider, PerxisDataProvider +from perxis.files.uploader import FileUploader + + +channel = get_grpc_channel( + target=..., api_key=..., project_name=..., project_version=... +) +uploader = FileUploader( + file_provider=PerxisFileProvider(channel=channel), + data_provider=PerxisDataProvider( + channel=channel, + space_id=..., + env_id=..., + ), +) + + +async def main(): + filepath = "path/to/file.txt" + + async with aiofiles.open(file=filepath, mode="rb") as file: + size = await aiopath.getsize(filepath) + + response = await uploader.upload_and_publish( + file_object=file, + file_name="file.txt", + file_size=size, + collection_id="files", + ) + +``` +--- +""" + import aiohttp import asyncstdlib as a +from typing import AsyncGenerator from aiofile import FileIOWrapperBase, TextFileWrapper from google.protobuf.struct_pb2 import Struct + from perxis.items import items_pb2 from perxis.provider import PerxisFileProvider, PerxisDataProvider __all__ = ("FileUploader",) -DEFAULT_CHUNK_SIZE: int = 5 * 2 ** 20 +DEFAULT_CHUNK_SIZE: int = 5 * 2**20 async def read_chunks( - file_object: FileIOWrapperBase | TextFileWrapper, - chunk_size: int -) -> bytes | str: + file_object: FileIOWrapperBase | TextFileWrapper, chunk_size: int +) -> AsyncGenerator[bytes | str, None]: + """Читает файл чанками заданного размера. + + Args: + file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. + chunk_size (int): Размер РѕРґРЅРѕРіРѕ чанка РІ байтах. + Yields: + AsyncGenerator[bytes | str]: Чанки данных РёР· файла. + """ while True: data = await file_object.read(chunk_size) if not data: @@ -23,12 +81,27 @@ async def read_chunks( class FileUploader: + """Класс для загрузки файлов РІ Perxis. + + Attributes: + file_provider (PerxisFileProvider): Провайдер файлов. + data_provider (PerxisDataProvider): Провайдер данных. + chunk_size (int): Размер чанка для загрузки. + """ + def __init__( self, file_provider: PerxisFileProvider, data_provider: PerxisDataProvider, chunk_size: int = DEFAULT_CHUNK_SIZE, ) -> None: + """Рнициализирует FileUploader. + + Args: + file_provider (PerxisFileProvider): Провайдер файлов. + data_provider (PerxisDataProvider): Провайдер данных. + chunk_size (int, optional): Размер чанка. РџРѕ умолчанию 5 РњР‘. + """ self.file_provider = file_provider self.data_provider = data_provider self.chunk_size = chunk_size @@ -38,14 +111,20 @@ class FileUploader: file_object: FileIOWrapperBase | TextFileWrapper, part_urls: list[str], ) -> list[str]: + """Загружает файл РїРѕ частям РІ указанные URL. + + Args: + file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. + part_urls (list[str]): URL-адреса частей для загрузки. + Returns: + list[str]: РЎРїРёСЃРѕРє `ETag` загруженных частей. + """ async with aiohttp.ClientSession( connector=aiohttp.TCPConnector(ssl=False) ) as session: parts = [] async for ix, chunk in a.enumerate( - read_chunks( - file_object=file_object, chunk_size=self.chunk_size - ) + read_chunks(file_object=file_object, chunk_size=self.chunk_size) ): async with session.put(url=part_urls[ix], data=chunk) as response: response.raise_for_status() @@ -57,18 +136,26 @@ class FileUploader: file_object: FileIOWrapperBase | TextFileWrapper, file_name: str, file_size: int, - collection_id: str + collection_id: str, ) -> items_pb2.CreateResponse: + """Загружает файл РІ Perxis. + + Args: + file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. + file_name (str): РРјСЏ файла. + file_size (int): Размер файла РІ байтах. + collection_id (str): ID коллекции. + Returns: + items_pb2.CreateResponse: Ответ API СЃ информацией Рѕ созданном объекте. + """ message = await self.file_provider.start_upload( file_name=file_name, file_size=file_size, ) - parts = await self.__put_chunks( file_object=file_object, part_urls=message.upload.part_urls, ) - await self.file_provider.complete_upload( file_id=message.upload.file.id, upload_id=message.upload.upload_id, @@ -76,14 +163,8 @@ class FileUploader: ) file, data = Struct(), Struct() - file.update({ - "id": message.upload.file.id, - "name": file_name, - }) - data.update({ - "name": file_name, - "file": file, - }) + file.update({"id": message.upload.file.id, "name": file_name}) + data.update({"name": file_name, "file": file}) message = await self.data_provider.create( data=data, @@ -93,11 +174,21 @@ class FileUploader: async def upload_and_publish( self, - file_object, + file_object: FileIOWrapperBase | TextFileWrapper, file_name: str, file_size: int, - collection_id: str + collection_id: str, ) -> items_pb2.CreateResponse: + """Загружает файл Рё публикует его РІ системе Perxis. + + Args: + file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. + file_name (str): РРјСЏ файла. + file_size (int): Размер файла РІ байтах. + collection_id (str): ID коллекции. + Returns: + items_pb2.CreateResponse: Ответ API СЃ информацией Рѕ созданном Рё опубликованном объекте. + """ message = await self.upload( file_object=file_object, file_name=file_name,