Skip to content
Snippets Groups Projects
Commit 4e8693d2 authored by Eterevskiy Georgiy's avatar Eterevskiy Georgiy
Browse files

Merge branch 'feature/add_periodic_extension_registration' into 'master'

Fix using channel context manager

See merge request !54
parents 531a765c a9c2551b
No related branches found
No related tags found
1 merge request!54Fix using channel context manager
Pipeline #35052 passed with stage
in 29 seconds
...@@ -12,34 +12,34 @@ from perxis.clients import clients_pb2_grpc ...@@ -12,34 +12,34 @@ from perxis.clients import clients_pb2_grpc
from perxis.extensions import extension_pb2_grpc, manager_pb2_grpc, manager_pb2 from perxis.extensions import extension_pb2_grpc, manager_pb2_grpc, manager_pb2
from perxis.interceptors import header_adder_interceptor from perxis.interceptors import header_adder_interceptor
logger = logging.getLogger(__name__)
def bootstrap(
ext_descriptor: manager_pb2.ExtensionDescriptor, servicer_cls: extension_pb2_grpc.ExtensionServicer,
ext_manager_host: str, content_host: str,
):
logger = logging.getLogger(__name__)
logger.info(f"Инициализация сервиса расширения {ext_descriptor.extension}")
interceptor = header_adder_interceptor(
'x-perxis-access', 'system'
)
loop = asyncio.get_event_loop()
async def _main(
ext_descriptor: manager_pb2.ExtensionDescriptor,
servicer_cls: extension_pb2_grpc.ExtensionServicer,
ext_manager_host: str,
content_host: str
):
@aiocron.crontab('* * * * *', start=False) @aiocron.crontab('* * * * *', start=False)
def register_extension(): async def register_extension():
ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest( await ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest(
extensions=[ext_descriptor] extensions=[ext_descriptor]
)) ))
logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано в perxis") logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано в perxis")
with grpc.aio.insecure_channel(ext_manager_host) as extensions_manager_channel: interceptor = header_adder_interceptor(
'x-perxis-access', 'system'
)
async with grpc.aio.insecure_channel(ext_manager_host) as extensions_manager_channel:
intercept_channel_extensions_manager_channel = grpc.intercept_channel(extensions_manager_channel, interceptor) intercept_channel_extensions_manager_channel = grpc.intercept_channel(extensions_manager_channel, interceptor)
ext_manager_stub = manager_pb2_grpc.ExtensionManagerStub(intercept_channel_extensions_manager_channel) ext_manager_stub = manager_pb2_grpc.ExtensionManagerStub(intercept_channel_extensions_manager_channel)
registered_extensions: manager_pb2.ListExtensionsResponse = ext_manager_stub.ListExtensions(manager_pb2.ListExtensionsRequest()) registered_extensions: manager_pb2.ListExtensionsResponse = await ext_manager_stub.ListExtensions(
manager_pb2.ListExtensionsRequest()
)
# todo enable after fix - https://tracker.yandex.ru/PRXS-1507 # todo enable after fix - https://tracker.yandex.ru/PRXS-1507
# for ext in registered_extensions.extensions: # for ext in registered_extensions.extensions:
...@@ -57,7 +57,7 @@ def bootstrap( ...@@ -57,7 +57,7 @@ def bootstrap(
register_extension.start() register_extension.start()
with grpc.aio.insecure_channel(content_host) as content_channel: async with grpc.aio.insecure_channel(content_host) as content_channel:
intercepted_content_channel = grpc.intercept_channel(content_channel, interceptor) intercepted_content_channel = grpc.intercept_channel(content_channel, interceptor)
collections_stub = collections_pb2_grpc.CollectionsStub(intercepted_content_channel) collections_stub = collections_pb2_grpc.CollectionsStub(intercepted_content_channel)
...@@ -66,20 +66,37 @@ def bootstrap( ...@@ -66,20 +66,37 @@ def bootstrap(
environments_stub = environments_pb2_grpc.EnvironmentsStub(intercepted_content_channel) environments_stub = environments_pb2_grpc.EnvironmentsStub(intercepted_content_channel)
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10)) server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
loop.create_task( await extension_pb2_grpc.add_ExtensionServicer_to_server(
extension_pb2_grpc.add_ExtensionServicer_to_server( servicer_cls(
servicer_cls( collections_stub, environments_stub, roles_stub, clients_stub
collections_stub, environments_stub, roles_stub, clients_stub ), server
), server
)
) )
server.add_insecure_port("[::]:50051") server.add_insecure_port("[::]:50051")
loop.create_task(server.start()) await server.start()
loop.create_task(server.wait_for_termination()) await server.wait_for_termination()
try:
loop.run_forever() def bootstrap(
finally: ext_descriptor: manager_pb2.ExtensionDescriptor,
loop.close() servicer_cls: extension_pb2_grpc.ExtensionServicer,
loop.stop() ext_manager_host: str,
logging.info('Successfully shutdown service') content_host: str,
):
logger.info(f"Инициализация сервиса расширения {ext_descriptor.extension}")
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
_main(
ext_descriptor,
servicer_cls,
ext_manager_host,
content_host
)
)
finally:
loop.close()
loop.stop()
logging.info('Successfully shutdown service')
...@@ -14,7 +14,7 @@ def load_requirements(): ...@@ -14,7 +14,7 @@ def load_requirements():
setup( setup(
name='perxis', name='perxis',
version='1.0.6', version='1.0.7',
description='Perxis python client', description='Perxis python client',
long_description=long_description, long_description=long_description,
long_description_content_type='text/markdown', long_description_content_type='text/markdown',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment