Skip to content
Snippets Groups Projects
Commit 62321c44 authored by Podosochnyy Maxim's avatar Podosochnyy Maxim
Browse files

Добавлен общий код PerxisDataProvider, GrpcChannel и моделей

parent 2f2045be
No related branches found
No related tags found
No related merge requests found
from typing import Optional
import grpc
from perxis.auth import APIKeyPlugin
from perxis.models import PerxisEnviron
class GrpcChannel:
call_credentials: grpc.CallCredentials
channel_credentials: grpc.ChannelCredentials
composite_credentials: grpc.ChannelCredentials
channel: Optional[grpc.Channel] = None
def __init__(
self,
target: str,
metadata_plugin: grpc.AuthMetadataPlugin,
project_name: str,
project_version: str,
call_credentials_name: str = "api-key",
) -> None:
self.target = target
self.metadata_plugin = metadata_plugin
self.call_credentials_name = call_credentials_name
self._init_credentials()
self.connect(project_name, project_version)
def _init_credentials(self) -> None:
self.call_credentials = grpc.metadata_call_credentials(
self.metadata_plugin, name=self.call_credentials_name
)
self.channel_credentials = grpc.ssl_channel_credentials()
self.composite_credentials = grpc.composite_channel_credentials(
self.channel_credentials, self.call_credentials
)
def connect(self, project_name: str, project_version: str) -> None:
if not self.channel:
self.channel = grpc.aio.secure_channel(
self.target, self.composite_credentials,
options=(
("grpc.primary_user_agent", f"{project_name}/{project_version}"),
)
)
async def close(self) -> None:
await self.channel.close()
def get_grpc_channel(perxis_environ: PerxisEnviron, project_name: str, project_version: str) -> GrpcChannel:
return GrpcChannel(
target=perxis_environ.target,
metadata_plugin=APIKeyPlugin(token=perxis_environ.api_key),
project_name=project_name,
project_version=project_version
)
from dataclasses import dataclass
@dataclass
class PerxisEnviron:
target: str
space_id: str
env_id: str
api_key: str
collection_id: str
@dataclass
class Reference:
id: str
collection_id: str
disabled: bool = False
from typing import Optional
from google.protobuf.struct_pb2 import Struct
from perxis.common import common_pb2
from perxis.items import items_pb2, items_pb2_grpc
from perxis.references import references_pb2
from perxis.references import references_pb2_grpc
from perxis.channel import GrpcChannel
from perxis.models import Reference
class PerxisDataProvider:
def __init__(
self,
channel: GrpcChannel,
space_id: str,
env_id: str,
collection_id: Optional[str] = None,
default_page_size: Optional[int] = 100,
) -> None:
self.channel = channel
self.space_id = space_id
self.env_id = env_id
self.collection_id = collection_id
self.items_stub = items_pb2_grpc.ItemsStub(self.channel.channel)
self.references_stub = references_pb2_grpc.ReferencesStub(self.channel.channel)
self.default_page_size = default_page_size
async def create(
self, data: Struct, collection_id: Optional[str] = None
) -> items_pb2.CreateResponse:
"""Сохранение данных формы в системе Perxis."""
result = await self.items_stub.Create(
items_pb2.CreateRequest(
item=items_pb2.Item(
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
data=data,
)
)
)
return result
async def update(
self, item_id: str, data: Struct, collection_id: Optional[str] = None
):
"""Метод обновления записи в коллекции."""
result = await self.items_stub.Update(
items_pb2.UpdateRequest(
item=items_pb2.Item(
id=item_id,
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
data=data,
)
)
)
return result
async def find(
self,
collection_id: Optional[str] = None,
filters: Optional[list[str]] = None,
sort_by: Optional[list[str]] = None,
page_num: Optional[int] = None,
page_size: Optional[int] = None,
) -> items_pb2.FindResponse:
"""Метод поиска записей в коллекции."""
result = await self.items_stub.Find(
items_pb2.FindRequest(
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
filter=items_pb2.Filter(q=filters or []),
options=items_pb2.FindOptions(
options=common_pb2.FindOptions(
sort=sort_by, page_num=page_num, page_size=page_size
)
),
)
)
return result
async def find_published(
self,
collection_id: Optional[str] = None,
filters: Optional[list[str]] = None,
fields: Optional[list[str]] = None,
sort_by: Optional[list[str]] = None,
page_num: Optional[int] = None,
page_size: Optional[int] = None,
) -> items_pb2.FindResponse:
result = await self.items_stub.FindPublished(
items_pb2.FindPublishedRequest(
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
filter=items_pb2.Filter(q=filters or []),
options=items_pb2.FindPublishedOptions(
options=common_pb2.FindOptions(
sort=sort_by,
page_size=page_size,
fields=fields or [],
page_num=page_num,
)
),
)
)
return result
async def fetch_all_published(
self,
collection_id: Optional[str] = None,
filters: Optional[list[str]] = None,
fields: Optional[list[str]] = None,
sort_by: Optional[list[str]] = None,
) -> items_pb2.FindResponse:
"""Метод поиска всех опубликованных записей в коллекции."""
kwargs = {
"collection_id": collection_id,
"filters": filters,
"sort_by": sort_by,
"fields": fields,
"page_size": self.default_page_size,
}
message = await self.find_published(**kwargs)
yield message
if pages := message.total // self.default_page_size:
if message.total % self.default_page_size:
pages += 1
else:
pages = 1
for page_num in range(1, pages + 1):
yield await self.find_published(page_num=page_num, **kwargs)
async def unpublish(self, item_id: str, collection_id: Optional[str] = None):
"""Метод снятия с публикации записи в коллекции."""
result = await self.items_stub.Unpublish(
items_pb2.UnpublishRequest(
item=items_pb2.Item(
id=item_id,
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
)
)
)
return result
async def publish(self, item_id: str, collection_id: Optional[str] = None):
"""Метод публикации записи в коллекции."""
result = await self.items_stub.Publish(
items_pb2.PublishRequest(
item=items_pb2.Item(
id=item_id,
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
)
)
)
return result
async def fetch_all(
self,
collection_id: Optional[str] = None,
filters: Optional[list[str]] = None,
sort_by: Optional[list[str]] = None,
) -> items_pb2.FindResponse:
"""Метод получения всех записей коллекции."""
items = []
storage_data = await self.find(
collection_id=collection_id or self.collection_id,
page_size=self.default_page_size,
filters=filters,
)
items.extend(storage_data.items)
if pages := storage_data.total // self.default_page_size:
if storage_data.total % self.default_page_size:
pages += 1
else:
pages = 1
for page_num in range(1, pages + 1):
storage_data = await self.find(
collection_id=collection_id or self.collection_id,
filters=filters,
sort_by=sort_by,
page_num=page_num,
page_size=self.default_page_size,
)
items.extend(storage_data.items)
return items_pb2.FindResponse(items=items)
async def get_references(self, references: list[Reference]):
"""Метод получения связей."""
result = await self.references_stub.Get(
references_pb2.GetRequest(
space_id=self.space_id,
env_id=self.env_id,
references=[
references_pb2.Reference(id=ref.id, collection_id=ref.collection_id)
for ref in references
],
)
)
return result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment