Skip to content
Snippets Groups Projects
Commit a9c2551b authored by Georgiy Eterevskiy's avatar Georgiy Eterevskiy
Browse files

Fix using channel context manager

parent b9dae3d1
No related branches found
No related tags found
No related merge requests found
......@@ -12,34 +12,34 @@ from perxis.clients import clients_pb2_grpc
from perxis.extensions import extension_pb2_grpc, manager_pb2_grpc, manager_pb2
from perxis.interceptors import header_adder_interceptor
def bootstrap(
ext_descriptor: manager_pb2.ExtensionDescriptor, servicer_cls: extension_pb2_grpc.ExtensionServicer,
ext_manager_host: str, content_host: str,
):
logger = logging.getLogger(__name__)
logger.info(f"Инициализация сервиса расширения {ext_descriptor.extension}")
interceptor = header_adder_interceptor(
'x-perxis-access', 'system'
)
loop = asyncio.get_event_loop()
async def _main(
ext_descriptor: manager_pb2.ExtensionDescriptor,
servicer_cls: extension_pb2_grpc.ExtensionServicer,
ext_manager_host: str,
content_host: str
):
@aiocron.crontab('* * * * *', start=False)
def register_extension():
ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest(
async def register_extension():
await ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest(
extensions=[ext_descriptor]
))
logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано в perxis")
with grpc.aio.insecure_channel(ext_manager_host) as extensions_manager_channel:
interceptor = header_adder_interceptor(
'x-perxis-access', 'system'
)
async with grpc.aio.insecure_channel(ext_manager_host) as extensions_manager_channel:
intercept_channel_extensions_manager_channel = grpc.intercept_channel(extensions_manager_channel, interceptor)
ext_manager_stub = manager_pb2_grpc.ExtensionManagerStub(intercept_channel_extensions_manager_channel)
registered_extensions: manager_pb2.ListExtensionsResponse = ext_manager_stub.ListExtensions(manager_pb2.ListExtensionsRequest())
registered_extensions: manager_pb2.ListExtensionsResponse = await ext_manager_stub.ListExtensions(
manager_pb2.ListExtensionsRequest()
)
# todo enable after fix - https://tracker.yandex.ru/PRXS-1507
# for ext in registered_extensions.extensions:
......@@ -57,7 +57,7 @@ def bootstrap(
register_extension.start()
with grpc.aio.insecure_channel(content_host) as content_channel:
async with grpc.aio.insecure_channel(content_host) as content_channel:
intercepted_content_channel = grpc.intercept_channel(content_channel, interceptor)
collections_stub = collections_pb2_grpc.CollectionsStub(intercepted_content_channel)
......@@ -66,19 +66,36 @@ def bootstrap(
environments_stub = environments_pb2_grpc.EnvironmentsStub(intercepted_content_channel)
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
loop.create_task(
extension_pb2_grpc.add_ExtensionServicer_to_server(
await extension_pb2_grpc.add_ExtensionServicer_to_server(
servicer_cls(
collections_stub, environments_stub, roles_stub, clients_stub
), server
)
)
server.add_insecure_port("[::]:50051")
loop.create_task(server.start())
loop.create_task(server.wait_for_termination())
await server.start()
await server.wait_for_termination()
def bootstrap(
ext_descriptor: manager_pb2.ExtensionDescriptor,
servicer_cls: extension_pb2_grpc.ExtensionServicer,
ext_manager_host: str,
content_host: str,
):
logger.info(f"Инициализация сервиса расширения {ext_descriptor.extension}")
loop = asyncio.get_event_loop()
try:
loop.run_forever()
loop.run_until_complete(
_main(
ext_descriptor,
servicer_cls,
ext_manager_host,
content_host
)
)
finally:
loop.close()
loop.stop()
......
......@@ -14,7 +14,7 @@ def load_requirements():
setup(
name='perxis',
version='1.0.6',
version='1.0.7',
description='Perxis python client',
long_description=long_description,
long_description_content_type='text/markdown',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment