Skip to content
Snippets Groups Projects
Select Git revision
  • 93beed6e6a766d566cc581d415ac23935ed6a13f
  • master default protected
  • bugfix/fix-return-var-in-find
  • feature/upgrade2
  • v1.10.0
  • v1.8.2
  • v1.8.1
  • v1.8.0
  • 1.7.3
  • v1.7.1
  • v1.6.1
  • v1.6.0
  • v1.5.0
  • v1.4.1
  • v1.3.0
  • v1.2.2
  • v1.2.1
  • v1.2.0
  • v1.0.1
  • v1.0.0
  • v0.0.23
  • v0.0.17
  • v0.0.10
  • v0.0.9
24 results

items.py

Blame
  • collections_client.py 3.55 KiB
    
    import logging
    
    import grpc
    
    from perxis.collections import collections_pb2, collections_pb2_grpc
    
    
    ACCESS_TOKEN = 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6Ik5UWXhSVE5FTnprek5qSkVPRFkyTmpORk9VVXhRVEUzUmpWRVFqY3pRVE5CTjBJeE1ETTNOUSJ9.eyJodHRwczovL2hvb3AucGVyeC5ydS9pZGVudGl0eSI6eyJncm91cHMiOltdfSwibmlja25hbWUiOiJnb3NoaWstZSIsIm5hbWUiOiJHZW9yZ2l5IEV0ZXJldnNraXkiLCJwaWN0dXJlIjoiaHR0cHM6Ly9hdmF0YXJzLmdpdGh1YnVzZXJjb250ZW50LmNvbS91Lzc2NjAzMzE_dj00IiwidXBkYXRlZF9hdCI6IjIwMjEtMDYtMTFUMTI6MDU6MDkuMjQ3WiIsImlzcyI6Imh0dHBzOi8vYWR3ei5hdXRoMC5jb20vIiwic3ViIjoiZ2l0aHVifDc2NjAzMzEiLCJhdWQiOiJ6d3JaaGZrd0owNlQ3MlE4ZXhOZHRFS0NxTzBja0ZpWCIsImlhdCI6MTYyMzQxMzEwOSwiZXhwIjoxNjIzNDQ5MTA5LCJhdXRoX3RpbWUiOjE2MjM0MTMxMDksImF0X2hhc2giOiIwVTNrRmJTTXpnWllWSVlKTHpMUWNBIiwibm9uY2UiOiJxTGxKZmVMSUtBOUtoMVRoWXE3UkxLSkxCMjF-X29TZCJ9.Z3BYpwyAUQYt5gyGyQSD14Lj5zwegYqfyaA-tsgVrHYdyYzb8VCG8Tb_CbZ6zjpRGpcOSuDCgPo6b8METTUoHTBgWyPgaYXLWeoecE4FjRrFgonL0Yltg0BAMG8hACxTkLsg3q3s1pA0FdUzNesDPRWgoyTQehh7ODwFCV9FLpV3aV1RYwqj3I_qMsfATCoDsn2_msozB2asge9PsH8cblQj5ZdT2JEFTU0ZLI-MFNu6aubpHd3WIgMXmfgmDASrsARx-W59enwZSls01Y3WRLfoQVZ8YqjdnOIXeolfkpr32DSuzcgOP_oxylXSa_grMlCbRBOZd1Z84teYteNU0g'
    SPACE_ID = 'c2qcp9cuaccmpj8lmom0'
    ENV_ID = "master"
    
    
    def create(stub, space_id, env_id, coll_id, name):
        return stub.Create.with_call(
            collections_pb2.CreateRequest(collection=collections_pb2.Collection(space_id=space_id, env_id=env_id, id=coll_id, name=name)),
            metadata=(
                ('authorization', f'{ACCESS_TOKEN}'),
            )
        )
    
    
    def get(stub, space_id, env_id, coll_id):
        return stub.Get.with_call(
            collections_pb2.GetRequest(space_id=space_id, env_id=env_id, collection_id=coll_id),
            metadata=(
                ('authorization', f'{ACCESS_TOKEN}'),
            )
        )
    
    
    def list(stub, space_id, env_id):
        return stub.List.with_call(
            collections_pb2.ListRequest(space_id=space_id, env_id=env_id),
            metadata=(
                ('authorization', f'{ACCESS_TOKEN}'),
            )
        )
    
    
    def update(stub, space_id, env_id, coll_id, name):
        return stub.Update.with_call(
            collections_pb2.UpdateRequest(
                collection=collections_pb2.Collection(space_id=space_id, env_id=env_id, id=coll_id, name=name)),
            metadata=(
                ('authorization', f'{ACCESS_TOKEN}'),
            )
        )
    
    
    def delete(stub, space_id, env_id, coll_id):
        return stub.Delete.with_call(
            collections_pb2.DeleteRequest(space_id=space_id, env_id=env_id, collection_id=coll_id),
            metadata=(
                ('authorization', f'{ACCESS_TOKEN}'),
            )
        )
    
    
    def main() -> None:
        with grpc.insecure_channel('envoy.perxis.pt.perx.ru:80') as channel:
            stub = collections_pb2_grpc.CollectionsStub(channel)
            print('---------- CREATE ----------')
            response, state = create(stub, SPACE_ID, ENV_ID, 'test_coll_id', 'test collection name')
            print(response.created, state)
            created = response.created
            print('---------- GET ----------')
            response, state = get(stub, SPACE_ID, ENV_ID, created.id)
            print(response.collection, state)
            print('---------- LIST ----------')
            response, state = list(stub, SPACE_ID, ENV_ID)
            print(response.collections, state)
            print('---------- UPDATE ----------')
            response, state = update(stub, SPACE_ID, ENV_ID, created.id, 'new test collection name')
            print(state)
            print('---------- DELETE ----------')
            response, state = delete(stub, SPACE_ID, ENV_ID, created.id)
            print(state)
    
    
    if __name__ == '__main__':
        logging.basicConfig(level=logging.INFO)
        main()