Skip to content
Snippets Groups Projects
Commit 1b28468a authored by Georgiy Eterevskiy's avatar Georgiy Eterevskiy
Browse files

Migrate code to async

parent 60b05538
No related branches found
No related tags found
No related merge requests found
......@@ -21,23 +21,22 @@ async def _main(
ext_manager_host: str,
content_host: str
):
metadata = grpc.aio.Metadata(
('x-perxis-access', 'system'),
interceptor = header_adder_interceptor(
'x-perxis-access', 'system'
)
@aiocron.crontab('* * * * *', start=False)
async def register_extension():
await ext_manager_stub.RegisterExtensions(manager_pb2.RegisterExtensionsRequest(
extensions=[ext_descriptor]
), metadata=metadata)
))
logger.info(f"Расширение {ext_descriptor.extension} зарегистрировано в perxis")
async with grpc.aio.insecure_channel(ext_manager_host) as extensions_manager_channel:
async with grpc.aio.insecure_channel(ext_manager_host, interceptors=[interceptor]) as extensions_manager_channel:
ext_manager_stub = manager_pb2_grpc.ExtensionManagerStub(extensions_manager_channel)
registered_extensions: manager_pb2.ListExtensionsResponse = await ext_manager_stub.ListExtensions(
manager_pb2.ListExtensionsRequest(), metadata=metadata
manager_pb2.ListExtensionsRequest()
)
# todo enable after fix - https://tracker.yandex.ru/PRXS-1507
......@@ -56,7 +55,7 @@ async def _main(
register_extension.start()
async with grpc.aio.insecure_channel(content_host) as content_channel:
async with grpc.aio.insecure_channel(content_host, interceptors=[interceptor]) as content_channel:
collections_stub = collections_pb2_grpc.CollectionsStub(content_channel)
roles_stub = roles_pb2_grpc.RolesStub(content_channel)
......
......@@ -75,8 +75,8 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
)
]
def _Install(self, request: extension_pb2.InstallRequest, context):
errors_list = self.extension_setup.install(
async def _Install(self, request: extension_pb2.InstallRequest, context):
errors_list = await self.extension_setup.install(
request.space_id, request.env_id, request.force
)
......@@ -93,7 +93,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
)]
)
def Install(self, request: extension_pb2.InstallRequest, context):
async def Install(self, request: extension_pb2.InstallRequest, context):
self.logger.info(
"Установка расширения %s для окружения %s пространства %s. %s force" % (
self.extension_id,
......@@ -104,7 +104,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
)
try:
response = self._Install(request, context)
response = await self._Install(request, context)
except Exception as e:
response = extension_pb2.InstallResponse(
results=self.ext_request_results_from_exception(e)
......@@ -118,8 +118,8 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
return response
def _Update(self, request: extension_pb2.UpdateRequest, context):
errors_list = self.extension_setup.update(
async def _Update(self, request: extension_pb2.UpdateRequest, context):
errors_list = await self.extension_setup.update(
request.space_id, request.env_id, request.force
)
......@@ -136,7 +136,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
)]
)
def Update(self, request: extension_pb2.UpdateRequest, context):
async def Update(self, request: extension_pb2.UpdateRequest, context):
self.logger.info(
"Обновление расширения %s для окружения %s пространства %s. %s force" % (
self.extension_id,
......@@ -147,7 +147,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
)
try:
response = self._Update(request, context)
response = await self._Update(request, context)
except Exception as e:
response = extension_pb2.UpdateResponse(
results=self.ext_request_results_from_exception(e)
......@@ -161,8 +161,8 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
return response
def _Uninstall(self, request: extension_pb2.UninstallRequest, context):
errors_list: list[str] = self.extension_setup.uninstall(request.space_id, request.env_id, request.remove)
async def _Uninstall(self, request: extension_pb2.UninstallRequest, context):
errors_list: list[str] = await self.extension_setup.uninstall(request.space_id, request.env_id, request.remove)
response_state = extension_pb2.ExtensionRequestResult.State.OK \
if not errors_list \
......@@ -177,7 +177,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
)]
)
def Uninstall(self, request: extension_pb2.UninstallRequest, context):
async def Uninstall(self, request: extension_pb2.UninstallRequest, context):
self.logger.info(
"Удаление расширения %s для окружения %s пространства %s. %s remove" % (
self.extension_id,
......@@ -188,7 +188,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
)
try:
response = self._Uninstall(request, context)
response = await self._Uninstall(request, context)
except Exception as e:
response = extension_pb2.UninstallResponse(
results=self.ext_request_results_from_exception(e)
......@@ -202,8 +202,8 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
return response
def _Check(self, request: extension_pb2.CheckRequest, context):
errors_list = self.extension_setup.check(request.space_id, request.env_id)
async def _Check(self, request: extension_pb2.CheckRequest, context):
errors_list = await self.extension_setup.check(request.space_id, request.env_id)
response_state = extension_pb2.ExtensionRequestResult.State.OK \
if not errors_list \
......@@ -218,7 +218,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
)]
)
def Check(self, request: extension_pb2.CheckRequest, context):
async def Check(self, request: extension_pb2.CheckRequest, context):
self.logger.info(
"Проверка расширения %s для окружения %s пространства %s" % (
self.extension_id,
......@@ -228,7 +228,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
)
try:
response = self._Check(request, context)
response = await self._Check(request, context)
except Exception as e:
response = extension_pb2.CheckResponse(
results=self.ext_request_results_from_exception(e)
......@@ -242,7 +242,7 @@ class ExtensionService(extension_pb2_grpc.ExtensionServicer):
return response
def Action(self, request: extension_pb2.ActionRequest, context):
async def Action(self, request: extension_pb2.ActionRequest, context):
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("Unknown action")
......
import logging
import grpc
import json
import time
......@@ -14,6 +16,9 @@ from perxis.clients import clients_pb2_grpc, clients_pb2
from perxis.environments import environments_pb2_grpc, environments_pb2
logger = logging.getLogger(__name__)
class ExtensionSetup:
def __init__(
self,
......@@ -53,12 +58,12 @@ class ExtensionSetup:
self.clients = clients
# Работа с ролями
def __remove_roles(self, space_id: str) -> list[str]:
async def __remove_roles(self, space_id: str) -> list[str]:
errors_list: list[str] = []
for role in self.roles:
try:
self.roles_service.Delete.with_call(
await self.roles_service.Delete(
roles_pb2.DeleteRequest(
space_id=space_id, role_id=role.id
)
......@@ -70,12 +75,12 @@ class ExtensionSetup:
return errors_list
def __check_roles(self, space_id: str) -> list[str]:
async def __check_roles(self, space_id: str) -> list[str]:
errors_list = []
for role in self.roles:
try:
self.roles_service.Get.with_call(
await self.roles_service.Get(
roles_pb2.GetRequest(space_id=space_id, role_id=role.id)
)
except grpc.RpcError as e:
......@@ -83,12 +88,12 @@ class ExtensionSetup:
return errors_list
def __update_roles(self, space_id: str, env_id: str) -> list[str]:
async def __update_roles(self, space_id: str, env_id: str) -> list[str]:
errors_list = []
for local_role in self.roles:
try:
get_response, state = self.roles_service.Get.with_call(
get_response = await self.roles_service.Get(
roles_pb2.GetRequest(
space_id=space_id,
role_id=local_role.id
......@@ -135,7 +140,7 @@ class ExtensionSetup:
))
try:
self.roles_service.Update.with_call(
await self.roles_service.Update(
roles_pb2.UpdateRequest(
role=cloned_role
)
......@@ -150,7 +155,7 @@ class ExtensionSetup:
cloned_role = copy.deepcopy(local_role)
cloned_role.space_id = space_id
response, _ = self.roles_service.Create.with_call(
response = await self.roles_service.Create(
roles_pb2.CreateRequest(
role=cloned_role
),
......@@ -163,12 +168,12 @@ class ExtensionSetup:
return errors_list
# Работа с клиентами
def __check_clients(self, space_id: str) -> list[str]:
async def __check_clients(self, space_id: str) -> list[str]:
errors_list = []
for client in self.clients:
try:
self.clients_service.Get.with_call(
await self.clients_service.Get(
clients_pb2.GetRequest(space_id=space_id, id=client.id)
)
except grpc.RpcError as e:
......@@ -176,12 +181,12 @@ class ExtensionSetup:
return errors_list
def __remove_clients(self, space_id: str) -> list[str]:
async def __remove_clients(self, space_id: str) -> list[str]:
errors_list: list[str] = []
for client in self.clients:
try:
self.clients_service.Delete.with_call(
await self.clients_service.Delete(
clients_pb2.DeleteRequest(
space_id=space_id, id=client.id
)
......@@ -193,7 +198,7 @@ class ExtensionSetup:
return errors_list
def __update_clients(self, space_id: str) -> list[str]:
async def __update_clients(self, space_id: str) -> list[str]:
errors_list = []
for local_client in self.clients:
......@@ -201,7 +206,7 @@ class ExtensionSetup:
# Перед обновлением клиента предварительно нужно получить текущую запись чтобы скопировать оттуда
# токены. Иначе после обновления расширения перестанут работать все приложения которые использовали
# токен клиента
get_response, state = self.clients_service.Get.with_call(
get_response = await self.clients_service.Get(
clients_pb2.GetRequest(
space_id=space_id,
id=local_client.id
......@@ -232,7 +237,7 @@ class ExtensionSetup:
tls=client.tls
)
self.clients_service.Update.with_call(
await self.clients_service.Update(
clients_pb2.UpdateRequest(
client=new_client
)
......@@ -244,7 +249,7 @@ class ExtensionSetup:
cloned_client = copy.deepcopy(local_client)
cloned_client.space_id = space_id
response, _ = self.clients_service.Create.with_call(
await self.clients_service.Create(
clients_pb2.CreateRequest(
client=cloned_client
),
......@@ -255,12 +260,12 @@ class ExtensionSetup:
return errors_list
# Работа с коллекциями
def __remove_collections(self, space_id: str, env_id: str) -> list[str]:
async def __remove_collections(self, space_id: str, env_id: str) -> list[str]:
errors_list: list[str] = []
for collection in self.collections:
try:
self.collections_service.Delete.with_call(
await self.collections_service.Delete(
collections_pb2.DeleteRequest(
space_id=space_id, env_id=env_id, collection_id=collection.id
)
......@@ -272,12 +277,12 @@ class ExtensionSetup:
return errors_list
def __check_collections(self, space_id: str, env_id: str) -> list[str]:
async def __check_collections(self, space_id: str, env_id: str) -> list[str]:
errors_list = []
for collection in self.collections:
try:
self.collections_service.Get.with_call(
await self.collections_service.Get(
collections_pb2.GetRequest(space_id=space_id, env_id=env_id, collection_id=collection.id)
)
except grpc.RpcError as e:
......@@ -285,7 +290,7 @@ class ExtensionSetup:
return errors_list
def __update_collections(self, space_id: str, env_id: str) -> list[str]:
async def __update_collections(self, space_id: str, env_id: str) -> list[str]:
"""
Метод __обновления__ коллекций. Миграция окружения требуется
только в случае если одна или несколько схем коллекций изменялись. Алгоритм работы:
......@@ -302,7 +307,7 @@ class ExtensionSetup:
for local_collection in self.collections:
try:
# Необходимо получить текущую версию коллекции для того чтобы сравнить схемы
get_response, state = self.collections_service.Get.with_call(
get_response = await self.collections_service.Get(
collections_pb2.GetRequest(space_id=space_id, env_id=env_id, collection_id=local_collection.id)
)
......@@ -317,7 +322,7 @@ class ExtensionSetup:
# Коллекция может быть не найдена в случае если она добавлена в новой версии
if not collection:
try:
create_response, state = self.collections_service.Create.with_call(
create_response = await self.collections_service.Create(
collections_pb2.CreateRequest(collection=cloned_collection)
)
......@@ -335,7 +340,7 @@ class ExtensionSetup:
cloned_collection.space_id = space_id
cloned_collection.env_id = env_id
self.collections_service.Update.with_call(
await self.collections_service.Update(
collections_pb2.UpdateRequest(collection=cloned_collection)
)
except grpc.RpcError as e:
......@@ -352,20 +357,20 @@ class ExtensionSetup:
if diff:
need_to_migrate_environment = True
set_schema_error_message = self.__set_collection_schema(
set_schema_error_message = await self.__set_collection_schema(
space_id, env_id, local_collection.id, local_collection.schema
)
if set_schema_error_message:
errors_list.append(set_schema_error_message)
if need_to_migrate_environment:
migrate_environment_error_message = self.__migrate_environment(space_id, env_id)
migrate_environment_error_message = await self.__migrate_environment(space_id, env_id)
if migrate_environment_error_message:
errors_list.append(migrate_environment_error_message)
return errors_list
def __migrate_environment(self, space_id: str, env_id: str) -> typing.Optional[str]:
async def __migrate_environment(self, space_id: str, env_id: str) -> typing.Optional[str]:
# Так как perxis может не сразу выставить коллекции / окружению статус ready операцию необходимо выполнять
# с попытками
......@@ -377,7 +382,7 @@ class ExtensionSetup:
time.sleep(self.__sleep_time)
try:
self.environments_service.Migrate(environments_pb2.MigrateRequest(
await self.environments_service.Migrate(environments_pb2.MigrateRequest(
space_id=space_id,
env_id=env_id,
options=environments_pb2.MigrateOptions(
......@@ -398,7 +403,7 @@ class ExtensionSetup:
return error_message
def __set_collection_schema(self, space_id: str, env_id: str, collection_id: str, schema: str) -> typing.Optional[str]:
async def __set_collection_schema(self, space_id: str, env_id: str, collection_id: str, schema: str) -> typing.Optional[str]:
# Так как perxis может не сразу выставить коллекции / окружению статус ready операцию необходимо выполнять
# с попытками
......@@ -410,7 +415,7 @@ class ExtensionSetup:
time.sleep(self.__sleep_time)
try:
self.collections_service.SetSchema.with_call(
await self.collections_service.SetSchema(
collections_pb2.SetSchemaRequest(
space_id=space_id,
env_id=env_id,
......@@ -432,48 +437,48 @@ class ExtensionSetup:
return error_message
def install(self, space_id: str, env_id: str, use_force: bool) -> list[str]:
async def install(self, space_id: str, env_id: str, use_force: bool) -> list[str]:
errors = []
if use_force:
errors += self.__remove_collections(space_id, env_id)
errors += self.__remove_clients(space_id)
errors += self.__remove_roles(space_id)
errors += await self.__remove_collections(space_id, env_id)
errors += await self.__remove_clients(space_id)
errors += await self.__remove_roles(space_id)
errors += self.__update_collections(space_id, env_id)
errors += self.__update_roles(space_id, env_id)
errors += self.__update_clients(space_id)
errors += await self.__update_collections(space_id, env_id)
errors += await self.__update_roles(space_id, env_id)
errors += await self.__update_clients(space_id)
return errors
def update(self, space_id: str, env_id: str, use_force: bool) -> list[str]:
async def update(self, space_id: str, env_id: str, use_force: bool) -> list[str]:
errors = []
# В случае обновление расширения с флагом force нужно предварительно удалить все сущности.
if use_force:
errors += self.__remove_clients(space_id)
errors += self.__remove_roles(space_id)
errors += self.__remove_collections(space_id, env_id)
errors += await self.__remove_clients(space_id)
errors += await self.__remove_roles(space_id)
errors += await self.__remove_collections(space_id, env_id)
errors += self.__update_collections(space_id, env_id)
errors += self.__update_roles(space_id, env_id)
errors += self.__update_clients(space_id)
errors += await self.__update_collections(space_id, env_id)
errors += await self.__update_roles(space_id, env_id)
errors += await self.__update_clients(space_id)
return errors
def check(self, space_id: str, env_id: str) -> list[str]:
errors = self.__check_collections(space_id, env_id)
errors += self.__check_roles(space_id)
errors += self.__check_clients(space_id)
async def check(self, space_id: str, env_id: str) -> list[str]:
errors = await self.__check_collections(space_id, env_id)
errors += await self.__check_roles(space_id)
errors += await self.__check_clients(space_id)
return errors
def uninstall(self, space_id: str, env_id: str, use_remove: bool) -> list[str]:
async def uninstall(self, space_id: str, env_id: str, use_remove: bool) -> list[str]:
errors = []
if use_remove:
errors += self.__remove_collections(space_id, env_id)
errors += self.__remove_clients(space_id)
errors += self.__remove_roles(space_id)
errors += await self.__remove_collections(space_id, env_id)
errors += await self.__remove_clients(space_id)
errors += await self.__remove_roles(space_id)
return errors
......@@ -3,38 +3,38 @@ import collections
class _GenericClientInterceptor(
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor,
grpc.aio.UnaryUnaryClientInterceptor,
grpc.aio.UnaryStreamClientInterceptor,
grpc.aio.StreamUnaryClientInterceptor,
grpc.aio.StreamStreamClientInterceptor,
):
def __init__(self, interceptor_function):
self._fn = interceptor_function
def intercept_unary_unary(self, continuation, client_call_details, request):
async def intercept_unary_unary(self, continuation, client_call_details, request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, False
)
response = continuation(new_details, next(new_request_iterator))
response = await continuation(new_details, next(new_request_iterator))
return postprocess(response) if postprocess else response
def intercept_unary_stream(self, continuation, client_call_details, request):
async def intercept_unary_stream(self, continuation, client_call_details, request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, True
)
response_it = continuation(new_details, next(new_request_iterator))
return postprocess(response_it) if postprocess else response_it
def intercept_stream_unary(
async def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, request_iterator, True, False
)
response = continuation(new_details, new_request_iterator)
response = await continuation(new_details, new_request_iterator)
return postprocess(response) if postprocess else response
def intercept_stream_stream(
async def intercept_stream_stream(
self, continuation, client_call_details, request_iterator
):
new_details, new_request_iterator, postprocess = self._fn(
......@@ -51,7 +51,7 @@ def create(intercept_call):
class _ClientCallDetails(
collections.namedtuple(
typename="_ClientCallDetails",
field_names=("method", "timeout", "metadata", "credentials"),
field_names=("method", "timeout", "metadata", "credentials", "wait_for_ready"),
),
grpc.ClientCallDetails,
):
......@@ -77,6 +77,7 @@ def header_adder_interceptor(header, value):
client_call_details.timeout,
metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
)
return client_call_details, request_iterator, None
......
......@@ -14,7 +14,7 @@ def load_requirements():
setup(
name='perxis',
version='1.1.0',
version='1.2.0',
description='Perxis python client',
long_description=long_description,
long_description_content_type='text/markdown',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment