Skip to content
Snippets Groups Projects
Commit e5844032 authored by antondmtvch's avatar antondmtvch
Browse files

add file uploader

parent 9d6a9d64
No related branches found
No related tags found
1 merge request!57add file uploader
import aiohttp
import asyncstdlib as a
from aiofile import FileIOWrapperBase, TextFileWrapper
from google.protobuf.struct_pb2 import Struct
from perxis.items import items_pb2
from perxis.provider import PerxisFileProvider, PerxisDataProvider
__all__ = ("FileUploader",)
DEFAULT_CHUNK_SIZE: int = 5 * 2 ** 20
async def read_chunks(
file_object: FileIOWrapperBase | TextFileWrapper,
chunk_size: int
) -> bytes | str:
while True:
data = await file_object.read(chunk_size)
if not data:
break
yield data
file_object.seek(0)
class FileUploader:
def __init__(
self,
file_provider: PerxisFileProvider,
data_provider: PerxisDataProvider,
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> None:
self._file_provider = file_provider
self._data_provider = data_provider
self.chunk_size = chunk_size
async def __put_chunks(
self,
file_object: FileIOWrapperBase | TextFileWrapper,
part_urls: list[str],
) -> list[str]:
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False)
) as session:
parts = []
async for ix, chunk in a.enumerate(
read_chunks(
file_object=file_object, chunk_size=self.chunk_size
)
):
async with session.put(url=part_urls[ix], data=chunk) as response:
response.raise_for_status()
parts.append(response.headers["etag"].strip('"'))
return parts
async def upload(
self,
file_object: FileIOWrapperBase | TextFileWrapper,
file_name: str,
file_size: int,
collection_id: str
) -> items_pb2.CreateResponse:
message = await self._file_provider.start_upload(
file_name=file_name,
file_size=file_size,
)
parts = await self.__put_chunks(
file_object=file_object,
part_urls=message.upload.part_urls,
)
await self._file_provider.complete_upload(
file_id=message.upload.file.id,
upload_id=message.upload.upload_id,
parts=parts,
)
file, data = Struct(), Struct()
file.update({
"id": message.upload.file.id,
"name": file_name,
})
data.update({
"name": file_name,
"file": file,
})
message = await self._data_provider.create(
data=data,
collection_id=collection_id,
)
return message
async def upload_and_publish(
self,
file_object,
file_name: str,
file_size: int,
collection_id: str
) -> items_pb2.CreateResponse:
message = await self.upload(
file_object=file_object,
file_name=file_name,
file_size=file_size,
collection_id=collection_id,
)
await self._data_provider.publish(
item_id=message.created.id,
collection_id=collection_id,
)
return message
......@@ -4,11 +4,13 @@ from perxis.common import common_pb2
from perxis.items import items_pb2, items_pb2_grpc
from perxis.references import references_pb2
from perxis.references import references_pb2_grpc
from perxis.channel import GrpcChannel
from perxis.models import Reference
from perxis.channel import GrpcChannel
from perxis.files import files_pb2, files_pb2_grpc
DEFAULT_PAGE_SIZE: int = 100
DEFAULT_PART_SIZE: int = 1024 * 5
class PerxisDataProvider:
......@@ -211,3 +213,45 @@ class PerxisDataProvider:
)
)
return result
class PerxisFileProvider:
def __init__(self, channel: GrpcChannel) -> None:
self._stub = files_pb2_grpc.FilesStub(channel.channel)
async def start_upload(
self, file_name: str, file_size: int
) -> files_pb2.StartUploadResponse:
message = await self._stub.StartUpload(
files_pb2.StartUploadRequest(
upload=files_pb2.MultipartUpload(
file=files_pb2.File(
name=file_name, size=file_size
)
)
)
)
return message
async def complete_upload(
self,
file_id: str,
upload_id: str,
parts: list[str],
part_size: int = DEFAULT_PART_SIZE
) -> files_pb2.CompleteUploadResponse:
message = await self._stub.CompleteUpload(
files_pb2.CompleteUploadRequest(
upload=files_pb2.MultipartUpload(
file=files_pb2.File(id=file_id),
upload_id=upload_id,
part_size=part_size,
parts=[
files_pb2.CompletedPart(id=value, number=index)
for index, value in enumerate(parts, 1)
],
)
)
)
return message
aiocron==1.8
aiohttp==3.8.5
aiofile==3.8.8
asyncstdlib==3.10.8
certifi==2023.7.22
chardet==5.2.0
charset-normalizer==3.2.0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment