perxis.extensions.extension_service

Модуль содержит базовый класс сервисов расширений - ExtensionService

  1"""
  2    Модуль содержит базовый класс сервисов расширений - ExtensionService
  3"""
  4
  5
  6import grpc
  7import uuid
  8import typing
  9import asyncio
 10import logging
 11import aiocron
 12import datetime
 13import dataclasses
 14
 15from google.protobuf import timestamp_pb2, any_pb2, wrappers_pb2
 16
 17from perxis.extensions import extension_service_pb2, extension_service_pb2_grpc, extension_pb2, manager_service_pb2_grpc
 18from perxis.collaborators import collaborators_pb2_grpc
 19from perxis.invitations import invitations_pb2_grpc
 20from perxis.locales import locales_pb2_grpc
 21from perxis.organizations import organizations_pb2_grpc
 22from perxis.members import members_pb2_grpc
 23from perxis.users import users_pb2_grpc
 24from perxis.references import references_pb2_grpc
 25from perxis.images import images_pb2_grpc
 26from perxis.files import files_pb2_grpc
 27from perxis.roles import roles_pb2_grpc, roles_pb2
 28from perxis.items import items_pb2_grpc
 29from perxis.clients import clients_pb2_grpc, clients_pb2
 30from perxis.spaces import spaces_pb2_grpc
 31from perxis.common import operation_pb2, operation_service_pb2_grpc, operation_service_pb2, error_pb2
 32from perxis.collections import collections_pb2_grpc, collections_pb2
 33from perxis.environments import environments_pb2_grpc
 34from perxis.extensions.extension_setup import ExtensionSetup
 35from perxis.extensions.item_models import AbstractItem
 36
 37
 38def generate_operation_id() -> str:
 39    """
 40        Функция для генерации идентификатора операции
 41
 42        Returns:
 43            str
 44    """
 45
 46    return str(uuid.uuid4())
 47
 48
 49def datetime_to_timestamp(dt: datetime) -> timestamp_pb2.Timestamp:
 50    """
 51        Функция для преобразования объекта datetime в объект timestamp_pb2.Timestamp
 52
 53        Arguments:
 54            dt (datetime)
 55        Returns:
 56            timestamp_pb2.Timestamp
 57    """
 58
 59    timestamp = dt.timestamp()
 60    seconds = int(timestamp)
 61    nanos = int(timestamp % 1 * 1e9)
 62
 63    return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
 64
 65
 66@dataclasses.dataclass
 67class OperationMeta:
 68    """
 69        Класс для хранения метаданных операции
 70
 71        Attributes:
 72            task (asyncio.Task): ссылка на task операции
 73            operation_id (str): идентификатор операции
 74            description (str): описание операции
 75            created_at (datetime.datetime): дата создания операции
 76            modified_at (datetime.datetime): дата изменения метаданных операции
 77            response (str | None): результат операции
 78            errors (list[str]): возникшие во время выполнения операции ошибки
 79            was_finished (bool): флаг завершения операции
 80            metadata (dict): любые прочие метаданные операции
 81    """
 82
 83    task: asyncio.Task
 84
 85    operation_id: str
 86    description: str
 87
 88    created_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now)
 89    modified_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now)
 90
 91    response: typing.Optional[str] = None
 92    errors: typing.Optional[list[str]] = None
 93
 94    was_finished: bool = False
 95    metadata: dict[str, typing.Any] = dataclasses.field(default_factory=dict)
 96
 97    def mark_cancelled(self):
 98        """
 99            Функция для отмены операции
100        """
101
102        self.was_finished = True
103        self.response = "Отменено"
104
105    def mark_finished(self, errors: typing.Optional[list[str]] = None):
106        """
107            Функция для завершения операции
108
109            Arguments:
110                errors (list[str] | None): список ошибок
111        """
112
113        self.modified_at = datetime.datetime.now()
114        self.was_finished = True
115        self.errors = errors
116        self.response = None if errors else "OK"
117
118    def to_operation(self):
119        """
120            Функция для преобразования объекта операции в формат perxis
121
122            Returns:
123                operation_pb2.Operation
124        """
125
126        packed_response = any_pb2.Any()
127        packed_response.Pack(wrappers_pb2.StringValue(value=self.response or "PENDING"))
128
129        return operation_pb2.Operation(
130            id=self.operation_id,
131            description=self.description,
132            created_at=datetime_to_timestamp(self.created_at),
133            modified_at=datetime_to_timestamp(self.modified_at),
134            done=self.was_finished,
135            metadata=self.metadata,
136            response=packed_response,
137            error=error_pb2.Error(
138                message="; ".join(self.errors)
139            ) if self.errors else None
140        )
141
142
143class ExtensionService(
144    extension_service_pb2_grpc.ExtensionServiceServicer, operation_service_pb2_grpc.OperationServiceServicer
145):
146    """
147        Базовый класс для сервисов расширений
148
149        Attributes:
150            extension_id (str): идентификатор расширения
151            collections (list[collections_pb2.Collection]): список коллекций расширения
152            roles (list[roles_pb2.Role]): список ролей расширения
153            clients (list[clients_pb2.Client]): список клиентов расширения
154            actions (list[dict]): список действий расширения
155            items (list[AbstractItem]): список управляемых расширением item'ов
156            __operations (dict[str, OperationMeta]): маппинг с данными операций
157            logger (logging.Logger): логгер расширения
158            ext_manager_service (manager_service_pb2_grpc.ExtensionManagerServiceStub): ссылка на сервис менеджера расширений
159            collections_service (collections_pb2_grpc.CollectionsStub): ссылка на сервис коллекций
160            environments_service (environments_pb2_grpc.EnvironmentsStub): ссылка на сервис окружений
161            roles_service (roles_pb2_grpc.RolesStub): ссылка на сервис ролей
162            clients_service (clients_pb2_grpc.ClientsStub): ссылка на сервис клиентов
163            items_service (items_pb2_grpc.ItemsStub): ссылка на сервис item'ов
164            spaces_service (spaces_pb2_grpc.SpacesStub): ссылка на сервис пространств
165            files_service (files_pb2_grpc.FilesStub): ссылка на сервис файлов
166            images_service (images_pb2_grpc.ImagesStub): ссылка на сервис изображений
167            references_service (references_pb2_grpc.ReferencesStub): ссылка на сервис для работы с объектами Reference
168            users_service (users_pb2_grpc.UsersStub): ссылка на сервис пользователей
169            organizations_service (organizations_pb2_grpc.OrganizationsStub): ссылка на сервис организаций
170            members_service (members_pb2_grpc.MembersStub): ссылка на сервис работы с пользователями организаций
171            locales_service (locales_pb2_grpc.LocalesStub): ссылка на сервис локалей и переводов
172            invitations_service (invitations_pb2_grpc.InvitationsStub): ссылка на сервис приглашений
173            collaborators_service (collaborators_pb2_grpc.CollaboratorsStub): ссылка на сервис коллабораторов
174            channel (grpc.Channel): ссылка на grpc канал
175            extension_setup (ExtensionSetup): объект класса ExtensionSetup для управления данными расширения
176    """
177
178    extension_id: str
179    collections: list[collections_pb2.Collection] = []
180    roles: list[roles_pb2.Role] = []
181    clients: list[clients_pb2.Client] = []
182    actions: list[dict] = []
183    items: list[AbstractItem] = []
184
185    __operations: dict[str, OperationMeta]
186
187    def __init__(self,
188                 ext_manager_service: manager_service_pb2_grpc.ExtensionManagerServiceStub,
189                 collections_service: collections_pb2_grpc.CollectionsStub,
190                 environments_service: environments_pb2_grpc.EnvironmentsStub,
191                 roles_service: roles_pb2_grpc.RolesStub,
192                 clients_service: clients_pb2_grpc.ClientsStub,
193                 items_service: items_pb2_grpc.ItemsStub,
194                 spaces_service: spaces_pb2_grpc.SpacesStub,
195                 files_service: files_pb2_grpc.FilesStub,
196                 images_service: images_pb2_grpc.ImagesStub,
197                 references_service: references_pb2_grpc.ReferencesStub,
198                 users_service: users_pb2_grpc.UsersStub,
199                 organizations_service: organizations_pb2_grpc.OrganizationsStub,
200                 members_service: members_pb2_grpc.MembersStub,
201                 locales_service: locales_pb2_grpc.LocalesStub,
202                 invitations_service: invitations_pb2_grpc.InvitationsStub,
203                 collaborators_service: collaborators_pb2_grpc.CollaboratorsStub,
204                 channel: grpc.Channel,
205    ):
206        self.logger = logging.getLogger(__name__)
207        self.ext_manager_service = ext_manager_service
208        self.collections_service = collections_service
209        self.environments_service = environments_service
210        self.roles_service = roles_service
211        self.clients_service = clients_service
212        self.items_service = items_service
213        self.spaces_service = spaces_service
214        self.files_service = files_service
215        self.images_service = images_service
216        self.references_service = references_service
217        self.users_service = users_service
218        self.organizations_service = organizations_service
219        self.members_service = members_service
220        self.locales_service = locales_service
221        self.invitations_service = invitations_service
222        self.collaborators_service = collaborators_service
223        self.channel = channel
224
225        self.extension_setup = ExtensionSetup(
226            self.collections_service, self.environments_service,
227            self.roles_service, self.clients_service, self.items_service,
228        )
229
230        self.__operations = {}
231
232        for collection in self.collections or []:
233            self.extension_setup.add_collection(collection)
234
235        for role in self.roles or []:
236            self.extension_setup.add_role(role)
237
238        for client in self.clients or []:
239            self.extension_setup.add_client(client)
240
241        for action in self.actions or []:
242            self.extension_setup.add_action(action)
243
244        services_list = [
245            (service_name, getattr(self, service_name))
246            for service_name
247            in self.__dict__
248            if service_name.endswith("_service")
249        ]
250        services_list.append(("channel", self.channel, ))
251
252        for item in self.items:
253            for rule in item.rules:
254                rule.bind_services(services_list)
255
256        self.extension_setup.set_items(self.items)
257
258        @aiocron.crontab('0 * * * *', start=True)
259        async def remove_old_operations():
260            self.remove_old_operations()
261
262    @property
263    def operations(self):
264        return self.__operations
265
266    def remove_old_operations(self):
267        """
268            Функция для удаления stale операций. Запускается раз в час. Удаляет все операции
269            которые были завершены больше часа назад
270        """
271
272        self.logger.info("Удаление старых операций")
273
274        ids = []
275        now = datetime.datetime.now()
276
277        for operation_meta in self.__operations.values():
278            task_is_not_running = operation_meta.task.done() or operation_meta.task.cancelled()
279
280            # Если task фактически не работает то операцию нужно пометить как выполненную. Это может произойти в случае
281            # неотловленного исключения, например
282            if task_is_not_running:
283                operation_meta.was_finished = True
284
285            if not operation_meta.was_finished:
286                continue
287
288            time_delta = now - operation_meta.modified_at
289
290            # Для операций которые завершены больше часа назад
291            if time_delta.seconds < 60 * 60:
292                continue
293
294            # Если по какой-то причине операция была помечена как завершенная но task ещё активен
295            need_to_cancel_task = not any(
296                (operation_meta.task.done(), operation_meta.task.cancelled())
297            )
298
299            if need_to_cancel_task:
300                try:
301                    operation_meta.task.cancel()
302                except Exception as e:
303                    self.logger.error("Не удалось отменить task")
304                    self.logger.exception(e)
305
306                    continue
307
308            ids.append(operation_meta.operation_id)
309
310            self.logger.info(f"Операция {operation_meta.operation_id} помечена на удаление")
311
312        for operation_id in ids:
313            del self.__operations[operation_id]
314
315        self.logger.info("Удаление старых операций завершено")
316
317    def result_log(self, operation, operation_id: str, request, errors: list[str]):
318        """
319            Вспомогательная функция для логгирования резултата выполнения операции
320
321            Arguments:
322                  operation (str): название операции
323                  operation_id (str): идентификатор операции
324                  request: объект запроса
325                  errors (list[str]): ошибки выполнения операции
326        """
327
328        log_func = self.logger.error if errors else self.logger.info
329
330        log_func(
331            "Операция %s (%s) расширения %s для окружения %s пространства %s завершена %s"
332            % (
333                operation,
334                operation_id,
335                self.extension_id,
336                request.env_id,
337                request.space_id,
338                "с ошибками: " + "; ".join(errors) if errors else "успешно"
339            )
340        )
341
342    def ext_request_results_from_exception(self, e: Exception):
343        """
344            Вспомогательная функция для преобразования объектов исключений в объект результата запроса
345
346            Arguments:
347                e (Exception): любое исключение
348            Returns:
349                extension_service_pb2.ExtensionRequestResult
350        """
351
352        return [
353            extension_service_pb2.ExtensionRequestResult(
354                extension=self.extension_id,
355                state=extension_service_pb2.ExtensionRequestResult.State.ERROR,
356                error=str(e),
357                msg=None
358            )
359        ]
360
361    def get_operation_meta(self, operation_id: str) -> typing.Optional[OperationMeta]:
362        """
363            Получение объекта операции по идентификатору
364
365            Arguments:
366                    operation_id (str): идентификатор операции
367
368            Returns:
369                   OperationMeta | None
370        """
371
372        return self.__operations.get(operation_id)
373
374    def set_operation_meta(self, operation_id: str, description: str, task: asyncio.Task):
375        """
376            Функция для сохранения данных об операции
377            Arguments:
378                  operation_id (str): идентификатор операции
379                  description (str): описание операции
380                  task (asyncio.Task): объект task операции
381        """
382
383        self.__operations[operation_id] = OperationMeta(
384            operation_id=operation_id,
385            task=task,
386            description=description
387        )
388
389    def mark_operation_as_finished(self, operation_id: str, errors: list[str] | None = None):
390        """
391            Функция для завершения операции
392
393            Arguments:
394                operation_id (str): идентификатор операции
395                errors (list[str] | None): ошибки выполнения операции
396        """
397
398        if operation_id in self.operations:
399            self.operations[operation_id].mark_finished(errors)
400        else:
401            self.logger.error(f"Операция {operation_id} не найдена!")
402
403    async def _Install(self, operation_id: str, request: extension_service_pb2.InstallRequest, context):
404        """
405            Действия метода Install сервиса расширений. Вызывается после всех необходимых действий с
406            формированием данных об операции. В случае необходимости можно переопределить
407
408            Arguments:
409                   operation_id (str): идентификатор операции
410                   request (extension_service_pb2.InstallRequest): объект запроса
411                   context: контекст
412        """
413
414        errors_list = await self.extension_setup.install(
415            request.space_id, request.env_id, request.force
416        )
417
418        errors_list += await self.additional_install_operations(operation_id, request, context)
419
420        self.result_log("установки", operation_id, request, errors_list)
421
422        self.mark_operation_as_finished(operation_id, errors_list)
423
424    async def additional_install_operations(self, operation_id: str, request: extension_service_pb2.InstallRequest, context) -> list[str]:
425        """
426            Функция для возможности добавления любой дополнительной логики после завершения всех стандартных
427            процедур установки. Можно переопределять. Возвращает массив с ошибками.
428
429            Arguments:
430                   operation_id (str): идентификатор операции
431                   request (extension_service_pb2.InstallRequest): объект запроса
432                   context: контекст
433
434            Returns:
435                list[str]
436        """
437
438        return []
439
440    async def Install(self, request: extension_service_pb2.InstallRequest, context):
441        """
442            Реализация метода Install класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит
443            в себе формирование данных об операции. Переопределять не следует, для изменения логики Install
444            лучше работать с методами _Install если нужно всё радикально изменить или с методом
445            additional_install_operations для добавления чего-то дополнительного
446        """
447
448        operation_id = generate_operation_id()
449        operation_description = "Установка расширения %s для окружения %s пространства %s. %s force" % (
450            self.extension_id,
451            request.env_id,
452            request.space_id,
453            "С" if request.force else "Без"
454        )
455
456        self.logger.info(operation_description)
457
458        install_task = asyncio.create_task(self._Install(operation_id, request, context))
459
460        self.set_operation_meta(operation_id, operation_description, install_task)
461
462        return self.get_operation_meta(operation_id).to_operation()
463
464    async def _Uninstall(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context):
465        """
466            Действия метода _Uninstall сервиса расширений. Вызывается после всех необходимых действий с
467            формированием данных об операции. В случае необходимости можно переопределить
468
469            Arguments:
470                   operation_id (str): идентификатор операции
471                   request (extension_service_pb2.UninstallRequest): объект запроса
472                   context: контекст
473        """
474
475        errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove)
476        errors_list += await self.additional_uninstall_operations(operation_id, request, context)
477
478        self.result_log("удаления", operation_id, request, errors_list)
479
480        self.mark_operation_as_finished(operation_id, errors_list)
481
482    async def additional_uninstall_operations(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context) -> list[str]:
483        """
484            Функция для возможности добавления любой дополнительной логики после завершения всех стандартных
485            процедур удаления. Можно переопределять. Возвращает массив с ошибками.
486
487            Arguments:
488                   operation_id (str): идентификатор операции
489                   request (extension_service_pb2.UninstallRequest): объект запроса
490                   context: контекст
491
492            Returns:
493                list[str]
494        """
495
496        return []
497
498    async def Uninstall(self, request: extension_service_pb2.UninstallRequest, context):
499        """
500            Реализация метода Uninstall класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит
501            в себе формирование данных об операции. Переопределять не следует, для изменения логики Uninstall
502            лучше работать с методами _Uninstall если нужно всё радикально изменить или с методом
503            additional_uninstall_operations для добавления чего-то дополнительного
504        """
505
506        operation_id = generate_operation_id()
507        operation_description = "Удаление расширения %s для окружения %s пространства %s. %s remove" % (
508            self.extension_id,
509            request.env_id,
510            request.space_id,
511            "С" if request.remove else "Без"
512        )
513
514        self.logger.info(operation_description)
515
516        uninstall_task = asyncio.create_task(self._Uninstall(operation_id, request, context))
517
518        self.set_operation_meta(operation_id, operation_description, uninstall_task)
519
520        return self.get_operation_meta(operation_id).to_operation()
521
522    async def _Check(self, operation_id: str, request: extension_service_pb2.CheckRequest, context):
523        """
524            Действия метода _Check сервиса расширений. Вызывается после всех необходимых действий с
525            формированием данных об операции. В случае необходимости можно переопределить
526
527            Arguments:
528                   operation_id (str): идентификатор операции
529                   request (extension_service_pb2.CheckRequest): объект запроса
530                   context: контекст
531        """
532
533        errors_list: list[str] = await self.extension_setup.check(request.space_id, request.env_id)
534
535        self.result_log("проверки", operation_id, request, errors_list)
536
537        self.__operations[operation_id].mark_finished(errors_list)
538
539    async def additional_check_operations(self, operation_id: str, request: extension_service_pb2.CheckRequest, context) -> list[str]:
540        """
541            Функция для возможности добавления любой дополнительной логики после завершения всех стандартных
542            процедур проверки. Можно переопределять. Возвращает массив с ошибками.
543
544            Arguments:
545                   operation_id (str): идентификатор операции
546                   request (extension_service_pb2.CheckRequest): объект запроса
547                   context: контекст
548
549            Returns:
550                list[str]
551        """
552
553        return []
554
555    async def Check(self, request: extension_service_pb2.CheckRequest, context):
556        """
557            Реализация метода Check класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит
558            в себе формирование данных об операции. Переопределять не следует, для изменения логики Check
559            лучше работать с методами _Check если нужно всё радикально изменить или с методом
560            additional_check_operations для добавления чего-то дополнительного
561        """
562
563        operation_id = generate_operation_id()
564        operation_description = "Проверка расширения %s для окружения %s пространства %s" % (
565            self.extension_id,
566            request.env_id,
567            request.space_id,
568        )
569
570        check_task = asyncio.create_task(self._Check(operation_id, request, context))
571
572        self.set_operation_meta(operation_id, operation_description, check_task)
573
574        return self.get_operation_meta(operation_id).to_operation()
575
576    async def _dispatch_action(
577            self, request: extension_pb2.ActionRequest, context
578    ) -> extension_service_pb2.ActionResponse:
579        """
580            Метод для получение нужной ф-ции для действия и её вызова
581
582            Arguments:
583               request (extension_pb2.ActionRequest): объект запроса
584               context: контекст
585
586            Returns:
587               extension_service_pb2.ActionResponse
588        """
589
590        action_id = request.action.split("/")[-1]
591
592        func_name = f"action_{action_id}"
593
594        if not hasattr(self, func_name):
595            response = extension_service_pb2.ActionResponse(
596                state=extension_service_pb2.ActionResponse.State.ERROR,
597                title="Невозможно выполнить действие",
598                error=f"В расширении отсутсвует функция {action_id}"
599            )
600        else:
601            func = getattr(self, func_name)
602
603            response = await func(request, context)
604
605        return response
606
607    async def Action(self, request: extension_pb2.ActionRequest, context):
608        """
609            Реализация метода Action класса extension_service_pb2_grpc.ExtensionServiceServicer
610
611            Arguments:
612                request (extension_pb2.ActionRequest): объект запроса
613                context: контекст
614        """
615
616        operation_description = "Действие %s для окружения %s пространства %s" % (
617            request.action,
618            request.env_id,
619            request.space_id,
620        )
621
622        self.logger.info(operation_description)
623
624        response = await self._dispatch_action(request, context)
625
626        if response.state == extension_service_pb2.ActionResponse.State.ERROR:
627            error_description = "Ошибка действия %s в расширении %s: %s" % (
628                request.action,
629                self.extension_id,
630                response.error
631            )
632
633            self.logger.error(error_description)
634
635        return response
636
637    async def Get(self, request: operation_service_pb2.GetOperationRequest, context):
638        """
639            Реализация метода Get класса operation_service_pb2_grpc.OperationServiceServicer. Используется
640            для получения данных о выполняемой операции
641
642            Arguments:
643                request (operation_service_pb2.GetOperationRequest): объект запроса
644                context: контекст
645        """
646
647        operations_meta = self.get_operation_meta(request.operation_id)
648
649        if not operations_meta:
650            error_description = "Ошибка проверки операции %s в расширении %s - не найдена" % (
651                request.operation_id,
652                self.extension_id,
653            )
654
655            context.set_code(grpc.StatusCode.UNKNOWN)
656            context.set_details(error_description)
657
658            self.logger.error(error_description)
659
660            return None
661
662        self.logger.info(
663            f"Статус операции {operations_meta.operation_id}:\n"
664            f"{operations_meta.description}\n"
665            f"{'выполняется' if not operations_meta.was_finished else 'завершена'}, \n"
666            f"{'без ошибок' if not operations_meta.errors else 'ошибки: ' + '; '.join(operations_meta.errors)}"
667        )
668
669        return operations_meta.to_operation()
670
671    def Cancel(self, request: operation_service_pb2.CancelOperationRequest, context):
672        """
673            Реализация метода Cancel класса operation_service_pb2_grpc.OperationServiceServicer. Используется
674            для отмены выполнения операции
675
676            Arguments:
677                request (operation_service_pb2.CancelOperationRequest): объект запроса
678                context: контекст
679        """
680
681        operations_meta = self.get_operation_meta(request.operation_id)
682
683        if not operations_meta:
684            error_description = "Не удалось удалить операцию %s в расширении %s - не найдена" % (
685                request.operation_id,
686                self.extension_id,
687            )
688
689            context.set_code(grpc.StatusCode.UNKNOWN)
690            context.set_details(error_description)
691
692            self.logger.error(
693                error_description
694            )
695
696            return None
697
698        try:
699            operations_meta.task.cancel()
700            operations_meta.mark_cancelled()
701        except Exception as e:
702            error_description = "Во время отмены операции %s расширении %s произошла ошибка %s" % (
703                request.operation_id,
704                self.extension_id,
705                str(e)
706            )
707
708            self.logger.error(error_description)
709
710            context.set_code(grpc.StatusCode.UNKNOWN)
711            context.set_details(error_description)
712
713            self.logger.exception(e)
714
715            return None
716
717        self.logger.info(
718            "Операция %s в расширении %s отменена" % (
719                request.operation_id,
720                self.extension_id,
721            )
722        )
723
724        return operations_meta.to_operation()
def generate_operation_id() -> str:
39def generate_operation_id() -> str:
40    """
41        Функция для генерации идентификатора операции
42
43        Returns:
44            str
45    """
46
47    return str(uuid.uuid4())

Функция для генерации идентификатора операции

Returns:

str

def datetime_to_timestamp( dt: <module 'datetime' from '/usr/local/lib/python3.11/datetime.py'>) -> google.protobuf.timestamp_pb2.Timestamp:
50def datetime_to_timestamp(dt: datetime) -> timestamp_pb2.Timestamp:
51    """
52        Функция для преобразования объекта datetime в объект timestamp_pb2.Timestamp
53
54        Arguments:
55            dt (datetime)
56        Returns:
57            timestamp_pb2.Timestamp
58    """
59
60    timestamp = dt.timestamp()
61    seconds = int(timestamp)
62    nanos = int(timestamp % 1 * 1e9)
63
64    return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)

Функция для преобразования объекта datetime в объект timestamp_pb2.Timestamp

Arguments:
  • dt (datetime)
Returns:

timestamp_pb2.Timestamp

@dataclasses.dataclass
class OperationMeta:
 67@dataclasses.dataclass
 68class OperationMeta:
 69    """
 70        Класс для хранения метаданных операции
 71
 72        Attributes:
 73            task (asyncio.Task): ссылка на task операции
 74            operation_id (str): идентификатор операции
 75            description (str): описание операции
 76            created_at (datetime.datetime): дата создания операции
 77            modified_at (datetime.datetime): дата изменения метаданных операции
 78            response (str | None): результат операции
 79            errors (list[str]): возникшие во время выполнения операции ошибки
 80            was_finished (bool): флаг завершения операции
 81            metadata (dict): любые прочие метаданные операции
 82    """
 83
 84    task: asyncio.Task
 85
 86    operation_id: str
 87    description: str
 88
 89    created_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now)
 90    modified_at: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now)
 91
 92    response: typing.Optional[str] = None
 93    errors: typing.Optional[list[str]] = None
 94
 95    was_finished: bool = False
 96    metadata: dict[str, typing.Any] = dataclasses.field(default_factory=dict)
 97
 98    def mark_cancelled(self):
 99        """
100            Функция для отмены операции
101        """
102
103        self.was_finished = True
104        self.response = "Отменено"
105
106    def mark_finished(self, errors: typing.Optional[list[str]] = None):
107        """
108            Функция для завершения операции
109
110            Arguments:
111                errors (list[str] | None): список ошибок
112        """
113
114        self.modified_at = datetime.datetime.now()
115        self.was_finished = True
116        self.errors = errors
117        self.response = None if errors else "OK"
118
119    def to_operation(self):
120        """
121            Функция для преобразования объекта операции в формат perxis
122
123            Returns:
124                operation_pb2.Operation
125        """
126
127        packed_response = any_pb2.Any()
128        packed_response.Pack(wrappers_pb2.StringValue(value=self.response or "PENDING"))
129
130        return operation_pb2.Operation(
131            id=self.operation_id,
132            description=self.description,
133            created_at=datetime_to_timestamp(self.created_at),
134            modified_at=datetime_to_timestamp(self.modified_at),
135            done=self.was_finished,
136            metadata=self.metadata,
137            response=packed_response,
138            error=error_pb2.Error(
139                message="; ".join(self.errors)
140            ) if self.errors else None
141        )

Класс для хранения метаданных операции

Attributes:
  • task (asyncio.Task): ссылка на task операции
  • operation_id (str): идентификатор операции
  • description (str): описание операции
  • created_at (datetime.datetime): дата создания операции
  • modified_at (datetime.datetime): дата изменения метаданных операции
  • response (str | None): результат операции
  • errors (list[str]): возникшие во время выполнения операции ошибки
  • was_finished (bool): флаг завершения операции
  • metadata (dict): любые прочие метаданные операции
OperationMeta( task: _asyncio.Task, operation_id: str, description: str, created_at: datetime.datetime = <factory>, modified_at: datetime.datetime = <factory>, response: Optional[str] = None, errors: Optional[list[str]] = None, was_finished: bool = False, metadata: dict[str, typing.Any] = <factory>)
task: _asyncio.Task
operation_id: str
description: str
created_at: datetime.datetime
modified_at: datetime.datetime
response: Optional[str] = None
errors: Optional[list[str]] = None
was_finished: bool = False
metadata: dict[str, typing.Any]
def mark_cancelled(self):
 98    def mark_cancelled(self):
 99        """
100            Функция для отмены операции
101        """
102
103        self.was_finished = True
104        self.response = "Отменено"

Функция для отмены операции

def mark_finished(self, errors: Optional[list[str]] = None):
106    def mark_finished(self, errors: typing.Optional[list[str]] = None):
107        """
108            Функция для завершения операции
109
110            Arguments:
111                errors (list[str] | None): список ошибок
112        """
113
114        self.modified_at = datetime.datetime.now()
115        self.was_finished = True
116        self.errors = errors
117        self.response = None if errors else "OK"

Функция для завершения операции

Arguments:
  • errors (list[str] | None): список ошибок
def to_operation(self):
119    def to_operation(self):
120        """
121            Функция для преобразования объекта операции в формат perxis
122
123            Returns:
124                operation_pb2.Operation
125        """
126
127        packed_response = any_pb2.Any()
128        packed_response.Pack(wrappers_pb2.StringValue(value=self.response or "PENDING"))
129
130        return operation_pb2.Operation(
131            id=self.operation_id,
132            description=self.description,
133            created_at=datetime_to_timestamp(self.created_at),
134            modified_at=datetime_to_timestamp(self.modified_at),
135            done=self.was_finished,
136            metadata=self.metadata,
137            response=packed_response,
138            error=error_pb2.Error(
139                message="; ".join(self.errors)
140            ) if self.errors else None
141        )

Функция для преобразования объекта операции в формат perxis

Returns:

operation_pb2.Operation

144class ExtensionService(
145    extension_service_pb2_grpc.ExtensionServiceServicer, operation_service_pb2_grpc.OperationServiceServicer
146):
147    """
148        Базовый класс для сервисов расширений
149
150        Attributes:
151            extension_id (str): идентификатор расширения
152            collections (list[collections_pb2.Collection]): список коллекций расширения
153            roles (list[roles_pb2.Role]): список ролей расширения
154            clients (list[clients_pb2.Client]): список клиентов расширения
155            actions (list[dict]): список действий расширения
156            items (list[AbstractItem]): список управляемых расширением item'ов
157            __operations (dict[str, OperationMeta]): маппинг с данными операций
158            logger (logging.Logger): логгер расширения
159            ext_manager_service (manager_service_pb2_grpc.ExtensionManagerServiceStub): ссылка на сервис менеджера расширений
160            collections_service (collections_pb2_grpc.CollectionsStub): ссылка на сервис коллекций
161            environments_service (environments_pb2_grpc.EnvironmentsStub): ссылка на сервис окружений
162            roles_service (roles_pb2_grpc.RolesStub): ссылка на сервис ролей
163            clients_service (clients_pb2_grpc.ClientsStub): ссылка на сервис клиентов
164            items_service (items_pb2_grpc.ItemsStub): ссылка на сервис item'ов
165            spaces_service (spaces_pb2_grpc.SpacesStub): ссылка на сервис пространств
166            files_service (files_pb2_grpc.FilesStub): ссылка на сервис файлов
167            images_service (images_pb2_grpc.ImagesStub): ссылка на сервис изображений
168            references_service (references_pb2_grpc.ReferencesStub): ссылка на сервис для работы с объектами Reference
169            users_service (users_pb2_grpc.UsersStub): ссылка на сервис пользователей
170            organizations_service (organizations_pb2_grpc.OrganizationsStub): ссылка на сервис организаций
171            members_service (members_pb2_grpc.MembersStub): ссылка на сервис работы с пользователями организаций
172            locales_service (locales_pb2_grpc.LocalesStub): ссылка на сервис локалей и переводов
173            invitations_service (invitations_pb2_grpc.InvitationsStub): ссылка на сервис приглашений
174            collaborators_service (collaborators_pb2_grpc.CollaboratorsStub): ссылка на сервис коллабораторов
175            channel (grpc.Channel): ссылка на grpc канал
176            extension_setup (ExtensionSetup): объект класса ExtensionSetup для управления данными расширения
177    """
178
179    extension_id: str
180    collections: list[collections_pb2.Collection] = []
181    roles: list[roles_pb2.Role] = []
182    clients: list[clients_pb2.Client] = []
183    actions: list[dict] = []
184    items: list[AbstractItem] = []
185
186    __operations: dict[str, OperationMeta]
187
188    def __init__(self,
189                 ext_manager_service: manager_service_pb2_grpc.ExtensionManagerServiceStub,
190                 collections_service: collections_pb2_grpc.CollectionsStub,
191                 environments_service: environments_pb2_grpc.EnvironmentsStub,
192                 roles_service: roles_pb2_grpc.RolesStub,
193                 clients_service: clients_pb2_grpc.ClientsStub,
194                 items_service: items_pb2_grpc.ItemsStub,
195                 spaces_service: spaces_pb2_grpc.SpacesStub,
196                 files_service: files_pb2_grpc.FilesStub,
197                 images_service: images_pb2_grpc.ImagesStub,
198                 references_service: references_pb2_grpc.ReferencesStub,
199                 users_service: users_pb2_grpc.UsersStub,
200                 organizations_service: organizations_pb2_grpc.OrganizationsStub,
201                 members_service: members_pb2_grpc.MembersStub,
202                 locales_service: locales_pb2_grpc.LocalesStub,
203                 invitations_service: invitations_pb2_grpc.InvitationsStub,
204                 collaborators_service: collaborators_pb2_grpc.CollaboratorsStub,
205                 channel: grpc.Channel,
206    ):
207        self.logger = logging.getLogger(__name__)
208        self.ext_manager_service = ext_manager_service
209        self.collections_service = collections_service
210        self.environments_service = environments_service
211        self.roles_service = roles_service
212        self.clients_service = clients_service
213        self.items_service = items_service
214        self.spaces_service = spaces_service
215        self.files_service = files_service
216        self.images_service = images_service
217        self.references_service = references_service
218        self.users_service = users_service
219        self.organizations_service = organizations_service
220        self.members_service = members_service
221        self.locales_service = locales_service
222        self.invitations_service = invitations_service
223        self.collaborators_service = collaborators_service
224        self.channel = channel
225
226        self.extension_setup = ExtensionSetup(
227            self.collections_service, self.environments_service,
228            self.roles_service, self.clients_service, self.items_service,
229        )
230
231        self.__operations = {}
232
233        for collection in self.collections or []:
234            self.extension_setup.add_collection(collection)
235
236        for role in self.roles or []:
237            self.extension_setup.add_role(role)
238
239        for client in self.clients or []:
240            self.extension_setup.add_client(client)
241
242        for action in self.actions or []:
243            self.extension_setup.add_action(action)
244
245        services_list = [
246            (service_name, getattr(self, service_name))
247            for service_name
248            in self.__dict__
249            if service_name.endswith("_service")
250        ]
251        services_list.append(("channel", self.channel, ))
252
253        for item in self.items:
254            for rule in item.rules:
255                rule.bind_services(services_list)
256
257        self.extension_setup.set_items(self.items)
258
259        @aiocron.crontab('0 * * * *', start=True)
260        async def remove_old_operations():
261            self.remove_old_operations()
262
263    @property
264    def operations(self):
265        return self.__operations
266
267    def remove_old_operations(self):
268        """
269            Функция для удаления stale операций. Запускается раз в час. Удаляет все операции
270            которые были завершены больше часа назад
271        """
272
273        self.logger.info("Удаление старых операций")
274
275        ids = []
276        now = datetime.datetime.now()
277
278        for operation_meta in self.__operations.values():
279            task_is_not_running = operation_meta.task.done() or operation_meta.task.cancelled()
280
281            # Если task фактически не работает то операцию нужно пометить как выполненную. Это может произойти в случае
282            # неотловленного исключения, например
283            if task_is_not_running:
284                operation_meta.was_finished = True
285
286            if not operation_meta.was_finished:
287                continue
288
289            time_delta = now - operation_meta.modified_at
290
291            # Для операций которые завершены больше часа назад
292            if time_delta.seconds < 60 * 60:
293                continue
294
295            # Если по какой-то причине операция была помечена как завершенная но task ещё активен
296            need_to_cancel_task = not any(
297                (operation_meta.task.done(), operation_meta.task.cancelled())
298            )
299
300            if need_to_cancel_task:
301                try:
302                    operation_meta.task.cancel()
303                except Exception as e:
304                    self.logger.error("Не удалось отменить task")
305                    self.logger.exception(e)
306
307                    continue
308
309            ids.append(operation_meta.operation_id)
310
311            self.logger.info(f"Операция {operation_meta.operation_id} помечена на удаление")
312
313        for operation_id in ids:
314            del self.__operations[operation_id]
315
316        self.logger.info("Удаление старых операций завершено")
317
318    def result_log(self, operation, operation_id: str, request, errors: list[str]):
319        """
320            Вспомогательная функция для логгирования резултата выполнения операции
321
322            Arguments:
323                  operation (str): название операции
324                  operation_id (str): идентификатор операции
325                  request: объект запроса
326                  errors (list[str]): ошибки выполнения операции
327        """
328
329        log_func = self.logger.error if errors else self.logger.info
330
331        log_func(
332            "Операция %s (%s) расширения %s для окружения %s пространства %s завершена %s"
333            % (
334                operation,
335                operation_id,
336                self.extension_id,
337                request.env_id,
338                request.space_id,
339                "с ошибками: " + "; ".join(errors) if errors else "успешно"
340            )
341        )
342
343    def ext_request_results_from_exception(self, e: Exception):
344        """
345            Вспомогательная функция для преобразования объектов исключений в объект результата запроса
346
347            Arguments:
348                e (Exception): любое исключение
349            Returns:
350                extension_service_pb2.ExtensionRequestResult
351        """
352
353        return [
354            extension_service_pb2.ExtensionRequestResult(
355                extension=self.extension_id,
356                state=extension_service_pb2.ExtensionRequestResult.State.ERROR,
357                error=str(e),
358                msg=None
359            )
360        ]
361
362    def get_operation_meta(self, operation_id: str) -> typing.Optional[OperationMeta]:
363        """
364            Получение объекта операции по идентификатору
365
366            Arguments:
367                    operation_id (str): идентификатор операции
368
369            Returns:
370                   OperationMeta | None
371        """
372
373        return self.__operations.get(operation_id)
374
375    def set_operation_meta(self, operation_id: str, description: str, task: asyncio.Task):
376        """
377            Функция для сохранения данных об операции
378            Arguments:
379                  operation_id (str): идентификатор операции
380                  description (str): описание операции
381                  task (asyncio.Task): объект task операции
382        """
383
384        self.__operations[operation_id] = OperationMeta(
385            operation_id=operation_id,
386            task=task,
387            description=description
388        )
389
390    def mark_operation_as_finished(self, operation_id: str, errors: list[str] | None = None):
391        """
392            Функция для завершения операции
393
394            Arguments:
395                operation_id (str): идентификатор операции
396                errors (list[str] | None): ошибки выполнения операции
397        """
398
399        if operation_id in self.operations:
400            self.operations[operation_id].mark_finished(errors)
401        else:
402            self.logger.error(f"Операция {operation_id} не найдена!")
403
404    async def _Install(self, operation_id: str, request: extension_service_pb2.InstallRequest, context):
405        """
406            Действия метода Install сервиса расширений. Вызывается после всех необходимых действий с
407            формированием данных об операции. В случае необходимости можно переопределить
408
409            Arguments:
410                   operation_id (str): идентификатор операции
411                   request (extension_service_pb2.InstallRequest): объект запроса
412                   context: контекст
413        """
414
415        errors_list = await self.extension_setup.install(
416            request.space_id, request.env_id, request.force
417        )
418
419        errors_list += await self.additional_install_operations(operation_id, request, context)
420
421        self.result_log("установки", operation_id, request, errors_list)
422
423        self.mark_operation_as_finished(operation_id, errors_list)
424
425    async def additional_install_operations(self, operation_id: str, request: extension_service_pb2.InstallRequest, context) -> list[str]:
426        """
427            Функция для возможности добавления любой дополнительной логики после завершения всех стандартных
428            процедур установки. Можно переопределять. Возвращает массив с ошибками.
429
430            Arguments:
431                   operation_id (str): идентификатор операции
432                   request (extension_service_pb2.InstallRequest): объект запроса
433                   context: контекст
434
435            Returns:
436                list[str]
437        """
438
439        return []
440
441    async def Install(self, request: extension_service_pb2.InstallRequest, context):
442        """
443            Реализация метода Install класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит
444            в себе формирование данных об операции. Переопределять не следует, для изменения логики Install
445            лучше работать с методами _Install если нужно всё радикально изменить или с методом
446            additional_install_operations для добавления чего-то дополнительного
447        """
448
449        operation_id = generate_operation_id()
450        operation_description = "Установка расширения %s для окружения %s пространства %s. %s force" % (
451            self.extension_id,
452            request.env_id,
453            request.space_id,
454            "С" if request.force else "Без"
455        )
456
457        self.logger.info(operation_description)
458
459        install_task = asyncio.create_task(self._Install(operation_id, request, context))
460
461        self.set_operation_meta(operation_id, operation_description, install_task)
462
463        return self.get_operation_meta(operation_id).to_operation()
464
465    async def _Uninstall(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context):
466        """
467            Действия метода _Uninstall сервиса расширений. Вызывается после всех необходимых действий с
468            формированием данных об операции. В случае необходимости можно переопределить
469
470            Arguments:
471                   operation_id (str): идентификатор операции
472                   request (extension_service_pb2.UninstallRequest): объект запроса
473                   context: контекст
474        """
475
476        errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove)
477        errors_list += await self.additional_uninstall_operations(operation_id, request, context)
478
479        self.result_log("удаления", operation_id, request, errors_list)
480
481        self.mark_operation_as_finished(operation_id, errors_list)
482
483    async def additional_uninstall_operations(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context) -> list[str]:
484        """
485            Функция для возможности добавления любой дополнительной логики после завершения всех стандартных
486            процедур удаления. Можно переопределять. Возвращает массив с ошибками.
487
488            Arguments:
489                   operation_id (str): идентификатор операции
490                   request (extension_service_pb2.UninstallRequest): объект запроса
491                   context: контекст
492
493            Returns:
494                list[str]
495        """
496
497        return []
498
499    async def Uninstall(self, request: extension_service_pb2.UninstallRequest, context):
500        """
501            Реализация метода Uninstall класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит
502            в себе формирование данных об операции. Переопределять не следует, для изменения логики Uninstall
503            лучше работать с методами _Uninstall если нужно всё радикально изменить или с методом
504            additional_uninstall_operations для добавления чего-то дополнительного
505        """
506
507        operation_id = generate_operation_id()
508        operation_description = "Удаление расширения %s для окружения %s пространства %s. %s remove" % (
509            self.extension_id,
510            request.env_id,
511            request.space_id,
512            "С" if request.remove else "Без"
513        )
514
515        self.logger.info(operation_description)
516
517        uninstall_task = asyncio.create_task(self._Uninstall(operation_id, request, context))
518
519        self.set_operation_meta(operation_id, operation_description, uninstall_task)
520
521        return self.get_operation_meta(operation_id).to_operation()
522
523    async def _Check(self, operation_id: str, request: extension_service_pb2.CheckRequest, context):
524        """
525            Действия метода _Check сервиса расширений. Вызывается после всех необходимых действий с
526            формированием данных об операции. В случае необходимости можно переопределить
527
528            Arguments:
529                   operation_id (str): идентификатор операции
530                   request (extension_service_pb2.CheckRequest): объект запроса
531                   context: контекст
532        """
533
534        errors_list: list[str] = await self.extension_setup.check(request.space_id, request.env_id)
535
536        self.result_log("проверки", operation_id, request, errors_list)
537
538        self.__operations[operation_id].mark_finished(errors_list)
539
540    async def additional_check_operations(self, operation_id: str, request: extension_service_pb2.CheckRequest, context) -> list[str]:
541        """
542            Функция для возможности добавления любой дополнительной логики после завершения всех стандартных
543            процедур проверки. Можно переопределять. Возвращает массив с ошибками.
544
545            Arguments:
546                   operation_id (str): идентификатор операции
547                   request (extension_service_pb2.CheckRequest): объект запроса
548                   context: контекст
549
550            Returns:
551                list[str]
552        """
553
554        return []
555
556    async def Check(self, request: extension_service_pb2.CheckRequest, context):
557        """
558            Реализация метода Check класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит
559            в себе формирование данных об операции. Переопределять не следует, для изменения логики Check
560            лучше работать с методами _Check если нужно всё радикально изменить или с методом
561            additional_check_operations для добавления чего-то дополнительного
562        """
563
564        operation_id = generate_operation_id()
565        operation_description = "Проверка расширения %s для окружения %s пространства %s" % (
566            self.extension_id,
567            request.env_id,
568            request.space_id,
569        )
570
571        check_task = asyncio.create_task(self._Check(operation_id, request, context))
572
573        self.set_operation_meta(operation_id, operation_description, check_task)
574
575        return self.get_operation_meta(operation_id).to_operation()
576
577    async def _dispatch_action(
578            self, request: extension_pb2.ActionRequest, context
579    ) -> extension_service_pb2.ActionResponse:
580        """
581            Метод для получение нужной ф-ции для действия и её вызова
582
583            Arguments:
584               request (extension_pb2.ActionRequest): объект запроса
585               context: контекст
586
587            Returns:
588               extension_service_pb2.ActionResponse
589        """
590
591        action_id = request.action.split("/")[-1]
592
593        func_name = f"action_{action_id}"
594
595        if not hasattr(self, func_name):
596            response = extension_service_pb2.ActionResponse(
597                state=extension_service_pb2.ActionResponse.State.ERROR,
598                title="Невозможно выполнить действие",
599                error=f"В расширении отсутсвует функция {action_id}"
600            )
601        else:
602            func = getattr(self, func_name)
603
604            response = await func(request, context)
605
606        return response
607
608    async def Action(self, request: extension_pb2.ActionRequest, context):
609        """
610            Реализация метода Action класса extension_service_pb2_grpc.ExtensionServiceServicer
611
612            Arguments:
613                request (extension_pb2.ActionRequest): объект запроса
614                context: контекст
615        """
616
617        operation_description = "Действие %s для окружения %s пространства %s" % (
618            request.action,
619            request.env_id,
620            request.space_id,
621        )
622
623        self.logger.info(operation_description)
624
625        response = await self._dispatch_action(request, context)
626
627        if response.state == extension_service_pb2.ActionResponse.State.ERROR:
628            error_description = "Ошибка действия %s в расширении %s: %s" % (
629                request.action,
630                self.extension_id,
631                response.error
632            )
633
634            self.logger.error(error_description)
635
636        return response
637
638    async def Get(self, request: operation_service_pb2.GetOperationRequest, context):
639        """
640            Реализация метода Get класса operation_service_pb2_grpc.OperationServiceServicer. Используется
641            для получения данных о выполняемой операции
642
643            Arguments:
644                request (operation_service_pb2.GetOperationRequest): объект запроса
645                context: контекст
646        """
647
648        operations_meta = self.get_operation_meta(request.operation_id)
649
650        if not operations_meta:
651            error_description = "Ошибка проверки операции %s в расширении %s - не найдена" % (
652                request.operation_id,
653                self.extension_id,
654            )
655
656            context.set_code(grpc.StatusCode.UNKNOWN)
657            context.set_details(error_description)
658
659            self.logger.error(error_description)
660
661            return None
662
663        self.logger.info(
664            f"Статус операции {operations_meta.operation_id}:\n"
665            f"{operations_meta.description}\n"
666            f"{'выполняется' if not operations_meta.was_finished else 'завершена'}, \n"
667            f"{'без ошибок' if not operations_meta.errors else 'ошибки: ' + '; '.join(operations_meta.errors)}"
668        )
669
670        return operations_meta.to_operation()
671
672    def Cancel(self, request: operation_service_pb2.CancelOperationRequest, context):
673        """
674            Реализация метода Cancel класса operation_service_pb2_grpc.OperationServiceServicer. Используется
675            для отмены выполнения операции
676
677            Arguments:
678                request (operation_service_pb2.CancelOperationRequest): объект запроса
679                context: контекст
680        """
681
682        operations_meta = self.get_operation_meta(request.operation_id)
683
684        if not operations_meta:
685            error_description = "Не удалось удалить операцию %s в расширении %s - не найдена" % (
686                request.operation_id,
687                self.extension_id,
688            )
689
690            context.set_code(grpc.StatusCode.UNKNOWN)
691            context.set_details(error_description)
692
693            self.logger.error(
694                error_description
695            )
696
697            return None
698
699        try:
700            operations_meta.task.cancel()
701            operations_meta.mark_cancelled()
702        except Exception as e:
703            error_description = "Во время отмены операции %s расширении %s произошла ошибка %s" % (
704                request.operation_id,
705                self.extension_id,
706                str(e)
707            )
708
709            self.logger.error(error_description)
710
711            context.set_code(grpc.StatusCode.UNKNOWN)
712            context.set_details(error_description)
713
714            self.logger.exception(e)
715
716            return None
717
718        self.logger.info(
719            "Операция %s в расширении %s отменена" % (
720                request.operation_id,
721                self.extension_id,
722            )
723        )
724
725        return operations_meta.to_operation()

Базовый класс для сервисов расширений

Attributes:
  • extension_id (str): идентификатор расширения
  • collections (list[collections_pb2.Collection]): список коллекций расширения
  • roles (list[roles_pb2.Role]): список ролей расширения
  • clients (list[clients_pb2.Client]): список клиентов расширения
  • actions (list[dict]): список действий расширения
  • items (list[AbstractItem]): список управляемых расширением item'ов
  • __operations (dict[str, OperationMeta]): маппинг с данными операций
  • logger (logging.Logger): логгер расширения
  • ext_manager_service (manager_service_pb2_grpc.ExtensionManagerServiceStub): ссылка на сервис менеджера расширений
  • collections_service (collections_pb2_grpc.CollectionsStub): ссылка на сервис коллекций
  • environments_service (environments_pb2_grpc.EnvironmentsStub): ссылка на сервис окружений
  • roles_service (roles_pb2_grpc.RolesStub): ссылка на сервис ролей
  • clients_service (clients_pb2_grpc.ClientsStub): ссылка на сервис клиентов
  • items_service (items_pb2_grpc.ItemsStub): ссылка на сервис item'ов
  • spaces_service (spaces_pb2_grpc.SpacesStub): ссылка на сервис пространств
  • files_service (files_pb2_grpc.FilesStub): ссылка на сервис файлов
  • images_service (images_pb2_grpc.ImagesStub): ссылка на сервис изображений
  • references_service (references_pb2_grpc.ReferencesStub): ссылка на сервис для работы с объектами Reference
  • users_service (users_pb2_grpc.UsersStub): ссылка на сервис пользователей
  • organizations_service (organizations_pb2_grpc.OrganizationsStub): ссылка на сервис организаций
  • members_service (members_pb2_grpc.MembersStub): ссылка на сервис работы с пользователями организаций
  • locales_service (locales_pb2_grpc.LocalesStub): ссылка на сервис локалей и переводов
  • invitations_service (invitations_pb2_grpc.InvitationsStub): ссылка на сервис приглашений
  • collaborators_service (collaborators_pb2_grpc.CollaboratorsStub): ссылка на сервис коллабораторов
  • channel (grpc.Channel): ссылка на grpc канал
  • extension_setup (ExtensionSetup): объект класса ExtensionSetup для управления данными расширения
