perxis.files.uploader

Модуль для загрузки файлов в систему Perxis.

Этот модуль предоставляет класс FileUploader, который поддерживает:

  • Чтение файлов чанками
  • Загрузку файлов в Perxis по частям
  • Автоматическую публикацию загруженных файлов

Используется для работы с API Perxis, обеспечивая передачу файлов.

Пример использования:

import aiofiles
from aiofiles.os import path as aiopath
from perxis.channel import get_grpc_channel
from perxis.provider import PerxisFileProvider, PerxisDataProvider
from perxis.files.uploader import FileUploader


channel = get_grpc_channel(
    target=..., api_key=..., project_name=..., project_version=...
)
uploader = FileUploader(
    file_provider=PerxisFileProvider(channel=channel),
    data_provider=PerxisDataProvider(
        channel=channel,
        space_id=...,
        env_id=...,
    ),
)


async def main():
    filepath = "path/to/file.txt"

    async with aiofiles.open(file=filepath, mode="rb") as file:
        size = await aiopath.getsize(filepath)

        response = await uploader.upload_and_publish(
            file_object=file,
            file_name="file.txt",
            file_size=size,
            collection_id="files",
        )

  1"""
  2Модуль для загрузки файлов в систему Perxis.
  3
  4Этот модуль предоставляет класс [`FileUploader`](#FileUploader), который поддерживает:
  5- Чтение файлов чанками
  6- Загрузку файлов в Perxis по частям
  7- Автоматическую публикацию загруженных файлов
  8
  9Используется для работы с API Perxis, обеспечивая передачу файлов.
 10
 11## Пример использования:
 12```python
 13import aiofiles
 14from aiofiles.os import path as aiopath
 15from perxis.channel import get_grpc_channel
 16from perxis.provider import PerxisFileProvider, PerxisDataProvider
 17from perxis.files.uploader import FileUploader
 18
 19
 20channel = get_grpc_channel(
 21    target=..., api_key=..., project_name=..., project_version=...
 22)
 23uploader = FileUploader(
 24    file_provider=PerxisFileProvider(channel=channel),
 25    data_provider=PerxisDataProvider(
 26        channel=channel,
 27        space_id=...,
 28        env_id=...,
 29    ),
 30)
 31
 32
 33async def main():
 34    filepath = "path/to/file.txt"
 35
 36    async with aiofiles.open(file=filepath, mode="rb") as file:
 37        size = await aiopath.getsize(filepath)
 38
 39        response = await uploader.upload_and_publish(
 40            file_object=file,
 41            file_name="file.txt",
 42            file_size=size,
 43            collection_id="files",
 44        )
 45
 46```
 47---
 48"""
 49
 50import aiohttp
 51import asyncstdlib as a
 52from typing import AsyncGenerator
 53
 54from aiofile import FileIOWrapperBase, TextFileWrapper
 55from google.protobuf.struct_pb2 import Struct
 56
 57from perxis.items import items_pb2
 58from perxis.provider import PerxisFileProvider, PerxisDataProvider
 59
 60
 61__all__ = ("FileUploader",)
 62DEFAULT_CHUNK_SIZE: int = 5 * 2**20
 63
 64
 65async def read_chunks(
 66    file_object: FileIOWrapperBase | TextFileWrapper, chunk_size: int
 67) -> AsyncGenerator[bytes | str, None]:
 68    """Читает файл чанками заданного размера.
 69
 70    Args:
 71        file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
 72        chunk_size (int): Размер одного чанка в байтах.
 73    Yields:
 74        AsyncGenerator[bytes | str]: Чанки данных из файла.
 75    """
 76    while True:
 77        data = await file_object.read(chunk_size)
 78        if not data:
 79            break
 80        yield data
 81
 82
 83class FileUploader:
 84    """Класс для загрузки файлов в Perxis.
 85
 86    Attributes:
 87        file_provider (PerxisFileProvider): Провайдер файлов.
 88        data_provider (PerxisDataProvider): Провайдер данных.
 89        chunk_size (int): Размер чанка для загрузки.
 90    """
 91
 92    def __init__(
 93        self,
 94        file_provider: PerxisFileProvider,
 95        data_provider: PerxisDataProvider,
 96        chunk_size: int = DEFAULT_CHUNK_SIZE,
 97    ) -> None:
 98        """Инициализирует FileUploader.
 99
100        Args:
101            file_provider (PerxisFileProvider): Провайдер файлов.
102            data_provider (PerxisDataProvider): Провайдер данных.
103            chunk_size (int, optional): Размер чанка. По умолчанию 5 МБ.
104        """
105        self.file_provider = file_provider
106        self.data_provider = data_provider
107        self.chunk_size = chunk_size
108
109    async def __put_chunks(
110        self,
111        file_object: FileIOWrapperBase | TextFileWrapper,
112        part_urls: list[str],
113    ) -> list[str]:
114        """Загружает файл по частям в указанные URL.
115
116        Args:
117            file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
118            part_urls (list[str]): URL-адреса частей для загрузки.
119        Returns:
120            list[str]: Список `ETag` загруженных частей.
121        """
122        async with aiohttp.ClientSession(
123            connector=aiohttp.TCPConnector(ssl=False)
124        ) as session:
125            parts = []
126            async for ix, chunk in a.enumerate(
127                read_chunks(file_object=file_object, chunk_size=self.chunk_size)
128            ):
129                async with session.put(url=part_urls[ix], data=chunk) as response:
130                    response.raise_for_status()
131                    parts.append(response.headers["etag"].strip('"'))
132        return parts
133
134    async def upload(
135        self,
136        file_object: FileIOWrapperBase | TextFileWrapper,
137        file_name: str,
138        file_size: int,
139        collection_id: str,
140    ) -> items_pb2.CreateResponse:
141        """Загружает файл в Perxis.
142
143        Args:
144            file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
145            file_name (str): Имя файла.
146            file_size (int): Размер файла в байтах.
147            collection_id (str): ID коллекции.
148        Returns:
149            items_pb2.CreateResponse: Ответ API с информацией о созданном объекте.
150        """
151        message = await self.file_provider.start_upload(
152            file_name=file_name,
153            file_size=file_size,
154        )
155        parts = await self.__put_chunks(
156            file_object=file_object,
157            part_urls=message.upload.part_urls,
158        )
159        await self.file_provider.complete_upload(
160            file_id=message.upload.file.id,
161            upload_id=message.upload.upload_id,
162            parts=parts,
163        )
164
165        file, data = Struct(), Struct()
166        file.update({"id": message.upload.file.id, "name": file_name})
167        data.update({"name": file_name, "file": file})
168
169        message = await self.data_provider.create(
170            data=data,
171            collection_id=collection_id,
172        )
173        return message
174
175    async def upload_and_publish(
176        self,
177        file_object: FileIOWrapperBase | TextFileWrapper,
178        file_name: str,
179        file_size: int,
180        collection_id: str,
181    ) -> items_pb2.CreateResponse:
182        """Загружает файл и публикует его в системе Perxis.
183
184        Args:
185            file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
186            file_name (str): Имя файла.
187            file_size (int): Размер файла в байтах.
188            collection_id (str): ID коллекции.
189        Returns:
190            items_pb2.CreateResponse: Ответ API с информацией о созданном и опубликованном объекте.
191        """
192        message = await self.upload(
193            file_object=file_object,
194            file_name=file_name,
195            file_size=file_size,
196            collection_id=collection_id,
197        )
198        await self.data_provider.publish(
199            item_id=message.created.id,
200            collection_id=collection_id,
201        )
202        return message
class FileUploader:
 84class FileUploader:
 85    """Класс для загрузки файлов в Perxis.
 86
 87    Attributes:
 88        file_provider (PerxisFileProvider): Провайдер файлов.
 89        data_provider (PerxisDataProvider): Провайдер данных.
 90        chunk_size (int): Размер чанка для загрузки.
 91    """
 92
 93    def __init__(
 94        self,
 95        file_provider: PerxisFileProvider,
 96        data_provider: PerxisDataProvider,
 97        chunk_size: int = DEFAULT_CHUNK_SIZE,
 98    ) -> None:
 99        """Инициализирует FileUploader.
100
101        Args:
102            file_provider (PerxisFileProvider): Провайдер файлов.
103            data_provider (PerxisDataProvider): Провайдер данных.
104            chunk_size (int, optional): Размер чанка. По умолчанию 5 МБ.
105        """
106        self.file_provider = file_provider
107        self.data_provider = data_provider
108        self.chunk_size = chunk_size
109
110    async def __put_chunks(
111        self,
112        file_object: FileIOWrapperBase | TextFileWrapper,
113        part_urls: list[str],
114    ) -> list[str]:
115        """Загружает файл по частям в указанные URL.
116
117        Args:
118            file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
119            part_urls (list[str]): URL-адреса частей для загрузки.
120        Returns:
121            list[str]: Список `ETag` загруженных частей.
122        """
123        async with aiohttp.ClientSession(
124            connector=aiohttp.TCPConnector(ssl=False)
125        ) as session:
126            parts = []
127            async for ix, chunk in a.enumerate(
128                read_chunks(file_object=file_object, chunk_size=self.chunk_size)
129            ):
130                async with session.put(url=part_urls[ix], data=chunk) as response:
131                    response.raise_for_status()
132                    parts.append(response.headers["etag"].strip('"'))
133        return parts
134
135    async def upload(
136        self,
137        file_object: FileIOWrapperBase | TextFileWrapper,
138        file_name: str,
139        file_size: int,
140        collection_id: str,
141    ) -> items_pb2.CreateResponse:
142        """Загружает файл в Perxis.
143
144        Args:
145            file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
146            file_name (str): Имя файла.
147            file_size (int): Размер файла в байтах.
148            collection_id (str): ID коллекции.
149        Returns:
150            items_pb2.CreateResponse: Ответ API с информацией о созданном объекте.
151        """
152        message = await self.file_provider.start_upload(
153            file_name=file_name,
154            file_size=file_size,
155        )
156        parts = await self.__put_chunks(
157            file_object=file_object,
158            part_urls=message.upload.part_urls,
159        )
160        await self.file_provider.complete_upload(
161            file_id=message.upload.file.id,
162            upload_id=message.upload.upload_id,
163            parts=parts,
164        )
165
166        file, data = Struct(), Struct()
167        file.update({"id": message.upload.file.id, "name": file_name})
168        data.update({"name": file_name, "file": file})
169
170        message = await self.data_provider.create(
171            data=data,
172            collection_id=collection_id,
173        )
174        return message
175
176    async def upload_and_publish(
177        self,
178        file_object: FileIOWrapperBase | TextFileWrapper,
179        file_name: str,
180        file_size: int,
181        collection_id: str,
182    ) -> items_pb2.CreateResponse:
183        """Загружает файл и публикует его в системе Perxis.
184
185        Args:
186            file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
187            file_name (str): Имя файла.
188            file_size (int): Размер файла в байтах.
189            collection_id (str): ID коллекции.
190        Returns:
191            items_pb2.CreateResponse: Ответ API с информацией о созданном и опубликованном объекте.
192        """
193        message = await self.upload(
194            file_object=file_object,
195            file_name=file_name,
196            file_size=file_size,
197            collection_id=collection_id,
198        )
199        await self.data_provider.publish(
200            item_id=message.created.id,
201            collection_id=collection_id,
202        )
203        return message

Класс для загрузки файлов в Perxis.

Attributes:
  • file_provider (PerxisFileProvider): Провайдер файлов.
  • data_provider (PerxisDataProvider): Провайдер данных.
  • chunk_size (int): Размер чанка для загрузки.
FileUploader( file_provider: perxis.provider.PerxisFileProvider, data_provider: perxis.provider.PerxisDataProvider, chunk_size: int = 5242880)
 93    def __init__(
 94        self,
 95        file_provider: PerxisFileProvider,
 96        data_provider: PerxisDataProvider,
 97        chunk_size: int = DEFAULT_CHUNK_SIZE,
 98    ) -> None:
 99        """Инициализирует FileUploader.
100
101        Args:
102            file_provider (PerxisFileProvider): Провайдер файлов.
103            data_provider (PerxisDataProvider): Провайдер данных.
104            chunk_size (int, optional): Размер чанка. По умолчанию 5 МБ.
105        """
106        self.file_provider = file_provider
107        self.data_provider = data_provider
108        self.chunk_size = chunk_size

Инициализирует FileUploader.

Arguments:
  • file_provider (PerxisFileProvider): Провайдер файлов.
  • data_provider (PerxisDataProvider): Провайдер данных.
  • chunk_size (int, optional): Размер чанка. По умолчанию 5 МБ.
