perxis.extensions.extension_service
Модуль содержит базовый класс сервисов расширений - ExtensionService
1""" 2 Модуль содержит базовый класс сервисов расширений - ExtensionService 3""" 4 5 6import grpc 7import uuid 8import typing 9import asyncio 10import logging 11import aiocron 12import datetime 13import dataclasses 14 15from google.protobuf import timestamp_pb2, any_pb2, wrappers_pb2 16 17from perxis.extensions import extension_service_pb2, extension_service_pb2_grpc, extension_pb2, manager_service_pb2_grpc 18from perxis.collaborators import collaborators_pb2_grpc 19from perxis.invitations import invitations_pb2_grpc 20from perxis.locales import locales_pb2_grpc 21from perxis.organizations import organizations_pb2_grpc 22from perxis.members import members_pb2_grpc 23from perxis.users import users_pb2_grpc 24from perxis.references import references_pb2_grpc 25from perxis.images import images_pb2_grpc 26from perxis.files import files_pb2_grpc 27from perxis.roles import roles_pb2_grpc, roles_pb2 28from perxis.items import items_pb2_grpc 29from perxis.clients import clients_pb2_grpc, clients_pb2 30from perxis.spaces import spaces_pb2_grpc 31from perxis.common import operation_pb2, operation_service_pb2_grpc, operation_service_pb2, error_pb2 32from perxis.collections import collections_pb2_grpc, collections_pb2 33from perxis.environments import environments_pb2_grpc 34from perxis.extensions.extension_setup import ExtensionSetup 35from perxis.extensions.item_models import AbstractItem 36 37 38def generate_operation_id() -> str: 39 """ 40 Функция для генерации идентификатора операции 41 42 Returns: 43 str 44 """ 45 46 return str(uuid.uuid4()) 47 48 49def datetime_to_timestamp(dt: datetime) -> timestamp_pb2.Timestamp: 50 """ 51 Функция для преобразования объекта datetime в объект timestamp_pb2.Timestamp 52 53 Arguments: 54 dt (datetime) 55 Returns: 56 timestamp_pb2.Timestamp 57 """ 58 59 timestamp = dt.timestamp() 60 seconds = int(timestamp) 61 nanos = int(timestamp % 1 * 1e9) 62 63 return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos) 64 65 66@dataclasses.dataclass 67class OperationMeta: 68 """ 69 Класс для хранения метаданных операции 70 71 Attributes: 72 task (asyncio.Task): ссылка на task операции 73 operation_id (str): идентификатор операции 74 description (str): описание операции 75 created_at (datetime.datetime): дата создания операции 76 modified_at (datetime.datetime): дата изменения метаданных операции 77 response (str | None): результат операции 78 errors (list[str]): возникшие во время выполнения операции ошибки 79 was_finished (bool): флаг завершения операции 80 metadata (dict): любые прочие метаданные операции 81 """ 82 83 task: asyncio.Task 84 85 operation_id: str 86 description: str 87 88 created_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now) 89 modified_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now) 90 91 response: typing.Optional[str] = None 92 errors: typing.Optional[list[str]] = None 93 94 was_finished: bool = False 95 metadata: dict[str, typing.Any] = dataclasses.field(default_factory=dict) 96 97 def mark_cancelled(self): 98 """ 99 Функция для отмены операции 100 """ 101 102 self.was_finished = True 103 self.response = "Отменено" 104 105 def mark_finished(self, errors: typing.Optional[list[str]] = None): 106 """ 107 Функция для завершения операции 108 109 Arguments: 110 errors (list[str] | None): список ошибок 111 """ 112 113 self.modified_at = datetime.datetime.now() 114 self.was_finished = True 115 self.errors = errors 116 self.response = None if errors else "OK" 117 118 def to_operation(self): 119 """ 120 Функция для преобразования объекта операции в формат perxis 121 122 Returns: 123 operation_pb2.Operation 124 """ 125 126 packed_response = any_pb2.Any() 127 packed_response.Pack(wrappers_pb2.StringValue(value=self.response or "PENDING")) 128 129 return operation_pb2.Operation( 130 id=self.operation_id, 131 description=self.description, 132 created_at=datetime_to_timestamp(self.created_at), 133 modified_at=datetime_to_timestamp(self.modified_at), 134 done=self.was_finished, 135 metadata=self.metadata, 136 response=packed_response, 137 error=error_pb2.Error( 138 message="; ".join(self.errors) 139 ) if self.errors else None 140 ) 141 142 143class ExtensionService( 144 extension_service_pb2_grpc.ExtensionServiceServicer, operation_service_pb2_grpc.OperationServiceServicer 145): 146 """ 147 Базовый класс для сервисов расширений 148 149 Attributes: 150 extension_id (str): идентификатор расширения 151 collections (list[collections_pb2.Collection]): список коллекций расширения 152 roles (list[roles_pb2.Role]): список ролей расширения 153 clients (list[clients_pb2.Client]): список клиентов расширения 154 actions (list[dict]): список действий расширения 155 items (list[AbstractItem]): список управляемых расширением item'ов 156 __operations (dict[str, OperationMeta]): маппинг с данными операций 157 logger (logging.Logger): логгер расширения 158 ext_manager_service (manager_service_pb2_grpc.ExtensionManagerServiceStub): ссылка на сервис менеджера расширений 159 collections_service (collections_pb2_grpc.CollectionsStub): ссылка на сервис коллекций 160 environments_service (environments_pb2_grpc.EnvironmentsStub): ссылка на сервис окружений 161 roles_service (roles_pb2_grpc.RolesStub): ссылка на сервис ролей 162 clients_service (clients_pb2_grpc.ClientsStub): ссылка на сервис клиентов 163 items_service (items_pb2_grpc.ItemsStub): ссылка на сервис item'ов 164 spaces_service (spaces_pb2_grpc.SpacesStub): ссылка на сервис пространств 165 files_service (files_pb2_grpc.FilesStub): ссылка на сервис файлов 166 images_service (images_pb2_grpc.ImagesStub): ссылка на сервис изображений 167 references_service (references_pb2_grpc.ReferencesStub): ссылка на сервис для работы с объектами Reference 168 users_service (users_pb2_grpc.UsersStub): ссылка на сервис пользователей 169 organizations_service (organizations_pb2_grpc.OrganizationsStub): ссылка на сервис организаций 170 members_service (members_pb2_grpc.MembersStub): ссылка на сервис работы с пользователями организаций 171 locales_service (locales_pb2_grpc.LocalesStub): ссылка на сервис локалей и переводов 172 invitations_service (invitations_pb2_grpc.InvitationsStub): ссылка на сервис приглашений 173 collaborators_service (collaborators_pb2_grpc.CollaboratorsStub): ссылка на сервис коллабораторов 174 channel (grpc.Channel): ссылка на grpc канал 175 extension_setup (ExtensionSetup): объект класса ExtensionSetup для управления данными расширения 176 """ 177 178 extension_id: str 179 collections: list[collections_pb2.Collection] = [] 180 roles: list[roles_pb2.Role] = [] 181 clients: list[clients_pb2.Client] = [] 182 actions: list[dict] = [] 183 items: list[AbstractItem] = [] 184 185 __operations: dict[str, OperationMeta] 186 187 def __init__(self, 188 ext_manager_service: manager_service_pb2_grpc.ExtensionManagerServiceStub, 189 collections_service: collections_pb2_grpc.CollectionsStub, 190 environments_service: environments_pb2_grpc.EnvironmentsStub, 191 roles_service: roles_pb2_grpc.RolesStub, 192 clients_service: clients_pb2_grpc.ClientsStub, 193 items_service: items_pb2_grpc.ItemsStub, 194 spaces_service: spaces_pb2_grpc.SpacesStub, 195 files_service: files_pb2_grpc.FilesStub, 196 images_service: images_pb2_grpc.ImagesStub, 197 references_service: references_pb2_grpc.ReferencesStub, 198 users_service: users_pb2_grpc.UsersStub, 199 organizations_service: organizations_pb2_grpc.OrganizationsStub, 200 members_service: members_pb2_grpc.MembersStub, 201 locales_service: locales_pb2_grpc.LocalesStub, 202 invitations_service: invitations_pb2_grpc.InvitationsStub, 203 collaborators_service: collaborators_pb2_grpc.CollaboratorsStub, 204 channel: grpc.Channel, 205 ): 206 self.logger = logging.getLogger(__name__) 207 self.ext_manager_service = ext_manager_service 208 self.collections_service = collections_service 209 self.environments_service = environments_service 210 self.roles_service = roles_service 211 self.clients_service = clients_service 212 self.items_service = items_service 213 self.spaces_service = spaces_service 214 self.files_service = files_service 215 self.images_service = images_service 216 self.references_service = references_service 217 self.users_service = users_service 218 self.organizations_service = organizations_service 219 self.members_service = members_service 220 self.locales_service = locales_service 221 self.invitations_service = invitations_service 222 self.collaborators_service = collaborators_service 223 self.channel = channel 224 225 self.extension_setup = ExtensionSetup( 226 self.collections_service, self.environments_service, 227 self.roles_service, self.clients_service, self.items_service, 228 ) 229 230 self.__operations = {} 231 232 for collection in self.collections or []: 233 self.extension_setup.add_collection(collection) 234 235 for role in self.roles or []: 236 self.extension_setup.add_role(role) 237 238 for client in self.clients or []: 239 self.extension_setup.add_client(client) 240 241 for action in self.actions or []: 242 self.extension_setup.add_action(action) 243 244 services_list = [ 245 (service_name, getattr(self, service_name)) 246 for service_name 247 in self.__dict__ 248 if service_name.endswith("_service") 249 ] 250 services_list.append(("channel", self.channel, )) 251 252 for item in self.items: 253 for rule in item.rules: 254 rule.bind_services(services_list) 255 256 self.extension_setup.set_items(self.items) 257 258 @aiocron.crontab('0 * * * *', start=True) 259 async def remove_old_operations(): 260 self.remove_old_operations() 261 262 @property 263 def operations(self): 264 return self.__operations 265 266 def remove_old_operations(self): 267 """ 268 Функция для удаления stale операций. Запускается раз в час. Удаляет все операции 269 которые были завершены больше часа назад 270 """ 271 272 self.logger.info("Удаление старых операций") 273 274 ids = [] 275 now = datetime.datetime.now() 276 277 for operation_meta in self.__operations.values(): 278 task_is_not_running = operation_meta.task.done() or operation_meta.task.cancelled() 279 280 # Если task фактически не работает то операцию нужно пометить как выполненную. Это может произойти в случае 281 # неотловленного исключения, например 282 if task_is_not_running: 283 operation_meta.was_finished = True 284 285 if not operation_meta.was_finished: 286 continue 287 288 time_delta = now - operation_meta.modified_at 289 290 # Для операций которые завершены больше часа назад 291 if time_delta.seconds < 60 * 60: 292 continue 293 294 # Если по какой-то причине операция была помечена как завершенная но task ещё активен 295 need_to_cancel_task = not any( 296 (operation_meta.task.done(), operation_meta.task.cancelled()) 297 ) 298 299 if need_to_cancel_task: 300 try: 301 operation_meta.task.cancel() 302 except Exception as e: 303 self.logger.error("Не удалось отменить task") 304 self.logger.exception(e) 305 306 continue 307 308 ids.append(operation_meta.operation_id) 309 310 self.logger.info(f"Операция {operation_meta.operation_id} помечена на удаление") 311 312 for operation_id in ids: 313 del self.__operations[operation_id] 314 315 self.logger.info("Удаление старых операций завершено") 316 317 def result_log(self, operation, operation_id: str, request, errors: list[str]): 318 """ 319 Вспомогательная функция для логгирования резултата выполнения операции 320 321 Arguments: 322 operation (str): название операции 323 operation_id (str): идентификатор операции 324 request: объект запроса 325 errors (list[str]): ошибки выполнения операции 326 """ 327 328 log_func = self.logger.error if errors else self.logger.info 329 330 log_func( 331 "Операция %s (%s) расширения %s для окружения %s пространства %s завершена %s" 332 % ( 333 operation, 334 operation_id, 335 self.extension_id, 336 request.env_id, 337 request.space_id, 338 "с ошибками: " + "; ".join(errors) if errors else "успешно" 339 ) 340 ) 341 342 def ext_request_results_from_exception(self, e: Exception): 343 """ 344 Вспомогательная функция для преобразования объектов исключений в объект результата запроса 345 346 Arguments: 347 e (Exception): любое исключение 348 Returns: 349 extension_service_pb2.ExtensionRequestResult 350 """ 351 352 return [ 353 extension_service_pb2.ExtensionRequestResult( 354 extension=self.extension_id, 355 state=extension_service_pb2.ExtensionRequestResult.State.ERROR, 356 error=str(e), 357 msg=None 358 ) 359 ] 360 361 def get_operation_meta(self, operation_id: str) -> typing.Optional[OperationMeta]: 362 """ 363 Получение объекта операции по идентификатору 364 365 Arguments: 366 operation_id (str): идентификатор операции 367 368 Returns: 369 OperationMeta | None 370 """ 371 372 return self.__operations.get(operation_id) 373 374 def set_operation_meta(self, operation_id: str, description: str, task: asyncio.Task): 375 """ 376 Функция для сохранения данных об операции 377 Arguments: 378 operation_id (str): идентификатор операции 379 description (str): описание операции 380 task (asyncio.Task): объект task операции 381 """ 382 383 self.__operations[operation_id] = OperationMeta( 384 operation_id=operation_id, 385 task=task, 386 description=description 387 ) 388 389 def mark_operation_as_finished(self, operation_id: str, errors: list[str] | None = None): 390 """ 391 Функция для завершения операции 392 393 Arguments: 394 operation_id (str): идентификатор операции 395 errors (list[str] | None): ошибки выполнения операции 396 """ 397 398 if operation_id in self.operations: 399 self.operations[operation_id].mark_finished(errors) 400 else: 401 self.logger.error(f"Операция {operation_id} не найдена!") 402 403 async def _Install(self, operation_id: str, request: extension_service_pb2.InstallRequest, context): 404 """ 405 Действия метода Install сервиса расширений. Вызывается после всех необходимых действий с 406 формированием данных об операции. В случае необходимости можно переопределить 407 408 Arguments: 409 operation_id (str): идентификатор операции 410 request (extension_service_pb2.InstallRequest): объект запроса 411 context: контекст 412 """ 413 414 errors_list = await self.extension_setup.install( 415 request.space_id, request.env_id, request.force 416 ) 417 418 errors_list += await self.additional_install_operations(operation_id, request, context) 419 420 self.result_log("установки", operation_id, request, errors_list) 421 422 self.mark_operation_as_finished(operation_id, errors_list) 423 424 async def additional_install_operations(self, operation_id: str, request: extension_service_pb2.InstallRequest, context) -> list[str]: 425 """ 426 Функция для возможности добавления любой дополнительной логики после завершения всех стандартных 427 процедур установки. Можно переопределять. Возвращает массив с ошибками. 428 429 Arguments: 430 operation_id (str): идентификатор операции 431 request (extension_service_pb2.InstallRequest): объект запроса 432 context: контекст 433 434 Returns: 435 list[str] 436 """ 437 438 return [] 439 440 async def Install(self, request: extension_service_pb2.InstallRequest, context): 441 """ 442 Реализация метода Install класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит 443 в себе формирование данных об операции. Переопределять не следует, для изменения логики Install 444 лучше работать с методами _Install если нужно всё радикально изменить или с методом 445 additional_install_operations для добавления чего-то дополнительного 446 """ 447 448 operation_id = generate_operation_id() 449 operation_description = "Установка расширения %s для окружения %s пространства %s. %s force" % ( 450 self.extension_id, 451 request.env_id, 452 request.space_id, 453 "С" if request.force else "Без" 454 ) 455 456 self.logger.info(operation_description) 457 458 install_task = asyncio.create_task(self._Install(operation_id, request, context)) 459 460 self.set_operation_meta(operation_id, operation_description, install_task) 461 462 return self.get_operation_meta(operation_id).to_operation() 463 464 async def _Uninstall(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context): 465 """ 466 Действия метода _Uninstall сервиса расширений. Вызывается после всех необходимых действий с 467 формированием данных об операции. В случае необходимости можно переопределить 468 469 Arguments: 470 operation_id (str): идентификатор операции 471 request (extension_service_pb2.UninstallRequest): объект запроса 472 context: контекст 473 """ 474 475 errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove) 476 errors_list += await self.additional_uninstall_operations(operation_id, request, context) 477 478 self.result_log("удаления", operation_id, request, errors_list) 479 480 self.mark_operation_as_finished(operation_id, errors_list) 481 482 async def additional_uninstall_operations(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context) -> list[str]: 483 """ 484 Функция для возможности добавления любой дополнительной логики после завершения всех стандартных 485 процедур удаления. Можно переопределять. Возвращает массив с ошибками. 486 487 Arguments: 488 operation_id (str): идентификатор операции 489 request (extension_service_pb2.UninstallRequest): объект запроса 490 context: контекст 491 492 Returns: 493 list[str] 494 """ 495 496 return [] 497 498 async def Uninstall(self, request: extension_service_pb2.UninstallRequest, context): 499 """ 500 Реализация метода Uninstall класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит 501 в себе формирование данных об операции. Переопределять не следует, для изменения логики Uninstall 502 лучше работать с методами _Uninstall если нужно всё радикально изменить или с методом 503 additional_uninstall_operations для добавления чего-то дополнительного 504 """ 505 506 operation_id = generate_operation_id() 507 operation_description = "Удаление расширения %s для окружения %s пространства %s. %s remove" % ( 508 self.extension_id, 509 request.env_id, 510 request.space_id, 511 "С" if request.remove else "Без" 512 ) 513 514 self.logger.info(operation_description) 515 516 uninstall_task = asyncio.create_task(self._Uninstall(operation_id, request, context)) 517 518 self.set_operation_meta(operation_id, operation_description, uninstall_task) 519 520 return self.get_operation_meta(operation_id).to_operation() 521 522 async def _Check(self, operation_id: str, request: extension_service_pb2.CheckRequest, context): 523 """ 524 Действия метода _Check сервиса расширений. Вызывается после всех необходимых действий с 525 формированием данных об операции. В случае необходимости можно переопределить 526 527 Arguments: 528 operation_id (str): идентификатор операции 529 request (extension_service_pb2.CheckRequest): объект запроса 530 context: контекст 531 """ 532 533 errors_list: list[str] = await self.extension_setup.check(request.space_id, request.env_id) 534 535 self.result_log("проверки", operation_id, request, errors_list) 536 537 self.__operations[operation_id].mark_finished(errors_list) 538 539 async def additional_check_operations(self, operation_id: str, request: extension_service_pb2.CheckRequest, context) -> list[str]: 540 """ 541 Функция для возможности добавления любой дополнительной логики после завершения всех стандартных 542 процедур проверки. Можно переопределять. Возвращает массив с ошибками. 543 544 Arguments: 545 operation_id (str): идентификатор операции 546 request (extension_service_pb2.CheckRequest): объект запроса 547 context: контекст 548 549 Returns: 550 list[str] 551 """ 552 553 return [] 554 555 async def Check(self, request: extension_service_pb2.CheckRequest, context): 556 """ 557 Реализация метода Check класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит 558 в себе формирование данных об операции. Переопределять не следует, для изменения логики Check 559 лучше работать с методами _Check если нужно всё радикально изменить или с методом 560 additional_check_operations для добавления чего-то дополнительного 561 """ 562 563 operation_id = generate_operation_id() 564 operation_description = "Проверка расширения %s для окружения %s пространства %s" % ( 565 self.extension_id, 566 request.env_id, 567 request.space_id, 568 ) 569 570 check_task = asyncio.create_task(self._Check(operation_id, request, context)) 571 572 self.set_operation_meta(operation_id, operation_description, check_task) 573 574 return self.get_operation_meta(operation_id).to_operation() 575 576 async def _dispatch_action( 577 self, request: extension_pb2.ActionRequest, context 578 ) -> extension_service_pb2.ActionResponse: 579 """ 580 Метод для получение нужной ф-ции для действия и её вызова 581 582 Arguments: 583 request (extension_pb2.ActionRequest): объект запроса 584 context: контекст 585 586 Returns: 587 extension_service_pb2.ActionResponse 588 """ 589 590 action_id = request.action.split("/")[-1] 591 592 func_name = f"action_{action_id}" 593 594 if not hasattr(self, func_name): 595 response = extension_service_pb2.ActionResponse( 596 state=extension_service_pb2.ActionResponse.State.ERROR, 597 title="Невозможно выполнить действие", 598 error=f"В расширении отсутсвует функция {action_id}" 599 ) 600 else: 601 func = getattr(self, func_name) 602 603 response = await func(request, context) 604 605 return response 606 607 async def Action(self, request: extension_pb2.ActionRequest, context): 608 """ 609 Реализация метода Action класса extension_service_pb2_grpc.ExtensionServiceServicer 610 611 Arguments: 612 request (extension_pb2.ActionRequest): объект запроса 613 context: контекст 614 """ 615 616 operation_description = "Действие %s для окружения %s пространства %s" % ( 617 request.action, 618 request.env_id, 619 request.space_id, 620 ) 621 622 self.logger.info(operation_description) 623 624 response = await self._dispatch_action(request, context) 625 626 if response.state == extension_service_pb2.ActionResponse.State.ERROR: 627 error_description = "Ошибка действия %s в расширении %s: %s" % ( 628 request.action, 629 self.extension_id, 630 response.error 631 ) 632 633 self.logger.error(error_description) 634 635 return response 636 637 async def Get(self, request: operation_service_pb2.GetOperationRequest, context): 638 """ 639 Реализация метода Get класса operation_service_pb2_grpc.OperationServiceServicer. Используется 640 для получения данных о выполняемой операции 641 642 Arguments: 643 request (operation_service_pb2.GetOperationRequest): объект запроса 644 context: контекст 645 """ 646 647 operations_meta = self.get_operation_meta(request.operation_id) 648 649 if not operations_meta: 650 error_description = "Ошибка проверки операции %s в расширении %s - не найдена" % ( 651 request.operation_id, 652 self.extension_id, 653 ) 654 655 context.set_code(grpc.StatusCode.UNKNOWN) 656 context.set_details(error_description) 657 658 self.logger.error(error_description) 659 660 return None 661 662 self.logger.info( 663 f"Статус операции {operations_meta.operation_id}:\n" 664 f"{operations_meta.description}\n" 665 f"{'выполняется' if not operations_meta.was_finished else 'завершена'}, \n" 666 f"{'без ошибок' if not operations_meta.errors else 'ошибки: ' + '; '.join(operations_meta.errors)}" 667 ) 668 669 return operations_meta.to_operation() 670 671 def Cancel(self, request: operation_service_pb2.CancelOperationRequest, context): 672 """ 673 Реализация метода Cancel класса operation_service_pb2_grpc.OperationServiceServicer. Используется 674 для отмены выполнения операции 675 676 Arguments: 677 request (operation_service_pb2.CancelOperationRequest): объект запроса 678 context: контекст 679 """ 680 681 operations_meta = self.get_operation_meta(request.operation_id) 682 683 if not operations_meta: 684 error_description = "Не удалось удалить операцию %s в расширении %s - не найдена" % ( 685 request.operation_id, 686 self.extension_id, 687 ) 688 689 context.set_code(grpc.StatusCode.UNKNOWN) 690 context.set_details(error_description) 691 692 self.logger.error( 693 error_description 694 ) 695 696 return None 697 698 try: 699 operations_meta.task.cancel() 700 operations_meta.mark_cancelled() 701 except Exception as e: 702 error_description = "Во время отмены операции %s расширении %s произошла ошибка %s" % ( 703 request.operation_id, 704 self.extension_id, 705 str(e) 706 ) 707 708 self.logger.error(error_description) 709 710 context.set_code(grpc.StatusCode.UNKNOWN) 711 context.set_details(error_description) 712 713 self.logger.exception(e) 714 715 return None 716 717 self.logger.info( 718 "Операция %s в расширении %s отменена" % ( 719 request.operation_id, 720 self.extension_id, 721 ) 722 ) 723 724 return operations_meta.to_operation()
39def generate_operation_id() -> str: 40 """ 41 Функция для генерации идентификатора операции 42 43 Returns: 44 str 45 """ 46 47 return str(uuid.uuid4())
Функция для генерации идентификатора операции
Returns:
str
50def datetime_to_timestamp(dt: datetime) -> timestamp_pb2.Timestamp: 51 """ 52 Функция для преобразования объекта datetime в объект timestamp_pb2.Timestamp 53 54 Arguments: 55 dt (datetime) 56 Returns: 57 timestamp_pb2.Timestamp 58 """ 59 60 timestamp = dt.timestamp() 61 seconds = int(timestamp) 62 nanos = int(timestamp % 1 * 1e9) 63 64 return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
Функция для преобразования объекта datetime в объект timestamp_pb2.Timestamp
Arguments:
- dt (datetime)
Returns:
timestamp_pb2.Timestamp
67@dataclasses.dataclass 68class OperationMeta: 69 """ 70 Класс для хранения метаданных операции 71 72 Attributes: 73 task (asyncio.Task): ссылка на task операции 74 operation_id (str): идентификатор операции 75 description (str): описание операции 76 created_at (datetime.datetime): дата создания операции 77 modified_at (datetime.datetime): дата изменения метаданных операции 78 response (str | None): результат операции 79 errors (list[str]): возникшие во время выполнения операции ошибки 80 was_finished (bool): флаг завершения операции 81 metadata (dict): любые прочие метаданные операции 82 """ 83 84 task: asyncio.Task 85 86 operation_id: str 87 description: str 88 89 created_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now) 90 modified_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now) 91 92 response: typing.Optional[str] = None 93 errors: typing.Optional[list[str]] = None 94 95 was_finished: bool = False 96 metadata: dict[str, typing.Any] = dataclasses.field(default_factory=dict) 97 98 def mark_cancelled(self): 99 """ 100 Функция для отмены операции 101 """ 102 103 self.was_finished = True 104 self.response = "Отменено" 105 106 def mark_finished(self, errors: typing.Optional[list[str]] = None): 107 """ 108 Функция для завершения операции 109 110 Arguments: 111 errors (list[str] | None): список ошибок 112 """ 113 114 self.modified_at = datetime.datetime.now() 115 self.was_finished = True 116 self.errors = errors 117 self.response = None if errors else "OK" 118 119 def to_operation(self): 120 """ 121 Функция для преобразования объекта операции в формат perxis 122 123 Returns: 124 operation_pb2.Operation 125 """ 126 127 packed_response = any_pb2.Any() 128 packed_response.Pack(wrappers_pb2.StringValue(value=self.response or "PENDING")) 129 130 return operation_pb2.Operation( 131 id=self.operation_id, 132 description=self.description, 133 created_at=datetime_to_timestamp(self.created_at), 134 modified_at=datetime_to_timestamp(self.modified_at), 135 done=self.was_finished, 136 metadata=self.metadata, 137 response=packed_response, 138 error=error_pb2.Error( 139 message="; ".join(self.errors) 140 ) if self.errors else None 141 )
Класс для хранения метаданных операции
Attributes:
- task (asyncio.Task): ссылка на task операции
- operation_id (str): идентификатор операции
- description (str): описание операции
- created_at (datetime.datetime): дата создания операции
- modified_at (datetime.datetime): дата изменения метаданных операции
- response (str | None): результат операции
- errors (list[str]): возникшие во время выполнения операции ошибки
- was_finished (bool): флаг завершения операции
- metadata (dict): любые прочие метаданные операции
98 def mark_cancelled(self): 99 """ 100 Функция для отмены операции 101 """ 102 103 self.was_finished = True 104 self.response = "Отменено"
Функция для отмены операции
106 def mark_finished(self, errors: typing.Optional[list[str]] = None): 107 """ 108 Функция для завершения операции 109 110 Arguments: 111 errors (list[str] | None): список ошибок 112 """ 113 114 self.modified_at = datetime.datetime.now() 115 self.was_finished = True 116 self.errors = errors 117 self.response = None if errors else "OK"
Функция для завершения операции
Arguments:
- errors (list[str] | None): список ошибок
119 def to_operation(self): 120 """ 121 Функция для преобразования объекта операции в формат perxis 122 123 Returns: 124 operation_pb2.Operation 125 """ 126 127 packed_response = any_pb2.Any() 128 packed_response.Pack(wrappers_pb2.StringValue(value=self.response or "PENDING")) 129 130 return operation_pb2.Operation( 131 id=self.operation_id, 132 description=self.description, 133 created_at=datetime_to_timestamp(self.created_at), 134 modified_at=datetime_to_timestamp(self.modified_at), 135 done=self.was_finished, 136 metadata=self.metadata, 137 response=packed_response, 138 error=error_pb2.Error( 139 message="; ".join(self.errors) 140 ) if self.errors else None 141 )
Функция для преобразования объекта операции в формат perxis
Returns:
operation_pb2.Operation
144class ExtensionService( 145 extension_service_pb2_grpc.ExtensionServiceServicer, operation_service_pb2_grpc.OperationServiceServicer 146): 147 """ 148 Базовый класс для сервисов расширений 149 150 Attributes: 151 extension_id (str): идентификатор расширения 152 collections (list[collections_pb2.Collection]): список коллекций расширения 153 roles (list[roles_pb2.Role]): список ролей расширения 154 clients (list[clients_pb2.Client]): список клиентов расширения 155 actions (list[dict]): список действий расширения 156 items (list[AbstractItem]): список управляемых расширением item'ов 157 __operations (dict[str, OperationMeta]): маппинг с данными операций 158 logger (logging.Logger): логгер расширения 159 ext_manager_service (manager_service_pb2_grpc.ExtensionManagerServiceStub): ссылка на сервис менеджера расширений 160 collections_service (collections_pb2_grpc.CollectionsStub): ссылка на сервис коллекций 161 environments_service (environments_pb2_grpc.EnvironmentsStub): ссылка на сервис окружений 162 roles_service (roles_pb2_grpc.RolesStub): ссылка на сервис ролей 163 clients_service (clients_pb2_grpc.ClientsStub): ссылка на сервис клиентов 164 items_service (items_pb2_grpc.ItemsStub): ссылка на сервис item'ов 165 spaces_service (spaces_pb2_grpc.SpacesStub): ссылка на сервис пространств 166 files_service (files_pb2_grpc.FilesStub): ссылка на сервис файлов 167 images_service (images_pb2_grpc.ImagesStub): ссылка на сервис изображений 168 references_service (references_pb2_grpc.ReferencesStub): ссылка на сервис для работы с объектами Reference 169 users_service (users_pb2_grpc.UsersStub): ссылка на сервис пользователей 170 organizations_service (organizations_pb2_grpc.OrganizationsStub): ссылка на сервис организаций 171 members_service (members_pb2_grpc.MembersStub): ссылка на сервис работы с пользователями организаций 172 locales_service (locales_pb2_grpc.LocalesStub): ссылка на сервис локалей и переводов 173 invitations_service (invitations_pb2_grpc.InvitationsStub): ссылка на сервис приглашений 174 collaborators_service (collaborators_pb2_grpc.CollaboratorsStub): ссылка на сервис коллабораторов 175 channel (grpc.Channel): ссылка на grpc канал 176 extension_setup (ExtensionSetup): объект класса ExtensionSetup для управления данными расширения 177 """ 178 179 extension_id: str 180 collections: list[collections_pb2.Collection] = [] 181 roles: list[roles_pb2.Role] = [] 182 clients: list[clients_pb2.Client] = [] 183 actions: list[dict] = [] 184 items: list[AbstractItem] = [] 185 186 __operations: dict[str, OperationMeta] 187 188 def __init__(self, 189 ext_manager_service: manager_service_pb2_grpc.ExtensionManagerServiceStub, 190 collections_service: collections_pb2_grpc.CollectionsStub, 191 environments_service: environments_pb2_grpc.EnvironmentsStub, 192 roles_service: roles_pb2_grpc.RolesStub, 193 clients_service: clients_pb2_grpc.ClientsStub, 194 items_service: items_pb2_grpc.ItemsStub, 195 spaces_service: spaces_pb2_grpc.SpacesStub, 196 files_service: files_pb2_grpc.FilesStub, 197 images_service: images_pb2_grpc.ImagesStub, 198 references_service: references_pb2_grpc.ReferencesStub, 199 users_service: users_pb2_grpc.UsersStub, 200 organizations_service: organizations_pb2_grpc.OrganizationsStub, 201 members_service: members_pb2_grpc.MembersStub, 202 locales_service: locales_pb2_grpc.LocalesStub, 203 invitations_service: invitations_pb2_grpc.InvitationsStub, 204 collaborators_service: collaborators_pb2_grpc.CollaboratorsStub, 205 channel: grpc.Channel, 206 ): 207 self.logger = logging.getLogger(__name__) 208 self.ext_manager_service = ext_manager_service 209 self.collections_service = collections_service 210 self.environments_service = environments_service 211 self.roles_service = roles_service 212 self.clients_service = clients_service 213 self.items_service = items_service 214 self.spaces_service = spaces_service 215 self.files_service = files_service 216 self.images_service = images_service 217 self.references_service = references_service 218 self.users_service = users_service 219 self.organizations_service = organizations_service 220 self.members_service = members_service 221 self.locales_service = locales_service 222 self.invitations_service = invitations_service 223 self.collaborators_service = collaborators_service 224 self.channel = channel 225 226 self.extension_setup = ExtensionSetup( 227 self.collections_service, self.environments_service, 228 self.roles_service, self.clients_service, self.items_service, 229 ) 230 231 self.__operations = {} 232 233 for collection in self.collections or []: 234 self.extension_setup.add_collection(collection) 235 236 for role in self.roles or []: 237 self.extension_setup.add_role(role) 238 239 for client in self.clients or []: 240 self.extension_setup.add_client(client) 241 242 for action in self.actions or []: 243 self.extension_setup.add_action(action) 244 245 services_list = [ 246 (service_name, getattr(self, service_name)) 247 for service_name 248 in self.__dict__ 249 if service_name.endswith("_service") 250 ] 251 services_list.append(("channel", self.channel, )) 252 253 for item in self.items: 254 for rule in item.rules: 255 rule.bind_services(services_list) 256 257 self.extension_setup.set_items(self.items) 258 259 @aiocron.crontab('0 * * * *', start=True) 260 async def remove_old_operations(): 261 self.remove_old_operations() 262 263 @property 264 def operations(self): 265 return self.__operations 266 267 def remove_old_operations(self): 268 """ 269 Функция для удаления stale операций. Запускается раз в час. Удаляет все операции 270 которые были завершены больше часа назад 271 """ 272 273 self.logger.info("Удаление старых операций") 274 275 ids = [] 276 now = datetime.datetime.now() 277 278 for operation_meta in self.__operations.values(): 279 task_is_not_running = operation_meta.task.done() or operation_meta.task.cancelled() 280 281 # Если task фактически не работает то операцию нужно пометить как выполненную. Это может произойти в случае 282 # неотловленного исключения, например 283 if task_is_not_running: 284 operation_meta.was_finished = True 285 286 if not operation_meta.was_finished: 287 continue 288 289 time_delta = now - operation_meta.modified_at 290 291 # Для операций которые завершены больше часа назад 292 if time_delta.seconds < 60 * 60: 293 continue 294 295 # Если по какой-то причине операция была помечена как завершенная но task ещё активен 296 need_to_cancel_task = not any( 297 (operation_meta.task.done(), operation_meta.task.cancelled()) 298 ) 299 300 if need_to_cancel_task: 301 try: 302 operation_meta.task.cancel() 303 except Exception as e: 304 self.logger.error("Не удалось отменить task") 305 self.logger.exception(e) 306 307 continue 308 309 ids.append(operation_meta.operation_id) 310 311 self.logger.info(f"Операция {operation_meta.operation_id} помечена на удаление") 312 313 for operation_id in ids: 314 del self.__operations[operation_id] 315 316 self.logger.info("Удаление старых операций завершено") 317 318 def result_log(self, operation, operation_id: str, request, errors: list[str]): 319 """ 320 Вспомогательная функция для логгирования резултата выполнения операции 321 322 Arguments: 323 operation (str): название операции 324 operation_id (str): идентификатор операции 325 request: объект запроса 326 errors (list[str]): ошибки выполнения операции 327 """ 328 329 log_func = self.logger.error if errors else self.logger.info 330 331 log_func( 332 "Операция %s (%s) расширения %s для окружения %s пространства %s завершена %s" 333 % ( 334 operation, 335 operation_id, 336 self.extension_id, 337 request.env_id, 338 request.space_id, 339 "с ошибками: " + "; ".join(errors) if errors else "успешно" 340 ) 341 ) 342 343 def ext_request_results_from_exception(self, e: Exception): 344 """ 345 Вспомогательная функция для преобразования объектов исключений в объект результата запроса 346 347 Arguments: 348 e (Exception): любое исключение 349 Returns: 350 extension_service_pb2.ExtensionRequestResult 351 """ 352 353 return [ 354 extension_service_pb2.ExtensionRequestResult( 355 extension=self.extension_id, 356 state=extension_service_pb2.ExtensionRequestResult.State.ERROR, 357 error=str(e), 358 msg=None 359 ) 360 ] 361 362 def get_operation_meta(self, operation_id: str) -> typing.Optional[OperationMeta]: 363 """ 364 Получение объекта операции по идентификатору 365 366 Arguments: 367 operation_id (str): идентификатор операции 368 369 Returns: 370 OperationMeta | None 371 """ 372 373 return self.__operations.get(operation_id) 374 375 def set_operation_meta(self, operation_id: str, description: str, task: asyncio.Task): 376 """ 377 Функция для сохранения данных об операции 378 Arguments: 379 operation_id (str): идентификатор операции 380 description (str): описание операции 381 task (asyncio.Task): объект task операции 382 """ 383 384 self.__operations[operation_id] = OperationMeta( 385 operation_id=operation_id, 386 task=task, 387 description=description 388 ) 389 390 def mark_operation_as_finished(self, operation_id: str, errors: list[str] | None = None): 391 """ 392 Функция для завершения операции 393 394 Arguments: 395 operation_id (str): идентификатор операции 396 errors (list[str] | None): ошибки выполнения операции 397 """ 398 399 if operation_id in self.operations: 400 self.operations[operation_id].mark_finished(errors) 401 else: 402 self.logger.error(f"Операция {operation_id} не найдена!") 403 404 async def _Install(self, operation_id: str, request: extension_service_pb2.InstallRequest, context): 405 """ 406 Действия метода Install сервиса расширений. Вызывается после всех необходимых действий с 407 формированием данных об операции. В случае необходимости можно переопределить 408 409 Arguments: 410 operation_id (str): идентификатор операции 411 request (extension_service_pb2.InstallRequest): объект запроса 412 context: контекст 413 """ 414 415 errors_list = await self.extension_setup.install( 416 request.space_id, request.env_id, request.force 417 ) 418 419 errors_list += await self.additional_install_operations(operation_id, request, context) 420 421 self.result_log("установки", operation_id, request, errors_list) 422 423 self.mark_operation_as_finished(operation_id, errors_list) 424 425 async def additional_install_operations(self, operation_id: str, request: extension_service_pb2.InstallRequest, context) -> list[str]: 426 """ 427 Функция для возможности добавления любой дополнительной логики после завершения всех стандартных 428 процедур установки. Можно переопределять. Возвращает массив с ошибками. 429 430 Arguments: 431 operation_id (str): идентификатор операции 432 request (extension_service_pb2.InstallRequest): объект запроса 433 context: контекст 434 435 Returns: 436 list[str] 437 """ 438 439 return [] 440 441 async def Install(self, request: extension_service_pb2.InstallRequest, context): 442 """ 443 Реализация метода Install класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит 444 в себе формирование данных об операции. Переопределять не следует, для изменения логики Install 445 лучше работать с методами _Install если нужно всё радикально изменить или с методом 446 additional_install_operations для добавления чего-то дополнительного 447 """ 448 449 operation_id = generate_operation_id() 450 operation_description = "Установка расширения %s для окружения %s пространства %s. %s force" % ( 451 self.extension_id, 452 request.env_id, 453 request.space_id, 454 "С" if request.force else "Без" 455 ) 456 457 self.logger.info(operation_description) 458 459 install_task = asyncio.create_task(self._Install(operation_id, request, context)) 460 461 self.set_operation_meta(operation_id, operation_description, install_task) 462 463 return self.get_operation_meta(operation_id).to_operation() 464 465 async def _Uninstall(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context): 466 """ 467 Действия метода _Uninstall сервиса расширений. Вызывается после всех необходимых действий с 468 формированием данных об операции. В случае необходимости можно переопределить 469 470 Arguments: 471 operation_id (str): идентификатор операции 472 request (extension_service_pb2.UninstallRequest): объект запроса 473 context: контекст 474 """ 475 476 errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove) 477 errors_list += await self.additional_uninstall_operations(operation_id, request, context) 478 479 self.result_log("удаления", operation_id, request, errors_list) 480 481 self.mark_operation_as_finished(operation_id, errors_list) 482 483 async def additional_uninstall_operations(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context) -> list[str]: 484 """ 485 Функция для возможности добавления любой дополнительной логики после завершения всех стандартных 486 процедур удаления. Можно переопределять. Возвращает массив с ошибками. 487 488 Arguments: 489 operation_id (str): идентификатор операции 490 request (extension_service_pb2.UninstallRequest): объект запроса 491 context: контекст 492 493 Returns: 494 list[str] 495 """ 496 497 return [] 498 499 async def Uninstall(self, request: extension_service_pb2.UninstallRequest, context): 500 """ 501 Реализация метода Uninstall класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит 502 в себе формирование данных об операции. Переопределять не следует, для изменения логики Uninstall 503 лучше работать с методами _Uninstall если нужно всё радикально изменить или с методом 504 additional_uninstall_operations для добавления чего-то дополнительного 505 """ 506 507 operation_id = generate_operation_id() 508 operation_description = "Удаление расширения %s для окружения %s пространства %s. %s remove" % ( 509 self.extension_id, 510 request.env_id, 511 request.space_id, 512 "С" if request.remove else "Без" 513 ) 514 515 self.logger.info(operation_description) 516 517 uninstall_task = asyncio.create_task(self._Uninstall(operation_id, request, context)) 518 519 self.set_operation_meta(operation_id, operation_description, uninstall_task) 520 521 return self.get_operation_meta(operation_id).to_operation() 522 523 async def _Check(self, operation_id: str, request: extension_service_pb2.CheckRequest, context): 524 """ 525 Действия метода _Check сервиса расширений. Вызывается после всех необходимых действий с 526 формированием данных об операции. В случае необходимости можно переопределить 527 528 Arguments: 529 operation_id (str): идентификатор операции 530 request (extension_service_pb2.CheckRequest): объект запроса 531 context: контекст 532 """ 533 534 errors_list: list[str] = await self.extension_setup.check(request.space_id, request.env_id) 535 536 self.result_log("проверки", operation_id, request, errors_list) 537 538 self.__operations[operation_id].mark_finished(errors_list) 539 540 async def additional_check_operations(self, operation_id: str, request: extension_service_pb2.CheckRequest, context) -> list[str]: 541 """ 542 Функция для возможности добавления любой дополнительной логики после завершения всех стандартных 543 процедур проверки. Можно переопределять. Возвращает массив с ошибками. 544 545 Arguments: 546 operation_id (str): идентификатор операции 547 request (extension_service_pb2.CheckRequest): объект запроса 548 context: контекст 549 550 Returns: 551 list[str] 552 """ 553 554 return [] 555 556 async def Check(self, request: extension_service_pb2.CheckRequest, context): 557 """ 558 Реализация метода Check класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит 559 в себе формирование данных об операции. Переопределять не следует, для изменения логики Check 560 лучше работать с методами _Check если нужно всё радикально изменить или с методом 561 additional_check_operations для добавления чего-то дополнительного 562 """ 563 564 operation_id = generate_operation_id() 565 operation_description = "Проверка расширения %s для окружения %s пространства %s" % ( 566 self.extension_id, 567 request.env_id, 568 request.space_id, 569 ) 570 571 check_task = asyncio.create_task(self._Check(operation_id, request, context)) 572 573 self.set_operation_meta(operation_id, operation_description, check_task) 574 575 return self.get_operation_meta(operation_id).to_operation() 576 577 async def _dispatch_action( 578 self, request: extension_pb2.ActionRequest, context 579 ) -> extension_service_pb2.ActionResponse: 580 """ 581 Метод для получение нужной ф-ции для действия и её вызова 582 583 Arguments: 584 request (extension_pb2.ActionRequest): объект запроса 585 context: контекст 586 587 Returns: 588 extension_service_pb2.ActionResponse 589 """ 590 591 action_id = request.action.split("/")[-1] 592 593 func_name = f"action_{action_id}" 594 595 if not hasattr(self, func_name): 596 response = extension_service_pb2.ActionResponse( 597 state=extension_service_pb2.ActionResponse.State.ERROR, 598 title="Невозможно выполнить действие", 599 error=f"В расширении отсутсвует функция {action_id}" 600 ) 601 else: 602 func = getattr(self, func_name) 603 604 response = await func(request, context) 605 606 return response 607 608 async def Action(self, request: extension_pb2.ActionRequest, context): 609 """ 610 Реализация метода Action класса extension_service_pb2_grpc.ExtensionServiceServicer 611 612 Arguments: 613 request (extension_pb2.ActionRequest): объект запроса 614 context: контекст 615 """ 616 617 operation_description = "Действие %s для окружения %s пространства %s" % ( 618 request.action, 619 request.env_id, 620 request.space_id, 621 ) 622 623 self.logger.info(operation_description) 624 625 response = await self._dispatch_action(request, context) 626 627 if response.state == extension_service_pb2.ActionResponse.State.ERROR: 628 error_description = "Ошибка действия %s в расширении %s: %s" % ( 629 request.action, 630 self.extension_id, 631 response.error 632 ) 633 634 self.logger.error(error_description) 635 636 return response 637 638 async def Get(self, request: operation_service_pb2.GetOperationRequest, context): 639 """ 640 Реализация метода Get класса operation_service_pb2_grpc.OperationServiceServicer. Используется 641 для получения данных о выполняемой операции 642 643 Arguments: 644 request (operation_service_pb2.GetOperationRequest): объект запроса 645 context: контекст 646 """ 647 648 operations_meta = self.get_operation_meta(request.operation_id) 649 650 if not operations_meta: 651 error_description = "Ошибка проверки операции %s в расширении %s - не найдена" % ( 652 request.operation_id, 653 self.extension_id, 654 ) 655 656 context.set_code(grpc.StatusCode.UNKNOWN) 657 context.set_details(error_description) 658 659 self.logger.error(error_description) 660 661 return None 662 663 self.logger.info( 664 f"Статус операции {operations_meta.operation_id}:\n" 665 f"{operations_meta.description}\n" 666 f"{'выполняется' if not operations_meta.was_finished else 'завершена'}, \n" 667 f"{'без ошибок' if not operations_meta.errors else 'ошибки: ' + '; '.join(operations_meta.errors)}" 668 ) 669 670 return operations_meta.to_operation() 671 672 def Cancel(self, request: operation_service_pb2.CancelOperationRequest, context): 673 """ 674 Реализация метода Cancel класса operation_service_pb2_grpc.OperationServiceServicer. Используется 675 для отмены выполнения операции 676 677 Arguments: 678 request (operation_service_pb2.CancelOperationRequest): объект запроса 679 context: контекст 680 """ 681 682 operations_meta = self.get_operation_meta(request.operation_id) 683 684 if not operations_meta: 685 error_description = "Не удалось удалить операцию %s в расширении %s - не найдена" % ( 686 request.operation_id, 687 self.extension_id, 688 ) 689 690 context.set_code(grpc.StatusCode.UNKNOWN) 691 context.set_details(error_description) 692 693 self.logger.error( 694 error_description 695 ) 696 697 return None 698 699 try: 700 operations_meta.task.cancel() 701 operations_meta.mark_cancelled() 702 except Exception as e: 703 error_description = "Во время отмены операции %s расширении %s произошла ошибка %s" % ( 704 request.operation_id, 705 self.extension_id, 706 str(e) 707 ) 708 709 self.logger.error(error_description) 710 711 context.set_code(grpc.StatusCode.UNKNOWN) 712 context.set_details(error_description) 713 714 self.logger.exception(e) 715 716 return None 717 718 self.logger.info( 719 "Операция %s в расширении %s отменена" % ( 720 request.operation_id, 721 self.extension_id, 722 ) 723 ) 724 725 return operations_meta.to_operation()
Базовый класс для сервисов расширений
Attributes:
- extension_id (str): идентификатор расширения
- collections (list[collections_pb2.Collection]): список коллекций расширения
- roles (list[roles_pb2.Role]): список ролей расширения
- clients (list[clients_pb2.Client]): список клиентов расширения
- actions (list[dict]): список действий расширения
- items (list[AbstractItem]): список управляемых расширением item'ов
- __operations (dict[str, OperationMeta]): маппинг с данными операций
- logger (logging.Logger): логгер расширения
- ext_manager_service (manager_service_pb2_grpc.ExtensionManagerServiceStub): ссылка на сервис менеджера расширений
- collections_service (collections_pb2_grpc.CollectionsStub): ссылка на сервис коллекций
- environments_service (environments_pb2_grpc.EnvironmentsStub): ссылка на сервис окружений
- roles_service (roles_pb2_grpc.RolesStub): ссылка на сервис ролей
- clients_service (clients_pb2_grpc.ClientsStub): ссылка на сервис клиентов
- items_service (items_pb2_grpc.ItemsStub): ссылка на сервис item'ов
- spaces_service (spaces_pb2_grpc.SpacesStub): ссылка на сервис пространств
- files_service (files_pb2_grpc.FilesStub): ссылка на сервис файлов
- images_service (images_pb2_grpc.ImagesStub): ссылка на сервис изображений
- references_service (references_pb2_grpc.ReferencesStub): ссылка на сервис для работы с объектами Reference
- users_service (users_pb2_grpc.UsersStub): ссылка на сервис пользователей
- organizations_service (organizations_pb2_grpc.OrganizationsStub): ссылка на сервис организаций
- members_service (members_pb2_grpc.MembersStub): ссылка на сервис работы с пользователями организаций
- locales_service (locales_pb2_grpc.LocalesStub): ссылка на сервис локалей и переводов
- invitations_service (invitations_pb2_grpc.InvitationsStub): ссылка на сервис приглашений
- collaborators_service (collaborators_pb2_grpc.CollaboratorsStub): ссылка на сервис коллабораторов
- channel (grpc.Channel): ссылка на grpc канал
- extension_setup (ExtensionSetup): объект класса ExtensionSetup для управления данными расширения
188 def __init__(self, 189 ext_manager_service: manager_service_pb2_grpc.ExtensionManagerServiceStub, 190 collections_service: collections_pb2_grpc.CollectionsStub, 191 environments_service: environments_pb2_grpc.EnvironmentsStub, 192 roles_service: roles_pb2_grpc.RolesStub, 193 clients_service: clients_pb2_grpc.ClientsStub, 194 items_service: items_pb2_grpc.ItemsStub, 195 spaces_service: spaces_pb2_grpc.SpacesStub, 196 files_service: files_pb2_grpc.FilesStub, 197 images_service: images_pb2_grpc.ImagesStub, 198 references_service: references_pb2_grpc.ReferencesStub, 199 users_service: users_pb2_grpc.UsersStub, 200 organizations_service: organizations_pb2_grpc.OrganizationsStub, 201 members_service: members_pb2_grpc.MembersStub, 202 locales_service: locales_pb2_grpc.LocalesStub, 203 invitations_service: invitations_pb2_grpc.InvitationsStub, 204 collaborators_service: collaborators_pb2_grpc.CollaboratorsStub, 205 channel: grpc.Channel, 206 ): 207 self.logger = logging.getLogger(__name__) 208 self.ext_manager_service = ext_manager_service 209 self.collections_service = collections_service 210 self.environments_service = environments_service 211 self.roles_service = roles_service 212 self.clients_service = clients_service 213 self.items_service = items_service 214 self.spaces_service = spaces_service 215 self.files_service = files_service 216 self.images_service = images_service 217 self.references_service = references_service 218 self.users_service = users_service 219 self.organizations_service = organizations_service 220 self.members_service = members_service 221 self.locales_service = locales_service 222 self.invitations_service = invitations_service 223 self.collaborators_service = collaborators_service 224 self.channel = channel 225 226 self.extension_setup = ExtensionSetup( 227 self.collections_service, self.environments_service, 228 self.roles_service, self.clients_service, self.items_service, 229 ) 230 231 self.__operations = {} 232 233 for collection in self.collections or []: 234 self.extension_setup.add_collection(collection) 235 236 for role in self.roles or []: 237 self.extension_setup.add_role(role) 238 239 for client in self.clients or []: 240 self.extension_setup.add_client(client) 241 242 for action in self.actions or []: 243 self.extension_setup.add_action(action) 244 245 services_list = [ 246 (service_name, getattr(self, service_name)) 247 for service_name 248 in self.__dict__ 249 if service_name.endswith("_service") 250 ] 251 services_list.append(("channel", self.channel, )) 252 253 for item in self.items: 254 for rule in item.rules: 255 rule.bind_services(services_list) 256 257 self.extension_setup.set_items(self.items) 258 259 @aiocron.crontab('0 * * * *', start=True) 260 async def remove_old_operations(): 261 self.remove_old_operations()
267 def remove_old_operations(self): 268 """ 269 Функция для удаления stale операций. Запускается раз в час. Удаляет все операции 270 которые были завершены больше часа назад 271 """ 272 273 self.logger.info("Удаление старых операций") 274 275 ids = [] 276 now = datetime.datetime.now() 277 278 for operation_meta in self.__operations.values(): 279 task_is_not_running = operation_meta.task.done() or operation_meta.task.cancelled() 280 281 # Если task фактически не работает то операцию нужно пометить как выполненную. Это может произойти в случае 282 # неотловленного исключения, например 283 if task_is_not_running: 284 operation_meta.was_finished = True 285 286 if not operation_meta.was_finished: 287 continue 288 289 time_delta = now - operation_meta.modified_at 290 291 # Для операций которые завершены больше часа назад 292 if time_delta.seconds < 60 * 60: 293 continue 294 295 # Если по какой-то причине операция была помечена как завершенная но task ещё активен 296 need_to_cancel_task = not any( 297 (operation_meta.task.done(), operation_meta.task.cancelled()) 298 ) 299 300 if need_to_cancel_task: 301 try: 302 operation_meta.task.cancel() 303 except Exception as e: 304 self.logger.error("Не удалось отменить task") 305 self.logger.exception(e) 306 307 continue 308 309 ids.append(operation_meta.operation_id) 310 311 self.logger.info(f"Операция {operation_meta.operation_id} помечена на удаление") 312 313 for operation_id in ids: 314 del self.__operations[operation_id] 315 316 self.logger.info("Удаление старых операций завершено")
Функция для удаления stale операций. Запускается раз в час. Удаляет все операции которые были завершены больше часа назад
318 def result_log(self, operation, operation_id: str, request, errors: list[str]): 319 """ 320 Вспомогательная функция для логгирования резултата выполнения операции 321 322 Arguments: 323 operation (str): название операции 324 operation_id (str): идентификатор операции 325 request: объект запроса 326 errors (list[str]): ошибки выполнения операции 327 """ 328 329 log_func = self.logger.error if errors else self.logger.info 330 331 log_func( 332 "Операция %s (%s) расширения %s для окружения %s пространства %s завершена %s" 333 % ( 334 operation, 335 operation_id, 336 self.extension_id, 337 request.env_id, 338 request.space_id, 339 "с ошибками: " + "; ".join(errors) if errors else "успешно" 340 ) 341 )
Вспомогательная функция для логгирования резултата выполнения операции
Arguments:
- operation (str): название операции
- operation_id (str): идентификатор операции
- request: объект запроса
- errors (list[str]): ошибки выполнения операции
343 def ext_request_results_from_exception(self, e: Exception): 344 """ 345 Вспомогательная функция для преобразования объектов исключений в объект результата запроса 346 347 Arguments: 348 e (Exception): любое исключение 349 Returns: 350 extension_service_pb2.ExtensionRequestResult 351 """ 352 353 return [ 354 extension_service_pb2.ExtensionRequestResult( 355 extension=self.extension_id, 356 state=extension_service_pb2.ExtensionRequestResult.State.ERROR, 357 error=str(e), 358 msg=None 359 ) 360 ]
Вспомогательная функция для преобразования объектов исключений в объект результата запроса
Arguments:
- e (Exception): любое исключение
Returns:
extension_service_pb2.ExtensionRequestResult
362 def get_operation_meta(self, operation_id: str) -> typing.Optional[OperationMeta]: 363 """ 364 Получение объекта операции по идентификатору 365 366 Arguments: 367 operation_id (str): идентификатор операции 368 369 Returns: 370 OperationMeta | None 371 """ 372 373 return self.__operations.get(operation_id)
Получение объекта операции по идентификатору
Arguments:
- operation_id (str): идентификатор операции
Returns:
OperationMeta | None
375 def set_operation_meta(self, operation_id: str, description: str, task: asyncio.Task): 376 """ 377 Функция для сохранения данных об операции 378 Arguments: 379 operation_id (str): идентификатор операции 380 description (str): описание операции 381 task (asyncio.Task): объект task операции 382 """ 383 384 self.__operations[operation_id] = OperationMeta( 385 operation_id=operation_id, 386 task=task, 387 description=description 388 )
Функция для сохранения данных об операции
Arguments:
- operation_id (str): идентификатор операции
- description (str): описание операции
- task (asyncio.Task): объект task операции
390 def mark_operation_as_finished(self, operation_id: str, errors: list[str] | None = None): 391 """ 392 Функция для завершения операции 393 394 Arguments: 395 operation_id (str): идентификатор операции 396 errors (list[str] | None): ошибки выполнения операции 397 """ 398 399 if operation_id in self.operations: 400 self.operations[operation_id].mark_finished(errors) 401 else: 402 self.logger.error(f"Операция {operation_id} не найдена!")
Функция для завершения операции
Arguments:
- operation_id (str): идентификатор операции
- errors (list[str] | None): ошибки выполнения операции
425 async def additional_install_operations(self, operation_id: str, request: extension_service_pb2.InstallRequest, context) -> list[str]: 426 """ 427 Функция для возможности добавления любой дополнительной логики после завершения всех стандартных 428 процедур установки. Можно переопределять. Возвращает массив с ошибками. 429 430 Arguments: 431 operation_id (str): идентификатор операции 432 request (extension_service_pb2.InstallRequest): объект запроса 433 context: контекст 434 435 Returns: 436 list[str] 437 """ 438 439 return []
Функция для возможности добавления любой дополнительной логики после завершения всех стандартных процедур установки. Можно переопределять. Возвращает массив с ошибками.
Arguments:
- operation_id (str): идентификатор операции
- request (extension_service_pb2.InstallRequest): объект запроса
- context: контекст
Returns:
list[str]
441 async def Install(self, request: extension_service_pb2.InstallRequest, context): 442 """ 443 Реализация метода Install класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит 444 в себе формирование данных об операции. Переопределять не следует, для изменения логики Install 445 лучше работать с методами _Install если нужно всё радикально изменить или с методом 446 additional_install_operations для добавления чего-то дополнительного 447 """ 448 449 operation_id = generate_operation_id() 450 operation_description = "Установка расширения %s для окружения %s пространства %s. %s force" % ( 451 self.extension_id, 452 request.env_id, 453 request.space_id, 454 "С" if request.force else "Без" 455 ) 456 457 self.logger.info(operation_description) 458 459 install_task = asyncio.create_task(self._Install(operation_id, request, context)) 460 461 self.set_operation_meta(operation_id, operation_description, install_task) 462 463 return self.get_operation_meta(operation_id).to_operation()
Реализация метода Install класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит в себе формирование данных об операции. Переопределять не следует, для изменения логики Install лучше работать с методами _Install если нужно всё радикально изменить или с методом additional_install_operations для добавления чего-то дополнительного
483 async def additional_uninstall_operations(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context) -> list[str]: 484 """ 485 Функция для возможности добавления любой дополнительной логики после завершения всех стандартных 486 процедур удаления. Можно переопределять. Возвращает массив с ошибками. 487 488 Arguments: 489 operation_id (str): идентификатор операции 490 request (extension_service_pb2.UninstallRequest): объект запроса 491 context: контекст 492 493 Returns: 494 list[str] 495 """ 496 497 return []
Функция для возможности добавления любой дополнительной логики после завершения всех стандартных процедур удаления. Можно переопределять. Возвращает массив с ошибками.
Arguments:
- operation_id (str): идентификатор операции
- request (extension_service_pb2.UninstallRequest): объект запроса
- context: контекст
Returns:
list[str]
499 async def Uninstall(self, request: extension_service_pb2.UninstallRequest, context): 500 """ 501 Реализация метода Uninstall класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит 502 в себе формирование данных об операции. Переопределять не следует, для изменения логики Uninstall 503 лучше работать с методами _Uninstall если нужно всё радикально изменить или с методом 504 additional_uninstall_operations для добавления чего-то дополнительного 505 """ 506 507 operation_id = generate_operation_id() 508 operation_description = "Удаление расширения %s для окружения %s пространства %s. %s remove" % ( 509 self.extension_id, 510 request.env_id, 511 request.space_id, 512 "С" if request.remove else "Без" 513 ) 514 515 self.logger.info(operation_description) 516 517 uninstall_task = asyncio.create_task(self._Uninstall(operation_id, request, context)) 518 519 self.set_operation_meta(operation_id, operation_description, uninstall_task) 520 521 return self.get_operation_meta(operation_id).to_operation()
Реализация метода Uninstall класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит в себе формирование данных об операции. Переопределять не следует, для изменения логики Uninstall лучше работать с методами _Uninstall если нужно всё радикально изменить или с методом additional_uninstall_operations для добавления чего-то дополнительного
540 async def additional_check_operations(self, operation_id: str, request: extension_service_pb2.CheckRequest, context) -> list[str]: 541 """ 542 Функция для возможности добавления любой дополнительной логики после завершения всех стандартных 543 процедур проверки. Можно переопределять. Возвращает массив с ошибками. 544 545 Arguments: 546 operation_id (str): идентификатор операции 547 request (extension_service_pb2.CheckRequest): объект запроса 548 context: контекст 549 550 Returns: 551 list[str] 552 """ 553 554 return []
Функция для возможности добавления любой дополнительной логики после завершения всех стандартных процедур проверки. Можно переопределять. Возвращает массив с ошибками.
Arguments:
- operation_id (str): идентификатор операции
- request (extension_service_pb2.CheckRequest): объект запроса
- context: контекст
Returns:
list[str]
556 async def Check(self, request: extension_service_pb2.CheckRequest, context): 557 """ 558 Реализация метода Check класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит 559 в себе формирование данных об операции. Переопределять не следует, для изменения логики Check 560 лучше работать с методами _Check если нужно всё радикально изменить или с методом 561 additional_check_operations для добавления чего-то дополнительного 562 """ 563 564 operation_id = generate_operation_id() 565 operation_description = "Проверка расширения %s для окружения %s пространства %s" % ( 566 self.extension_id, 567 request.env_id, 568 request.space_id, 569 ) 570 571 check_task = asyncio.create_task(self._Check(operation_id, request, context)) 572 573 self.set_operation_meta(operation_id, operation_description, check_task) 574 575 return self.get_operation_meta(operation_id).to_operation()
Реализация метода Check класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит в себе формирование данных об операции. Переопределять не следует, для изменения логики Check лучше работать с методами _Check если нужно всё радикально изменить или с методом additional_check_operations для добавления чего-то дополнительного
608 async def Action(self, request: extension_pb2.ActionRequest, context): 609 """ 610 Реализация метода Action класса extension_service_pb2_grpc.ExtensionServiceServicer 611 612 Arguments: 613 request (extension_pb2.ActionRequest): объект запроса 614 context: контекст 615 """ 616 617 operation_description = "Действие %s для окружения %s пространства %s" % ( 618 request.action, 619 request.env_id, 620 request.space_id, 621 ) 622 623 self.logger.info(operation_description) 624 625 response = await self._dispatch_action(request, context) 626 627 if response.state == extension_service_pb2.ActionResponse.State.ERROR: 628 error_description = "Ошибка действия %s в расширении %s: %s" % ( 629 request.action, 630 self.extension_id, 631 response.error 632 ) 633 634 self.logger.error(error_description) 635 636 return response
Реализация метода Action класса extension_service_pb2_grpc.ExtensionServiceServicer
Arguments:
- request (extension_pb2.ActionRequest): объект запроса
- context: контекст
638 async def Get(self, request: operation_service_pb2.GetOperationRequest, context): 639 """ 640 Реализация метода Get класса operation_service_pb2_grpc.OperationServiceServicer. Используется 641 для получения данных о выполняемой операции 642 643 Arguments: 644 request (operation_service_pb2.GetOperationRequest): объект запроса 645 context: контекст 646 """ 647 648 operations_meta = self.get_operation_meta(request.operation_id) 649 650 if not operations_meta: 651 error_description = "Ошибка проверки операции %s в расширении %s - не найдена" % ( 652 request.operation_id, 653 self.extension_id, 654 ) 655 656 context.set_code(grpc.StatusCode.UNKNOWN) 657 context.set_details(error_description) 658 659 self.logger.error(error_description) 660 661 return None 662 663 self.logger.info( 664 f"Статус операции {operations_meta.operation_id}:\n" 665 f"{operations_meta.description}\n" 666 f"{'выполняется' if not operations_meta.was_finished else 'завершена'}, \n" 667 f"{'без ошибок' if not operations_meta.errors else 'ошибки: ' + '; '.join(operations_meta.errors)}" 668 ) 669 670 return operations_meta.to_operation()
Реализация метода Get класса operation_service_pb2_grpc.OperationServiceServicer. Используется для получения данных о выполняемой операции
Arguments:
- request (operation_service_pb2.GetOperationRequest): объект запроса
- context: контекст
672 def Cancel(self, request: operation_service_pb2.CancelOperationRequest, context): 673 """ 674 Реализация метода Cancel класса operation_service_pb2_grpc.OperationServiceServicer. Используется 675 для отмены выполнения операции 676 677 Arguments: 678 request (operation_service_pb2.CancelOperationRequest): объект запроса 679 context: контекст 680 """ 681 682 operations_meta = self.get_operation_meta(request.operation_id) 683 684 if not operations_meta: 685 error_description = "Не удалось удалить операцию %s в расширении %s - не найдена" % ( 686 request.operation_id, 687 self.extension_id, 688 ) 689 690 context.set_code(grpc.StatusCode.UNKNOWN) 691 context.set_details(error_description) 692 693 self.logger.error( 694 error_description 695 ) 696 697 return None 698 699 try: 700 operations_meta.task.cancel() 701 operations_meta.mark_cancelled() 702 except Exception as e: 703 error_description = "Во время отмены операции %s расширении %s произошла ошибка %s" % ( 704 request.operation_id, 705 self.extension_id, 706 str(e) 707 ) 708 709 self.logger.error(error_description) 710 711 context.set_code(grpc.StatusCode.UNKNOWN) 712 context.set_details(error_description) 713 714 self.logger.exception(e) 715 716 return None 717 718 self.logger.info( 719 "Операция %s в расширении %s отменена" % ( 720 request.operation_id, 721 self.extension_id, 722 ) 723 ) 724 725 return operations_meta.to_operation()
Реализация метода Cancel класса operation_service_pb2_grpc.OperationServiceServicer. Используется для отмены выполнения операции
Arguments:
- request (operation_service_pb2.CancelOperationRequest): объект запроса
- context: контекст