diff --git a/examples/extension_service/Dockerfile b/examples/extension_service/Dockerfile index f74c9caebfbef28357ad1103d9f809c8d46fd457..8e11f941a29274ee56ef20e1ef74ced8f08d60c1 100644 --- a/examples/extension_service/Dockerfile +++ b/examples/extension_service/Dockerfile @@ -11,7 +11,7 @@ ARG PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL} ENV PIP_EXTRA_INDEX_URL=$PIP_EXTRA_INDEX_URL COPY . /home/${USER}/app -RUN pip install perxis==0.0.20 +RUN pip install perxis==1.3.0 ENV PYTHONPATH="/home/perx/app" ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python diff --git a/examples/extension_service/Dockerfile.local b/examples/extension_service/Dockerfile.local index 34fb9476e846e831631056ae78769785397b1b8d..1b2d30650f663a69309125051451daf89915b2cc 100644 --- a/examples/extension_service/Dockerfile.local +++ b/examples/extension_service/Dockerfile.local @@ -11,7 +11,7 @@ ARG PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL} ENV PIP_EXTRA_INDEX_URL=$PIP_EXTRA_INDEX_URL COPY . /home/${USER}/app -RUN pip install perxis==0.0.20 +RUN pip install perxis==1.3.0 RUN pip install 'watchdog[watchmedo]' ENV PYTHONPATH="/home/perx/app" diff --git a/examples/extension_service/docker-compose.override.example.yml b/examples/extension_service/docker-compose.override.example.yml index cba3614c2541f206709b693194c08fc9c37d1e81..6c0686f6f92c543d41e45a67f6514d7dfa70f484 100644 --- a/examples/extension_service/docker-compose.override.example.yml +++ b/examples/extension_service/docker-compose.override.example.yml @@ -11,4 +11,4 @@ services: - "50051:50051" volumes: - .:/home/perx/app - - <Путь Рє интерпретатору питона>/site-packages/perxis:/usr/local/lib/python3.9/site-packages/perxis \ No newline at end of file + - <Путь Рє интерпретатору питона>/site-packages/perxis:/usr/local/lib/python3.11/site-packages/perxis \ No newline at end of file diff --git a/perxis-proto b/perxis-proto index f000812a1eef24093c0d0abf1318e3179b679773..78539871cf2d9f6b187865c4855450143dec4e79 160000 --- a/perxis-proto +++ b/perxis-proto @@ -1 +1 @@ -Subproject commit f000812a1eef24093c0d0abf1318e3179b679773 +Subproject commit 78539871cf2d9f6b187865c4855450143dec4e79 diff --git a/perxis/extensions/bootstrap.py b/perxis/extensions/bootstrap.py index 8f532f15797cc51a74c9f01923a425dcb11088ff..bc465cf0294aac865b05e26933b8e835c01cee61 100644 --- a/perxis/extensions/bootstrap.py +++ b/perxis/extensions/bootstrap.py @@ -9,15 +9,16 @@ from perxis.collections import collections_pb2_grpc from perxis.environments import environments_pb2_grpc from perxis.roles import roles_pb2_grpc from perxis.clients import clients_pb2_grpc -from perxis.extensions import extension_pb2_grpc, manager_pb2_grpc, manager_pb2 +from perxis.common import operation_service_pb2_grpc +from perxis.extensions import manager_service_pb2_grpc, manager_service_pb2, extension_service_pb2_grpc from perxis.interceptors import header_adder_interceptor logger = logging.getLogger(__name__) async def _main( - ext_descriptor: manager_pb2.ExtensionDescriptor, - servicer_cls: extension_pb2_grpc.ExtensionServicer, + ext_descriptor: manager_service_pb2.ExtensionDescriptor, + servicer_cls: extension_service_pb2_grpc.ExtensionServiceServicer, ext_manager_host: str, content_host: str ): @@ -25,33 +26,38 @@ async def _main( 'x-perxis-access', 'system' ) - @aiocron.crontab('* * * * *', start=False) - async def register_extension(): - await ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest( + async def _register_extension(): + await ext_manager_stub.RegisterExtensions(manager_service_pb2.RegisterExtensionsRequest( extensions=[ext_descriptor] )) logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано РІ perxis") + @aiocron.crontab('* * * * *', start=False) + async def register_extension(): + await _register_extension() + async with grpc.aio.insecure_channel(ext_manager_host, interceptors=[interceptor]) as extensions_manager_channel: - ext_manager_stub = manager_pb2_grpc.ExtensionManagerStub(extensions_manager_channel) + ext_manager_stub = manager_service_pb2_grpc.ExtensionManagerServiceStub(extensions_manager_channel) - registered_extensions: manager_pb2.ListExtensionsResponse = await ext_manager_stub.ListExtensions( - manager_pb2.ListExtensionsRequest() + registered_extensions: manager_service_pb2.ListRegisteredExtensionsResponse = await ext_manager_stub.ListRegisteredExtensions( + manager_service_pb2.ListRegisteredExtensionsRequest() ) - # todo enable after fix - https://tracker.yandex.ru/PRXS-1507 - # for ext in registered_extensions.extensions: - # if ext.extension == ext_descriptor.extension: - # if ext.version != ext_descriptor.version: - # ext_manager_stub.UnregisterExtensions( - # manager_pb2.UnregisterExtensionsRequest( - # extensions=[ext_descriptor] - # ) - # ) - # - # logger.info( - # f"Рзменилась версия расширения {ext_descriptor.extension} ({ext.version} -> {ext_descriptor.version})" - # ) + for ext in registered_extensions.extensions: + if ext.extension == ext_descriptor.extension: + if ext.version != ext_descriptor.version: + await ext_manager_stub.UnregisterExtensions( + manager_service_pb2.UnregisterExtensionsRequest( + extensions=[ext_descriptor] + ) + ) + + await _register_extension() + + logger.info( + f"Рзменилась версия расширения {ext_descriptor.extension} ({ext.version} " + f"-> {ext_descriptor.version})" + ) register_extension.start() @@ -63,19 +69,22 @@ async def _main( environments_stub = environments_pb2_grpc.EnvironmentsStub(content_channel) server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10)) - extension_pb2_grpc.add_ExtensionServicer_to_server( - servicer_cls( - collections_stub, environments_stub, roles_stub, clients_stub - ), server + + servicer = servicer_cls( + collections_stub, environments_stub, roles_stub, clients_stub ) + + extension_service_pb2_grpc.add_ExtensionServiceServicer_to_server(servicer, server) + operation_service_pb2_grpc.add_OperationServiceServicer_to_server(servicer, server) + server.add_insecure_port("[::]:50051") await server.start() await server.wait_for_termination() def bootstrap( - ext_descriptor: manager_pb2.ExtensionDescriptor, - servicer_cls: extension_pb2_grpc.ExtensionServicer, + ext_descriptor: manager_service_pb2.ExtensionDescriptor, + servicer_cls: extension_service_pb2_grpc.ExtensionServiceServicer, ext_manager_host: str, content_host: str, ): diff --git a/perxis/extensions/extension_service.py b/perxis/extensions/extension_service.py index 4bb8ed984ea5fe4cd3a9b2c6624a95612b0e96af..6da5bfb386ded0af25799f27c5b8d657a1ec6ec1 100644 --- a/perxis/extensions/extension_service.py +++ b/perxis/extensions/extension_service.py @@ -1,20 +1,89 @@ import grpc +import uuid +import typing +import asyncio import logging +import aiocron +import datetime +import dataclasses -from perxis.extensions import extension_pb2, extension_pb2_grpc +from google.protobuf import timestamp_pb2, any_pb2, wrappers_pb2 + +from perxis.extensions import extension_service_pb2, extension_service_pb2_grpc, extension_pb2 from perxis.roles import roles_pb2_grpc, roles_pb2 from perxis.clients import clients_pb2_grpc, clients_pb2 +from perxis.common import common_pb2, operation_pb2, operation_service_pb2_grpc, operation_service_pb2, error_pb2 from perxis.collections import collections_pb2_grpc, collections_pb2 from perxis.environments import environments_pb2_grpc from perxis.extensions.extension_setup import ExtensionSetup -class ExtensionService(extension_pb2_grpc.ExtensionServicer): +def generate_operation_id() -> str: + return str(uuid.uuid4()) + + +def datetime_to_timestamp(dt: datetime) -> timestamp_pb2.Timestamp: + timestamp = dt.timestamp() + seconds = int(timestamp) + nanos = int(timestamp % 1 * 1e9) + + return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos) + + +@dataclasses.dataclass +class OperationMeta: + task: asyncio.Task + + operation_id: str + description: str + + created_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now) + modified_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now) + + response: typing.Optional[str] = None + errors: typing.Optional[list[str]] = None + + was_finished: bool = False + metadata: dict[str, typing.Any] = dataclasses.field(default_factory=dict) + + def mark_cancelled(self): + self.was_finished = True + self.response = "Отменено" + + def mark_finished(self, errors: typing.Optional[list[str]] = None): + self.modified_at = datetime.datetime.now() + self.was_finished = True + self.errors = errors + self.response = None if errors else "OK" + + def to_operation(self): + packed_response = any_pb2.Any() + packed_response.Pack(wrappers_pb2.StringValue(value=self.response or "PENDING")) + + return operation_pb2.Operation( + id=self.operation_id, + description=self.description, + created_at=datetime_to_timestamp(self.created_at), + modified_at=datetime_to_timestamp(self.modified_at), + done=self.was_finished, + metadata=self.metadata, + response=packed_response, + error=error_pb2.Error( + message="; ".join(self.errors) + ) if self.errors else None + ) + + +class ExtensionService( + extension_service_pb2_grpc.ExtensionServiceServicer, operation_service_pb2_grpc.OperationServiceServicer +): extension_id: str collections: list[collections_pb2.Collection] = [] roles: list[roles_pb2.Role] = [] clients: list[clients_pb2.Client] = [] + __operations: dict[str, OperationMeta] = {} + def __init__(self, collections_service: collections_pb2_grpc.CollectionsStub, environments_service: environments_pb2_grpc.EnvironmentsStub, @@ -41,209 +110,235 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): for client in self.clients or []: self.extension_setup.add_client(client) - def write_result_log(self, operation, request, response): - log_func = self.get_log_func_by_state(response.results[0].state) + @aiocron.crontab('0 * * * *', start=True) + async def remove_old_operations(): + self.remove_old_operations() + + def remove_old_operations(self): + self.logger.info("Удаление старых операций") + + ids = [] + now = datetime.datetime.now() + + for operation_meta in self.__operations.values(): + task_is_not_running = operation_meta.task.done() or operation_meta.task.cancelled() + + # Если task фактически РЅРµ работает то операцию РЅСѓР¶РЅРѕ пометить как выполненную. Рто может произойти РІ случае + # неотловленного исключения, например + if task_is_not_running: + operation_meta.was_finished = True + + if not operation_meta.was_finished: + continue + + time_delta = now - operation_meta.modified_at + + # Для операций которые завершены больше часа назад + if time_delta.seconds < 60 * 60: + continue + + # Если РїРѕ какой-то причине операция была помечена как завершенная РЅРѕ task ещё активен + need_to_cancel_task = not any( + (operation_meta.task.done(), operation_meta.task.cancelled()) + ) + + if need_to_cancel_task: + try: + operation_meta.task.cancel() + except Exception as e: + self.logger.error("РќРµ удалось отменить task") + self.logger.exception(e) + + continue + + ids.append(operation_meta.operation_id) + + self.logger.info(f"Операция {operation_meta.operation_id} помечена РЅР° удаление") + + for operation_id in ids: + del self.__operations[operation_id] + + self.logger.info("Удаление старых операций завершено") + + def result_log(self, operation, operation_id: str, request, errors: list[str]): + log_func = self.logger.error if errors else self.logger.info log_func( - "Результат %s расширения %s для окружения %s пространства %s: \r\n" - "State: %s\r\n" - "Msg: %s\r\n" - "Error: %s" % ( + "Операция %s (%s) расширения %s для окружения %s пространства %s завершена %s" + % ( operation, + operation_id, self.extension_id, request.env_id, request.space_id, - response.results[0].state, - response.results[0].msg, - response.results[0].error + "СЃ ошибками: " + "; ".join(errors) if errors else "успешно" ) ) - def get_log_func_by_state(self, state): - if state == extension_pb2.ExtensionRequestResult.State.OK: - return self.logger.info - else: - return self.logger.error - - def ext_request_results_from_exception(self, e: Exception) -> extension_pb2.ExtensionRequestResult: + def ext_request_results_from_exception(self, e: Exception): return [ - extension_pb2.ExtensionRequestResult( + extension_service_pb2.ExtensionRequestResult( extension=self.extension_id, - state=extension_pb2.ExtensionRequestResult.State.ERROR, + state=extension_service_pb2.ExtensionRequestResult.State.ERROR, error=str(e), msg=None ) ] - async def _Install(self, request: extension_pb2.InstallRequest, context): + def get_operation_meta(self, operation_id: str) -> typing.Optional[OperationMeta]: + return self.__operations.get(operation_id) + + def set_operation_meta(self, operation_id: str, description: str, task: asyncio.Task): + self.__operations[operation_id] = OperationMeta( + operation_id=operation_id, + task=task, + description=description + ) + + async def _Install(self, operation_id: str, request: extension_service_pb2.InstallRequest, context): errors_list = await self.extension_setup.install( request.space_id, request.env_id, request.force ) - response_state = extension_pb2.ExtensionRequestResult.State.OK \ - if not errors_list \ - else extension_pb2.ExtensionRequestResult.State.ERROR + self.result_log("установки", operation_id, request, errors_list) - return extension_pb2.InstallResponse( - results=[extension_pb2.ExtensionRequestResult( - extension=self.extension_id, - state=response_state, - error="; ".join(errors_list) if errors_list else None, - msg="Ok" if not errors_list else None - )] - ) + self.__operations[operation_id].mark_finished(errors_list) - async def Install(self, request: extension_pb2.InstallRequest, context): - self.logger.info( - "Установка расширения %s для окружения %s пространства %s. %s force" % ( - self.extension_id, - request.env_id, - request.space_id, - "РЎ" if request.force else "Без" - ) + async def Install(self, request: extension_service_pb2.InstallRequest, context): + operation_id = generate_operation_id() + operation_description = "Установка расширения %s для окружения %s пространства %s. %s force" % ( + self.extension_id, + request.env_id, + request.space_id, + "РЎ" if request.force else "Без" ) - try: - response = await self._Install(request, context) - except Exception as e: - response = extension_pb2.InstallResponse( - results=self.ext_request_results_from_exception(e) - ) + self.logger.info(operation_description) - if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK: - context.set_code(grpc.StatusCode.UNKNOWN) - context.set_details(response.results[0].error) + install_task = asyncio.create_task(self._Install(operation_id, request, context)) - self.write_result_log("установки", request, response) + self.set_operation_meta(operation_id, operation_description, install_task) - return response + return self.get_operation_meta(operation_id).to_operation() - async def _Update(self, request: extension_pb2.UpdateRequest, context): - errors_list = await self.extension_setup.update( - request.space_id, request.env_id, request.force - ) + async def _Uninstall(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context): + errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove) - response_state = extension_pb2.ExtensionRequestResult.State.OK \ - if not errors_list \ - else extension_pb2.ExtensionRequestResult.State.ERROR + self.result_log("удаления", operation_id, request, errors_list) - return extension_pb2.UpdateResponse( - results=[extension_pb2.ExtensionRequestResult( - extension=self.extension_id, - state=response_state, - error="; ".join(errors_list) if errors_list else None, - msg="Ok" if not errors_list else None - )] - ) + self.__operations[operation_id].mark_finished(errors_list) - async def Update(self, request: extension_pb2.UpdateRequest, context): - self.logger.info( - "Обновление расширения %s для окружения %s пространства %s. %s force" % ( - self.extension_id, - request.env_id, - request.space_id, - "РЎ" if request.force else "Без" - ) + async def Uninstall(self, request: extension_service_pb2.UninstallRequest, context): + operation_id = generate_operation_id() + operation_description = "Удаление расширения %s для окружения %s пространства %s. %s remove" % ( + self.extension_id, + request.env_id, + request.space_id, + "РЎ" if request.remove else "Без" ) - try: - response = await self._Update(request, context) - except Exception as e: - response = extension_pb2.UpdateResponse( - results=self.ext_request_results_from_exception(e) - ) + self.logger.info(operation_description) - if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK: - context.set_code(grpc.StatusCode.UNKNOWN) - context.set_details(response.results[0].error) + uninstall_task = asyncio.create_task(self._Uninstall(operation_id, request, context)) - self.write_result_log("обновления", request, response) + self.set_operation_meta(operation_id, operation_description, uninstall_task) - return response + return self.get_operation_meta(operation_id).to_operation() - async def _Uninstall(self, request: extension_pb2.UninstallRequest, context): - errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove) + async def _Check(self, operation_id: str, request: extension_service_pb2.CheckRequest, context): + errors_list: list[str] = await self.extension_setup.check(request.space_id, request.env_id) - response_state = extension_pb2.ExtensionRequestResult.State.OK \ - if not errors_list \ - else extension_pb2.ExtensionRequestResult.State.ERROR + self.result_log("проверки", operation_id, request, errors_list) - return extension_pb2.UninstallResponse( - results=[extension_pb2.ExtensionRequestResult( - extension=self.extension_id, - state=response_state, - error="; ".join(errors_list) if errors_list else None, - msg="Ok" if not errors_list else None - )] - ) + self.__operations[operation_id].mark_finished(errors_list) - async def Uninstall(self, request: extension_pb2.UninstallRequest, context): - self.logger.info( - "Удаление расширения %s для окружения %s пространства %s. %s remove" % ( - self.extension_id, - request.env_id, - request.space_id, - "РЎ" if request.remove else "Без" - ) + async def Check(self, request: extension_service_pb2.CheckRequest, context): + operation_id = generate_operation_id() + operation_description = "Проверка расширения %s для окружения %s пространства %s" % ( + self.extension_id, + request.env_id, + request.space_id, ) - try: - response = await self._Uninstall(request, context) - except Exception as e: - response = extension_pb2.UninstallResponse( - results=self.ext_request_results_from_exception(e) + check_task = asyncio.create_task(self._Check(operation_id, request, context)) + + self.set_operation_meta(operation_id, operation_description, check_task) + + return self.get_operation_meta(operation_id).to_operation() + + async def Action(self, request: extension_pb2.ActionRequest, context): + context.set_code(grpc.StatusCode.UNKNOWN) + context.set_details("Unknown action") + + return None + + async def Get(self, request: operation_service_pb2.GetOperationRequest, context): + operations_meta = self.get_operation_meta(request.operation_id) + + if not operations_meta: + error_description = "Ошибка проверки операции %s РІ расширении %s - РЅРµ найдена" % ( + request.operation_id, + self.extension_id, ) - if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK: context.set_code(grpc.StatusCode.UNKNOWN) - context.set_details(response.results[0].error) + context.set_details(error_description) - self.write_result_log("удаления", request, response) + self.logger.error(error_description) - return response + return None - async def _Check(self, request: extension_pb2.CheckRequest, context): - errors_list = await self.extension_setup.check(request.space_id, request.env_id) + self.logger.info( + f"Статус операции {operations_meta.operation_id}:\n" + f"{operations_meta.description}\n" + f"{'выполняется' if not operations_meta.was_finished else 'завершена'}, \n" + f"{'без ошибок' if not operations_meta.errors else 'ошибки: ' + '; '.join(operations_meta.errors)}" + ) - response_state = extension_pb2.ExtensionRequestResult.State.OK \ - if not errors_list \ - else extension_pb2.ExtensionRequestResult.State.ERROR + return operations_meta.to_operation() - return extension_pb2.CheckResponse( - results=[extension_pb2.ExtensionRequestResult( - extension=self.extension_id, - state=response_state, - error="; ".join(errors_list) if errors_list else None, - msg="Ok" if not errors_list else None - )] - ) + def Cancel(self, request: operation_service_pb2.CancelOperationRequest, context): + operations_meta = self.get_operation_meta(request.operation_id) - async def Check(self, request: extension_pb2.CheckRequest, context): - self.logger.info( - "Проверка расширения %s для окружения %s пространства %s" % ( + if not operations_meta: + error_description = "РќРµ удалось удалить операцию %s РІ расширении %s - РЅРµ найдена" % ( + request.operation_id, self.extension_id, - request.env_id, - request.space_id, ) - ) + + context.set_code(grpc.StatusCode.UNKNOWN) + context.set_details(error_description) + + self.logger.error( + error_description + ) + + return None try: - response = await self._Check(request, context) + operations_meta.task.cancel() + operations_meta.mark_cancelled() except Exception as e: - response = extension_pb2.CheckResponse( - results=self.ext_request_results_from_exception(e) + error_description = "Р’Рѕ время отмены операции %s расширении %s произошла ошибка %s" % ( + request.operation_id, + self.extension_id, + str(e) ) - if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK: + self.logger.error(error_description) + context.set_code(grpc.StatusCode.UNKNOWN) - context.set_details(response.results[0].error) + context.set_details(error_description) - self.write_result_log("проверки", request, response) + self.logger.exception(e) - return response + return None - async def Action(self, request: extension_pb2.ActionRequest, context): - context.set_code(grpc.StatusCode.UNKNOWN) - context.set_details("Unknown action") + self.logger.info( + "Операция %s РІ расширении %s отменена" % ( + request.operation_id, + self.extension_id, + ) + ) - return None + return operations_meta.to_operation() diff --git a/perxis/extensions/utils.py b/perxis/extensions/utils.py index ccc5ad704bf2fa30209a44d7238f478ec142e103..b5bbc4140df4b248d9dfc627732f45d7b7c7f7fb 100644 --- a/perxis/extensions/utils.py +++ b/perxis/extensions/utils.py @@ -1,17 +1,17 @@ from typing import Optional -from perxis.extensions import manager_pb2 +from perxis.extensions import manager_service_pb2 def get_extension_descriptor( ext_host: str, ext_id: str, ext_name: str, ext_description: str, ext_version: str, ext_version_description: str, ext_deps: Optional[list[str]] = None -) -> manager_pb2.ExtensionDescriptor: +) -> manager_service_pb2.ExtensionDescriptor: if not ext_deps: ext_deps = [] - return manager_pb2.ExtensionDescriptor( + return manager_service_pb2.ExtensionDescriptor( extension=ext_id, title=ext_name, description=ext_description, diff --git a/setup.py b/setup.py index 3bb8d079e9996a0558b5877fdbb5f59cee35b30c..9c3f8fd2c215a9254f06a16260ca120879102f00 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ def load_requirements(): setup( name='perxis', - version='1.2.2', + version='1.3.0', description='Perxis python client', long_description=long_description, long_description_content_type='text/markdown',