file_provider
data_provider
chunk_size
async def upload( self, file_object: aiofile.utils.FileIOWrapperBase | aiofile.utils.TextFileWrapper, file_name: str, file_size: int, collection_id: str) -> items.items_pb2.CreateResponse:
135    async def upload(
136        self,
137        file_object: FileIOWrapperBase | TextFileWrapper,
138        file_name: str,
139        file_size: int,
140        collection_id: str,
141    ) -> items_pb2.CreateResponse:
142        """Загружает файл в Perxis.
143
144        Args:
145            file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
146            file_name (str): Имя файла.
147            file_size (int): Размер файла в байтах.
148            collection_id (str): ID коллекции.
149        Returns:
150            items_pb2.CreateResponse: Ответ API с информацией о созданном объекте.
151        """
152        message = await self.file_provider.start_upload(
153            file_name=file_name,
154            file_size=file_size,
155        )
156        parts = await self.__put_chunks(
157            file_object=file_object,
158            part_urls=message.upload.part_urls,
159        )
160        await self.file_provider.complete_upload(
161            file_id=message.upload.file.id,
162            upload_id=message.upload.upload_id,
163            parts=parts,
164        )
165
166        file, data = Struct(), Struct()
167        file.update({"id": message.upload.file.id, "name": file_name})
168        data.update({"name": file_name, "file": file})
169
170        message = await self.data_provider.create(
171            data=data,
172            collection_id=collection_id,
173        )
174        return message

