Skip to content
Snippets Groups Projects

Добавлен плагин аутентификации

Merged Andrei Biriukov requested to merge feature/auth into master
6 files
+ 170
0
Compare changes
  • Side-by-side
  • Inline
Files
6
perxis/auth.py 0 → 100644
+ 53
0
import time
from typing import Type
import grpc
from oauthlib.oauth2 import Client, OAuth2Token
from requests_oauthlib import OAuth2Session
class OAuth2Plugin(grpc.AuthMetadataPlugin):
_token = None
def __init__(self, client: Type[Client], client_secret: str, token_url: str, audience: str,
signature_header_key: str = 'Authorization', token_type: str = 'Bearer') -> None:
self._client_secret = client_secret
self._token_url = token_url
self._audience = audience
self._signature_header_key = signature_header_key
self._token_type = token_type
self.oauth2 = OAuth2Session(client=client)
def __call__(self, context: grpc.AuthMetadataContext,
callback: grpc.AuthMetadataPluginCallback) -> None:
callback(((self._signature_header_key, f'{self._token_type} {self.token["access_token"]}'),), None)
@property
def token(self) -> OAuth2Token:
def fetch_token() -> OAuth2Token:
return self.oauth2.fetch_token(
token_url=self._token_url,
client_secret=self._client_secret,
audience=self._audience
)
def refresh_token() -> OAuth2Token:
return self.oauth2.refresh_token(
token_url=self._token_url,
client_secret=self._client_secret,
audience=self._audience
)
if self._token is None:
self._token = fetch_token()
if self._token.expires_at and self._token.expires_at < time.time():
if 'refresh_token' in self._token:
self._token = refresh_token()
else:
self._token = fetch_token()
return self._token
Loading