diff --git a/perxis/extensions/bootstrap.py b/perxis/extensions/bootstrap.py index d13764b305ee63ff4c7c337b8a710c0c706a7ba9..8f532f15797cc51a74c9f01923a425dcb11088ff 100644 --- a/perxis/extensions/bootstrap.py +++ b/perxis/extensions/bootstrap.py @@ -21,23 +21,22 @@ async def _main( ext_manager_host: str, content_host: str ): - metadata = grpc.aio.Metadata( - ('x-perxis-access', 'system'), + interceptor = header_adder_interceptor( + 'x-perxis-access', 'system' ) @aiocron.crontab('* * * * *', start=False) async def register_extension(): await ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest( extensions=[ext_descriptor] - ), metadata=metadata) + )) logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано РІ perxis") - async with grpc.aio.insecure_channel(ext_manager_host) as extensions_manager_channel: - + async with grpc.aio.insecure_channel(ext_manager_host, interceptors=[interceptor]) as extensions_manager_channel: ext_manager_stub = manager_pb2_grpc.ExtensionManagerStub(extensions_manager_channel) registered_extensions: manager_pb2.ListExtensionsResponse = await ext_manager_stub.ListExtensions( - manager_pb2.ListExtensionsRequest(), metadata=metadata + manager_pb2.ListExtensionsRequest() ) # todo enable after fix - https://tracker.yandex.ru/PRXS-1507 @@ -56,7 +55,7 @@ async def _main( register_extension.start() - async with grpc.aio.insecure_channel(content_host) as content_channel: + async with grpc.aio.insecure_channel(content_host, interceptors=[interceptor]) as content_channel: collections_stub = collections_pb2_grpc.CollectionsStub(content_channel) roles_stub = roles_pb2_grpc.RolesStub(content_channel) diff --git a/perxis/extensions/extension_service.py b/perxis/extensions/extension_service.py index 198c7f454f7d126864baf6751217b2e5749934c3..4bb8ed984ea5fe4cd3a9b2c6624a95612b0e96af 100644 --- a/perxis/extensions/extension_service.py +++ b/perxis/extensions/extension_service.py @@ -75,8 +75,8 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): ) ] - def _Install(self, request: extension_pb2.InstallRequest, context): - errors_list = self.extension_setup.install( + async def _Install(self, request: extension_pb2.InstallRequest, context): + errors_list = await self.extension_setup.install( request.space_id, request.env_id, request.force ) @@ -93,7 +93,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): )] ) - def Install(self, request: extension_pb2.InstallRequest, context): + async def Install(self, request: extension_pb2.InstallRequest, context): self.logger.info( "Установка расширения %s для окружения %s пространства %s. %s force" % ( self.extension_id, @@ -104,7 +104,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): ) try: - response = self._Install(request, context) + response = await self._Install(request, context) except Exception as e: response = extension_pb2.InstallResponse( results=self.ext_request_results_from_exception(e) @@ -118,8 +118,8 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): return response - def _Update(self, request: extension_pb2.UpdateRequest, context): - errors_list = self.extension_setup.update( + async def _Update(self, request: extension_pb2.UpdateRequest, context): + errors_list = await self.extension_setup.update( request.space_id, request.env_id, request.force ) @@ -136,7 +136,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): )] ) - def Update(self, request: extension_pb2.UpdateRequest, context): + async def Update(self, request: extension_pb2.UpdateRequest, context): self.logger.info( "Обновление расширения %s для окружения %s пространства %s. %s force" % ( self.extension_id, @@ -147,7 +147,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): ) try: - response = self._Update(request, context) + response = await self._Update(request, context) except Exception as e: response = extension_pb2.UpdateResponse( results=self.ext_request_results_from_exception(e) @@ -161,8 +161,8 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): return response - def _Uninstall(self, request: extension_pb2.UninstallRequest, context): - errors_list: list[str] = self.extension_setup.uninstall(request.space_id, request.env_id, request.remove) + async def _Uninstall(self, request: extension_pb2.UninstallRequest, context): + errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove) response_state = extension_pb2.ExtensionRequestResult.State.OK \ if not errors_list \ @@ -177,7 +177,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): )] ) - def Uninstall(self, request: extension_pb2.UninstallRequest, context): + async def Uninstall(self, request: extension_pb2.UninstallRequest, context): self.logger.info( "Удаление расширения %s для окружения %s пространства %s. %s remove" % ( self.extension_id, @@ -188,7 +188,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): ) try: - response = self._Uninstall(request, context) + response = await self._Uninstall(request, context) except Exception as e: response = extension_pb2.UninstallResponse( results=self.ext_request_results_from_exception(e) @@ -202,8 +202,8 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): return response - def _Check(self, request: extension_pb2.CheckRequest, context): - errors_list = self.extension_setup.check(request.space_id, request.env_id) + async def _Check(self, request: extension_pb2.CheckRequest, context): + errors_list = await self.extension_setup.check(request.space_id, request.env_id) response_state = extension_pb2.ExtensionRequestResult.State.OK \ if not errors_list \ @@ -218,7 +218,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): )] ) - def Check(self, request: extension_pb2.CheckRequest, context): + async def Check(self, request: extension_pb2.CheckRequest, context): self.logger.info( "Проверка расширения %s для окружения %s пространства %s" % ( self.extension_id, @@ -228,7 +228,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): ) try: - response = self._Check(request, context) + response = await self._Check(request, context) except Exception as e: response = extension_pb2.CheckResponse( results=self.ext_request_results_from_exception(e) @@ -242,7 +242,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer): return response - def Action(self, request: extension_pb2.ActionRequest, context): + async def Action(self, request: extension_pb2.ActionRequest, context): context.set_code(grpc.StatusCode.UNKNOWN) context.set_details("Unknown action") diff --git a/perxis/extensions/extension_setup.py b/perxis/extensions/extension_setup.py index 2b8d96d5d6910e53012ef32935421b69dded1bf0..e353c2303f191180166ee90bf5e36b2811b20b0e 100644 --- a/perxis/extensions/extension_setup.py +++ b/perxis/extensions/extension_setup.py @@ -1,3 +1,5 @@ +import logging + import grpc import json import time @@ -14,6 +16,9 @@ from perxis.clients import clients_pb2_grpc, clients_pb2 from perxis.environments import environments_pb2_grpc, environments_pb2 +logger = logging.getLogger(__name__) + + class ExtensionSetup: def __init__( self, @@ -53,12 +58,12 @@ class ExtensionSetup: self.clients = clients # Работа СЃ ролями - def __remove_roles(self, space_id: str) -> list[str]: + async def __remove_roles(self, space_id: str) -> list[str]: errors_list: list[str] = [] for role in self.roles: try: - self.roles_service.Delete.with_call( + await self.roles_service.Delete( roles_pb2.DeleteRequest( space_id=space_id, role_id=role.id ) @@ -70,12 +75,12 @@ class ExtensionSetup: return errors_list - def __check_roles(self, space_id: str) -> list[str]: + async def __check_roles(self, space_id: str) -> list[str]: errors_list = [] for role in self.roles: try: - self.roles_service.Get.with_call( + await self.roles_service.Get( roles_pb2.GetRequest(space_id=space_id, role_id=role.id) ) except grpc.RpcError as e: @@ -83,12 +88,12 @@ class ExtensionSetup: return errors_list - def __update_roles(self, space_id: str, env_id: str) -> list[str]: + async def __update_roles(self, space_id: str, env_id: str) -> list[str]: errors_list = [] for local_role in self.roles: try: - get_response, state = self.roles_service.Get.with_call( + get_response = await self.roles_service.Get( roles_pb2.GetRequest( space_id=space_id, role_id=local_role.id @@ -135,7 +140,7 @@ class ExtensionSetup: )) try: - self.roles_service.Update.with_call( + await self.roles_service.Update( roles_pb2.UpdateRequest( role=cloned_role ) @@ -150,7 +155,7 @@ class ExtensionSetup: cloned_role = copy.deepcopy(local_role) cloned_role.space_id = space_id - response, _ = self.roles_service.Create.with_call( + response = await self.roles_service.Create( roles_pb2.CreateRequest( role=cloned_role ), @@ -163,12 +168,12 @@ class ExtensionSetup: return errors_list # Работа СЃ клиентами - def __check_clients(self, space_id: str) -> list[str]: + async def __check_clients(self, space_id: str) -> list[str]: errors_list = [] for client in self.clients: try: - self.clients_service.Get.with_call( + await self.clients_service.Get( clients_pb2.GetRequest(space_id=space_id, id=client.id) ) except grpc.RpcError as e: @@ -176,12 +181,12 @@ class ExtensionSetup: return errors_list - def __remove_clients(self, space_id: str) -> list[str]: + async def __remove_clients(self, space_id: str) -> list[str]: errors_list: list[str] = [] for client in self.clients: try: - self.clients_service.Delete.with_call( + await self.clients_service.Delete( clients_pb2.DeleteRequest( space_id=space_id, id=client.id ) @@ -193,7 +198,7 @@ class ExtensionSetup: return errors_list - def __update_clients(self, space_id: str) -> list[str]: + async def __update_clients(self, space_id: str) -> list[str]: errors_list = [] for local_client in self.clients: @@ -201,7 +206,7 @@ class ExtensionSetup: # Перед обновлением клиента предварительно РЅСѓР¶РЅРѕ получить текущую запись чтобы скопировать оттуда # токены. Рначе после обновления расширения перестанут работать РІСЃРµ приложения которые использовали # токен клиента - get_response, state = self.clients_service.Get.with_call( + get_response = await self.clients_service.Get( clients_pb2.GetRequest( space_id=space_id, id=local_client.id @@ -232,7 +237,7 @@ class ExtensionSetup: tls=client.tls ) - self.clients_service.Update.with_call( + await self.clients_service.Update( clients_pb2.UpdateRequest( client=new_client ) @@ -244,7 +249,7 @@ class ExtensionSetup: cloned_client = copy.deepcopy(local_client) cloned_client.space_id = space_id - response, _ = self.clients_service.Create.with_call( + await self.clients_service.Create( clients_pb2.CreateRequest( client=cloned_client ), @@ -255,12 +260,12 @@ class ExtensionSetup: return errors_list # Работа СЃ коллекциями - def __remove_collections(self, space_id: str, env_id: str) -> list[str]: + async def __remove_collections(self, space_id: str, env_id: str) -> list[str]: errors_list: list[str] = [] for collection in self.collections: try: - self.collections_service.Delete.with_call( + await self.collections_service.Delete( collections_pb2.DeleteRequest( space_id=space_id, env_id=env_id, collection_id=collection.id ) @@ -272,12 +277,12 @@ class ExtensionSetup: return errors_list - def __check_collections(self, space_id: str, env_id: str) -> list[str]: + async def __check_collections(self, space_id: str, env_id: str) -> list[str]: errors_list = [] for collection in self.collections: try: - self.collections_service.Get.with_call( + await self.collections_service.Get( collections_pb2.GetRequest(space_id=space_id, env_id=env_id, collection_id=collection.id) ) except grpc.RpcError as e: @@ -285,7 +290,7 @@ class ExtensionSetup: return errors_list - def __update_collections(self, space_id: str, env_id: str) -> list[str]: + async def __update_collections(self, space_id: str, env_id: str) -> list[str]: """ Метод __обновления__ коллекций. Миграция окружения требуется только РІ случае если РѕРґРЅР° или несколько схем коллекций изменялись. Алгоритм работы: @@ -302,7 +307,7 @@ class ExtensionSetup: for local_collection in self.collections: try: # Необходимо получить текущую версию коллекции для того чтобы сравнить схемы - get_response, state = self.collections_service.Get.with_call( + get_response = await self.collections_service.Get( collections_pb2.GetRequest(space_id=space_id, env_id=env_id, collection_id=local_collection.id) ) @@ -317,7 +322,7 @@ class ExtensionSetup: # Коллекция может быть РЅРµ найдена РІ случае если РѕРЅР° добавлена РІ РЅРѕРІРѕР№ версии if not collection: try: - create_response, state = self.collections_service.Create.with_call( + create_response = await self.collections_service.Create( collections_pb2.CreateRequest(collection=cloned_collection) ) @@ -335,7 +340,7 @@ class ExtensionSetup: cloned_collection.space_id = space_id cloned_collection.env_id = env_id - self.collections_service.Update.with_call( + await self.collections_service.Update( collections_pb2.UpdateRequest(collection=cloned_collection) ) except grpc.RpcError as e: @@ -352,20 +357,20 @@ class ExtensionSetup: if diff: need_to_migrate_environment = True - set_schema_error_message = self.__set_collection_schema( + set_schema_error_message = await self.__set_collection_schema( space_id, env_id, local_collection.id, local_collection.schema ) if set_schema_error_message: errors_list.append(set_schema_error_message) if need_to_migrate_environment: - migrate_environment_error_message = self.__migrate_environment(space_id, env_id) + migrate_environment_error_message = await self.__migrate_environment(space_id, env_id) if migrate_environment_error_message: errors_list.append(migrate_environment_error_message) return errors_list - def __migrate_environment(self, space_id: str, env_id: str) -> typing.Optional[str]: + async def __migrate_environment(self, space_id: str, env_id: str) -> typing.Optional[str]: # Так как perxis может РЅРµ сразу выставить коллекции / окружению статус ready операцию необходимо выполнять # СЃ попытками @@ -377,7 +382,7 @@ class ExtensionSetup: time.sleep(self.__sleep_time) try: - self.environments_service.Migrate(environments_pb2.MigrateRequest( + await self.environments_service.Migrate(environments_pb2.MigrateRequest( space_id=space_id, env_id=env_id, options=environments_pb2.MigrateOptions( @@ -398,7 +403,7 @@ class ExtensionSetup: return error_message - def __set_collection_schema(self, space_id: str, env_id: str, collection_id: str, schema: str) -> typing.Optional[str]: + async def __set_collection_schema(self, space_id: str, env_id: str, collection_id: str, schema: str) -> typing.Optional[str]: # Так как perxis может РЅРµ сразу выставить коллекции / окружению статус ready операцию необходимо выполнять # СЃ попытками @@ -410,7 +415,7 @@ class ExtensionSetup: time.sleep(self.__sleep_time) try: - self.collections_service.SetSchema.with_call( + await self.collections_service.SetSchema( collections_pb2.SetSchemaRequest( space_id=space_id, env_id=env_id, @@ -432,48 +437,48 @@ class ExtensionSetup: return error_message - def install(self, space_id: str, env_id: str, use_force: bool) -> list[str]: + async def install(self, space_id: str, env_id: str, use_force: bool) -> list[str]: errors = [] if use_force: - errors += self.__remove_collections(space_id, env_id) - errors += self.__remove_clients(space_id) - errors += self.__remove_roles(space_id) + errors += await self.__remove_collections(space_id, env_id) + errors += await self.__remove_clients(space_id) + errors += await self.__remove_roles(space_id) - errors += self.__update_collections(space_id, env_id) - errors += self.__update_roles(space_id, env_id) - errors += self.__update_clients(space_id) + errors += await self.__update_collections(space_id, env_id) + errors += await self.__update_roles(space_id, env_id) + errors += await self.__update_clients(space_id) return errors - def update(self, space_id: str, env_id: str, use_force: bool) -> list[str]: + async def update(self, space_id: str, env_id: str, use_force: bool) -> list[str]: errors = [] # Р’ случае обновление расширения СЃ флагом force РЅСѓР¶РЅРѕ предварительно удалить РІСЃРµ сущности. if use_force: - errors += self.__remove_clients(space_id) - errors += self.__remove_roles(space_id) - errors += self.__remove_collections(space_id, env_id) + errors += await self.__remove_clients(space_id) + errors += await self.__remove_roles(space_id) + errors += await self.__remove_collections(space_id, env_id) - errors += self.__update_collections(space_id, env_id) - errors += self.__update_roles(space_id, env_id) - errors += self.__update_clients(space_id) + errors += await self.__update_collections(space_id, env_id) + errors += await self.__update_roles(space_id, env_id) + errors += await self.__update_clients(space_id) return errors - def check(self, space_id: str, env_id: str) -> list[str]: - errors = self.__check_collections(space_id, env_id) - errors += self.__check_roles(space_id) - errors += self.__check_clients(space_id) + async def check(self, space_id: str, env_id: str) -> list[str]: + errors = await self.__check_collections(space_id, env_id) + errors += await self.__check_roles(space_id) + errors += await self.__check_clients(space_id) return errors - def uninstall(self, space_id: str, env_id: str, use_remove: bool) -> list[str]: + async def uninstall(self, space_id: str, env_id: str, use_remove: bool) -> list[str]: errors = [] if use_remove: - errors += self.__remove_collections(space_id, env_id) - errors += self.__remove_clients(space_id) - errors += self.__remove_roles(space_id) + errors += await self.__remove_collections(space_id, env_id) + errors += await self.__remove_clients(space_id) + errors += await self.__remove_roles(space_id) return errors diff --git a/perxis/interceptors.py b/perxis/interceptors.py index cca3b8e817bb49781dcb9b76a1f1fcea5fe258e2..17241306bc8056ffc7304d24f29cc2933e43ea3a 100644 --- a/perxis/interceptors.py +++ b/perxis/interceptors.py @@ -3,38 +3,38 @@ import collections class _GenericClientInterceptor( - grpc.UnaryUnaryClientInterceptor, - grpc.UnaryStreamClientInterceptor, - grpc.StreamUnaryClientInterceptor, - grpc.StreamStreamClientInterceptor, + grpc.aio.UnaryUnaryClientInterceptor, + grpc.aio.UnaryStreamClientInterceptor, + grpc.aio.StreamUnaryClientInterceptor, + grpc.aio.StreamStreamClientInterceptor, ): def __init__(self, interceptor_function): self._fn = interceptor_function - def intercept_unary_unary(self, continuation, client_call_details, request): + async def intercept_unary_unary(self, continuation, client_call_details, request): new_details, new_request_iterator, postprocess = self._fn( client_call_details, iter((request,)), False, False ) - response = continuation(new_details, next(new_request_iterator)) + response = await continuation(new_details, next(new_request_iterator)) return postprocess(response) if postprocess else response - def intercept_unary_stream(self, continuation, client_call_details, request): + async def intercept_unary_stream(self, continuation, client_call_details, request): new_details, new_request_iterator, postprocess = self._fn( client_call_details, iter((request,)), False, True ) response_it = continuation(new_details, next(new_request_iterator)) return postprocess(response_it) if postprocess else response_it - def intercept_stream_unary( + async def intercept_stream_unary( self, continuation, client_call_details, request_iterator ): new_details, new_request_iterator, postprocess = self._fn( client_call_details, request_iterator, True, False ) - response = continuation(new_details, new_request_iterator) + response = await continuation(new_details, new_request_iterator) return postprocess(response) if postprocess else response - def intercept_stream_stream( + async def intercept_stream_stream( self, continuation, client_call_details, request_iterator ): new_details, new_request_iterator, postprocess = self._fn( @@ -51,7 +51,7 @@ def create(intercept_call): class _ClientCallDetails( collections.namedtuple( typename="_ClientCallDetails", - field_names=("method", "timeout", "metadata", "credentials"), + field_names=("method", "timeout", "metadata", "credentials", "wait_for_ready"), ), grpc.ClientCallDetails, ): @@ -77,6 +77,7 @@ def header_adder_interceptor(header, value): client_call_details.timeout, metadata, client_call_details.credentials, + client_call_details.wait_for_ready, ) return client_call_details, request_iterator, None diff --git a/setup.py b/setup.py index 68e2c33c9640ac39660b8498ab8a1becae71279f..37aafb0d6f42240f5ccd5a91fc661f6f3e07e3bc 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ def load_requirements(): setup( name='perxis', - version='1.1.0', + version='1.2.0', description='Perxis python client', long_description=long_description, long_description_content_type='text/markdown',