Загружает файл в Perxis.

Arguments:
  • file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
  • file_name (str): Имя файла.
  • file_size (int): Размер файла в байтах.
  • collection_id (str): ID коллекции.
Returns:

items_pb2.CreateResponse: Ответ API с информацией о созданном объекте.

async def upload_and_publish( self, file_object: aiofile.utils.FileIOWrapperBase | aiofile.utils.TextFileWrapper, file_name: str, file_size: int, collection_id: str) -> items.items_pb2.CreateResponse:
176    async def upload_and_publish(
177        self,
178        file_object: FileIOWrapperBase | TextFileWrapper,
179        file_name: str,
180        file_size: int,
181        collection_id: str,
182    ) -> items_pb2.CreateResponse:
183        """Загружает файл и публикует его в системе Perxis.
184
185        Args:
186            file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
187            file_name (str): Имя файла.
188            file_size (int): Размер файла в байтах.
189            collection_id (str): ID коллекции.
190        Returns:
191            items_pb2.CreateResponse: Ответ API с информацией о созданном и опубликованном объекте.
192        """
193        message = await self.upload(
194            file_object=file_object,
195            file_name=file_name,
196            file_size=file_size,
197            collection_id=collection_id,
198        )
199        await self.data_provider.publish(
200            item_id=message.created.id,
201            collection_id=collection_id,
202        )
203        return message

Загружает файл и публикует его в системе Perxis.

Arguments:
  • file_object (FileIOWrapperBase | TextFileWrapper): Файловый объект.
  • file_name (str): Имя файла.
  • file_size (int): Размер файла в байтах.
  • collection_id (str): ID коллекции.
Returns:

items_pb2.CreateResponse: Ответ API с информацией о созданном и опубликованном объекте.