Skip to content
Snippets Groups Projects
Commit dbd33962 authored by Podosochnyy Maxim's avatar Podosochnyy Maxim
Browse files

Реализация ExtensionService с учётом long polling операций

parent e18a0a70
No related branches found
No related tags found
No related merge requests found
...@@ -50,7 +50,7 @@ lint: install-build-requirements ...@@ -50,7 +50,7 @@ lint: install-build-requirements
flake8 perxis/ flake8 perxis/
generate: clean-proto install-build-requirements generate: clean-proto install-build-requirements
find $(PROTO_FILES_DIR) -name '*.proto' -exec python3 -m grpc_tools.protoc -I${PROTO_FILES_DIR} --python_out=$(OUTPUT_FILES_DIR) --grpc_python_out=$(OUTPUT_FILES_DIR) {} + find $(PROTO_FILES_DIR) -name '*.proto' -exec python -m grpc_tools.protoc -I ${PROTO_FILES_DIR} --python_out=$(OUTPUT_FILES_DIR) --grpc_python_out=$(OUTPUT_FILES_DIR) {} +
python fix_imports.py python fix_imports.py
dist: clean generate dist: clean generate
......
...@@ -11,7 +11,7 @@ ARG PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL} ...@@ -11,7 +11,7 @@ ARG PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL}
ENV PIP_EXTRA_INDEX_URL=$PIP_EXTRA_INDEX_URL ENV PIP_EXTRA_INDEX_URL=$PIP_EXTRA_INDEX_URL
COPY . /home/${USER}/app COPY . /home/${USER}/app
RUN pip install perxis==0.0.20 RUN pip install perxis==1.2.3
ENV PYTHONPATH="/home/perx/app" ENV PYTHONPATH="/home/perx/app"
ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
......
...@@ -11,7 +11,7 @@ ARG PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL} ...@@ -11,7 +11,7 @@ ARG PIP_EXTRA_INDEX_URL=${PIP_EXTRA_INDEX_URL}
ENV PIP_EXTRA_INDEX_URL=$PIP_EXTRA_INDEX_URL ENV PIP_EXTRA_INDEX_URL=$PIP_EXTRA_INDEX_URL
COPY . /home/${USER}/app COPY . /home/${USER}/app
RUN pip install perxis==0.0.20 RUN pip install perxis==1.2.3
RUN pip install 'watchdog[watchmedo]' RUN pip install 'watchdog[watchmedo]'
ENV PYTHONPATH="/home/perx/app" ENV PYTHONPATH="/home/perx/app"
......
...@@ -11,4 +11,4 @@ services: ...@@ -11,4 +11,4 @@ services:
- "50051:50051" - "50051:50051"
volumes: volumes:
- .:/home/perx/app - .:/home/perx/app
- <Путь к интерпретатору питона>/site-packages/perxis:/usr/local/lib/python3.9/site-packages/perxis - <Путь к интерпретатору питона>/site-packages/perxis:/usr/local/lib/python3.11/site-packages/perxis
\ No newline at end of file \ No newline at end of file
Subproject commit f000812a1eef24093c0d0abf1318e3179b679773 Subproject commit 78539871cf2d9f6b187865c4855450143dec4e79
...@@ -9,15 +9,16 @@ from perxis.collections import collections_pb2_grpc ...@@ -9,15 +9,16 @@ from perxis.collections import collections_pb2_grpc
from perxis.environments import environments_pb2_grpc from perxis.environments import environments_pb2_grpc
from perxis.roles import roles_pb2_grpc from perxis.roles import roles_pb2_grpc
from perxis.clients import clients_pb2_grpc from perxis.clients import clients_pb2_grpc
from perxis.extensions import extension_pb2_grpc, manager_pb2_grpc, manager_pb2 from perxis.common import operation_service_pb2_grpc
from perxis.extensions import manager_service_pb2_grpc, manager_service_pb2, extension_service_pb2_grpc
from perxis.interceptors import header_adder_interceptor from perxis.interceptors import header_adder_interceptor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def _main( async def _main(
ext_descriptor: manager_pb2.ExtensionDescriptor, ext_descriptor: manager_service_pb2.ExtensionDescriptor,
servicer_cls: extension_pb2_grpc.ExtensionServicer, servicer_cls: extension_service_pb2_grpc.ExtensionServiceServicer,
ext_manager_host: str, ext_manager_host: str,
content_host: str content_host: str
): ):
...@@ -27,17 +28,17 @@ async def _main( ...@@ -27,17 +28,17 @@ async def _main(
@aiocron.crontab('* * * * *', start=False) @aiocron.crontab('* * * * *', start=False)
async def register_extension(): async def register_extension():
await ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest( await ext_manager_stub.RegisterExtensions(manager_service_pb2.RegisterExtensionsRequest(
extensions=[ext_descriptor] extensions=[ext_descriptor]
)) ))
logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано в perxis") logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано в perxis")
async with grpc.aio.insecure_channel(ext_manager_host, interceptors=[interceptor]) as extensions_manager_channel: async with grpc.aio.insecure_channel(ext_manager_host, interceptors=[interceptor]) as extensions_manager_channel:
ext_manager_stub = manager_pb2_grpc.ExtensionManagerStub(extensions_manager_channel) ext_manager_stub = manager_service_pb2_grpc.ExtensionManagerServiceStub(extensions_manager_channel)
registered_extensions: manager_pb2.ListExtensionsResponse = await ext_manager_stub.ListExtensions( #registered_extensions: manager_service_pb2.ListExtensionsResponse = await ext_manager_stub.ListExtensions(
manager_pb2.ListExtensionsRequest() # manager_service_pb2.ListExtensionsRequest()
) #)
# todo enable after fix - https://tracker.yandex.ru/PRXS-1507 # todo enable after fix - https://tracker.yandex.ru/PRXS-1507
# for ext in registered_extensions.extensions: # for ext in registered_extensions.extensions:
...@@ -63,19 +64,22 @@ async def _main( ...@@ -63,19 +64,22 @@ async def _main(
environments_stub = environments_pb2_grpc.EnvironmentsStub(content_channel) environments_stub = environments_pb2_grpc.EnvironmentsStub(content_channel)
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10)) server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
extension_pb2_grpc.add_ExtensionServicer_to_server(
servicer_cls( servicer = servicer_cls(
collections_stub, environments_stub, roles_stub, clients_stub collections_stub, environments_stub, roles_stub, clients_stub
), server
) )
extension_service_pb2_grpc.add_ExtensionServiceServicer_to_server(servicer, server)
operation_service_pb2_grpc.add_OperationServiceServicer_to_server(servicer, server)
server.add_insecure_port("[::]:50051") server.add_insecure_port("[::]:50051")
await server.start() await server.start()
await server.wait_for_termination() await server.wait_for_termination()
def bootstrap( def bootstrap(
ext_descriptor: manager_pb2.ExtensionDescriptor, ext_descriptor: manager_service_pb2.ExtensionDescriptor,
servicer_cls: extension_pb2_grpc.ExtensionServicer, servicer_cls: extension_service_pb2_grpc.ExtensionServiceServicer,
ext_manager_host: str, ext_manager_host: str,
content_host: str, content_host: str,
): ):
......
import grpc import grpc
import uuid
import typing
import asyncio
import logging import logging
import aiocron
import datetime
import dataclasses
from perxis.extensions import extension_pb2, extension_pb2_grpc from google.protobuf import timestamp_pb2, any_pb2, wrappers_pb2
from perxis.extensions import extension_service_pb2, extension_service_pb2_grpc, extension_pb2
from perxis.roles import roles_pb2_grpc, roles_pb2 from perxis.roles import roles_pb2_grpc, roles_pb2
from perxis.clients import clients_pb2_grpc, clients_pb2 from perxis.clients import clients_pb2_grpc, clients_pb2
from perxis.common import common_pb2, operation_pb2, operation_service_pb2_grpc, operation_service_pb2, error_pb2
from perxis.collections import collections_pb2_grpc, collections_pb2 from perxis.collections import collections_pb2_grpc, collections_pb2
from perxis.environments import environments_pb2_grpc from perxis.environments import environments_pb2_grpc
from perxis.extensions.extension_setup import ExtensionSetup from perxis.extensions.extension_setup import ExtensionSetup
class ExtensionService(extension_pb2_grpc.ExtensionServicer): def generate_operation_id() -> str:
return str(uuid.uuid4())
def datetime_to_timestamp(dt: datetime) -> timestamp_pb2.Timestamp:
timestamp = dt.timestamp()
seconds = int(timestamp)
nanos = int(timestamp % 1 * 1e9)
return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
@dataclasses.dataclass
class OperationMeta:
task: asyncio.Task
operation_id: str
description: str
created_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now)
modified_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now)
response: typing.Optional[str] = None
errors: typing.Optional[list[str]] = None
was_finished: bool = False
metadata: dict[str, typing.Any] = dataclasses.field(default_factory=dict)
def finish(self, errors: typing.Optional[list[str]] = None):
self.modified_at = datetime.datetime.now()
self.was_finished = True
self.errors = errors
self.response = None if errors else "OK"
def to_operation(self):
packed_response = any_pb2.Any()
packed_response.Pack(wrappers_pb2.StringValue(value=self.response or "PENDING"))
return operation_pb2.Operation(
id=self.operation_id,
description=self.description,
created_at=datetime_to_timestamp(self.created_at),
modified_at=datetime_to_timestamp(self.modified_at),
done=self.was_finished,
metadata=self.metadata,
response=packed_response,
error=error_pb2.Error(
message="; ".join(self.errors)
) if self.errors else None
)
class ExtensionService(
extension_service_pb2_grpc.ExtensionServiceServicer, operation_service_pb2_grpc.OperationServiceServicer
):
extension_id: str extension_id: str
collections: list[collections_pb2.Collection] = [] collections: list[collections_pb2.Collection] = []
roles: list[roles_pb2.Role] = [] roles: list[roles_pb2.Role] = []
clients: list[clients_pb2.Client] = [] clients: list[clients_pb2.Client] = []
__operations: dict[str, OperationMeta] = {}
def __init__(self, def __init__(self,
collections_service: collections_pb2_grpc.CollectionsStub, collections_service: collections_pb2_grpc.CollectionsStub,
environments_service: environments_pb2_grpc.EnvironmentsStub, environments_service: environments_pb2_grpc.EnvironmentsStub,
...@@ -41,209 +106,229 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): ...@@ -41,209 +106,229 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
for client in self.clients or []: for client in self.clients or []:
self.extension_setup.add_client(client) self.extension_setup.add_client(client)
def write_result_log(self, operation, request, response): @aiocron.crontab('0 * * * *', start=True)
log_func = self.get_log_func_by_state(response.results[0].state) async def remove_old_operations():
self.remove_old_operations()
def remove_old_operations(self):
self.logger.info("Удаление старых операций")
ids = []
now = datetime.datetime.now()
for operation_meta in self.__operations.values():
if not operation_meta.was_finished:
continue
time_delta = now - operation_meta.modified_at
# Для операций которые завершены больше часа назад
if time_delta.seconds < 60 * 60:
continue
# Если по какой-то причине операция была помечена как завершенная но task ещё активен
need_to_cancel_task = not any(
(operation_meta.task.done(), operation_meta.task.cancelled())
)
if need_to_cancel_task:
try:
operation_meta.task.cancel()
except Exception as e:
self.logger.error("Не удалось отменить task")
self.logger.exception(e)
continue
ids.append(operation_meta.operation_id)
self.logger.info(f"Операция {operation_meta.operation_id} помечена на удаление")
for operation_id in ids:
del self.__operations[operation_id]
self.logger.info("Удаление старых операций завершено")
def result_log(self, operation, operation_id: str, request, errors: list[str]):
log_func = self.logger.error if errors else self.logger.info
log_func( log_func(
"Результат %s расширения %s для окружения %s пространства %s: \r\n" "Операция %s (%s) расширения %s для окружения %s пространства %s завершена %s"
"State: %s\r\n" % (
"Msg: %s\r\n"
"Error: %s" % (
operation, operation,
operation_id,
self.extension_id, self.extension_id,
request.env_id, request.env_id,
request.space_id, request.space_id,
response.results[0].state, "с ошибками: " + "; ".join(errors) if errors else "успешно"
response.results[0].msg,
response.results[0].error
) )
) )
def get_log_func_by_state(self, state): def ext_request_results_from_exception(self, e: Exception):
if state == extension_pb2.ExtensionRequestResult.State.OK:
return self.logger.info
else:
return self.logger.error
def ext_request_results_from_exception(self, e: Exception) -> extension_pb2.ExtensionRequestResult:
return [ return [
extension_pb2.ExtensionRequestResult( extension_service_pb2.ExtensionRequestResult(
extension=self.extension_id, extension=self.extension_id,
state=extension_pb2.ExtensionRequestResult.State.ERROR, state=extension_service_pb2.ExtensionRequestResult.State.ERROR,
error=str(e), error=str(e),
msg=None msg=None
) )
] ]
async def _Install(self, request: extension_pb2.InstallRequest, context): def get_operation_meta(self, operation_id: str) -> typing.Optional[OperationMeta]:
return self.__operations.get(operation_id)
def set_operation_meta(self, operation_id: str, description: str, task: asyncio.Task):
self.__operations[operation_id] = OperationMeta(
operation_id=operation_id,
task=task,
description=description
)
async def _Install(self, operation_id: str, request: extension_service_pb2.InstallRequest, context):
errors_list = await self.extension_setup.install( errors_list = await self.extension_setup.install(
request.space_id, request.env_id, request.force request.space_id, request.env_id, request.force
) )
response_state = extension_pb2.ExtensionRequestResult.State.OK \ self.result_log("установки", operation_id, request, errors_list)
if not errors_list \
else extension_pb2.ExtensionRequestResult.State.ERROR
return extension_pb2.InstallResponse( self.__operations[operation_id].finish(errors_list)
results=[extension_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=response_state,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
)
async def Install(self, request: extension_pb2.InstallRequest, context): async def Install(self, request: extension_service_pb2.InstallRequest, context):
self.logger.info( operation_id = generate_operation_id()
"Установка расширения %s для окружения %s пространства %s. %s force" % ( operation_description = "Установка расширения %s для окружения %s пространства %s. %s force" % (
self.extension_id, self.extension_id,
request.env_id, request.env_id,
request.space_id, request.space_id,
"С" if request.force else "Без" "С" if request.force else "Без"
) )
)
try: self.logger.info(operation_description)
response = await self._Install(request, context)
except Exception as e:
response = extension_pb2.InstallResponse(
results=self.ext_request_results_from_exception(e)
)
if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK: install_task = asyncio.create_task(self._Install(operation_id, request, context))
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(response.results[0].error)
self.write_result_log("установки", request, response) self.set_operation_meta(operation_id, operation_description, install_task)
return response return self.get_operation_meta(operation_id).to_operation()
async def _Update(self, request: extension_pb2.UpdateRequest, context): async def _Uninstall(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context):
errors_list = await self.extension_setup.update( errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove)
request.space_id, request.env_id, request.force
)
response_state = extension_pb2.ExtensionRequestResult.State.OK \ self.result_log("удаления", operation_id, request, errors_list)
if not errors_list \
else extension_pb2.ExtensionRequestResult.State.ERROR
return extension_pb2.UpdateResponse( self.__operations[operation_id].finish(errors_list)
results=[extension_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=response_state,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
)
async def Update(self, request: extension_pb2.UpdateRequest, context): async def Uninstall(self, request: extension_service_pb2.UninstallRequest, context):
self.logger.info( operation_id = generate_operation_id()
"Обновление расширения %s для окружения %s пространства %s. %s force" % ( operation_description = "Удаление расширения %s для окружения %s пространства %s. %s remove" % (
self.extension_id, self.extension_id,
request.env_id, request.env_id,
request.space_id, request.space_id,
"С" if request.force else "Без" "С" if request.remove else "Без"
)
) )
try: self.logger.info(operation_description)
response = await self._Update(request, context)
except Exception as e:
response = extension_pb2.UpdateResponse(
results=self.ext_request_results_from_exception(e)
)
if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK: uninstall_task = asyncio.create_task(self._Uninstall(operation_id, request, context))
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(response.results[0].error)
self.write_result_log("обновления", request, response) self.set_operation_meta(operation_id, operation_description, uninstall_task)
return response return self.get_operation_meta(operation_id).to_operation()
async def _Uninstall(self, request: extension_pb2.UninstallRequest, context): async def _Check(self, operation_id: str, request: extension_service_pb2.CheckRequest, context):
errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove) errors_list: list[str] = await self.extension_setup.check(request.space_id, request.env_id)
response_state = extension_pb2.ExtensionRequestResult.State.OK \ self.result_log("проверки", operation_id, request, errors_list)
if not errors_list \
else extension_pb2.ExtensionRequestResult.State.ERROR
return extension_pb2.UninstallResponse( self.__operations[operation_id].finish(errors_list)
results=[extension_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=response_state,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
)
async def Uninstall(self, request: extension_pb2.UninstallRequest, context): async def Check(self, request: extension_service_pb2.CheckRequest, context):
self.logger.info( operation_id = generate_operation_id()
"Удаление расширения %s для окружения %s пространства %s. %s remove" % ( operation_description = "Проверка расширения %s для окружения %s пространства %s" % (
self.extension_id, self.extension_id,
request.env_id, request.env_id,
request.space_id, request.space_id,
"С" if request.remove else "Без"
)
) )
try: check_task = asyncio.create_task(self._Check(operation_id, request, context))
response = await self._Uninstall(request, context)
except Exception as e:
response = extension_pb2.UninstallResponse(
results=self.ext_request_results_from_exception(e)
)
if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK: self.set_operation_meta(operation_id, operation_description, check_task)
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(response.results[0].error)
self.write_result_log("удаления", request, response) return self.get_operation_meta(operation_id).to_operation()
return response async def Action(self, request: extension_pb2.ActionRequest, context):
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("Unknown action")
async def _Check(self, request: extension_pb2.CheckRequest, context): return None
errors_list = await self.extension_setup.check(request.space_id, request.env_id)
response_state = extension_pb2.ExtensionRequestResult.State.OK \ async def Get(self, request: operation_service_pb2.GetOperationRequest, context):
if not errors_list \ operations_meta = self.__operations.get(request.operation_id)
else extension_pb2.ExtensionRequestResult.State.ERROR
return extension_pb2.CheckResponse( if not operations_meta:
results=[extension_pb2.ExtensionRequestResult( error_description = "Ошибка проверки операции %s в расширении %s - не найдена" % (
extension=self.extension_id, request.operation_id,
state=response_state, self.extension_id,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
) )
async def Check(self, request: extension_pb2.CheckRequest, context): context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(error_description)
self.logger.error(error_description)
return None
self.logger.info( self.logger.info(
"Проверка расширения %s для окружения %s пространства %s" % ( f"Статус операции {operations_meta.operation_id}:\n"
f"{operations_meta.description}\n"
f"{'выполняется' if not operations_meta.was_finished else 'завершена'}, \n"
f"{'без ошибок' if not operations_meta.errors else 'ошибки: ' + '; '.join(operations_meta.errors)}"
)
return operations_meta.to_operation()
def Cancel(self, request: operation_service_pb2.CancelOperationRequest, context):
operations_meta = self.__operations.get(request.operation_id)
if not operations_meta:
error_description = "Не удалось удалить операцию %s в расширении %s - не найдена" % (
request.operation_id,
self.extension_id, self.extension_id,
request.env_id,
request.space_id,
) )
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(error_description)
self.logger.error(
error_description
) )
return None
try: try:
response = await self._Check(request, context) operations_meta.task.cancel()
operations_meta.response = "Отменено"
except Exception as e: except Exception as e:
response = extension_pb2.CheckResponse( error_description = "Во время отмены операции %s расширении %s произошла ошибка %s" % (
results=self.ext_request_results_from_exception(e) request.operation_id,
self.extension_id,
str(e)
) )
if response.results[0].state != extension_pb2.ExtensionRequestResult.State.OK: self.logger.error(error_description)
context.set_code(grpc.StatusCode.UNKNOWN) context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(response.results[0].error) context.set_details(error_description)
self.write_result_log("проверки", request, response) self.logger.exception(e)
return response return None
async def Action(self, request: extension_pb2.ActionRequest, context): self.logger.info(
context.set_code(grpc.StatusCode.UNKNOWN) "Операция %s в расширении %s отменена" % (
context.set_details("Unknown action") request.operation_id,
self.extension_id,
)
)
return None return operations_meta.to_operation()
from typing import Optional from typing import Optional
from perxis.extensions import manager_pb2 from perxis.extensions import manager_service_pb2
def get_extension_descriptor( def get_extension_descriptor(
ext_host: str, ext_id: str, ext_name: str, ext_description: str, ext_host: str, ext_id: str, ext_name: str, ext_description: str,
ext_version: str, ext_version_description: str, ext_version: str, ext_version_description: str,
ext_deps: Optional[list[str]] = None ext_deps: Optional[list[str]] = None
) -> manager_pb2.ExtensionDescriptor: ) -> manager_service_pb2.ExtensionDescriptor:
if not ext_deps: if not ext_deps:
ext_deps = [] ext_deps = []
return manager_pb2.ExtensionDescriptor( return manager_service_pb2.ExtensionDescriptor(
extension=ext_id, extension=ext_id,
title=ext_name, title=ext_name,
description=ext_description, description=ext_description,
......
...@@ -14,7 +14,7 @@ def load_requirements(): ...@@ -14,7 +14,7 @@ def load_requirements():
setup( setup(
name='perxis', name='perxis',
version='1.2.2', version='1.2.3',
description='Perxis python client', description='Perxis python client',
long_description=long_description, long_description=long_description,
long_description_content_type='text/markdown', long_description_content_type='text/markdown',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment