Skip to content
Snippets Groups Projects

add file uploader

Merged Anton Teplyakov requested to merge feature/AUTO-1532_file-loader into master
4 files
+ 161
2
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 112
0
import aiohttp
import asyncstdlib as a
from aiofile import FileIOWrapperBase, TextFileWrapper
from google.protobuf.struct_pb2 import Struct
from perxis.items import items_pb2
from perxis.provider import PerxisFileProvider, PerxisDataProvider
__all__ = ("FileUploader",)
DEFAULT_CHUNK_SIZE: int = 5 * 2 ** 20
async def read_chunks(
file_object: FileIOWrapperBase | TextFileWrapper,
chunk_size: int
) -> bytes | str:
while True:
data = await file_object.read(chunk_size)
if not data:
break
yield data
file_object.seek(0)
class FileUploader:
def __init__(
self,
file_provider: PerxisFileProvider,
data_provider: PerxisDataProvider,
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> None:
self._file_provider = file_provider
self._data_provider = data_provider
self.chunk_size = chunk_size
async def __put_chunks(
self,
file_object: FileIOWrapperBase | TextFileWrapper,
part_urls: list[str],
) -> list[str]:
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False)
) as session:
parts = []
async for ix, chunk in a.enumerate(
read_chunks(
file_object=file_object, chunk_size=self.chunk_size
)
):
async with session.put(url=part_urls[ix], data=chunk) as response:
response.raise_for_status()
parts.append(response.headers["etag"].strip('"'))
return parts
async def upload(
self,
file_object: FileIOWrapperBase | TextFileWrapper,
file_name: str,
file_size: int,
collection_id: str
) -> items_pb2.CreateResponse:
message = await self._file_provider.start_upload(
file_name=file_name,
file_size=file_size,
)
parts = await self.__put_chunks(
file_object=file_object,
part_urls=message.upload.part_urls,
)
await self._file_provider.complete_upload(
file_id=message.upload.file.id,
upload_id=message.upload.upload_id,
parts=parts,
)
file, data = Struct(), Struct()
file.update({
"id": message.upload.file.id,
"name": file_name,
})
data.update({
"name": file_name,
"file": file,
})
message = await self._data_provider.create(
data=data,
collection_id=collection_id,
)
return message
async def upload_and_publish(
self,
file_object,
file_name: str,
file_size: int,
collection_id: str
) -> items_pb2.CreateResponse:
message = await self.upload(
file_object=file_object,
file_name=file_name,
file_size=file_size,
collection_id=collection_id,
)
await self._data_provider.publish(
item_id=message.created.id,
collection_id=collection_id,
)
return message
Loading