perxis.files.uploader
Модуль для загрузки файлов в систему Perxis.
Этот модуль предоставляет класс FileUploader
, который поддерживает:
- Чтение файлов чанками
- Загрузку файлов в Perxis по частям
- Автоматическую публикацию загруженных файлов
Используется для работы с API Perxis, обеспечивая передачу файлов.
Пример использования:
import aiofiles
from aiofiles.os import path as aiopath
from perxis.channel import get_grpc_channel
from perxis.provider import PerxisFileProvider, PerxisDataProvider
from perxis.files.uploader import FileUploader
channel = get_grpc_channel(
target=..., api_key=..., project_name=..., project_version=...
)
uploader = FileUploader(
file_provider=PerxisFileProvider(channel=channel),
data_provider=PerxisDataProvider(
channel=channel,
space_id=...,
env_id=...,
),
)
async def main():
filepath = "path/to/file.txt"
async with aiofiles.open(file=filepath, mode="rb") as file:
size = await aiopath.getsize(filepath)
response = await uploader.upload_and_publish(
file_object=file,
file_name="file.txt",
file_size=size,
collection_id="files",
)
1""" 2Модуль для загрузки файлов в систему Perxis. 3 4Этот модуль предоставляет класс [`FileUploader`](#FileUploader), который поддерживает: 5- Чтение файлов чанками 6- Загрузку файлов в Perxis по частям 7- Автоматическую публикацию загруженных файлов 8 9Используется для работы с API Perxis, обеспечивая передачу файлов. 10 11## Пример использования: 12```python 13import aiofiles 14from aiofiles.os import path as aiopath 15from perxis.channel import get_grpc_channel 16from perxis.provider import PerxisFileProvider, PerxisDataProvider 17from perxis.files.uploader import FileUploader 18 19 20channel = get_grpc_channel( 21 target=..., api_key=..., project_name=..., project_version=... 22) 23uploader = FileUploader( 24 file_provider=PerxisFileProvider(channel=channel), 25 data_provider=PerxisDataProvider( 26 channel=channel, 27 space_id=..., 28 env_id=..., 29 ), 30) 31 32 33async def main(): 34 filepath = "path/to/file.txt" 35 36 async with aiofiles.open(file=filepath, mode="rb") as file: 37 size = await aiopath.getsize(filepath) 38 39 response = await uploader.upload_and_publish( 40 file_object=file, 41 file_name="file.txt", 42 file_size=size, 43 collection_id="files", 44 ) 45 46``` 47--- 48""" 49 50import aiohttp 51import asyncstdlib as a 52from typing import AsyncGenerator 53 54from aiofile import FileIOWrapperBase, TextFileWrapper 55from google.protobuf.struct_pb2 import Struct 56 57from perxis.items import items_pb2 58from perxis.provider import PerxisFileProvider, PerxisDataProvider 59 60 61__all__ = ("FileUploader",) 62DEFAULT_CHUNK_SIZE: int = 5 * 2**20 63 64 65async def read_chunks( 66 file_object: FileIOWrapperBase | TextFileWrapper, chunk_size: int 67) -> AsyncGenerator[bytes | str, None]: 68 """Читает файл чанками заданного размера. 69 70 Args: 71 file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. 72 chunk_size (int): Размер одного чанка в байтах. 73 Yields: 74 AsyncGenerator[bytes | str]: Чанки данных из файла. 75 """ 76 while True: 77 data = await file_object.read(chunk_size) 78 if not data: 79 break 80 yield data 81 82 83class FileUploader: 84 """Класс для загрузки файлов в Perxis. 85 86 Attributes: 87 file_provider (PerxisFileProvider): Провайдер файлов. 88 data_provider (PerxisDataProvider): Провайдер данных. 89 chunk_size (int): Размер чанка для загрузки. 90 """ 91 92 def __init__( 93 self, 94 file_provider: PerxisFileProvider, 95 data_provider: PerxisDataProvider, 96 chunk_size: int = DEFAULT_CHUNK_SIZE, 97 ) -> None: 98 """Инициализирует FileUploader. 99 100 Args: 101 file_provider (PerxisFileProvider): Провайдер файлов. 102 data_provider (PerxisDataProvider): Провайдер данных. 103 chunk_size (int, optional): Размер чанка. По умолчанию 5 МБ. 104 """ 105 self.file_provider = file_provider 106 self.data_provider = data_provider 107 self.chunk_size = chunk_size 108 109 async def __put_chunks( 110 self, 111 file_object: FileIOWrapperBase | TextFileWrapper, 112 part_urls: list[str], 113 ) -> list[str]: 114 """Загружает файл по частям в указанные URL. 115 116 Args: 117 file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. 118 part_urls (list[str]): URL-адреса частей для загрузки. 119 Returns: 120 list[str]: Список `ETag` загруженных частей. 121 """ 122 async with aiohttp.ClientSession( 123 connector=aiohttp.TCPConnector(ssl=False) 124 ) as session: 125 parts = [] 126 async for ix, chunk in a.enumerate( 127 read_chunks(file_object=file_object, chunk_size=self.chunk_size) 128 ): 129 async with session.put(url=part_urls[ix], data=chunk) as response: 130 response.raise_for_status() 131 parts.append(response.headers["etag"].strip('"')) 132 return parts 133 134 async def upload( 135 self, 136 file_object: FileIOWrapperBase | TextFileWrapper, 137 file_name: str, 138 file_size: int, 139 collection_id: str, 140 ) -> items_pb2.CreateResponse: 141 """Загружает файл в Perxis. 142 143 Args: 144 file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. 145 file_name (str): Имя файла. 146 file_size (int): Размер файла в байтах. 147 collection_id (str): ID коллекции. 148 Returns: 149 items_pb2.CreateResponse: Ответ API с информацией о созданном объекте. 150 """ 151 message = await self.file_provider.start_upload( 152 file_name=file_name, 153 file_size=file_size, 154 ) 155 parts = await self.__put_chunks( 156 file_object=file_object, 157 part_urls=message.upload.part_urls, 158 ) 159 await self.file_provider.complete_upload( 160 file_id=message.upload.file.id, 161 upload_id=message.upload.upload_id, 162 parts=parts, 163 ) 164 165 file, data = Struct(), Struct() 166 file.update({"id": message.upload.file.id, "name": file_name}) 167 data.update({"name": file_name, "file": file}) 168 169 message = await self.data_provider.create( 170 data=data, 171 collection_id=collection_id, 172 ) 173 return message 174 175 async def upload_and_publish( 176 self, 177 file_object: FileIOWrapperBase | TextFileWrapper, 178 file_name: str, 179 file_size: int, 180 collection_id: str, 181 ) -> items_pb2.CreateResponse: 182 """Загружает файл и публикует его в системе Perxis. 183 184 Args: 185 file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. 186 file_name (str): Имя файла. 187 file_size (int): Размер файла в байтах. 188 collection_id (str): ID коллекции. 189 Returns: 190 items_pb2.CreateResponse: Ответ API с информацией о созданном и опубликованном объекте. 191 """ 192 message = await self.upload( 193 file_object=file_object, 194 file_name=file_name, 195 file_size=file_size, 196 collection_id=collection_id, 197 ) 198 await self.data_provider.publish( 199 item_id=message.created.id, 200 collection_id=collection_id, 201 ) 202 return message
class
FileUploader:
84class FileUploader: 85 """Класс для загрузки файлов в Perxis. 86 87 Attributes: 88 file_provider (PerxisFileProvider): Провайдер файлов. 89 data_provider (PerxisDataProvider): Провайдер данных. 90 chunk_size (int): Размер чанка для загрузки. 91 """ 92 93 def __init__( 94 self, 95 file_provider: PerxisFileProvider, 96 data_provider: PerxisDataProvider, 97 chunk_size: int = DEFAULT_CHUNK_SIZE, 98 ) -> None: 99 """Инициализирует FileUploader. 100 101 Args: 102 file_provider (PerxisFileProvider): Провайдер файлов. 103 data_provider (PerxisDataProvider): Провайдер данных. 104 chunk_size (int, optional): Размер чанка. По умолчанию 5 МБ. 105 """ 106 self.file_provider = file_provider 107 self.data_provider = data_provider 108 self.chunk_size = chunk_size 109 110 async def __put_chunks( 111 self, 112 file_object: FileIOWrapperBase | TextFileWrapper, 113 part_urls: list[str], 114 ) -> list[str]: 115 """Загружает файл по частям в указанные URL. 116 117 Args: 118 file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. 119 part_urls (list[str]): URL-адреса частей для загрузки. 120 Returns: 121 list[str]: Список `ETag` загруженных частей. 122 """ 123 async with aiohttp.ClientSession( 124 connector=aiohttp.TCPConnector(ssl=False) 125 ) as session: 126 parts = [] 127 async for ix, chunk in a.enumerate( 128 read_chunks(file_object=file_object, chunk_size=self.chunk_size) 129 ): 130 async with session.put(url=part_urls[ix], data=chunk) as response: 131 response.raise_for_status() 132 parts.append(response.headers["etag"].strip('"')) 133 return parts 134 135 async def upload( 136 self, 137 file_object: FileIOWrapperBase | TextFileWrapper, 138 file_name: str, 139 file_size: int, 140 collection_id: str, 141 ) -> items_pb2.CreateResponse: 142 """Загружает файл в Perxis. 143 144 Args: 145 file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. 146 file_name (str): Имя файла. 147 file_size (int): Размер файла в байтах. 148 collection_id (str): ID коллекции. 149 Returns: 150 items_pb2.CreateResponse: Ответ API с информацией о созданном объекте. 151 """ 152 message = await self.file_provider.start_upload( 153 file_name=file_name, 154 file_size=file_size, 155 ) 156 parts = await self.__put_chunks( 157 file_object=file_object, 158 part_urls=message.upload.part_urls, 159 ) 160 await self.file_provider.complete_upload( 161 file_id=message.upload.file.id, 162 upload_id=message.upload.upload_id, 163 parts=parts, 164 ) 165 166 file, data = Struct(), Struct() 167 file.update({"id": message.upload.file.id, "name": file_name}) 168 data.update({"name": file_name, "file": file}) 169 170 message = await self.data_provider.create( 171 data=data, 172 collection_id=collection_id, 173 ) 174 return message 175 176 async def upload_and_publish( 177 self, 178 file_object: FileIOWrapperBase | TextFileWrapper, 179 file_name: str, 180 file_size: int, 181 collection_id: str, 182 ) -> items_pb2.CreateResponse: 183 """Загружает файл и публикует его в системе Perxis. 184 185 Args: 186 file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. 187 file_name (str): Имя файла. 188 file_size (int): Размер файла в байтах. 189 collection_id (str): ID коллекции. 190 Returns: 191 items_pb2.CreateResponse: Ответ API с информацией о созданном и опубликованном объекте. 192 """ 193 message = await self.upload( 194 file_object=file_object, 195 file_name=file_name, 196 file_size=file_size, 197 collection_id=collection_id, 198 ) 199 await self.data_provider.publish( 200 item_id=message.created.id, 201 collection_id=collection_id, 202 ) 203 return message
Класс для загрузки файлов в Perxis.
Attributes:
- file_provider (PerxisFileProvider): Провайдер файлов.
- data_provider (PerxisDataProvider): Провайдер данных.
- chunk_size (int): Размер чанка для загрузки.
FileUploader( file_provider: perxis.provider.PerxisFileProvider, data_provider: perxis.provider.PerxisDataProvider, chunk_size: int = 5242880)
93 def __init__( 94 self, 95 file_provider: PerxisFileProvider, 96 data_provider: PerxisDataProvider, 97 chunk_size: int = DEFAULT_CHUNK_SIZE, 98 ) -> None: 99 """Инициализирует FileUploader. 100 101 Args: 102 file_provider (PerxisFileProvider): Провайдер файлов. 103 data_provider (PerxisDataProvider): Провайдер данных. 104 chunk_size (int, optional): Размер чанка. По умолчанию 5 МБ. 105 """ 106 self.file_provider = file_provider 107 self.data_provider = data_provider 108 self.chunk_size = chunk_size
Инициализирует FileUploader.
Arguments:
- file_provider (PerxisFileProvider): Провайдер файлов.
- data_provider (PerxisDataProvider): Провайдер данных.
- chunk_size (int, optional): Размер чанка. По умолчанию 5 МБ.
async def
upload( self, file_object: aiofile.utils.FileIOWrapperBase | aiofile.utils.TextFileWrapper, file_name: str, file_size: int, collection_id: str) -> items.items_pb2.CreateResponse:
135 async def upload( 136 self, 137 file_object: FileIOWrapperBase | TextFileWrapper, 138 file_name: str, 139 file_size: int, 140 collection_id: str, 141 ) -> items_pb2.CreateResponse: 142 """Загружает файл в Perxis. 143 144 Args: 145 file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. 146 file_name (str): Имя файла. 147 file_size (int): Размер файла в байтах. 148 collection_id (str): ID коллекции. 149 Returns: 150 items_pb2.CreateResponse: Ответ API с информацией о созданном объекте. 151 """ 152 message = await self.file_provider.start_upload( 153 file_name=file_name, 154 file_size=file_size, 155 ) 156 parts = await self.__put_chunks( 157 file_object=file_object, 158 part_urls=message.upload.part_urls, 159 ) 160 await self.file_provider.complete_upload( 161 file_id=message.upload.file.id, 162 upload_id=message.upload.upload_id, 163 parts=parts, 164 ) 165 166 file, data = Struct(), Struct() 167 file.update({"id": message.upload.file.id, "name": file_name}) 168 data.update({"name": file_name, "file": file}) 169 170 message = await self.data_provider.create( 171 data=data, 172 collection_id=collection_id, 173 ) 174 return message
Загружает файл в Perxis.
Arguments:
- file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
- file_name (str): Имя файла.
- file_size (int): Размер файла в байтах.
- collection_id (str): ID коллекции.
Returns:
items_pb2.CreateResponse: Ответ API с информацией о созданном объекте.
async def
upload_and_publish( self, file_object: aiofile.utils.FileIOWrapperBase | aiofile.utils.TextFileWrapper, file_name: str, file_size: int, collection_id: str) -> items.items_pb2.CreateResponse:
176 async def upload_and_publish( 177 self, 178 file_object: FileIOWrapperBase | TextFileWrapper, 179 file_name: str, 180 file_size: int, 181 collection_id: str, 182 ) -> items_pb2.CreateResponse: 183 """Загружает файл и публикует его в системе Perxis. 184 185 Args: 186 file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект. 187 file_name (str): Имя файла. 188 file_size (int): Размер файла в байтах. 189 collection_id (str): ID коллекции. 190 Returns: 191 items_pb2.CreateResponse: Ответ API с информацией о созданном и опубликованном объекте. 192 """ 193 message = await self.upload( 194 file_object=file_object, 195 file_name=file_name, 196 file_size=file_size, 197 collection_id=collection_id, 198 ) 199 await self.data_provider.publish( 200 item_id=message.created.id, 201 collection_id=collection_id, 202 ) 203 return message
Загружает файл и публикует его в системе Perxis.
Arguments:
- file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
- file_name (str): Имя файла.
- file_size (int): Размер файла в байтах.
- collection_id (str): ID коллекции.
Returns:
items_pb2.CreateResponse: Ответ API с информацией о созданном и опубликованном объекте.