Skip to content
Snippets Groups Projects

Реализация ExtensionService с учётом long polling операций

1 file
+ 24
19
Compare changes
  • Side-by-side
  • Inline
@@ -9,15 +9,16 @@ from perxis.collections import collections_pb2_grpc
@@ -9,15 +9,16 @@ from perxis.collections import collections_pb2_grpc
from perxis.environments import environments_pb2_grpc
from perxis.environments import environments_pb2_grpc
from perxis.roles import roles_pb2_grpc
from perxis.roles import roles_pb2_grpc
from perxis.clients import clients_pb2_grpc
from perxis.clients import clients_pb2_grpc
from perxis.extensions import extension_pb2_grpc, manager_pb2_grpc, manager_pb2
from perxis.common import operation_service_pb2_grpc
 
from perxis.extensions import manager_service_pb2_grpc, manager_service_pb2, extension_service_pb2_grpc
from perxis.interceptors import header_adder_interceptor
from perxis.interceptors import header_adder_interceptor
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
async def _main(
async def _main(
ext_descriptor: manager_pb2.ExtensionDescriptor,
ext_descriptor: manager_service_pb2.ExtensionDescriptor,
servicer_cls: extension_pb2_grpc.ExtensionServicer,
servicer_cls: extension_service_pb2_grpc.ExtensionServiceServicer,
ext_manager_host: str,
ext_manager_host: str,
content_host: str
content_host: str
):
):
@@ -25,33 +26,38 @@ async def _main(
@@ -25,33 +26,38 @@ async def _main(
'x-perxis-access', 'system'
'x-perxis-access', 'system'
)
)
@aiocron.crontab('* * * * *', start=False)
async def _register_extension():
async def register_extension():
await ext_manager_stub.RegisterExtensions(manager_service_pb2.RegisterExtensionsRequest(
await ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest(
extensions=[ext_descriptor]
extensions=[ext_descriptor]
))
))
logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано в perxis")
logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано в perxis")
 
@aiocron.crontab('* * * * *', start=False)
 
async def register_extension():
 
await _register_extension()
 
async with grpc.aio.insecure_channel(ext_manager_host, interceptors=[interceptor]) as extensions_manager_channel:
async with grpc.aio.insecure_channel(ext_manager_host, interceptors=[interceptor]) as extensions_manager_channel:
ext_manager_stub = manager_pb2_grpc.ExtensionManagerStub(extensions_manager_channel)
ext_manager_stub = manager_service_pb2_grpc.ExtensionManagerServiceStub(extensions_manager_channel)
registered_extensions: manager_pb2.ListExtensionsResponse = await ext_manager_stub.ListExtensions(
registered_extensions: manager_service_pb2.ListRegisteredExtensionsResponse = await ext_manager_stub.ListRegisteredExtensions(
manager_pb2.ListExtensionsRequest()
manager_service_pb2.ListRegisteredExtensionsRequest()
)
)
# todo enable after fix - https://tracker.yandex.ru/PRXS-1507
for ext in registered_extensions.extensions:
# for ext in registered_extensions.extensions:
if ext.extension == ext_descriptor.extension:
# if ext.extension == ext_descriptor.extension:
if ext.version != ext_descriptor.version:
# if ext.version != ext_descriptor.version:
await ext_manager_stub.UnregisterExtensions(
# ext_manager_stub.UnregisterExtensions(
manager_service_pb2.UnregisterExtensionsRequest(
# manager_pb2.UnregisterExtensionsRequest(
extensions=[ext_descriptor]
# extensions=[ext_descriptor]
)
# )
)
# )
#
await _register_extension()
# logger.info(
# f"Изменилась версия расширения {ext_descriptor.extension} ({ext.version} -> {ext_descriptor.version})"
logger.info(
# )
f"Изменилась версия расширения {ext_descriptor.extension} ({ext.version} "
 
f"-> {ext_descriptor.version})"
 
)
register_extension.start()
register_extension.start()
@@ -63,19 +69,22 @@ async def _main(
@@ -63,19 +69,22 @@ async def _main(
environments_stub = environments_pb2_grpc.EnvironmentsStub(content_channel)
environments_stub = environments_pb2_grpc.EnvironmentsStub(content_channel)
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
extension_pb2_grpc.add_ExtensionServicer_to_server(
servicer_cls(
servicer = servicer_cls(
collections_stub, environments_stub, roles_stub, clients_stub
collections_stub, environments_stub, roles_stub, clients_stub
), server
)
)
 
 
extension_service_pb2_grpc.add_ExtensionServiceServicer_to_server(servicer, server)
 
operation_service_pb2_grpc.add_OperationServiceServicer_to_server(servicer, server)
 
server.add_insecure_port("[::]:50051")
server.add_insecure_port("[::]:50051")
await server.start()
await server.start()
await server.wait_for_termination()
await server.wait_for_termination()
def bootstrap(
def bootstrap(
ext_descriptor: manager_pb2.ExtensionDescriptor,
ext_descriptor: manager_service_pb2.ExtensionDescriptor,
servicer_cls: extension_pb2_grpc.ExtensionServicer,
servicer_cls: extension_service_pb2_grpc.ExtensionServiceServicer,
ext_manager_host: str,
ext_manager_host: str,
content_host: str,
content_host: str,
):
):
Loading