From a9c2551be2ba516c739bc5ac2efabfbd5373e4f0 Mon Sep 17 00:00:00 2001
From: Georgiy Eterevskiy <goshik_e@mail.ru>
Date: Wed, 13 Sep 2023 13:19:24 +0300
Subject: [PATCH] Fix using channel context manager

---
 perxis/extensions/bootstrap.py | 83 ++++++++++++++++++++--------------
 setup.py                       |  2 +-
 2 files changed, 51 insertions(+), 34 deletions(-)

diff --git a/perxis/extensions/bootstrap.py b/perxis/extensions/bootstrap.py
index 9ff0e0b..413b559 100644
--- a/perxis/extensions/bootstrap.py
+++ b/perxis/extensions/bootstrap.py
@@ -12,34 +12,34 @@ from perxis.clients import clients_pb2_grpc
 from perxis.extensions import extension_pb2_grpc, manager_pb2_grpc, manager_pb2
 from perxis.interceptors import header_adder_interceptor
 
+logger = logging.getLogger(__name__)
 
-def bootstrap(
-        ext_descriptor: manager_pb2.ExtensionDescriptor, servicer_cls: extension_pb2_grpc.ExtensionServicer,
-        ext_manager_host: str, content_host: str,
-):
-    logger = logging.getLogger(__name__)
-
-    logger.info(f"Инициализация сервиса расширения {ext_descriptor.extension}")
-
-    interceptor = header_adder_interceptor(
-        'x-perxis-access', 'system'
-    )
-
-    loop = asyncio.get_event_loop()
 
+async def _main(
+        ext_descriptor: manager_pb2.ExtensionDescriptor,
+        servicer_cls: extension_pb2_grpc.ExtensionServicer,
+        ext_manager_host: str,
+        content_host: str
+):
     @aiocron.crontab('* * * * *', start=False)
-    def register_extension():
-        ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest(
+    async def register_extension():
+        await ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest(
             extensions=[ext_descriptor]
         ))
         logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано в perxis")
 
-    with grpc.aio.insecure_channel(ext_manager_host) as extensions_manager_channel:
+    interceptor = header_adder_interceptor(
+        'x-perxis-access', 'system'
+    )
+
+    async with grpc.aio.insecure_channel(ext_manager_host) as extensions_manager_channel:
         intercept_channel_extensions_manager_channel = grpc.intercept_channel(extensions_manager_channel, interceptor)
 
         ext_manager_stub = manager_pb2_grpc.ExtensionManagerStub(intercept_channel_extensions_manager_channel)
 
-        registered_extensions: manager_pb2.ListExtensionsResponse = ext_manager_stub.ListExtensions(manager_pb2.ListExtensionsRequest())
+        registered_extensions: manager_pb2.ListExtensionsResponse = await ext_manager_stub.ListExtensions(
+            manager_pb2.ListExtensionsRequest()
+        )
 
         # todo enable after fix - https://tracker.yandex.ru/PRXS-1507
         # for ext in registered_extensions.extensions:
@@ -57,7 +57,7 @@ def bootstrap(
 
         register_extension.start()
 
-        with grpc.aio.insecure_channel(content_host) as content_channel:
+        async with grpc.aio.insecure_channel(content_host) as content_channel:
             intercepted_content_channel = grpc.intercept_channel(content_channel, interceptor)
 
             collections_stub = collections_pb2_grpc.CollectionsStub(intercepted_content_channel)
@@ -66,20 +66,37 @@ def bootstrap(
             environments_stub = environments_pb2_grpc.EnvironmentsStub(intercepted_content_channel)
 
             server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
-            loop.create_task(
-                extension_pb2_grpc.add_ExtensionServicer_to_server(
-                    servicer_cls(
-                        collections_stub, environments_stub, roles_stub, clients_stub
-                    ), server
-                )
+            await extension_pb2_grpc.add_ExtensionServicer_to_server(
+                servicer_cls(
+                    collections_stub, environments_stub, roles_stub, clients_stub
+                ), server
             )
             server.add_insecure_port("[::]:50051")
-            loop.create_task(server.start())
-            loop.create_task(server.wait_for_termination())
-
-            try:
-                loop.run_forever()
-            finally:
-                loop.close()
-                loop.stop()
-                logging.info('Successfully shutdown service')
+            await server.start()
+            await server.wait_for_termination()
+
+
+def bootstrap(
+        ext_descriptor: manager_pb2.ExtensionDescriptor,
+        servicer_cls: extension_pb2_grpc.ExtensionServicer,
+        ext_manager_host: str,
+        content_host: str,
+):
+
+    logger.info(f"Инициализация сервиса расширения {ext_descriptor.extension}")
+
+    loop = asyncio.get_event_loop()
+
+    try:
+        loop.run_until_complete(
+            _main(
+                ext_descriptor,
+                servicer_cls,
+                ext_manager_host,
+                content_host
+            )
+        )
+    finally:
+        loop.close()
+        loop.stop()
+        logging.info('Successfully shutdown service')
diff --git a/setup.py b/setup.py
index b168857..0d17b14 100644
--- a/setup.py
+++ b/setup.py
@@ -14,7 +14,7 @@ def load_requirements():
 
 setup(
     name='perxis',
-    version='1.0.6',
+    version='1.0.7',
     description='Perxis python client',
     long_description=long_description,
     long_description_content_type='text/markdown',
-- 
GitLab