Skip to content
Snippets Groups Projects
Commit eb0e5fe8 authored by Georgiy Eterevskiy's avatar Georgiy Eterevskiy
Browse files

Add plugin `Api-Key` for auth

parent 5468e6f2
No related branches found
No related tags found
1 merge request!13Add plugin `Api-Key` for auth
import logging
import grpc
from perxis.auth import APIKeyPlugin
from perxis.collections import collections_pb2_grpc, collections_pb2
API_KEY = '0Xp7bYBqYmIUqwZcNRMtRCtDBaheFdAc'
SPACE_ID = 'c2qcp9cuaccmpj8lmom0'
ENV_ID = 'master'
TARGET = 'envoy.perxis.pt.perx.ru:443'
def main():
api_key_plugin = APIKeyPlugin(API_KEY)
call_credentials = grpc.metadata_call_credentials(api_key_plugin, name='api-key')
channel_credentials = grpc.ssl_channel_credentials()
composite_credentials = grpc.composite_channel_credentials(
channel_credentials, call_credentials
)
with grpc.secure_channel(TARGET, composite_credentials) as channel:
stub = collections_pb2_grpc.CollectionsStub(channel)
collections = stub.List(collections_pb2.ListRequest(space_id=SPACE_ID, env_id=ENV_ID))
print(collections)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main()
...@@ -50,3 +50,16 @@ class OAuth2Plugin(grpc.AuthMetadataPlugin): ...@@ -50,3 +50,16 @@ class OAuth2Plugin(grpc.AuthMetadataPlugin):
self._token = fetch_token() self._token = fetch_token()
return self._token return self._token
class APIKeyPlugin(grpc.AuthMetadataPlugin):
_token = None
def __init__(self, token: str, signature_header_key: str = 'authorization', token_type: str = 'API-Key') -> None:
self._token = token
self._signature_header_key = signature_header_key
self._token_type = token_type
def __call__(self, context: grpc.AuthMetadataContext,
callback: grpc.AuthMetadataPluginCallback) -> None:
callback(((self._signature_header_key, f'{self._token_type} {self._token}'),), None)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment