Skip to content
Snippets Groups Projects
Commit d8b33674 authored by Podosochnyy Maxim's avatar Podosochnyy Maxim
Browse files

Добавлена поддержка действий

parent 3b7bd04c
Branches
Tags
No related merge requests found
import random
from constants import extension
from constants import collections
from constants import collections as extension_collections
from perxis.extensions.actions import make_action_dict
from perxis.extensions import extension_service_pb2
from perxis.extensions.extension_service import ExtensionService
from perxis.extensions import extension_pb2
from perxis.collections import helpers as collections_helpers
from perxis.roles import roles_pb2
from perxis.common import common_pb2
......@@ -11,7 +16,30 @@ from perxis.clients import clients_pb2
class Servicer(ExtensionService):
extension_id = extension.ID
collections = collections_helpers.make_collection_instances("./schemes", collections.schemes_mapping)
collections = collections_helpers.make_collection_instances("./schemes", extension_collections.schemes_mapping)
actions = [
make_action_dict(
extension_id=extension.ID,
action_id="demo-action-no-callback",
name="Действие с несуществующим каллбэком",
kind=extension_pb2.Action.Kind.ITEMS,
classes=[extension_collections.TEST_COLLECTION_ID]
),
make_action_dict(
extension_id=extension.ID,
action_id="demo-action-items",
name="Действие над элементами",
kind=extension_pb2.Action.Kind.ITEMS,
classes=[extension_collections.TEST_COLLECTION_ID]
),
make_action_dict(
extension_id=extension.ID,
action_id="demo-action-item",
name="Действие над одним элементом",
kind=extension_pb2.Action.Kind.ITEM,
classes=[extension_collections.TEST_COLLECTION_ID]
)
]
roles = [
roles_pb2.Role(
id="demo-role",
......@@ -37,3 +65,53 @@ class Servicer(ExtensionService):
}
)
]
async def demo_action_items(
self,
request: extension_pb2.ActionRequest,
context
) -> extension_service_pb2.ActionResponse:
self.logger.info(
"Действие над несколькими элементами %s %s %s %s" % (
request.space_id, request.env_id, request.item_ids, request.collection_id
)
)
with_error = bool(random.randint(0, 1))
if with_error:
state = extension_service_pb2.ActionResponse.State.ERROR
else:
state = extension_service_pb2.ActionResponse.State.DONE
return extension_service_pb2.ActionResponse(
state=state,
title="Действие над одним элементом",
error=", ".join(["Ошибка"] if with_error else []),
msg=f"{'Ошибка' if with_error else 'ОК'} ({request.item_ids}, {request.collection_id})"
)
async def demo_action_item(
self,
request: extension_pb2.ActionRequest,
context
) -> extension_service_pb2.ActionResponse:
self.logger.info(
"Действие над одним элементом %s %s %s %s" % (
request.space_id, request.env_id, request.item_id, request.collection_id
)
)
with_error = bool(random.randint(0, 1))
if with_error:
state = extension_service_pb2.ActionResponse.State.ERROR
else:
state = extension_service_pb2.ActionResponse.State.DONE
return extension_service_pb2.ActionResponse(
state=state,
title="Действие над одним элементом",
error=", ".join(["Ошибка"] if with_error else []),
msg=f"{'Ошибка' if with_error else 'ОК'} ({request.item_id}, {request.collection_id})"
)
import copy
from typing import Optional
from google.protobuf.struct_pb2 import Struct
from perxis.items import items_pb2
ACTIONS_COLLECTION_ID = "space_actions"
def process_action_id(action_id: str) -> str:
action_id = action_id.lower().replace("-", "_").replace(" ", "_")
return action_id
def make_action_dict(
extension_id: str,
action_id: str,
name: str,
kind: int,
classes: Optional[list[str]] = None,
target: Optional[int] = 0,
description: Optional[str] = "",
icon: Optional[str] = "",
confirm: bool = True,
view: Optional[int] = 0,
) -> dict:
if not classes:
classes = ["*"]
action_id = process_action_id(action_id)
return {
"id": action_id,
"action": f"grpc:///{extension_id}/{action_id}",
"name": name,
"target": target,
"classes": classes,
"description": description,
"icon": icon,
"kind": kind,
"confirm": confirm,
"view": view,
}
def make_action_struct(data: dict) -> Struct:
struct = Struct()
struct.update(data)
return struct
def make_action_item(space_id: str, env_id: str, data: dict) -> items_pb2.Item:
copied_data = copy.deepcopy(data)
action_id = copied_data.pop("id")
struct = make_action_struct(data)
return items_pb2.Item(
id=action_id,
space_id=space_id,
env_id=env_id,
collection_id=ACTIONS_COLLECTION_ID,
data=struct
)
......@@ -8,6 +8,7 @@ from concurrent import futures
from perxis.collections import collections_pb2_grpc
from perxis.environments import environments_pb2_grpc
from perxis.roles import roles_pb2_grpc
from perxis.items import items_pb2_grpc
from perxis.clients import clients_pb2_grpc
from perxis.common import operation_service_pb2_grpc
from perxis.extensions import manager_service_pb2_grpc, manager_service_pb2, extension_service_pb2_grpc
......@@ -67,11 +68,12 @@ async def _main(
roles_stub = roles_pb2_grpc.RolesStub(content_channel)
clients_stub = clients_pb2_grpc.ClientsStub(content_channel)
environments_stub = environments_pb2_grpc.EnvironmentsStub(content_channel)
items_stub = items_pb2_grpc.ItemsStub(content_channel)
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
servicer = servicer_cls(
collections_stub, environments_stub, roles_stub, clients_stub
collections_stub, environments_stub, roles_stub, clients_stub, items_stub
)
extension_service_pb2_grpc.add_ExtensionServiceServicer_to_server(servicer, server)
......
......@@ -11,8 +11,9 @@ from google.protobuf import timestamp_pb2, any_pb2, wrappers_pb2
from perxis.extensions import extension_service_pb2, extension_service_pb2_grpc, extension_pb2
from perxis.roles import roles_pb2_grpc, roles_pb2
from perxis.items import items_pb2_grpc
from perxis.clients import clients_pb2_grpc, clients_pb2
from perxis.common import common_pb2, operation_pb2, operation_service_pb2_grpc, operation_service_pb2, error_pb2
from perxis.common import operation_pb2, operation_service_pb2_grpc, operation_service_pb2, error_pb2
from perxis.collections import collections_pb2_grpc, collections_pb2
from perxis.environments import environments_pb2_grpc
from perxis.extensions.extension_setup import ExtensionSetup
......@@ -81,6 +82,7 @@ class ExtensionService(
collections: list[collections_pb2.Collection] = []
roles: list[roles_pb2.Role] = []
clients: list[clients_pb2.Client] = []
actions: list[dict] = []
__operations: dict[str, OperationMeta] = {}
......@@ -89,16 +91,18 @@ class ExtensionService(
environments_service: environments_pb2_grpc.EnvironmentsStub,
roles_service: roles_pb2_grpc.RolesStub,
clients_service: clients_pb2_grpc.ClientsStub,
items_service: items_pb2_grpc.ItemsStub,
):
self.logger = logging.getLogger(__name__)
self.collections_service = collections_service
self.environments_service = environments_service
self.roles_service = roles_service
self.clients_service = clients_service
self.items_service = items_service
self.extension_setup = ExtensionSetup(
self.collections_service, self.environments_service,
self.roles_service, self.clients_service
self.roles_service, self.clients_service, self.items_service,
)
for collection in self.collections or []:
......@@ -110,6 +114,9 @@ class ExtensionService(
for client in self.clients or []:
self.extension_setup.add_client(client)
for action in self.actions or []:
self.extension_setup.add_action(action)
@aiocron.crontab('0 * * * *', start=True)
async def remove_old_operations():
self.remove_old_operations()
......@@ -266,11 +273,48 @@ class ExtensionService(
return self.get_operation_meta(operation_id).to_operation()
async def _dispatch_action(
self, request: extension_pb2.ActionRequest, context
) -> extension_service_pb2.ActionResponse:
action_id = request.action.split("/")[-1]
if not hasattr(self, action_id):
response = extension_service_pb2.ActionResponse(
state=extension_service_pb2.ActionResponse.State.ERROR,
title="Невозможно выполнить действие",
error=f"В расширении отсутсвует функция {action_id}"
)
else:
func = getattr(self, action_id)
response = await func(request, context)
return response
async def Action(self, request: extension_pb2.ActionRequest, context):
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("Unknown action")
operation_description = "Действие %s для окружения %s пространства %s" % (
request.action,
request.env_id,
request.space_id,
)
return None
self.logger.info(operation_description)
response = await self._dispatch_action(request, context)
if response.state == extension_service_pb2.ActionResponse.State.ERROR:
error_description = "Ошибка действия %s в расширении %s: %s" % (
request.action,
self.extension_id,
response.error
)
#context.set_code(grpc.StatusCode.UNKNOWN)
#context.set_details(error_description)
self.logger.error(error_description)
return response
async def Get(self, request: operation_service_pb2.GetOperationRequest, context):
operations_meta = self.get_operation_meta(request.operation_id)
......
......@@ -11,9 +11,11 @@ from deepdiff import DeepDiff
from google.protobuf.json_format import MessageToDict
from perxis.collections import collections_pb2_grpc, collections_pb2
from perxis.roles import roles_pb2_grpc, roles_pb2
from perxis.items import items_pb2_grpc, items_pb2
from perxis.common import common_pb2
from perxis.clients import clients_pb2_grpc, clients_pb2
from perxis.environments import environments_pb2_grpc, environments_pb2
from perxis.extensions.actions import make_action_item, make_action_struct, ACTIONS_COLLECTION_ID
logger = logging.getLogger(__name__)
......@@ -26,19 +28,28 @@ class ExtensionSetup:
environments_service: environments_pb2_grpc.EnvironmentsStub,
roles_service: roles_pb2_grpc.RolesStub,
clients_service: clients_pb2_grpc.ClientsStub,
items_service: items_pb2_grpc.ItemsStub,
):
self.collections = []
self.clients = []
self.roles = []
self.actions = []
self.roles_service = roles_service
self.clients_service = clients_service
self.collections_service = collections_service
self.environments_service = environments_service
self.items_service = items_service
self.__max_attempts_count = 5
self.__sleep_time = 1
def add_action(self, action: dict):
self.actions.append(action)
def set_actions(self, actions: list[dict]):
self.actions = actions
def add_collection(self, collection: collections_pb2.Collection):
self.collections.append(collection)
......@@ -437,12 +448,140 @@ class ExtensionSetup:
return error_message
# Работа с действиями
async def __check_actions(self, space_id: str, env_id: str) -> list[str]:
errors_list = []
ids = [data["id"] for data in self.actions if data.get("id")]
items = []
try:
result = await self.items_service.Find(
items_pb2.FindRequest(
space_id=space_id,
env_id=env_id,
collection_id=ACTIONS_COLLECTION_ID,
filter=items_pb2.Filter(q=[f"id in {ids}"]),
options=items_pb2.FindOptions(
options=common_pb2.FindOptions(
page_num=0, page_size=len(ids)
)
),
)
)
items = result.items
except grpc.RpcError as e:
errors_list.append(f"Не удалось получить данные о действиях, ошибка {e.details()}")
finally:
found_ids = [item.id for item in items]
for action_id in found_ids:
if action_id not in ids:
errors_list.append(f"Действие {action_id} не найдено")
return errors_list
async def __update_actions(self, space_id: str, env_id: str) -> list[str]:
errors_list = []
not_found = False
for action in self.actions:
action_item = make_action_item(space_id, env_id, action)
try:
r = await self.items_service.Update(
items_pb2.UpdateRequest(
item=items_pb2.Item(
id=action_item.id,
space_id=space_id,
env_id=env_id,
collection_id=ACTIONS_COLLECTION_ID,
data=action_item.data,
)
)
)
except grpc.RpcError as e:
if "not found" not in e.details():
errors_list.append(f"Не удалось обновить действие {action_item.id}, {e.details()}")
continue
not_found = True
if not_found:
try:
await self.items_service.Create(
items_pb2.CreateRequest(
item=action_item
)
)
except grpc.RpcError as e:
errors_list.append(f"Не удалось создать действие {action_item.id}, {e.details()}")
try:
await self.items_service.Publish(
items_pb2.PublishRequest(
item=action_item
)
)
except grpc.RpcError as e:
errors_list.append(f"Не удалось опубликовать действие {action_item.id}, {e.details()}")
return errors_list
async def __remove_actions(self, space_id: str, env_id: str) -> list[str]:
errors_list = []
for action in self.actions:
action_item = make_action_item(space_id, env_id, action)
try:
await self.items_service.Unpublish(
items_pb2.UnpublishRequest(
item=items_pb2.Item(
id=action_item.id,
space_id=action_item.space_id,
env_id=action_item.env_id,
collection_id=action_item.collection_id,
)
)
)
except grpc.RpcError as e:
if "not found" not in e.details():
errors_list.append(f"Не удалось снять с публикации действие {action.get('id', 'n/a')}, {e.details()}")
try:
await self.items_service.Delete(
items_pb2.DeleteRequest(
space_id=space_id,
env_id=env_id,
item=items_pb2.Item(
id=action_item.id,
space_id=action_item.space_id,
env_id=action_item.env_id,
collection_id=action_item.collection_id,
),
options=items_pb2.DeleteOptions(
update_attrs=True,
erase=True
)
)
)
except grpc.RpcError as e:
# Отсутствие действия это не ошибка
if "not found" not in e.details():
errors_list.append(f"Не удалось удалить действие {action.get('id', 'n/a')}, {e.details()}")
return errors_list
async def install(self, space_id: str, env_id: str, use_force: bool) -> list[str]:
errors = []
errors += await self.__update_collections(space_id, env_id)
errors += await self.__update_roles(space_id, env_id)
errors += await self.__update_clients(space_id)
errors += await self.__update_actions(space_id, env_id)
return errors
......@@ -452,6 +591,7 @@ class ExtensionSetup:
errors += await self.__update_collections(space_id, env_id)
errors += await self.__update_roles(space_id, env_id)
errors += await self.__update_clients(space_id)
errors += await self.__update_actions(space_id, env_id)
return errors
......@@ -459,6 +599,7 @@ class ExtensionSetup:
errors = await self.__check_collections(space_id, env_id)
errors += await self.__check_roles(space_id)
errors += await self.__check_clients(space_id)
errors += await self.__check_actions(space_id, env_id)
return errors
......@@ -469,5 +610,6 @@ class ExtensionSetup:
errors += await self.__remove_collections(space_id, env_id)
errors += await self.__remove_clients(space_id)
errors += await self.__remove_roles(space_id)
errors += await self.__remove_actions(space_id, env_id)
return errors
......@@ -14,7 +14,7 @@ def load_requirements():
setup(
name='perxis',
version='1.3.0',
version='1.4.0',
description='Perxis python client',
long_description=long_description,
long_description_content_type='text/markdown',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment