Skip to content
Snippets Groups Projects
Commit 3fd99447 authored by Georgiy Eterevskiy's avatar Georgiy Eterevskiy
Browse files

Add support different grant types, add support updating of token without refresh_token

parent c009fef0
No related branches found
No related tags found
1 merge request!3Добавлен плагин аутентификации
...@@ -52,14 +52,33 @@ stub.FooRpc(request, credentials=call_credentials) ...@@ -52,14 +52,33 @@ stub.FooRpc(request, credentials=call_credentials)
```python ```python
import grpc import grpc
from oauthlib.oauth2 import Client
from oauthlib.oauth2.rfc6749.parameters import prepare_token_request
from auth import OAuth2Plugin from auth import OAuth2Plugin
from users.users_pb2 import GetRequest from users.users_pb2 import GetRequest
from users.users_pb2_grpc import UsersStub from users.users_pb2_grpc import UsersStub
# Могут быть использованы как встроенные клиенты, такие как WebApplicationClient, BackendApplicationClient,
# так и допускается реализовать собственный класс с кастомным поведением
class ExtendedClient(Client):
def __init__(self, client_id, grant_type, username, password, **kwargs):
self.grant_type = grant_type
self.username = username
self.password = password
super(ExtendedClient, self).__init__(client_id, **kwargs)
def prepare_request_body(self, body='', scope=None, include_client_id=False, **kwargs):
kwargs['client_id'] = self.client_id
kwargs['include_client_id'] = include_client_id
return prepare_token_request(self.grant_type, body=body, username=self.username,
password=self.password, scope=scope, **kwargs)
oauth2_plugin = OAuth2Plugin( oauth2_plugin = OAuth2Plugin(
client_id='client_id', client_secret='client_secret', client=ExtendedClient(client_id='client_id', grant_type='password', username='user', password='pass'),
token_url='https://example.com/oauth/token', audience='audience' client_secret='client_secret',
token_url='https://example.com/oauth/token',
audience='audience'
) )
call_credentials = grpc.metadata_call_credentials(oauth2_plugin) call_credentials = grpc.metadata_call_credentials(oauth2_plugin)
......
import time import time
from typing import Type
import grpc import grpc
from oauthlib.oauth2 import BackendApplicationClient, OAuth2Token from oauthlib.oauth2 import Client, OAuth2Token
from requests_oauthlib import OAuth2Session from requests_oauthlib import OAuth2Session
class OAuth2Plugin(grpc.AuthMetadataPlugin): class OAuth2Plugin(grpc.AuthMetadataPlugin):
_token = None _token = None
def __init__(self, client_id: str, client_secret: str, token_url: str, audience: str, def __init__(self, client: Type[Client], client_secret: str, token_url: str, audience: str,
signature_header_key: str = 'Authorization', token_type: str = 'Bearer') -> None: signature_header_key: str = 'Authorization', token_type: str = 'Bearer') -> None:
self._client_id = client_id
self._client_secret = client_secret self._client_secret = client_secret
self._token_url = token_url self._token_url = token_url
self._audience = audience self._audience = audience
self._signature_header_key = signature_header_key self._signature_header_key = signature_header_key
self._token_type = token_type self._token_type = token_type
self.oauth2 = OAuth2Session(client=BackendApplicationClient(client_id=client_id)) self.oauth2 = OAuth2Session(client=client)
def __call__(self, context: grpc.AuthMetadataContext, def __call__(self, context: grpc.AuthMetadataContext,
callback: grpc.AuthMetadataPluginCallback) -> None: callback: grpc.AuthMetadataPluginCallback) -> None:
...@@ -27,17 +27,27 @@ class OAuth2Plugin(grpc.AuthMetadataPlugin): ...@@ -27,17 +27,27 @@ class OAuth2Plugin(grpc.AuthMetadataPlugin):
@property @property
def token(self) -> OAuth2Token: def token(self) -> OAuth2Token:
if self._token is None: def fetch_token() -> OAuth2Token:
self._token = self.oauth2.fetch_token( return self.oauth2.fetch_token(
token_url=self._token_url,
client_secret=self._client_secret,
audience=self._audience
)
def refresh_token() -> OAuth2Token:
return self.oauth2.refresh_token(
token_url=self._token_url, token_url=self._token_url,
client_secret=self._client_secret, client_secret=self._client_secret,
audience=self._audience audience=self._audience
) )
else:
if self._token.expires_at and self._token.expires_at < time.time(): if self._token is None:
self._token = self.oauth2.refresh_token( self._token = fetch_token()
token_url=self._token_url,
client_secret=self._client_secret, if self._token.expires_at and self._token.expires_at < time.time():
audience=self._audience if 'refresh_token' in self._token:
) self._token = refresh_token()
else:
self._token = fetch_token()
return self._token return self._token
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment