Skip to content
Snippets Groups Projects
Commit 05526f4d authored by Podosochnyy Maxim's avatar Podosochnyy Maxim
Browse files

Добавлен пример с сервисом расширения, исправлен метод update_collections в extension_setup

parent 2ad97b07
No related branches found
No related tags found
No related merge requests found
Showing
with 442 additions and 88 deletions
# perxis-python # perxis-python
## Расширения ## Расширения
### Пример подключения расширения к Perxis ### Локальная разработка
... Для работы расширения требуется указание системного контекста при выполнении запросов. Это возможно только в случае прямой
работы с сервисами perxis минуя envoy, для этого они должны быть в одной сети. Поэтому нужен локально запущенный экземпляр
perxis. Для его запуска нужно в каталоге perxis выполнить команду make run-local. После этого в контейнере в той же сети
можно запустить сервис с расширением
### Пример написания сервиса расширений ### Пример написания сервиса расширений
```python Готовый пример с реализацией простого сервиса можно посмотреть в каталоге examples/extension_service
from perxis.extensions.extension_service import ExtensionService
from perxis.roles import roles_pb2
from perxis.common import common_pb2
from perxis.clients import clients_pb2
from perxis.collections import collections_pb2
def make_collection_instances(schemes_mapping: dict[str, str]) -> list[collections_pb2.Collection]:
collections = []
for collection_id, collection_name in schemes_mapping.items():
with open(f"./schemes/{collection_id}.json", "r") as file:
collection_schema = file.read()
collection = collections_pb2.Collection(
id=collection_id,
name=collection_name,
schema=collection_schema
)
collections.append(collection)
return collections
...
schemes_mapping = {
"dealers_cities": "Дилеры/Города",
"dealers_contacts": "Дилеры/Контакты",
"dealers_countries": "Дилеры/Страны",
"dealers_dealers": "Дилеры/Дилеры",
"dealers_dealerships": "Дилеры/Дилерские центры",
"dealers_department_types": "Дилеры/Типы отделов",
"dealers_departments": "Дилеры/Отделы",
"dealers_events": "Дилеры/События",
"dealers_identifiers": "Дилеры/Идентификаторы",
"dealers_schedules": "Дилеры/Графики работы",
"dealers_services": "Дилеры/Услуги"
}
...
class Servicer(ExtensionService):
extension_id = "some_id"
collections = make_collection_instances(schemes_mapping)
roles = [
roles_pb2.Role(
id="my-role",
description="Описание к роли",
rules=[
common_pb2.Rule(
collection_id="dealers_cities",
actions=[common_pb2.CREATE, common_pb2.UPDATE, common_pb2.DELETE],
)
],
environments=["*"],
allow_management=False,
)
]
clients = [
clients_pb2.Client(
id="my-client",
name="Клиент, созданный расширением",
description="Описание созданного расширением клиента",
role_id="my-role",
api_key={
"rotate": True
}
)
]
```
## Аутентификация ## Аутентификация
......
.docker-compose.override.yml
\ No newline at end of file
FROM python:3.9-slim
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV USER=perx
RUN mkdir -p /home/${USER}/data /home/${USER}/app /home/${USER}/logs
WORKDIR /home/${USER}/app
ARG PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL}
ENV PIP_EXTRA_INDEX_URL=$PIP_EXTRA_INDEX_URL
COPY . /home/${USER}/app
RUN pip install perxis==0.0.14
ENV PYTHONPATH="/home/perx/app"
ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
CMD ["python", "/home/perx/app/server.py"]
schemes_mapping = {
"test_collection": "Тестовая коллекция",
}
from perxis.extensions import manager_pb2
ID = "demo-extension"
NAME = "Демонстрационное расширение"
VERSION = "v0.0.1"
DESCRIPTION = "Демонстрационное расширение"
VERSION_DESCRIPTION = "Описание расширения"
DEPENDENCIES = []
def get_extension_descriptor(host: str) -> manager_pb2.ExtensionDescriptor:
return manager_pb2.ExtensionDescriptor(
extension=ID,
title=NAME,
description=DESCRIPTION,
version=VERSION,
version_description=VERSION_DESCRIPTION,
deps=DEPENDENCIES,
url=host
)
version: "3.9"
services:
demo-ext-backend:
ports:
- 50051:50051
build:
context: .
args:
- PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL}
restart: unless-stopped
volumes:
- .:/home/perx/app
networks:
- storage
- default
networks:
default:
storage:
external:
name: storage
# TODO: убрать дублирование кода. На момент написания сервиса ещё не было версии пакета с perxis-python в которую входил
# TODO: бы написанный ниже код. Удалить после того как станет не нужно
import grpc
import collections
from typing import Optional
from perxis.collections import collections_pb2
from perxis.extensions import manager_pb2
def make_descriptor(
extension_id: str, name: str, description: str,
version: str, version_description: str, host: str,
dependencies: Optional[list[str]] = None
) -> manager_pb2.ExtensionDescriptor:
if dependencies is None:
dependencies = []
return manager_pb2.ExtensionDescriptor(
extension=extension_id,
title=name,
description=description,
version=version,
version_description=version_description,
deps=dependencies,
url=host
)
def make_collection_instances(schemes_dir: str, schemes_mapping: dict[str, str]) -> list[collections_pb2.Collection]:
collections = []
for collection_id, collection_name in schemes_mapping.items():
with open(f"{schemes_dir}/{collection_id}.json", "r") as file:
collection_schema = file.read()
collection = collections_pb2.Collection(
id=collection_id,
name=collection_name,
schema=collection_schema
)
collections.append(collection)
return collections
class _GenericClientInterceptor(grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor):
def __init__(self, interceptor_function):
self._fn = interceptor_function
def intercept_unary_unary(self, continuation, client_call_details, request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, False)
response = continuation(new_details, next(new_request_iterator))
return postprocess(response) if postprocess else response
def intercept_unary_stream(self, continuation, client_call_details,
request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, True)
response_it = continuation(new_details, next(new_request_iterator))
return postprocess(response_it) if postprocess else response_it
def intercept_stream_unary(self, continuation, client_call_details,
request_iterator):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, request_iterator, True, False)
response = continuation(new_details, new_request_iterator)
return postprocess(response) if postprocess else response
def intercept_stream_stream(self, continuation, client_call_details,
request_iterator):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, request_iterator, True, True)
response_it = continuation(new_details, new_request_iterator)
return postprocess(response_it) if postprocess else response_it
def create(intercept_call):
return _GenericClientInterceptor(intercept_call)
class _ClientCallDetails(
collections.namedtuple(
'_ClientCallDetails',
('method', 'timeout', 'metadata', 'credentials')),
grpc.ClientCallDetails):
pass
def header_adder_interceptor(header, value):
def intercept_call(client_call_details, request_iterator, request_streaming,
response_streaming):
metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.append((
header,
value,
))
client_call_details = _ClientCallDetails(
client_call_details.method, client_call_details.timeout, metadata,
client_call_details.credentials)
return client_call_details, request_iterator, None
return create(intercept_call)
\ No newline at end of file
Для запуска необходимо:
1. Создать файл .env
2. Указать переменную PIP_EXTRA_INDEX_URL
3. docker-compose up
{
"ui": {
"options": {
"fields": [
"aaa",
"bbb"
]
}
},
"type": "object",
"params": {
"inline": false,
"fields": {
"bbb": {
"title": "Ещё поле",
"ui": {
"widget": "StringInput"
},
"type": "string",
"params": {}
},
"aaa": {
"title": "Какое то поле",
"ui": {
"widget": "StringInput"
},
"type": "string",
"params": {}
}
}
},
"loaded": false
}
import grpc
from concurrent import futures
from perxis.collections import collections_pb2_grpc
from perxis.environments import environments_pb2_grpc
from perxis.roles import roles_pb2_grpc
from perxis.clients import clients_pb2_grpc
from perxis.extensions import extension_pb2_grpc, manager_pb2_grpc, manager_pb2
from servicer import Servicer
from helpers import header_adder_interceptor
from constants.extension import get_extension_descriptor
def main():
my_extension_descriptor = get_extension_descriptor(host="demo-ext-backend:50051")
interceptor = header_adder_interceptor(
'x-perxis-access', 'system'
)
with grpc.insecure_channel("extension-manager:9030") as extensions_manager_channel:
intercept_channel_extensions_manager_channel = grpc.intercept_channel(extensions_manager_channel, interceptor)
ext_manager_stub = manager_pb2_grpc.ExtensionManagerStub(intercept_channel_extensions_manager_channel)
registered_extensions: manager_pb2.ListExtensionsResponse = ext_manager_stub.ListExtensions(manager_pb2.ListExtensionsRequest())
for ext in registered_extensions.extensions:
if ext.extension == my_extension_descriptor.extension:
if ext.version != my_extension_descriptor.version:
ext_manager_stub.UnregisterExtensions(
manager_pb2.UnregisterExtensionsRequest(
extensions=[my_extension_descriptor]
)
)
ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest(
extensions=[my_extension_descriptor]
))
with grpc.insecure_channel("content:9020") as content_channel:
intercepted_content_channel = grpc.intercept_channel(content_channel, interceptor)
collections_stub = collections_pb2_grpc.CollectionsStub(intercepted_content_channel)
roles_stub = roles_pb2_grpc.RolesStub(intercepted_content_channel)
clients_stub = clients_pb2_grpc.ClientsStub(intercepted_content_channel)
environments_stub = environments_pb2_grpc.EnvironmentsStub(intercepted_content_channel)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
extension_pb2_grpc.add_ExtensionServicer_to_server(
Servicer(
collections_stub, environments_stub, roles_stub, clients_stub
), server
)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
main()
from constants import extension
from helpers import make_collection_instances
from constants import collections
from perxis.extensions.extension_service import ExtensionService
from perxis.roles import roles_pb2
from perxis.common import common_pb2
from perxis.clients import clients_pb2
class Servicer(ExtensionService):
extension_id = extension.ID
collections = make_collection_instances("./schemes", collections.schemes_mapping)
roles = [
roles_pb2.Role(
id="demo-role",
description="Описание к роли",
rules=[
common_pb2.Rule(
collection_id="dealers_cities",
actions=[common_pb2.CREATE, common_pb2.UPDATE, common_pb2.DELETE],
)
],
environments=["*"],
allow_management=False,
)
]
clients = [
clients_pb2.Client(
id="demo-client",
name="Демонстрационный клиент",
description="Описание созданного расширением клиента",
role_id="demo-role",
api_key={
"rotate": True
}
)
]
from perxis.collections import collections_pb2
def make_collection_instances(schemes_dir: str, schemes_mapping: dict[str, str]) -> list[collections_pb2.Collection]:
collections = []
for collection_id, collection_name in schemes_mapping.items():
with open(f"{schemes_dir}/{collection_id}.json", "r") as file:
collection_schema = file.read()
collection = collections_pb2.Collection(
id=collection_id,
name=collection_name,
schema=collection_schema
)
collections.append(collection)
return collections
\ No newline at end of file
...@@ -250,7 +250,7 @@ class ExtensionSetup: ...@@ -250,7 +250,7 @@ class ExtensionSetup:
def __update_collections(self, space_id: str, env_id: str) -> list[str]: def __update_collections(self, space_id: str, env_id: str) -> list[str]:
""" """
Метод __обновления__ коллекций подразумевает что сами коллекции уже созданы. Миграция окружения требуется Метод __обновления__ коллекций. Миграция окружения требуется
только в случае если одна или несколько схем коллекций изменялись. Алгоритм работы: только в случае если одна или несколько схем коллекций изменялись. Алгоритм работы:
1. Получить фактически существующую коллекцию из БД 1. Получить фактически существующую коллекцию из БД
2. Обновить её в perxis 2. Обновить её в perxis
...@@ -271,10 +271,25 @@ class ExtensionSetup: ...@@ -271,10 +271,25 @@ class ExtensionSetup:
collection = get_response.collection collection = get_response.collection
except grpc.RpcError as e: except grpc.RpcError as e:
errors_list.append(f"Не удалось получить коллекцию {local_collection.id}, {e.details()}") collection = None
continue cloned_collection = copy.deepcopy(local_collection)
cloned_collection.space_id = space_id
cloned_collection.env_id = env_id
# Коллекция может быть не найдена в случае если она добавлена в новой версии
if not collection:
try:
create_response, state = self.collections_service.Create.with_call(
collections_pb2.CreateRequest(collection=cloned_collection)
)
collection = create_response.collection
except grpc.RpcError as e:
errors_list.append(f"Не удалось создать коллекцию {local_collection.id}, {e.details()}")
continue
else:
try: try:
cloned_collection = copy.deepcopy(local_collection) cloned_collection = copy.deepcopy(local_collection)
cloned_collection.space_id = space_id cloned_collection.space_id = space_id
......
import grpc
import collections
class _GenericClientInterceptor(grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor):
def __init__(self, interceptor_function):
self._fn = interceptor_function
def intercept_unary_unary(self, continuation, client_call_details, request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, False)
response = continuation(new_details, next(new_request_iterator))
return postprocess(response) if postprocess else response
def intercept_unary_stream(self, continuation, client_call_details,
request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, True)
response_it = continuation(new_details, next(new_request_iterator))
return postprocess(response_it) if postprocess else response_it
def intercept_stream_unary(self, continuation, client_call_details,
request_iterator):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, request_iterator, True, False)
response = continuation(new_details, new_request_iterator)
return postprocess(response) if postprocess else response
def intercept_stream_stream(self, continuation, client_call_details,
request_iterator):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, request_iterator, True, True)
response_it = continuation(new_details, new_request_iterator)
return postprocess(response_it) if postprocess else response_it
def create(intercept_call):
return _GenericClientInterceptor(intercept_call)
class _ClientCallDetails(
collections.namedtuple(
'_ClientCallDetails',
('method', 'timeout', 'metadata', 'credentials')),
grpc.ClientCallDetails):
pass
def header_adder_interceptor(header, value):
def intercept_call(client_call_details, request_iterator, request_streaming,
response_streaming):
metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.append((
header,
value,
))
client_call_details = _ClientCallDetails(
client_call_details.method, client_call_details.timeout, metadata,
client_call_details.credentials)
return client_call_details, request_iterator, None
return create(intercept_call)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment