Skip to content
Snippets Groups Projects
Commit f5dad13a authored by Eterevskiy Georgiy's avatar Eterevskiy Georgiy
Browse files

Merge branch 'feature/add-shared-code-of-data-providers' into 'master'

Добавлен общий код PerxisDataProvider, GrpcChannel и моделей

See merge request !42
parents 2f2045be 138794e7
No related branches found
No related tags found
1 merge request!42Добавлен общий код PerxisDataProvider, GrpcChannel и моделей
Pipeline #32976 passed with stage
in 1 minute and 47 seconds
from typing import Optional
import grpc
from perxis.auth import APIKeyPlugin
from perxis.models import PerxisEnviron
class GrpcChannel:
call_credentials: grpc.CallCredentials
channel_credentials: grpc.ChannelCredentials
composite_credentials: grpc.ChannelCredentials
channel: Optional[grpc.Channel] = None
def __init__(
self,
target: str,
metadata_plugin: grpc.AuthMetadataPlugin,
project_name: str,
project_version: str,
call_credentials_name: str = "api-key",
) -> None:
self.target = target
self.metadata_plugin = metadata_plugin
self.call_credentials_name = call_credentials_name
self._init_credentials()
self.connect(project_name, project_version)
def _init_credentials(self) -> None:
self.call_credentials = grpc.metadata_call_credentials(
self.metadata_plugin, name=self.call_credentials_name
)
self.channel_credentials = grpc.ssl_channel_credentials()
self.composite_credentials = grpc.composite_channel_credentials(
self.channel_credentials, self.call_credentials
)
def connect(self, project_name: str, project_version: str) -> None:
if not self.channel:
self.channel = grpc.aio.secure_channel(
self.target, self.composite_credentials,
options=(
("grpc.primary_user_agent", f"{project_name}/{project_version}"),
)
)
async def close(self) -> None:
await self.channel.close()
def get_grpc_channel(perxis_environ: PerxisEnviron, project_name: str, project_version: str) -> GrpcChannel:
return GrpcChannel(
target=perxis_environ.target,
metadata_plugin=APIKeyPlugin(token=perxis_environ.api_key),
project_name=project_name,
project_version=project_version
)
from dataclasses import dataclass
@dataclass
class PerxisEnviron:
target: str
space_id: str
env_id: str
api_key: str
collection_id: str
@dataclass
class Reference:
id: str
collection_id: str
disabled: bool = False
from typing import Optional
from google.protobuf.struct_pb2 import Struct
from perxis.common import common_pb2
from perxis.items import items_pb2, items_pb2_grpc
from perxis.references import references_pb2
from perxis.references import references_pb2_grpc
from perxis.channel import GrpcChannel
from perxis.models import Reference
class PerxisDataProvider:
def __init__(
self,
channel: GrpcChannel,
space_id: str,
env_id: str,
collection_id: Optional[str] = None,
default_page_size: Optional[int] = 100,
) -> None:
self.channel = channel
self.space_id = space_id
self.env_id = env_id
self.collection_id = collection_id
self.items_stub = items_pb2_grpc.ItemsStub(self.channel.channel)
self.references_stub = references_pb2_grpc.ReferencesStub(self.channel.channel)
self.default_page_size = default_page_size
async def create(
self, data: Struct, collection_id: Optional[str] = None
) -> items_pb2.CreateResponse:
"""Сохранение данных формы в системе Perxis."""
result = await self.items_stub.Create(
items_pb2.CreateRequest(
item=items_pb2.Item(
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
data=data,
)
)
)
return result
async def update(
self, item_id: str, data: Struct, collection_id: Optional[str] = None
):
"""Метод обновления записи в коллекции."""
result = await self.items_stub.Update(
items_pb2.UpdateRequest(
item=items_pb2.Item(
id=item_id,
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
data=data,
)
)
)
return result
async def find(
self,
collection_id: Optional[str] = None,
filters: Optional[list[str]] = None,
sort_by: Optional[list[str]] = None,
page_num: Optional[int] = None,
page_size: Optional[int] = None,
) -> items_pb2.FindResponse:
"""Метод поиска записей в коллекции."""
result = await self.items_stub.Find(
items_pb2.FindRequest(
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
filter=items_pb2.Filter(q=filters or []),
options=items_pb2.FindOptions(
options=common_pb2.FindOptions(
sort=sort_by, page_num=page_num, page_size=page_size
)
),
)
)
return result
async def find_published(
self,
collection_id: Optional[str] = None,
filters: Optional[list[str]] = None,
fields: Optional[list[str]] = None,
sort_by: Optional[list[str]] = None,
page_num: Optional[int] = None,
page_size: Optional[int] = None,
) -> items_pb2.FindResponse:
result = await self.items_stub.FindPublished(
items_pb2.FindPublishedRequest(
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
filter=items_pb2.Filter(q=filters or []),
options=items_pb2.FindPublishedOptions(
options=common_pb2.FindOptions(
sort=sort_by,
page_size=page_size,
fields=fields or [],
page_num=page_num,
)
),
)
)
return result
async def fetch_all_published(
self,
collection_id: Optional[str] = None,
filters: Optional[list[str]] = None,
fields: Optional[list[str]] = None,
sort_by: Optional[list[str]] = None,
) -> items_pb2.FindResponse:
"""Метод поиска всех опубликованных записей в коллекции."""
kwargs = {
"collection_id": collection_id,
"filters": filters,
"sort_by": sort_by,
"fields": fields,
"page_size": self.default_page_size,
}
message = await self.find_published(**kwargs)
yield message
if pages := message.total // self.default_page_size:
if message.total % self.default_page_size:
pages += 1
else:
pages = 1
for page_num in range(1, pages + 1):
yield await self.find_published(page_num=page_num, **kwargs)
async def unpublish(self, item_id: str, collection_id: Optional[str] = None):
"""Метод снятия с публикации записи в коллекции."""
result = await self.items_stub.Unpublish(
items_pb2.UnpublishRequest(
item=items_pb2.Item(
id=item_id,
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
)
)
)
return result
async def publish(self, item_id: str, collection_id: Optional[str] = None):
"""Метод публикации записи в коллекции."""
result = await self.items_stub.Publish(
items_pb2.PublishRequest(
item=items_pb2.Item(
id=item_id,
space_id=self.space_id,
env_id=self.env_id,
collection_id=collection_id or self.collection_id,
)
)
)
return result
async def fetch_all(
self,
collection_id: Optional[str] = None,
filters: Optional[list[str]] = None,
sort_by: Optional[list[str]] = None,
) -> items_pb2.FindResponse:
"""Метод получения всех записей коллекции."""
items = []
storage_data = await self.find(
collection_id=collection_id or self.collection_id,
page_size=self.default_page_size,
filters=filters,
)
items.extend(storage_data.items)
if pages := storage_data.total // self.default_page_size:
if storage_data.total % self.default_page_size:
pages += 1
else:
pages = 1
for page_num in range(1, pages + 1):
storage_data = await self.find(
collection_id=collection_id or self.collection_id,
filters=filters,
sort_by=sort_by,
page_num=page_num,
page_size=self.default_page_size,
)
items.extend(storage_data.items)
return items_pb2.FindResponse(items=items)
async def get_references(self, references: list[Reference]):
"""Метод получения связей."""
result = await self.references_stub.Get(
references_pb2.GetRequest(
space_id=self.space_id,
env_id=self.env_id,
references=[
references_pb2.Reference(id=ref.id, collection_id=ref.collection_id)
for ref in references
],
)
)
return result
......@@ -14,7 +14,7 @@ def load_requirements():
setup(
name='perxis',
version='0.0.20',
version='0.0.21',
description='Perxis python client',
long_description=long_description,
long_description_content_type='text/markdown',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment