diff --git a/perxis/files/uploader.py b/perxis/files/uploader.py new file mode 100644 index 0000000000000000000000000000000000000000..d066401d62c0e6740b268796300fe8556d240b42 --- /dev/null +++ b/perxis/files/uploader.py @@ -0,0 +1,112 @@ +import aiohttp +import asyncstdlib as a + +from aiofile import FileIOWrapperBase, TextFileWrapper +from google.protobuf.struct_pb2 import Struct +from perxis.items import items_pb2 +from perxis.provider import PerxisFileProvider, PerxisDataProvider + + +__all__ = ("FileUploader",) +DEFAULT_CHUNK_SIZE: int = 5 * 2 ** 20 + + +async def read_chunks( + file_object: FileIOWrapperBase | TextFileWrapper, + chunk_size: int +) -> bytes | str: + while True: + data = await file_object.read(chunk_size) + if not data: + break + yield data + file_object.seek(0) + + +class FileUploader: + def __init__( + self, + file_provider: PerxisFileProvider, + data_provider: PerxisDataProvider, + chunk_size: int = DEFAULT_CHUNK_SIZE, + ) -> None: + self._file_provider = file_provider + self._data_provider = data_provider + self.chunk_size = chunk_size + + async def __put_chunks( + self, + file_object: FileIOWrapperBase | TextFileWrapper, + part_urls: list[str], + ) -> list[str]: + async with aiohttp.ClientSession( + connector=aiohttp.TCPConnector(ssl=False) + ) as session: + parts = [] + async for ix, chunk in a.enumerate( + read_chunks( + file_object=file_object, chunk_size=self.chunk_size + ) + ): + async with session.put(url=part_urls[ix], data=chunk) as response: + response.raise_for_status() + parts.append(response.headers["etag"].strip('"')) + return parts + + async def upload( + self, + file_object: FileIOWrapperBase | TextFileWrapper, + file_name: str, + file_size: int, + collection_id: str + ) -> items_pb2.CreateResponse: + message = await self._file_provider.start_upload( + file_name=file_name, + file_size=file_size, + ) + + parts = await self.__put_chunks( + file_object=file_object, + part_urls=message.upload.part_urls, + ) + + await self._file_provider.complete_upload( + file_id=message.upload.file.id, + upload_id=message.upload.upload_id, + parts=parts, + ) + + file, data = Struct(), Struct() + file.update({ + "id": message.upload.file.id, + "name": file_name, + }) + data.update({ + "name": file_name, + "file": file, + }) + + message = await self._data_provider.create( + data=data, + collection_id=collection_id, + ) + return message + + async def upload_and_publish( + self, + file_object, + file_name: str, + file_size: int, + collection_id: str + ) -> items_pb2.CreateResponse: + message = await self.upload( + file_object=file_object, + file_name=file_name, + file_size=file_size, + collection_id=collection_id, + ) + await self._data_provider.publish( + item_id=message.created.id, + collection_id=collection_id, + ) + return message diff --git a/perxis/provider.py b/perxis/provider.py index eeadadfacfbef14df56a7f0a645426e190093ff0..213b8c45c5aa054c4e93eb71c8aac90812292135 100644 --- a/perxis/provider.py +++ b/perxis/provider.py @@ -4,11 +4,13 @@ from perxis.common import common_pb2 from perxis.items import items_pb2, items_pb2_grpc from perxis.references import references_pb2 from perxis.references import references_pb2_grpc -from perxis.channel import GrpcChannel from perxis.models import Reference +from perxis.channel import GrpcChannel +from perxis.files import files_pb2, files_pb2_grpc DEFAULT_PAGE_SIZE: int = 100 +DEFAULT_PART_SIZE: int = 1024 * 5 class PerxisDataProvider: @@ -211,3 +213,45 @@ class PerxisDataProvider: ) ) return result + + +class PerxisFileProvider: + def __init__(self, channel: GrpcChannel) -> None: + self._stub = files_pb2_grpc.FilesStub(channel.channel) + + async def start_upload( + self, file_name: str, file_size: int + ) -> files_pb2.StartUploadResponse: + message = await self._stub.StartUpload( + files_pb2.StartUploadRequest( + upload=files_pb2.MultipartUpload( + file=files_pb2.File( + name=file_name, size=file_size + ) + ) + ) + ) + return message + + async def complete_upload( + self, + file_id: str, + upload_id: str, + parts: list[str], + part_size: int = DEFAULT_PART_SIZE + ) -> files_pb2.CompleteUploadResponse: + message = await self._stub.CompleteUpload( + files_pb2.CompleteUploadRequest( + upload=files_pb2.MultipartUpload( + file=files_pb2.File(id=file_id), + upload_id=upload_id, + part_size=part_size, + parts=[ + files_pb2.CompletedPart(id=value, number=index) + for index, value in enumerate(parts, 1) + ], + + ) + ) + ) + return message diff --git a/requirements.txt b/requirements.txt index 741861e832a5a65a427d28e7e194ac68ea169920..179640919e2a75b6eccdb789fc6a4f749b9b6d74 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,7 @@ aiocron==1.8 +aiohttp==3.8.5 +aiofile==3.8.8 +asyncstdlib==3.10.8 certifi==2023.7.22 chardet==5.2.0 charset-normalizer==3.2.0