Skip to content
Snippets Groups Projects
Commit 515465ca authored by Podosochnyy Maxim's avatar Podosochnyy Maxim
Browse files

Добавлен базовый класс для сервиса расширений и модуль для установки зависимостей расширения

parent 3add9084
Branches
Tags
No related merge requests found
import grpc
from perxis.extensions import extension_pb2, extension_pb2_grpc
from perxis.roles import roles_pb2_grpc, roles_pb2
from perxis.clients import clients_pb2_grpc, clients_pb2
from perxis.collections import collections_pb2_grpc, collections_pb2
from perxis.environments import environments_pb2_grpc
from perxis.extensions.extension_setup import ExtensionSetup
class ExtensionService(extension_pb2_grpc.ExtensionServicer):
extension_id: str
collections: list[collections_pb2.Collection] = []
roles: list[roles_pb2.Role] = []
clients: list[clients_pb2.Client] = []
def __init__(self,
collections_service: collections_pb2_grpc.CollectionsStub,
environments_service: environments_pb2_grpc.EnvironmentsStub,
roles_service: roles_pb2_grpc.RolesStub,
clients_service: clients_pb2_grpc.ClientsStub,
):
self.collections_service = collections_service
self.environments_service = environments_service
self.roles_service = roles_service
self.clients_service = clients_service
self.extension_setup = ExtensionSetup(
self.collections_service, self.environments_service,
self.roles_service, self.clients_service
)
for collection in self.collections or []:
self.extension_setup.add_collection(collection)
for role in self.roles or []:
self.extension_setup.add_role(role)
for client in self.clients or []:
self.extension_setup.add_client(client)
def Install(self, request: extension_pb2.InstallRequest, context):
errors_list = self.extension_setup.install(
request.space_id, request.env_id, request.force
)
if errors_list:
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("; ".join(errors_list))
response_state = extension_pb2.ExtensionRequestResult.State.OK \
if not errors_list \
else extension_pb2.ExtensionRequestResult.State.ERROR
return extension_pb2.InstallResponse(
results=[extension_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=response_state,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
)
def Update(self, request: extension_pb2.UpdateRequest, context):
errors_list = self.extension_setup.update(
request.space_id, request.env_id, request.force
)
if errors_list:
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("; ".join(errors_list))
response_state = extension_pb2.ExtensionRequestResult.State.OK \
if not errors_list \
else extension_pb2.ExtensionRequestResult.State.ERROR
return extension_pb2.UpdateResponse(
results=[extension_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=response_state,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
)
def Uninstall(self, request: extension_pb2.UninstallRequest, context):
errors_list: list[str] = self.extension_setup.uninstall(request.space_id, request.env_id, request.remove)
response_state = extension_pb2.ExtensionRequestResult.State.OK \
if not errors_list \
else extension_pb2.ExtensionRequestResult.State.ERROR
return extension_pb2.UninstallResponse(
results=[extension_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=response_state,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
)
def Check(self, request: extension_pb2.CheckRequest, context):
errors_list = self.extension_setup.check(request.space_id, request.env_id)
if errors_list:
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("; ".join(errors_list))
response_state = extension_pb2.ExtensionRequestResult.State.OK \
if not errors_list \
else extension_pb2.ExtensionRequestResult.State.ERROR
return extension_pb2.CheckResponse(
results=[extension_pb2.ExtensionRequestResult(
extension=self.extension_id,
state=response_state,
error="; ".join(errors_list) if errors_list else None,
msg="Ok" if not errors_list else None
)]
)
def Action(self, request: extension_pb2.ActionRequest, context):
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("Unknown action")
return None
import grpc
import json
import time
import copy
import typing
from deepdiff import DeepDiff
from perxis.collections import collections_pb2_grpc, collections_pb2
from perxis.roles import roles_pb2_grpc, roles_pb2
from perxis.clients import clients_pb2_grpc, clients_pb2
from perxis.environments import environments_pb2_grpc, environments_pb2
class ExtensionSetup:
def __init__(
self,
collections_service: collections_pb2_grpc.CollectionsStub,
environments_service: environments_pb2_grpc.EnvironmentsStub,
roles_service: roles_pb2_grpc.RolesStub,
clients_service: clients_pb2_grpc.ClientsStub,
):
self.collections = []
self.clients = []
self.roles = []
self.roles_service = roles_service
self.clients_service = clients_service
self.collections_service = collections_service
self.environments_service = environments_service
self.__max_attempts_count = 5
self.__sleep_time = 1
def add_collection(self, collection: collections_pb2.Collection):
self.collections.append(collection)
def set_collections(self, collections: list[collections_pb2.Collection]):
self.collections = collections
def add_role(self, role: roles_pb2.Role):
self.roles.append(role)
def set_roles(self, roles: list[roles_pb2.Role]):
self.roles = roles
def add_client(self, client: clients_pb2.Client):
self.clients.append(client)
def set_clients(self, clients: list[clients_pb2.Client]):
self.clients = clients
# Работа с ролями
def __remove_roles(self, space_id: str) -> list[str]:
errors_list: list[str] = []
for role in self.roles:
try:
self.roles_service.Delete.with_call(
roles_pb2.DeleteRequest(
space_id=space_id, role_id=role.id
)
)
except grpc.RpcError as e:
# Если роли не существует считать это ошибкой не надо
if "not found" not in e.details():
errors_list.append(f"Не удалось удалить роль {role.id}, {e.details()}")
return errors_list
def __check_roles(self, space_id: str) -> list[str]:
errors_list = []
for role in self.roles:
try:
self.roles_service.Get.with_call(
roles_pb2.GetRequest(space_id=space_id, role_id=role.id)
)
except grpc.RpcError as e:
errors_list.append(f"Не удалось получить роль {role.id}, ошибка {e.details()}")
return errors_list
def __update_roles(self, space_id: str) -> list[str]:
errors_list = []
for local_role in self.roles:
try:
cloned_role = copy.deepcopy(local_role)
cloned_role.space_id = space_id
# Полностью замещать данные роли тем что имеется в расширении
# TODO: нужно ли мержить права чтобы не терять изменения?
self.roles_service.Update.with_call(
roles_pb2.UpdateRequest(
role=cloned_role
)
)
except grpc.RpcError as e:
errors_list.append(f"Не удалось обновить роль {local_role.id}, {e.details()}")
return errors_list
def __create_roles(self, space_id: str) -> list[str]:
errors_list = []
for local_role in self.roles:
try:
cloned_role = copy.deepcopy(local_role)
cloned_role.space_id = space_id
response, _ = self.roles_service.Create.with_call(
roles_pb2.CreateRequest(
role=cloned_role
),
)
except grpc.RpcError as e:
# На этапе install считается что ролей __нет__. При install с указанием force роли предварительно
# удаляются
errors_list.append(f"Не удалось создать роль {local_role.id}, {e.details()}")
return errors_list
# Работа с клиентами
def __check_clients(self, space_id: str) -> list[str]:
errors_list = []
for client in self.clients:
try:
self.clients_service.Get.with_call(
clients_pb2.GetRequest(space_id=space_id, id=client.id)
)
except grpc.RpcError as e:
errors_list.append(f"Не удалось получить клиент {client.id}, ошибка {e.details()}")
return errors_list
def __remove_clients(self, space_id: str) -> list[str]:
errors_list: list[str] = []
for client in self.clients:
try:
self.clients_service.Delete.with_call(
clients_pb2.DeleteRequest(
space_id=space_id, id=client.id
)
)
except grpc.RpcError as e:
# Отсутствие клиента ошибкой не считается
if "not found" not in e.details():
errors_list.append(f"Не удалось удалить клиент {client.id}, {e.details()}")
return errors_list
def __update_clients(self, space_id: str) -> list[str]:
errors_list = []
for local_client in self.clients:
try:
# Перед обновлением клиента предварительно нужно получить текущую запись чтобы скопировать оттуда
# токены. Иначе после обновления расширения перестанут работать все приложения которые использовали
# токен клиента
get_response, state = self.clients_service.Get.with_call(
clients_pb2.GetRequest(
space_id=space_id,
id=local_client.id
)
)
client = get_response.client
except grpc.RpcError as e:
errors_list.append(f"Не удалось получить клиент {local_client.id}, {e.details()}")
continue
try:
# Нужно чтобы у клиента каждый раз не слетали данные токенов при переустановке
# свойства oauth, api_key и tls должны браться из __созданного__ клиента
new_client = clients_pb2.Client(
id=local_client.id,
space_id=space_id,
name=local_client.name,
description=local_client.description,
disabled=client.disabled,
role_id=local_client.role_id,
oauth=client.oauth,
api_key=client.api_key,
tls=client.tls
)
self.clients_service.Update.with_call(
clients_pb2.UpdateRequest(
client=new_client
)
)
except grpc.RpcError as e:
errors_list.append(f"Не удалось обновить клиент {local_client.id}, {e.details()}")
return errors_list
def __create_clients(self, space_id: str) -> list[str]:
errors_list = []
for local_client in self.clients:
try:
cloned_client = copy.deepcopy(local_client)
cloned_client.space_id = space_id
response, _ = self.clients_service.Create.with_call(
clients_pb2.CreateRequest(
client=cloned_client
),
)
except grpc.RpcError as e:
# Как и с ролями считается что при install записей быть не должно в любом случае т.к. force всё удаляет
errors_list.append(f"Не удалось создать клиент {local_client.id}, {e.details()}")
return errors_list
# Работа с коллекциями
def __remove_collections(self, space_id: str, env_id: str) -> list[str]:
errors_list: list[str] = []
for collection in self.collections:
try:
self.collections_service.Delete.with_call(
collections_pb2.DeleteRequest(
space_id=space_id, env_id=env_id, collection_id=collection.id
)
)
except grpc.RpcError as e:
# Отсутствие коллекции это не ошибка
if "not found" not in e.details():
errors_list.append(f"Не удалось удалить коллекцию {collection.id}, {e.details()}")
return errors_list
def __check_collections(self, space_id: str, env_id: str) -> list[str]:
errors_list = []
for collection in self.collections:
try:
self.collections_service.Get.with_call(
collections_pb2.GetRequest(space_id=space_id, env_id=env_id, collection_id=collection.id)
)
except grpc.RpcError as e:
errors_list.append(f"Не удалось получить коллекцию {collection.id}, ошибка {e.details()}")
return errors_list
def __update_collections(self, space_id: str, env_id: str) -> list[str]:
"""
Метод __обновления__ коллекций подразумевает что сами коллекции уже созданы. Миграция окружения требуется
только в случае если одна или несколько схем коллекций изменялись. Алгоритм работы:
1. Получить фактически существующую коллекцию из БД
2. Обновить её в perxis
3. Сравнить схему коллекции в расширении и в perxis
4. Если схема изменена - обновить схему в perxis
5. Если обновлялась хотя бы одна схема в perxis - запустить миграцию окружения
"""
need_to_migrate_environment = False
errors_list = []
for local_collection in self.collections:
try:
# Необходимо получить текущую версию коллекции для того чтобы сравнить схемы
get_response, state = self.collections_service.Get.with_call(
collections_pb2.GetRequest(space_id=space_id, env_id=env_id, collection_id=local_collection.id)
)
collection = get_response.collection
except grpc.RpcError as e:
errors_list.append(f"Не удалось получить коллекцию {local_collection.id}, {e.details()}")
continue
try:
cloned_collection = copy.deepcopy(local_collection)
cloned_collection.space_id = space_id
cloned_collection.env_id = env_id
self.collections_service.Update.with_call(
collections_pb2.UpdateRequest(collection=cloned_collection)
)
except grpc.RpcError as e:
errors_list.append(f"Не удалось обновить коллекцию {local_collection.id}, {e.details()}")
continue
diff = DeepDiff(
json.loads(collection.schema or "{}"),
json.loads(local_collection.schema or "{}"),
ignore_numeric_type_changes=True,
exclude_paths=["root['loaded']"]
)
if diff:
set_schema_error_message = self.__set_collection_schema(
space_id, env_id, local_collection.id, local_collection.schema
)
if set_schema_error_message:
errors_list.append(set_schema_error_message)
if need_to_migrate_environment:
migrate_environment_error_message = self.__migrate_environment(space_id, env_id)
if migrate_environment_error_message:
errors_list.append(migrate_environment_error_message)
return errors_list
def __migrate_environment(self, space_id: str, env_id: str) -> typing.Optional[str]:
# Так как perxis может не сразу выставить коллекции / окружению статус ready операцию необходимо выполнять
# с попытками
attempt = 0
is_ok = False
error_message = None
while attempt <= self.__max_attempts_count and not is_ok:
time.sleep(self.__sleep_time)
try:
self.environments_service.Migrate(environments_pb2.MigrateRequest(
space_id=space_id,
env_id=env_id
))
is_ok = True
except grpc.RpcError as e:
# Если не удалось мигрировать окружение по любой причине кроме подготовки - это ошибка
if "is preparing" not in e.details():
error_message = e.details()
# Для принудительного выхода из цикла
attempt = self.__max_attempts_count
attempt += 1
return error_message
def __set_collection_schema(self, space_id: str, env_id: str, collection_id: str, schema: str) -> typing.Optional[str]:
# Так как perxis может не сразу выставить коллекции / окружению статус ready операцию необходимо выполнять
# с попытками
attempt = 0
is_ok = False
error_message = None
while attempt <= self.__max_attempts_count and not is_ok:
time.sleep(self.__sleep_time)
try:
self.collections_service.SetSchema.with_call(
collections_pb2.SetSchemaRequest(
space_id=space_id,
env_id=env_id,
collection_id=collection_id,
schema=schema
)
)
is_ok = True
except grpc.RpcError as e:
# Если не удалось установить схему по любой причине кроме подготовки - это ошибка
if "is preparing" not in e.details():
error_message = e.details()
# Для принудительного выхода из цикла
attempt = self.__max_attempts_count
attempt += 1
return error_message
def __create_collections(self, space_id: str, env_id: str) -> list[str]:
errors_list = []
for local_collection in self.collections:
try:
cloned_collection = copy.deepcopy(local_collection)
cloned_collection.space_id = space_id
cloned_collection.env_id = env_id
response, _ = self.collections_service.Create.with_call(
collections_pb2.CreateRequest(
collection=cloned_collection
),
)
except grpc.RpcError as e:
errors_list.append(f"Не удалось создать коллекцию {local_collection.id}, {e.details()}")
# Если коллекцию создать не удалось (по любой причине) дальнейшая обработка коллекции смысла
# не имеет
continue
set_schema_error_message = self.__set_collection_schema(
space_id, env_id, local_collection.id, local_collection.schema
)
if set_schema_error_message:
errors_list.append(set_schema_error_message)
# Миграция окружения нужна в любом случае т.к. все коллекции были __созданы__
migrate_environment_error_message = self.__migrate_environment(space_id, env_id)
if migrate_environment_error_message:
errors_list.append(migrate_environment_error_message)
return errors_list
def install(self, space_id: str, env_id: str, use_force: bool) -> list[str]:
errors = []
if use_force:
errors += self.__remove_collections(space_id, env_id)
errors += self.__remove_clients(space_id)
errors += self.__remove_roles(space_id)
errors += self.__create_collections(space_id, env_id)
errors += self.__create_roles(space_id)
errors += self.__create_clients(space_id)
return errors
def update(self, space_id: str, env_id: str, use_force: bool) -> list[str]:
errors = []
# В случае обновление расширения с флагом force нужно предварительно удалить все сущности.
# Фактически это переустановка а не удаление
if use_force:
errors += self.__remove_clients(space_id)
errors += self.__remove_roles(space_id)
errors += self.__remove_collections(space_id, env_id)
errors += self.__create_collections(space_id, env_id)
errors += self.__create_roles(space_id)
errors += self.__create_clients(space_id)
else:
errors += self.__update_collections(space_id, env_id)
errors += self.__update_roles(space_id)
errors += self.__update_clients(space_id)
return errors
def check(self, space_id: str, env_id: str) -> list[str]:
errors = self.__check_collections(space_id, env_id)
errors += self.__check_roles(space_id)
errors += self.__check_clients(space_id)
return errors
def uninstall(self, space_id: str, env_id: str, use_remove: bool) -> list[str]:
errors = []
if use_remove:
errors += self.__remove_collections(space_id, env_id)
errors += self.__remove_clients(space_id)
errors += self.__remove_roles(space_id)
return errors
......@@ -10,3 +10,4 @@ requests==2.28.1
requests-oauthlib==1.3.1
six==1.16.0
urllib3==1.26.12
deepdiff==6.3.0
......@@ -14,7 +14,7 @@ def load_requirements():
setup(
name='perxis',
version='0.0.13',
version='0.0.14',
description='Perxis python client',
long_description=long_description,
long_description_content_type='text/markdown',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment