Skip to content
Snippets Groups Projects
Commit 3b7bd04c authored by Eterevskiy Georgiy's avatar Eterevskiy Georgiy
Browse files

Merge branch 'feature/AUTO-1852-new-extension-manager-interface' into 'master'

Реализация ExtensionService с учётом long polling операций

See merge request !61
parents e18a0a70 b05d4b6b
No related branches found
No related tags found
1 merge request!61Реализация ExtensionService с учётом long polling операций
Pipeline #39257 passed with stage
in 1 minute and 37 seconds
......@@ -11,7 +11,7 @@ ARG PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL}
ENV PIP_EXTRA_INDEX_URL=$PIP_EXTRA_INDEX_URL
COPY . /home/${USER}/app
RUN pip install perxis==0.0.20
RUN pip install perxis==1.3.0
ENV PYTHONPATH="/home/perx/app"
ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
......
......@@ -11,7 +11,7 @@ ARG PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL}
ENV PIP_EXTRA_INDEX_URL=$PIP_EXTRA_INDEX_URL
COPY . /home/${USER}/app
RUN pip install perxis==0.0.20
RUN pip install perxis==1.3.0
RUN pip install 'watchdog[watchmedo]'
ENV PYTHONPATH="/home/perx/app"
......
......@@ -11,4 +11,4 @@ services:
- "50051:50051"
volumes:
- .:/home/perx/app
- <Путь к интерпретатору питона>/site-packages/perxis:/usr/local/lib/python3.9/site-packages/perxis
\ No newline at end of file
- <Путь к интерпретатору питона>/site-packages/perxis:/usr/local/lib/python3.11/site-packages/perxis
\ No newline at end of file
Subproject commit f000812a1eef24093c0d0abf1318e3179b679773
Subproject commit 78539871cf2d9f6b187865c4855450143dec4e79
......@@ -9,15 +9,16 @@ from perxis.collections import collections_pb2_grpc
from perxis.environments import environments_pb2_grpc
from perxis.roles import roles_pb2_grpc
from perxis.clients import clients_pb2_grpc
from perxis.extensions import extension_pb2_grpc, manager_pb2_grpc, manager_pb2
from perxis.common import operation_service_pb2_grpc
from perxis.extensions import manager_service_pb2_grpc, manager_service_pb2, extension_service_pb2_grpc
from perxis.interceptors import header_adder_interceptor
logger = logging.getLogger(__name__)
async def _main(
ext_descriptor: manager_pb2.ExtensionDescriptor,
servicer_cls: extension_pb2_grpc.ExtensionServicer,
ext_descriptor: manager_service_pb2.ExtensionDescriptor,
servicer_cls: extension_service_pb2_grpc.ExtensionServiceServicer,
ext_manager_host: str,
content_host: str
):
......@@ -25,33 +26,38 @@ async def _main(
'x-perxis-access', 'system'
)
@aiocron.crontab('* * * * *', start=False)
async def register_extension():
await ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest(
async def _register_extension():
await ext_manager_stub.RegisterExtensions(manager_service_pb2.RegisterExtensionsRequest(
extensions=[ext_descriptor]
))
logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано в perxis")
@aiocron.crontab('* * * * *', start=False)
async def register_extension():
await _register_extension()
async with grpc.aio.insecure_channel(ext_manager_host, interceptors=[interceptor]) as extensions_manager_channel:
ext_manager_stub = manager_pb2_grpc.ExtensionManagerStub(extensions_manager_channel)
ext_manager_stub = manager_service_pb2_grpc.ExtensionManagerServiceStub(extensions_manager_channel)
registered_extensions: manager_pb2.ListExtensionsResponse = await ext_manager_stub.ListExtensions(
manager_pb2.ListExtensionsRequest()
registered_extensions: manager_service_pb2.ListRegisteredExtensionsResponse = await ext_manager_stub.ListRegisteredExtensions(
manager_service_pb2.ListRegisteredExtensionsRequest()
)
# todo enable after fix - https://tracker.yandex.ru/PRXS-1507
# for ext in registered_extensions.extensions:
# if ext.extension == ext_descriptor.extension:
# if ext.version != ext_descriptor.version:
# ext_manager_stub.UnregisterExtensions(
# manager_pb2.UnregisterExtensionsRequest(
# extensions=[ext_descriptor]
# )
# )
#
# logger.info(
# f"Изменилась версия расширения {ext_descriptor.extension} ({ext.version} -> {ext_descriptor.version})"
# )
for ext in registered_extensions.extensions:
if ext.extension == ext_descriptor.extension:
if ext.version != ext_descriptor.version:
await ext_manager_stub.UnregisterExtensions(
manager_service_pb2.UnregisterExtensionsRequest(
extensions=[ext_descriptor]
)
)
await _register_extension()
logger.info(
f"Изменилась версия расширения {ext_descriptor.extension} ({ext.version} "
f"-> {ext_descriptor.version})"
)
register_extension.start()
......@@ -63,19 +69,22 @@ async def _main(
environments_stub = environments_pb2_grpc.EnvironmentsStub(content_channel)
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
extension_pb2_grpc.add_ExtensionServicer_to_server(
servicer_cls(
collections_stub, environments_stub, roles_stub, clients_stub
), server
servicer = servicer_cls(
collections_stub, environments_stub, roles_stub, clients_stub
)
extension_service_pb2_grpc.add_ExtensionServiceServicer_to_server(servicer, server)
operation_service_pb2_grpc.add_OperationServiceServicer_to_server(servicer, server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
def bootstrap(
ext_descriptor: manager_pb2.ExtensionDescriptor,
servicer_cls: extension_pb2_grpc.ExtensionServicer,
ext_descriptor: manager_service_pb2.ExtensionDescriptor,
servicer_cls: extension_service_pb2_grpc.ExtensionServiceServicer,
ext_manager_host: str,
content_host: str,
):
......
import grpc
import uuid
import typing
import asyncio
import logging
import aiocron
import datetime
import dataclasses
from perxis.extensions import extension_pb2, extension_pb2_grpc
from google.protobuf import timestamp_pb2, any_pb2, wrappers_pb2
from perxis.extensions import extension_service_pb2, extension_service_pb2_grpc, extension_pb2
from perxis.roles import roles_pb2_grpc, roles_pb2
from perxis.clients import clients_pb2_grpc, clients_pb2
from perxis.common import common_pb2, operation_pb2, operation_service_pb2_grpc, operation_service_pb2, error_pb2
from perxis.collections import collections_pb2_grpc, collections_pb2
from perxis.environments import environments_pb2_grpc
from perxis.extensions.extension_setup import ExtensionSetup
class ExtensionService(extension_pb2_grpc.ExtensionServicer):
def generate_operation_id() -> str:
return str(uuid.uuid4())
def datetime_to_timestamp(dt: datetime) -> timestamp_pb2.Timestamp:
timestamp = dt.timestamp()
seconds = int(timestamp)
nanos = int(timestamp % 1 * 1e9)
return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
@dataclasses.dataclass
class OperationMeta:
task: asyncio.Task
operation_id: str
description: str
created_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now)
modified_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now)
response: typing.Optional[str] = None
errors: typing.Optional[list[str]] = None
was_finished: bool = False
metadata: dict[str, typing.Any] = dataclasses.field(default_factory=dict)
def mark_cancelled(self):
self.was_finished = True
self.response = "Отменено"
def mark_finished(self, errors: typing.Optional[list[str]] = None):
self.modified_at = datetime.datetime.now()
self.was_finished = True
self.errors = errors
self.response = None if errors else "OK"
def to_operation(self):
packed_response = any_pb2.Any()
packed_response.Pack(wrappers_pb2.StringValue(value=self.response or "PENDING"))
return operation_pb2.Operation(
id=self.operation_id,
description=self.description,
created_at=datetime_to_timestamp(self.created_at),
modified_at=datetime_to_timestamp(self.modified_at),
done=self.was_finished,
metadata=self.metadata,
response=packed_response,
error=error_pb2.Error(
message="; ".join(self.errors)
) if self.errors else None
)
class ExtensionService(
extension_service_pb2_grpc.ExtensionServiceServicer, operation_service_pb2_grpc.OperationServiceServicer
):
extension_id: str
collections: list[collections_pb2.Collection] = []
roles: list[roles_pb2.Role] = []
clients: list[clients_pb2.Client] = []
__operations: dict[str, OperationMeta] = {}
def __init__(self,
collections_service: collections_pb2_grpc.CollectionsStub,
environments_service: environments_pb2_grpc.EnvironmentsStub,
......@@ -41,209 +110,235 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
for client in self.clients or []:
self.extension_setup.add_client(client)
def write_result_log(self, operation, request, response):
log_func = self.get_log_func_by_state(response.results[0].state)
@aiocron.crontab('0 * * * *', start=True)
async def remove_old_operations():
self.remove_old_operations()
def remove_old_operations(self):
self.logger.info("Удаление старых операций")
ids = []
now = datetime.datetime.now()
for operation_meta in self.__operations.values():
task_is_not_running = operation_meta.task.done() or operation_meta.task.cancelled()
# Если task фактически не работает то операцию нужно пометить как выполненную. Это может произойти в случае
# неотловленного исключения, например
if task_is_not_running:
operation_meta.was_finished = True
if not operation_meta.was_finished:
continue
time_delta = now - operation_meta.modified_at
# Для операций которые завершены больше часа назад
if time_delta.seconds < 60 * 60:
continue
# Если по какой-то причине операция была помечена как завершенная но task ещё активен
need_to_cancel_task = not any(
(operation_meta.task.done(), operation_meta.task.cancelled())
)
if need_to_cancel_task:
try:
operation_meta.task.cancel()
except Exception as e:
self.logger.error("Не удалось отменить task")
self.logger.exception(e)
continue
ids.append(operation_meta.operation_id)
self.logger.info(f"Операция {operation_meta.operation_id} помечена на удаление")
for operation_id in ids:
del self.__operations[operation_id]
self.logger.info("Удаление старых операций завершено")
def result_log(self, operation, operation_id: str, request, errors: list[str]):
log_func = self.logger.error if errors else self.logger.info
log_func(
"Результат %s расширения %s для окружения %s пространства %s: \r\n"
"State: %s\r\n"
"Msg: %s\r\n"
"Error: %s" % (
"Операция %s (%s) расширения %s для окружения %s пространства %s завершена %s"
% (
operation,
operation_id,
self.extension_id,
request.env_id,
request.space_id,
response.results[0].state,
response.results[0].msg,
response.results[0].error
"с ошибками: " + "; ".join(errors) if errors else "успешно"
)
)
def get_log_func_by_state(self, state):
if state == extension_pb2.ExtensionRequestResult.State.OK:
return self.logger.info
else:
return self.logger.error
def ext_request_results_from_exception(self, e: Exception) -> extension_pb2.ExtensionRequestResult:
def ext_request_results_from_exception(self, e: Exception):
return [
extension_pb2.ExtensionRequestResult(
extension_service_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=extension_pb2.ExtensionRequestResult.State.ERROR,
state=extension_service_pb2.ExtensionRequestResult.State.ERROR,
error=str(e),
msg=None
)
]
async def _Install(self, request: extension_pb2.InstallRequest, context):
def get_operation_meta(self, operation_id: str) -> typing.Optional[OperationMeta]:
return self.__operations.get(operation_id)
def set_operation_meta(self, operation_id: str, description: str, task: asyncio.Task):
self.__operations[operation_id] = OperationMeta(
operation_id=operation_id,
task=task,
description=description
)
async def _Install(self, operation_id: str, request: extension_service_pb2.InstallRequest, context):
errors_list = await self.extension_setup.install(
request.space_id, request.env_id, request.force
)
response_state = extension_pb2.ExtensionRequestResult.State.OK \
if not errors_list \
else extension_pb2.ExtensionRequestResult.State.ERROR
self.result_log("установки", operation_id, request, errors_list)
return extension_pb2.InstallResponse(
results=[extension_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=response_state,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
)
self.__operations[operation_id].mark_finished(errors_list)
async def Install(self, request: extension_pb2.InstallRequest, context):
self.logger.info(
"Установка расширения %s для окружения %s пространства %s. %s force" % (
self.extension_id,
request.env_id,
request.space_id,
"С" if request.force else "Без"
)
async def Install(self, request: extension_service_pb2.InstallRequest, context):
operation_id = generate_operation_id()
operation_description = "Установка расширения %s для окружения %s пространства %s. %s force" % (
self.extension_id,
request.env_id,
request.space_id,
"С" if request.force else "Без"
)
try:
response = await self._Install(request, context)
except Exception as e:
response = extension_pb2.InstallResponse(
results=self.ext_request_results_from_exception(e)
)
self.logger.info(operation_description)
if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK:
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(response.results[0].error)
install_task = asyncio.create_task(self._Install(operation_id, request, context))
self.write_result_log("установки", request, response)
self.set_operation_meta(operation_id, operation_description, install_task)
return response
return self.get_operation_meta(operation_id).to_operation()
async def _Update(self, request: extension_pb2.UpdateRequest, context):
errors_list = await self.extension_setup.update(
request.space_id, request.env_id, request.force
)
async def _Uninstall(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context):
errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove)
response_state = extension_pb2.ExtensionRequestResult.State.OK \
if not errors_list \
else extension_pb2.ExtensionRequestResult.State.ERROR
self.result_log("удаления", operation_id, request, errors_list)
return extension_pb2.UpdateResponse(
results=[extension_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=response_state,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
)
self.__operations[operation_id].mark_finished(errors_list)
async def Update(self, request: extension_pb2.UpdateRequest, context):
self.logger.info(
"Обновление расширения %s для окружения %s пространства %s. %s force" % (
self.extension_id,
request.env_id,
request.space_id,
"С" if request.force else "Без"
)
async def Uninstall(self, request: extension_service_pb2.UninstallRequest, context):
operation_id = generate_operation_id()
operation_description = "Удаление расширения %s для окружения %s пространства %s. %s remove" % (
self.extension_id,
request.env_id,
request.space_id,
"С" if request.remove else "Без"
)
try:
response = await self._Update(request, context)
except Exception as e:
response = extension_pb2.UpdateResponse(
results=self.ext_request_results_from_exception(e)
)
self.logger.info(operation_description)
if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK:
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(response.results[0].error)
uninstall_task = asyncio.create_task(self._Uninstall(operation_id, request, context))
self.write_result_log("обновления", request, response)
self.set_operation_meta(operation_id, operation_description, uninstall_task)
return response
return self.get_operation_meta(operation_id).to_operation()
async def _Uninstall(self, request: extension_pb2.UninstallRequest, context):
errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove)
async def _Check(self, operation_id: str, request: extension_service_pb2.CheckRequest, context):
errors_list: list[str] = await self.extension_setup.check(request.space_id, request.env_id)
response_state = extension_pb2.ExtensionRequestResult.State.OK \
if not errors_list \
else extension_pb2.ExtensionRequestResult.State.ERROR
self.result_log("проверки", operation_id, request, errors_list)
return extension_pb2.UninstallResponse(
results=[extension_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=response_state,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
)
self.__operations[operation_id].mark_finished(errors_list)
async def Uninstall(self, request: extension_pb2.UninstallRequest, context):
self.logger.info(
"Удаление расширения %s для окружения %s пространства %s. %s remove" % (
self.extension_id,
request.env_id,
request.space_id,
"С" if request.remove else "Без"
)
async def Check(self, request: extension_service_pb2.CheckRequest, context):
operation_id = generate_operation_id()
operation_description = "Проверка расширения %s для окружения %s пространства %s" % (
self.extension_id,
request.env_id,
request.space_id,
)
try:
response = await self._Uninstall(request, context)
except Exception as e:
response = extension_pb2.UninstallResponse(
results=self.ext_request_results_from_exception(e)
check_task = asyncio.create_task(self._Check(operation_id, request, context))
self.set_operation_meta(operation_id, operation_description, check_task)
return self.get_operation_meta(operation_id).to_operation()
async def Action(self, request: extension_pb2.ActionRequest, context):
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("Unknown action")
return None
async def Get(self, request: operation_service_pb2.GetOperationRequest, context):
operations_meta = self.get_operation_meta(request.operation_id)
if not operations_meta:
error_description = "Ошибка проверки операции %s в расширении %s - не найдена" % (
request.operation_id,
self.extension_id,
)
if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK:
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(response.results[0].error)
context.set_details(error_description)
self.write_result_log("удаления", request, response)
self.logger.error(error_description)
return response
return None
async def _Check(self, request: extension_pb2.CheckRequest, context):
errors_list = await self.extension_setup.check(request.space_id, request.env_id)
self.logger.info(
f"Статус операции {operations_meta.operation_id}:\n"
f"{operations_meta.description}\n"
f"{'выполняется' if not operations_meta.was_finished else 'завершена'}, \n"
f"{'без ошибок' if not operations_meta.errors else 'ошибки: ' + '; '.join(operations_meta.errors)}"
)
response_state = extension_pb2.ExtensionRequestResult.State.OK \
if not errors_list \
else extension_pb2.ExtensionRequestResult.State.ERROR
return operations_meta.to_operation()
return extension_pb2.CheckResponse(
results=[extension_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=response_state,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
)
def Cancel(self, request: operation_service_pb2.CancelOperationRequest, context):
operations_meta = self.get_operation_meta(request.operation_id)
async def Check(self, request: extension_pb2.CheckRequest, context):
self.logger.info(
"Проверка расширения %s для окружения %s пространства %s" % (
if not operations_meta:
error_description = "Не удалось удалить операцию %s в расширении %s - не найдена" % (
request.operation_id,
self.extension_id,
request.env_id,
request.space_id,
)
)
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(error_description)
self.logger.error(
error_description
)
return None
try:
response = await self._Check(request, context)
operations_meta.task.cancel()
operations_meta.mark_cancelled()
except Exception as e:
response = extension_pb2.CheckResponse(
results=self.ext_request_results_from_exception(e)
error_description = "Во время отмены операции %s расширении %s произошла ошибка %s" % (
request.operation_id,
self.extension_id,
str(e)
)
if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK:
self.logger.error(error_description)
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(response.results[0].error)
context.set_details(error_description)
self.write_result_log("проверки", request, response)
self.logger.exception(e)
return response
return None
async def Action(self, request: extension_pb2.ActionRequest, context):
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("Unknown action")
self.logger.info(
"Операция %s в расширении %s отменена" % (
request.operation_id,
self.extension_id,
)
)
return None
return operations_meta.to_operation()
from typing import Optional
from perxis.extensions import manager_pb2
from perxis.extensions import manager_service_pb2
def get_extension_descriptor(
ext_host: str, ext_id: str, ext_name: str, ext_description: str,
ext_version: str, ext_version_description: str,
ext_deps: Optional[list[str]] = None
) -> manager_pb2.ExtensionDescriptor:
) -> manager_service_pb2.ExtensionDescriptor:
if not ext_deps:
ext_deps = []
return manager_pb2.ExtensionDescriptor(
return manager_service_pb2.ExtensionDescriptor(
extension=ext_id,
title=ext_name,
description=ext_description,
......
......@@ -14,7 +14,7 @@ def load_requirements():
setup(
name='perxis',
version='1.2.2',
version='1.3.0',
description='Perxis python client',
long_description=long_description,
long_description_content_type='text/markdown',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment