Select Git revision
collections_client.py 3.55 KiB
import logging
import grpc
from perxis.collections import collections_pb2, collections_pb2_grpc
ACCESS_TOKEN = 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6Ik5UWXhSVE5FTnprek5qSkVPRFkyTmpORk9VVXhRVEUzUmpWRVFqY3pRVE5CTjBJeE1ETTNOUSJ9.eyJodHRwczovL2hvb3AucGVyeC5ydS9pZGVudGl0eSI6eyJncm91cHMiOltdfSwibmlja25hbWUiOiJnb3NoaWstZSIsIm5hbWUiOiJHZW9yZ2l5IEV0ZXJldnNraXkiLCJwaWN0dXJlIjoiaHR0cHM6Ly9hdmF0YXJzLmdpdGh1YnVzZXJjb250ZW50LmNvbS91Lzc2NjAzMzE_dj00IiwidXBkYXRlZF9hdCI6IjIwMjEtMDYtMTFUMTI6MDU6MDkuMjQ3WiIsImlzcyI6Imh0dHBzOi8vYWR3ei5hdXRoMC5jb20vIiwic3ViIjoiZ2l0aHVifDc2NjAzMzEiLCJhdWQiOiJ6d3JaaGZrd0owNlQ3MlE4ZXhOZHRFS0NxTzBja0ZpWCIsImlhdCI6MTYyMzQxMzEwOSwiZXhwIjoxNjIzNDQ5MTA5LCJhdXRoX3RpbWUiOjE2MjM0MTMxMDksImF0X2hhc2giOiIwVTNrRmJTTXpnWllWSVlKTHpMUWNBIiwibm9uY2UiOiJxTGxKZmVMSUtBOUtoMVRoWXE3UkxLSkxCMjF-X29TZCJ9.Z3BYpwyAUQYt5gyGyQSD14Lj5zwegYqfyaA-tsgVrHYdyYzb8VCG8Tb_CbZ6zjpRGpcOSuDCgPo6b8METTUoHTBgWyPgaYXLWeoecE4FjRrFgonL0Yltg0BAMG8hACxTkLsg3q3s1pA0FdUzNesDPRWgoyTQehh7ODwFCV9FLpV3aV1RYwqj3I_qMsfATCoDsn2_msozB2asge9PsH8cblQj5ZdT2JEFTU0ZLI-MFNu6aubpHd3WIgMXmfgmDASrsARx-W59enwZSls01Y3WRLfoQVZ8YqjdnOIXeolfkpr32DSuzcgOP_oxylXSa_grMlCbRBOZd1Z84teYteNU0g'
SPACE_ID = 'c2qcp9cuaccmpj8lmom0'
ENV_ID = "master"
def create(stub, space_id, env_id, coll_id, name):
return stub.Create.with_call(
collections_pb2.CreateRequest(collection=collections_pb2.Collection(space_id=space_id, env_id=env_id, id=coll_id, name=name)),
metadata=(
('authorization', f'{ACCESS_TOKEN}'),
)
)
def get(stub, space_id, env_id, coll_id):
return stub.Get.with_call(
collections_pb2.GetRequest(space_id=space_id, env_id=env_id, collection_id=coll_id),
metadata=(
('authorization', f'{ACCESS_TOKEN}'),
)
)
def list(stub, space_id, env_id):
return stub.List.with_call(
collections_pb2.ListRequest(space_id=space_id, env_id=env_id),
metadata=(
('authorization', f'{ACCESS_TOKEN}'),
)
)
def update(stub, space_id, env_id, coll_id, name):
return stub.Update.with_call(
collections_pb2.UpdateRequest(
collection=collections_pb2.Collection(space_id=space_id, env_id=env_id, id=coll_id, name=name)),
metadata=(
('authorization', f'{ACCESS_TOKEN}'),
)
)
def delete(stub, space_id, env_id, coll_id):
return stub.Delete.with_call(
collections_pb2.DeleteRequest(space_id=space_id, env_id=env_id, collection_id=coll_id),
metadata=(
('authorization', f'{ACCESS_TOKEN}'),
)
)
def main() -> None:
with grpc.insecure_channel('envoy.perxis.pt.perx.ru:80') as channel:
stub = collections_pb2_grpc.CollectionsStub(channel)
print('---------- CREATE ----------')
response, state = create(stub, SPACE_ID, ENV_ID, 'test_coll_id', 'test collection name')
print(response.created, state)
created = response.created
print('---------- GET ----------')
response, state = get(stub, SPACE_ID, ENV_ID, created.id)
print(response.collection, state)
print('---------- LIST ----------')
response, state = list(stub, SPACE_ID, ENV_ID)
print(response.collections, state)
print('---------- UPDATE ----------')
response, state = update(stub, SPACE_ID, ENV_ID, created.id, 'new test collection name')
print(state)
print('---------- DELETE ----------')
response, state = delete(stub, SPACE_ID, ENV_ID, created.id)
print(state)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main()