188    def __init__(self,
189                 ext_manager_service: manager_service_pb2_grpc.ExtensionManagerServiceStub,
190                 collections_service: collections_pb2_grpc.CollectionsStub,
191                 environments_service: environments_pb2_grpc.EnvironmentsStub,
192                 roles_service: roles_pb2_grpc.RolesStub,
193                 clients_service: clients_pb2_grpc.ClientsStub,
194                 items_service: items_pb2_grpc.ItemsStub,
195                 spaces_service: spaces_pb2_grpc.SpacesStub,
196                 files_service: files_pb2_grpc.FilesStub,
197                 images_service: images_pb2_grpc.ImagesStub,
198                 references_service: references_pb2_grpc.ReferencesStub,
199                 users_service: users_pb2_grpc.UsersStub,
200                 organizations_service: organizations_pb2_grpc.OrganizationsStub,
201                 members_service: members_pb2_grpc.MembersStub,
202                 locales_service: locales_pb2_grpc.LocalesStub,
203                 invitations_service: invitations_pb2_grpc.InvitationsStub,
204                 collaborators_service: collaborators_pb2_grpc.CollaboratorsStub,
205                 channel: grpc.Channel,
206    ):
207        self.logger = logging.getLogger(__name__)
208        self.ext_manager_service = ext_manager_service
209        self.collections_service = collections_service
210        self.environments_service = environments_service
211        self.roles_service = roles_service
212        self.clients_service = clients_service
213        self.items_service = items_service
214        self.spaces_service = spaces_service
215        self.files_service = files_service
216        self.images_service = images_service
217        self.references_service = references_service
218        self.users_service = users_service
219        self.organizations_service = organizations_service
220        self.members_service = members_service
221        self.locales_service = locales_service
222        self.invitations_service = invitations_service
223        self.collaborators_service = collaborators_service
224        self.channel = channel
225
226        self.extension_setup = ExtensionSetup(
227            self.collections_service, self.environments_service,
228            self.roles_service, self.clients_service, self.items_service,
229        )
230
231        self.__operations = {}
232
233        for collection in self.collections or []:
234            self.extension_setup.add_collection(collection)
235
236        for role in self.roles or []:
237            self.extension_setup.add_role(role)
238
239        for client in self.clients or []:
240            self.extension_setup.add_client(client)
241
242        for action in self.actions or []:
243            self.extension_setup.add_action(action)
244
245        services_list = [
246            (service_name, getattr(self, service_name))
247            for service_name
248            in self.__dict__
249            if service_name.endswith("_service")
250        ]
251        services_list.append(("channel", self.channel, ))
252
253        for item in self.items:
254            for rule in item.rules:
255                rule.bind_services(services_list)
256
257        self.extension_setup.set_items(self.items)
258
259        @aiocron.crontab('0 * * * *', start=True)
260        async def remove_old_operations():
261            self.remove_old_operations()
extension_id: str
collections: list[collections.collections_pb2.Collection] = []
roles: list[roles.roles_pb2.Role] = []
clients: list[clients.clients_pb2.Client] = []
actions: list[dict] = []
logger
ext_manager_service
collections_service
environments_service
roles_service
clients_service
items_service
spaces_service
files_service
images_service
references_service
users_service
organizations_service
members_service
locales_service
invitations_service
collaborators_service
channel
extension_setup
operations
263    @property
264    def operations(self):
265        return self.__operations
def remove_old_operations(self):
267    def remove_old_operations(self):
268        """
269            Функция для удаления stale операций. Запускается раз в час. Удаляет все операции
270            которые были завершены больше часа назад
271        """
272
273        self.logger.info("Удаление старых операций")
274
275        ids = []
276        now = datetime.datetime.now()
277
278        for operation_meta in self.__operations.values():
279            task_is_not_running = operation_meta.task.done() or operation_meta.task.cancelled()
280
281            # Если task фактически не работает то операцию нужно пометить как выполненную. Это может произойти в случае
282            # неотловленного исключения, например
283            if task_is_not_running:
284                operation_meta.was_finished = True
285
286            if not operation_meta.was_finished:
287                continue
288
289            time_delta = now - operation_meta.modified_at
290
291            # Для операций которые завершены больше часа назад
292            if time_delta.seconds < 60 * 60:
293                continue
294
295            # Если по какой-то причине операция была помечена как завершенная но task ещё активен
296            need_to_cancel_task = not any(
297                (operation_meta.task.done(), operation_meta.task.cancelled())
298            )
299
300            if need_to_cancel_task:
301                try:
302                    operation_meta.task.cancel()
303                except Exception as e:
304                    self.logger.error("Не удалось отменить task")
305                    self.logger.exception(e)
306
307                    continue
308
309            ids.append(operation_meta.operation_id)
310
311            self.logger.info(f"Операция {operation_meta.operation_id} помечена на удаление")
312
313        for operation_id in ids:
314            del self.__operations[operation_id]
315
316        self.logger.info("Удаление старых операций завершено")

Функция для удаления stale операций. Запускается раз в час. Удаляет все операции которые были завершены больше часа назад

def result_log(self, operation, operation_id: str, request, errors: list[str]):
318    def result_log(self, operation, operation_id: str, request, errors: list[str]):
319        """
320            Вспомогательная функция для логгирования резултата выполнения операции
321
322            Arguments:
323                  operation (str): название операции
324                  operation_id (str): идентификатор операции
325                  request: объект запроса
326                  errors (list[str]): ошибки выполнения операции
327        """
328
329        log_func = self.logger.error if errors else self.logger.info
330
331        log_func(
332            "Операция %s (%s) расширения %s для окружения %s пространства %s завершена %s"
333            % (
334                operation,
335                operation_id,
336                self.extension_id,
337                request.env_id,
338                request.space_id,
339                "с ошибками: " + "; ".join(errors) if errors else "успешно"
340            )
341        )

Вспомогательная функция для логгирования резултата выполнения операции

Arguments:
  • operation (str): название операции
  • operation_id (str): идентификатор операции
  • request: объект запроса
  • errors (list[str]): ошибки выполнения операции
def ext_request_results_from_exception(self, e: Exception):
343    def ext_request_results_from_exception(self, e: Exception):
344        """
345            Вспомогательная функция для преобразования объектов исключений в объект результата запроса
346
347            Arguments:
348                e (Exception): любое исключение
349            Returns:
350                extension_service_pb2.ExtensionRequestResult
351        """
352
353        return [
354            extension_service_pb2.ExtensionRequestResult(
355                extension=self.extension_id,
356                state=extension_service_pb2.ExtensionRequestResult.State.ERROR,
357                error=str(e),
358                msg=None
359            )
360        ]

Вспомогательная функция для преобразования объектов исключений в объект результата запроса

Arguments:
  • e (Exception): любое исключение
Returns:

extension_service_pb2.ExtensionRequestResult

def get_operation_meta( self, operation_id: str) -> Optional[OperationMeta]:
362    def get_operation_meta(self, operation_id: str) -> typing.Optional[OperationMeta]:
363        """
364            Получение объекта операции по идентификатору
365
366            Arguments:
367                    operation_id (str): идентификатор операции
368
369            Returns:
370                   OperationMeta | None
371        """
372
373        return self.__operations.get(operation_id)

Получение объекта операции по идентификатору

Arguments:
  • operation_id (str): идентификатор операции
Returns:

OperationMeta | None

def set_operation_meta(self, operation_id: str, description: str, task: _asyncio.Task):
375    def set_operation_meta(self, operation_id: str, description: str, task: asyncio.Task):
376        """
377            Функция для сохранения данных об операции
378            Arguments:
379                  operation_id (str): идентификатор операции
380                  description (str): описание операции
381                  task (asyncio.Task): объект task операции
382        """
383
384        self.__operations[operation_id] = OperationMeta(
385            operation_id=operation_id,
386            task=task,
387            description=description
388        )

Функция для сохранения данных об операции

Arguments:
  • operation_id (str): идентификатор операции
  • description (str): описание операции
  • task (asyncio.Task): объект task операции
def mark_operation_as_finished(self, operation_id: str, errors: list[str] | None = None):
390    def mark_operation_as_finished(self, operation_id: str, errors: list[str] | None = None):
391        """
392            Функция для завершения операции
393
394            Arguments:
395                operation_id (str): идентификатор операции
396                errors (list[str] | None): ошибки выполнения операции
397        """
398
399        if operation_id in self.operations:
400            self.operations[operation_id].mark_finished(errors)
401        else:
402            self.logger.error(f"Операция {operation_id} не найдена!")

Функция для завершения операции

Arguments:
  • operation_id (str): идентификатор операции
  • errors (list[str] | None): ошибки выполнения операции
async def additional_install_operations( self, operation_id: str, request: extensions.extension_service_pb2.InstallRequest, context) -> list[str]:
425    async def additional_install_operations(self, operation_id: str, request: extension_service_pb2.InstallRequest, context) -> list[str]:
426        """
427            Функция для возможности добавления любой дополнительной логики после завершения всех стандартных
428            процедур установки. Можно переопределять. Возвращает массив с ошибками.
429
430            Arguments:
431                   operation_id (str): идентификатор операции
432                   request (extension_service_pb2.InstallRequest): объект запроса
433                   context: контекст
434
435            Returns:
436                list[str]
437        """
438
439        return []

Функция для возможности добавления любой дополнительной логики после завершения всех стандартных процедур установки. Можно переопределять. Возвращает массив с ошибками.

Arguments:
  • operation_id (str): идентификатор операции
  • request (extension_service_pb2.InstallRequest): объект запроса
  • context: контекст
Returns:

list[str]

async def Install( self, request: extensions.extension_service_pb2.InstallRequest, context):
441    async def Install(self, request: extension_service_pb2.InstallRequest, context):
442        """
443            Реализация метода Install класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит
444            в себе формирование данных об операции. Переопределять не следует, для изменения логики Install
445            лучше работать с методами _Install если нужно всё радикально изменить или с методом
446            additional_install_operations для добавления чего-то дополнительного
447        """
448
449        operation_id = generate_operation_id()
450        operation_description = "Установка расширения %s для окружения %s пространства %s. %s force" % (
451            self.extension_id,
452            request.env_id,
453            request.space_id,
454            "С" if request.force else "Без"
455        )
456
457        self.logger.info(operation_description)
458
459        install_task = asyncio.create_task(self._Install(operation_id, request, context))
460
461        self.set_operation_meta(operation_id, operation_description, install_task)
462
463        return self.get_operation_meta(operation_id).to_operation()

Реализация метода Install класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит в себе формирование данных об операции. Переопределять не следует, для изменения логики Install лучше работать с методами _Install если нужно всё радикально изменить или с методом additional_install_operations для добавления чего-то дополнительного

async def additional_uninstall_operations( self, operation_id: str, request: extensions.extension_service_pb2.UninstallRequest, context) -> list[str]:
483    async def additional_uninstall_operations(self, operation_id: str, request: extension_service_pb2.UninstallRequest, context) -> list[str]:
484        """
485            Функция для возможности добавления любой дополнительной логики после завершения всех стандартных
486            процедур удаления. Можно переопределять. Возвращает массив с ошибками.
487
488            Arguments:
489                   operation_id (str): идентификатор операции
490                   request (extension_service_pb2.UninstallRequest): объект запроса
491                   context: контекст
492
493            Returns:
494                list[str]
495        """
496
497        return []

Функция для возможности добавления любой дополнительной логики после завершения всех стандартных процедур удаления. Можно переопределять. Возвращает массив с ошибками.

Arguments:
  • operation_id (str): идентификатор операции
  • request (extension_service_pb2.UninstallRequest): объект запроса
  • context: контекст
Returns:

list[str]

async def Uninstall( self, request: extensions.extension_service_pb2.UninstallRequest, context):
499    async def Uninstall(self, request: extension_service_pb2.UninstallRequest, context):
500        """
501            Реализация метода Uninstall класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит
502            в себе формирование данных об операции. Переопределять не следует, для изменения логики Uninstall
503            лучше работать с методами _Uninstall если нужно всё радикально изменить или с методом
504            additional_uninstall_operations для добавления чего-то дополнительного
505        """
506
507        operation_id = generate_operation_id()
508        operation_description = "Удаление расширения %s для окружения %s пространства %s. %s remove" % (
509            self.extension_id,
510            request.env_id,
511            request.space_id,
512            "С" if request.remove else "Без"
513        )
514
515        self.logger.info(operation_description)
516
517        uninstall_task = asyncio.create_task(self._Uninstall(operation_id, request, context))
518
519        self.set_operation_meta(operation_id, operation_description, uninstall_task)
520
521        return self.get_operation_meta(operation_id).to_operation()

Реализация метода Uninstall класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит в себе формирование данных об операции. Переопределять не следует, для изменения логики Uninstall лучше работать с методами _Uninstall если нужно всё радикально изменить или с методом additional_uninstall_operations для добавления чего-то дополнительного

async def additional_check_operations( self, operation_id: str, request: extensions.extension_service_pb2.CheckRequest, context) -> list[str]:
540    async def additional_check_operations(self, operation_id: str, request: extension_service_pb2.CheckRequest, context) -> list[str]:
541        """
542            Функция для возможности добавления любой дополнительной логики после завершения всех стандартных
543            процедур проверки. Можно переопределять. Возвращает массив с ошибками.
544
545            Arguments:
546                   operation_id (str): идентификатор операции
547                   request (extension_service_pb2.CheckRequest): объект запроса
548                   context: контекст
549
550            Returns:
551                list[str]
552        """
553
554        return []

Функция для возможности добавления любой дополнительной логики после завершения всех стандартных процедур проверки. Можно переопределять. Возвращает массив с ошибками.

Arguments:
  • operation_id (str): идентификатор операции
  • request (extension_service_pb2.CheckRequest): объект запроса
  • context: контекст
Returns:

list[str]

async def Check( self, request: extensions.extension_service_pb2.CheckRequest, context):
556    async def Check(self, request: extension_service_pb2.CheckRequest, context):
557        """
558            Реализация метода Check класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит
559            в себе формирование данных об операции. Переопределять не следует, для изменения логики Check
560            лучше работать с методами _Check если нужно всё радикально изменить или с методом
561            additional_check_operations для добавления чего-то дополнительного
562        """
563
564        operation_id = generate_operation_id()
565        operation_description = "Проверка расширения %s для окружения %s пространства %s" % (
566            self.extension_id,
567            request.env_id,
568            request.space_id,
569        )
570
571        check_task = asyncio.create_task(self._Check(operation_id, request, context))
572
573        self.set_operation_meta(operation_id, operation_description, check_task)
574
575        return self.get_operation_meta(operation_id).to_operation()

Реализация метода Check класса extension_service_pb2_grpc.ExtensionServiceServicer. Содержит в себе формирование данных об операции. Переопределять не следует, для изменения логики Check лучше работать с методами _Check если нужно всё радикально изменить или с методом additional_check_operations для добавления чего-то дополнительного

async def Action(self, request: extensions.extension_pb2.ActionRequest, context):
608    async def Action(self, request: extension_pb2.ActionRequest, context):
609        """
610            Реализация метода Action класса extension_service_pb2_grpc.ExtensionServiceServicer
611
612            Arguments:
613                request (extension_pb2.ActionRequest): объект запроса
614                context: контекст
615        """
616
617        operation_description = "Действие %s для окружения %s пространства %s" % (
618            request.action,
619            request.env_id,
620            request.space_id,
621        )
622
623        self.logger.info(operation_description)
624
625        response = await self._dispatch_action(request, context)
626
627        if response.state == extension_service_pb2.ActionResponse.State.ERROR:
628            error_description = "Ошибка действия %s в расширении %s: %s" % (
629                request.action,
630                self.extension_id,
631                response.error
632            )
633
634            self.logger.error(error_description)
635
636        return response

Реализация метода Action класса extension_service_pb2_grpc.ExtensionServiceServicer

Arguments:
  • request (extension_pb2.ActionRequest): объект запроса
  • context: контекст
async def Get( self, request: common.operation_service_pb2.GetOperationRequest, context):
638    async def Get(self, request: operation_service_pb2.GetOperationRequest, context):
639        """
640            Реализация метода Get класса operation_service_pb2_grpc.OperationServiceServicer. Используется
641            для получения данных о выполняемой операции
642
643            Arguments:
644                request (operation_service_pb2.GetOperationRequest): объект запроса
645                context: контекст
646        """
647
648        operations_meta = self.get_operation_meta(request.operation_id)
649
650        if not operations_meta:
651            error_description = "Ошибка проверки операции %s в расширении %s - не найдена" % (
652                request.operation_id,
653                self.extension_id,
654            )
655
656            context.set_code(grpc.StatusCode.UNKNOWN)
657            context.set_details(error_description)
658
659            self.logger.error(error_description)
660
661            return None
662
663        self.logger.info(
664            f"Статус операции {operations_meta.operation_id}:\n"
665            f"{operations_meta.description}\n"
666            f"{'выполняется' if not operations_meta.was_finished else 'завершена'}, \n"
667            f"{'без ошибок' if not operations_meta.errors else 'ошибки: ' + '; '.join(operations_meta.errors)}"
668        )
669
670        return operations_meta.to_operation()

Реализация метода Get класса operation_service_pb2_grpc.OperationServiceServicer. Используется для получения данных о выполняемой операции

Arguments:
  • request (operation_service_pb2.GetOperationRequest): объект запроса
  • context: контекст
def Cancel( self, request: common.operation_service_pb2.CancelOperationRequest, context):
672    def Cancel(self, request: operation_service_pb2.CancelOperationRequest, context):
673        """
674            Реализация метода Cancel класса operation_service_pb2_grpc.OperationServiceServicer. Используется
675            для отмены выполнения операции
676
677            Arguments:
678                request (operation_service_pb2.CancelOperationRequest): объект запроса
679                context: контекст
680        """
681
682        operations_meta = self.get_operation_meta(request.operation_id)
683
684        if not operations_meta:
685            error_description = "Не удалось удалить операцию %s в расширении %s - не найдена" % (
686                request.operation_id,
687                self.extension_id,
688            )
689
690            context.set_code(grpc.StatusCode.UNKNOWN)
691            context.set_details(error_description)
692
693            self.logger.error(
694                error_description
695            )
696
697            return None
698
699        try:
700            operations_meta.task.cancel()
701            operations_meta.mark_cancelled()
702        except Exception as e:
703            error_description = "Во время отмены операции %s расширении %s произошла ошибка %s" % (
704                request.operation_id,
705                self.extension_id,
706                str(e)
707            )
708
709            self.logger.error(error_description)
710
711            context.set_code(grpc.StatusCode.UNKNOWN)
712            context.set_details(error_description)
713
714            self.logger.exception(e)
715
716            return None
717
718        self.logger.info(
719            "Операция %s в расширении %s отменена" % (
720                request.operation_id,
721                self.extension_id,
722            )
723        )
724
725        return operations_meta.to_operation()

Реализация метода Cancel класса operation_service_pb2_grpc.OperationServiceServicer. Используется для отмены выполнения операции

Arguments:
  • request (operation_service_pb2.CancelOperationRequest): объект запроса
  • context: контекст