Skip to content
Snippets Groups Projects
Commit 548a5a91 authored by Eterevskiy Georgiy's avatar Eterevskiy Georgiy
Browse files

Merge branch 'feature/AUTO-2219' into 'master'

Добавлена поддержка действий

See merge request perxis/perxis-python!62
parents 3b7bd04c 26ddeedb
No related branches found
No related tags found
No related merge requests found
import random
from constants import extension from constants import extension
from constants import collections from constants import collections as extension_collections
from perxis.extensions.actions import make_action_dict
from perxis.extensions import extension_service_pb2
from perxis.extensions.extension_service import ExtensionService from perxis.extensions.extension_service import ExtensionService
from perxis.extensions import extension_pb2
from perxis.collections import helpers as collections_helpers from perxis.collections import helpers as collections_helpers
from perxis.roles import roles_pb2 from perxis.roles import roles_pb2
from perxis.common import common_pb2 from perxis.common import common_pb2
...@@ -11,7 +16,30 @@ from perxis.clients import clients_pb2 ...@@ -11,7 +16,30 @@ from perxis.clients import clients_pb2
class Servicer(ExtensionService): class Servicer(ExtensionService):
extension_id = extension.ID extension_id = extension.ID
collections = collections_helpers.make_collection_instances("./schemes", collections.schemes_mapping) collections = collections_helpers.make_collection_instances("./schemes", extension_collections.schemes_mapping)
actions = [
make_action_dict(
extension_id=extension.ID,
action_id="demo-action-no-callback",
name="Действие с несуществующим каллбэком",
kind=extension_pb2.Action.Kind.ITEMS,
classes=[extension_collections.TEST_COLLECTION_ID]
),
make_action_dict(
extension_id=extension.ID,
action_id="demo-action-items",
name="Действие над элементами",
kind=extension_pb2.Action.Kind.ITEMS,
classes=[extension_collections.TEST_COLLECTION_ID]
),
make_action_dict(
extension_id=extension.ID,
action_id="demo-action-item",
name="Действие над одним элементом",
kind=extension_pb2.Action.Kind.ITEM,
classes=[extension_collections.TEST_COLLECTION_ID]
)
]
roles = [ roles = [
roles_pb2.Role( roles_pb2.Role(
id="demo-role", id="demo-role",
...@@ -37,3 +65,53 @@ class Servicer(ExtensionService): ...@@ -37,3 +65,53 @@ class Servicer(ExtensionService):
} }
) )
] ]
async def action_demo_action_items(
self,
request: extension_pb2.ActionRequest,
context
) -> extension_service_pb2.ActionResponse:
self.logger.info(
"Действие над несколькими элементами %s %s %s %s" % (
request.space_id, request.env_id, request.item_ids, request.collection_id
)
)
with_error = bool(random.randint(0, 1))
if with_error:
state = extension_service_pb2.ActionResponse.State.ERROR
else:
state = extension_service_pb2.ActionResponse.State.DONE
return extension_service_pb2.ActionResponse(
state=state,
title="Действие над одним элементом",
error=", ".join(["Ошибка"] if with_error else []),
msg=f"{'Ошибка' if with_error else 'ОК'} ({request.item_ids}, {request.collection_id})"
)
async def action_demo_action_item(
self,
request: extension_pb2.ActionRequest,
context
) -> extension_service_pb2.ActionResponse:
self.logger.info(
"Действие над одним элементом %s %s %s %s" % (
request.space_id, request.env_id, request.item_id, request.collection_id
)
)
with_error = bool(random.randint(0, 1))
if with_error:
state = extension_service_pb2.ActionResponse.State.ERROR
else:
state = extension_service_pb2.ActionResponse.State.DONE
return extension_service_pb2.ActionResponse(
state=state,
title="Действие над одним элементом",
error=", ".join(["Ошибка"] if with_error else []),
msg=f"{'Ошибка' if with_error else 'ОК'} ({request.item_id}, {request.collection_id})"
)
import copy
from typing import Optional
from google.protobuf.struct_pb2 import Struct
from perxis.items import items_pb2
ACTIONS_COLLECTION_ID = "space_actions"
def process_action_id(action_id: str) -> str:
action_id = action_id.lower().replace("-", "_").replace(" ", "_")
return action_id
def make_action_dict(
extension_id: str,
action_id: str,
name: str,
kind: int,
**kwargs,
) -> dict:
action_id = process_action_id(action_id)
return {
"id": action_id,
"action": f"grpc:///{extension_id}/{action_id}",
"name": name,
"kind": kind,
**kwargs,
}
def make_action_struct(data: dict) -> Struct:
struct = Struct()
struct.update(data)
return struct
def make_action_item(space_id: str, env_id: str, data: dict) -> items_pb2.Item:
copied_data = copy.deepcopy(data)
action_id = copied_data.pop("id")
struct = make_action_struct(data)
return items_pb2.Item(
id=action_id,
space_id=space_id,
env_id=env_id,
collection_id=ACTIONS_COLLECTION_ID,
data=struct
)
...@@ -8,6 +8,7 @@ from concurrent import futures ...@@ -8,6 +8,7 @@ from concurrent import futures
from perxis.collections import collections_pb2_grpc from perxis.collections import collections_pb2_grpc
from perxis.environments import environments_pb2_grpc from perxis.environments import environments_pb2_grpc
from perxis.roles import roles_pb2_grpc from perxis.roles import roles_pb2_grpc
from perxis.items import items_pb2_grpc
from perxis.clients import clients_pb2_grpc from perxis.clients import clients_pb2_grpc
from perxis.common import operation_service_pb2_grpc from perxis.common import operation_service_pb2_grpc
from perxis.extensions import manager_service_pb2_grpc, manager_service_pb2, extension_service_pb2_grpc from perxis.extensions import manager_service_pb2_grpc, manager_service_pb2, extension_service_pb2_grpc
...@@ -67,11 +68,12 @@ async def _main( ...@@ -67,11 +68,12 @@ async def _main(
roles_stub = roles_pb2_grpc.RolesStub(content_channel) roles_stub = roles_pb2_grpc.RolesStub(content_channel)
clients_stub = clients_pb2_grpc.ClientsStub(content_channel) clients_stub = clients_pb2_grpc.ClientsStub(content_channel)
environments_stub = environments_pb2_grpc.EnvironmentsStub(content_channel) environments_stub = environments_pb2_grpc.EnvironmentsStub(content_channel)
items_stub = items_pb2_grpc.ItemsStub(content_channel)
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10)) server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
servicer = servicer_cls( servicer = servicer_cls(
collections_stub, environments_stub, roles_stub, clients_stub collections_stub, environments_stub, roles_stub, clients_stub, items_stub
) )
extension_service_pb2_grpc.add_ExtensionServiceServicer_to_server(servicer, server) extension_service_pb2_grpc.add_ExtensionServiceServicer_to_server(servicer, server)
......
...@@ -11,8 +11,9 @@ from google.protobuf import timestamp_pb2, any_pb2, wrappers_pb2 ...@@ -11,8 +11,9 @@ from google.protobuf import timestamp_pb2, any_pb2, wrappers_pb2
from perxis.extensions import extension_service_pb2, extension_service_pb2_grpc, extension_pb2 from perxis.extensions import extension_service_pb2, extension_service_pb2_grpc, extension_pb2
from perxis.roles import roles_pb2_grpc, roles_pb2 from perxis.roles import roles_pb2_grpc, roles_pb2
from perxis.items import items_pb2_grpc
from perxis.clients import clients_pb2_grpc, clients_pb2 from perxis.clients import clients_pb2_grpc, clients_pb2
from perxis.common import common_pb2, operation_pb2, operation_service_pb2_grpc, operation_service_pb2, error_pb2 from perxis.common import operation_pb2, operation_service_pb2_grpc, operation_service_pb2, error_pb2
from perxis.collections import collections_pb2_grpc, collections_pb2 from perxis.collections import collections_pb2_grpc, collections_pb2
from perxis.environments import environments_pb2_grpc from perxis.environments import environments_pb2_grpc
from perxis.extensions.extension_setup import ExtensionSetup from perxis.extensions.extension_setup import ExtensionSetup
...@@ -81,6 +82,7 @@ class ExtensionService( ...@@ -81,6 +82,7 @@ class ExtensionService(
collections: list[collections_pb2.Collection] = [] collections: list[collections_pb2.Collection] = []
roles: list[roles_pb2.Role] = [] roles: list[roles_pb2.Role] = []
clients: list[clients_pb2.Client] = [] clients: list[clients_pb2.Client] = []
actions: list[dict] = []
__operations: dict[str, OperationMeta] = {} __operations: dict[str, OperationMeta] = {}
...@@ -89,16 +91,18 @@ class ExtensionService( ...@@ -89,16 +91,18 @@ class ExtensionService(
environments_service: environments_pb2_grpc.EnvironmentsStub, environments_service: environments_pb2_grpc.EnvironmentsStub,
roles_service: roles_pb2_grpc.RolesStub, roles_service: roles_pb2_grpc.RolesStub,
clients_service: clients_pb2_grpc.ClientsStub, clients_service: clients_pb2_grpc.ClientsStub,
items_service: items_pb2_grpc.ItemsStub,
): ):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.collections_service = collections_service self.collections_service = collections_service
self.environments_service = environments_service self.environments_service = environments_service
self.roles_service = roles_service self.roles_service = roles_service
self.clients_service = clients_service self.clients_service = clients_service
self.items_service = items_service
self.extension_setup = ExtensionSetup( self.extension_setup = ExtensionSetup(
self.collections_service, self.environments_service, self.collections_service, self.environments_service,
self.roles_service, self.clients_service self.roles_service, self.clients_service, self.items_service,
) )
for collection in self.collections or []: for collection in self.collections or []:
...@@ -110,6 +114,9 @@ class ExtensionService( ...@@ -110,6 +114,9 @@ class ExtensionService(
for client in self.clients or []: for client in self.clients or []:
self.extension_setup.add_client(client) self.extension_setup.add_client(client)
for action in self.actions or []:
self.extension_setup.add_action(action)
@aiocron.crontab('0 * * * *', start=True) @aiocron.crontab('0 * * * *', start=True)
async def remove_old_operations(): async def remove_old_operations():
self.remove_old_operations() self.remove_old_operations()
...@@ -266,11 +273,47 @@ class ExtensionService( ...@@ -266,11 +273,47 @@ class ExtensionService(
return self.get_operation_meta(operation_id).to_operation() return self.get_operation_meta(operation_id).to_operation()
async def _dispatch_action(
self, request: extension_pb2.ActionRequest, context
) -> extension_service_pb2.ActionResponse:
action_id = request.action.split("/")[-1]
func_name = f"action_{action_id}"
if not hasattr(self, func_name):
response = extension_service_pb2.ActionResponse(
state=extension_service_pb2.ActionResponse.State.ERROR,
title="Невозможно выполнить действие",
error=f"В расширении отсутсвует функция {action_id}"
)
else:
func = getattr(self, func_name)
response = await func(request, context)
return response
async def Action(self, request: extension_pb2.ActionRequest, context): async def Action(self, request: extension_pb2.ActionRequest, context):
context.set_code(grpc.StatusCode.UNKNOWN) operation_description = "Действие %s для окружения %s пространства %s" % (
context.set_details("Unknown action") request.action,
request.env_id,
request.space_id,
)
return None self.logger.info(operation_description)
response = await self._dispatch_action(request, context)
if response.state == extension_service_pb2.ActionResponse.State.ERROR:
error_description = "Ошибка действия %s в расширении %s: %s" % (
request.action,
self.extension_id,
response.error
)
self.logger.error(error_description)
return response
async def Get(self, request: operation_service_pb2.GetOperationRequest, context): async def Get(self, request: operation_service_pb2.GetOperationRequest, context):
operations_meta = self.get_operation_meta(request.operation_id) operations_meta = self.get_operation_meta(request.operation_id)
......
...@@ -11,9 +11,11 @@ from deepdiff import DeepDiff ...@@ -11,9 +11,11 @@ from deepdiff import DeepDiff
from google.protobuf.json_format import MessageToDict from google.protobuf.json_format import MessageToDict
from perxis.collections import collections_pb2_grpc, collections_pb2 from perxis.collections import collections_pb2_grpc, collections_pb2
from perxis.roles import roles_pb2_grpc, roles_pb2 from perxis.roles import roles_pb2_grpc, roles_pb2
from perxis.items import items_pb2_grpc, items_pb2
from perxis.common import common_pb2 from perxis.common import common_pb2
from perxis.clients import clients_pb2_grpc, clients_pb2 from perxis.clients import clients_pb2_grpc, clients_pb2
from perxis.environments import environments_pb2_grpc, environments_pb2 from perxis.environments import environments_pb2_grpc, environments_pb2
from perxis.extensions.actions import make_action_item, make_action_struct, ACTIONS_COLLECTION_ID
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -26,19 +28,28 @@ class ExtensionSetup: ...@@ -26,19 +28,28 @@ class ExtensionSetup:
environments_service: environments_pb2_grpc.EnvironmentsStub, environments_service: environments_pb2_grpc.EnvironmentsStub,
roles_service: roles_pb2_grpc.RolesStub, roles_service: roles_pb2_grpc.RolesStub,
clients_service: clients_pb2_grpc.ClientsStub, clients_service: clients_pb2_grpc.ClientsStub,
items_service: items_pb2_grpc.ItemsStub,
): ):
self.collections = [] self.collections = []
self.clients = [] self.clients = []
self.roles = [] self.roles = []
self.actions = []
self.roles_service = roles_service self.roles_service = roles_service
self.clients_service = clients_service self.clients_service = clients_service
self.collections_service = collections_service self.collections_service = collections_service
self.environments_service = environments_service self.environments_service = environments_service
self.items_service = items_service
self.__max_attempts_count = 5 self.__max_attempts_count = 5
self.__sleep_time = 1 self.__sleep_time = 1
def add_action(self, action: dict):
self.actions.append(action)
def set_actions(self, actions: list[dict]):
self.actions = actions
def add_collection(self, collection: collections_pb2.Collection): def add_collection(self, collection: collections_pb2.Collection):
self.collections.append(collection) self.collections.append(collection)
...@@ -437,12 +448,140 @@ class ExtensionSetup: ...@@ -437,12 +448,140 @@ class ExtensionSetup:
return error_message return error_message
# Работа с действиями
async def __check_actions(self, space_id: str, env_id: str) -> list[str]:
errors_list = []
ids = [data["id"] for data in self.actions if data.get("id")]
items = []
try:
result = await self.items_service.Find(
items_pb2.FindRequest(
space_id=space_id,
env_id=env_id,
collection_id=ACTIONS_COLLECTION_ID,
filter=items_pb2.Filter(q=[f"id in {ids}"]),
options=items_pb2.FindOptions(
options=common_pb2.FindOptions(
page_num=0, page_size=len(ids)
)
),
)
)
items = result.items
except grpc.RpcError as e:
errors_list.append(f"Не удалось получить данные о действиях, ошибка {e.details()}")
finally:
found_ids = [item.id for item in items]
for action_id in found_ids:
if action_id not in ids:
errors_list.append(f"Действие {action_id} не найдено")
return errors_list
async def __update_actions(self, space_id: str, env_id: str) -> list[str]:
errors_list = []
not_found = False
for action in self.actions:
action_item = make_action_item(space_id, env_id, action)
try:
r = await self.items_service.Update(
items_pb2.UpdateRequest(
item=items_pb2.Item(
id=action_item.id,
space_id=space_id,
env_id=env_id,
collection_id=ACTIONS_COLLECTION_ID,
data=action_item.data,
)
)
)
except grpc.RpcError as e:
if "not found" not in e.details():
errors_list.append(f"Не удалось обновить действие {action_item.id}, {e.details()}")
continue
not_found = True
if not_found:
try:
await self.items_service.Create(
items_pb2.CreateRequest(
item=action_item
)
)
except grpc.RpcError as e:
errors_list.append(f"Не удалось создать действие {action_item.id}, {e.details()}")
try:
await self.items_service.Publish(
items_pb2.PublishRequest(
item=action_item
)
)
except grpc.RpcError as e:
errors_list.append(f"Не удалось опубликовать действие {action_item.id}, {e.details()}")
return errors_list
async def __remove_actions(self, space_id: str, env_id: str) -> list[str]:
errors_list = []
for action in self.actions:
action_item = make_action_item(space_id, env_id, action)
try:
await self.items_service.Unpublish(
items_pb2.UnpublishRequest(
item=items_pb2.Item(
id=action_item.id,
space_id=action_item.space_id,
env_id=action_item.env_id,
collection_id=action_item.collection_id,
)
)
)
except grpc.RpcError as e:
if "not found" not in e.details():
errors_list.append(f"Не удалось снять с публикации действие {action.get('id', 'n/a')}, {e.details()}")
try:
await self.items_service.Delete(
items_pb2.DeleteRequest(
space_id=space_id,
env_id=env_id,
item=items_pb2.Item(
id=action_item.id,
space_id=action_item.space_id,
env_id=action_item.env_id,
collection_id=action_item.collection_id,
),
options=items_pb2.DeleteOptions(
update_attrs=True,
erase=True
)
)
)
except grpc.RpcError as e:
# Отсутствие действия это не ошибка
if "not found" not in e.details():
errors_list.append(f"Не удалось удалить действие {action.get('id', 'n/a')}, {e.details()}")
return errors_list
async def install(self, space_id: str, env_id: str, use_force: bool) -> list[str]: async def install(self, space_id: str, env_id: str, use_force: bool) -> list[str]:
errors = [] errors = []
errors += await self.__update_collections(space_id, env_id) errors += await self.__update_collections(space_id, env_id)
errors += await self.__update_roles(space_id, env_id) errors += await self.__update_roles(space_id, env_id)
errors += await self.__update_clients(space_id) errors += await self.__update_clients(space_id)
errors += await self.__update_actions(space_id, env_id)
return errors return errors
...@@ -452,6 +591,7 @@ class ExtensionSetup: ...@@ -452,6 +591,7 @@ class ExtensionSetup:
errors += await self.__update_collections(space_id, env_id) errors += await self.__update_collections(space_id, env_id)
errors += await self.__update_roles(space_id, env_id) errors += await self.__update_roles(space_id, env_id)
errors += await self.__update_clients(space_id) errors += await self.__update_clients(space_id)
errors += await self.__update_actions(space_id, env_id)
return errors return errors
...@@ -459,6 +599,7 @@ class ExtensionSetup: ...@@ -459,6 +599,7 @@ class ExtensionSetup:
errors = await self.__check_collections(space_id, env_id) errors = await self.__check_collections(space_id, env_id)
errors += await self.__check_roles(space_id) errors += await self.__check_roles(space_id)
errors += await self.__check_clients(space_id) errors += await self.__check_clients(space_id)
errors += await self.__check_actions(space_id, env_id)
return errors return errors
...@@ -469,5 +610,6 @@ class ExtensionSetup: ...@@ -469,5 +610,6 @@ class ExtensionSetup:
errors += await self.__remove_collections(space_id, env_id) errors += await self.__remove_collections(space_id, env_id)
errors += await self.__remove_clients(space_id) errors += await self.__remove_clients(space_id)
errors += await self.__remove_roles(space_id) errors += await self.__remove_roles(space_id)
errors += await self.__remove_actions(space_id, env_id)
return errors return errors
...@@ -14,7 +14,7 @@ def load_requirements(): ...@@ -14,7 +14,7 @@ def load_requirements():
setup( setup(
name='perxis', name='perxis',
version='1.3.0', version='1.4.0',
description='Perxis python client', description='Perxis python client',
long_description=long_description, long_description=long_description,
long_description_content_type='text/markdown', long_description_content_type='text/markdown',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment