Source code for pysnow.oauth_client

# -*- coding: utf-8 -*-

import warnings

from oauthlib.oauth2 import LegacyApplicationClient
from oauthlib.oauth2.rfc6749.errors import OAuth2Error
from requests_oauthlib import OAuth2Session

from .client import Client
from .exceptions import InvalidUsage, MissingToken, TokenCreateError

warnings.simplefilter("always", DeprecationWarning)


[docs]class OAuthClient(Client): """Pysnow `Client` with extras for oauth session and token handling. :param client_id: client_id from ServiceNow :param client_secret: client_secret from ServiceNow :param token_updater: function called when a token has been refreshed :param kwargs: kwargs passed along to :class:`pysnow.Client` """ token = None def __init__(self, client_id=None, client_secret=None, token_updater=None, **kwargs): if not (client_secret and client_id): raise InvalidUsage('You must supply a client_id and client_secret') if kwargs.get('session') or kwargs.get('user'): warnings.warn('pysnow.OAuthClient manages sessions internally, ' 'provided user / password credentials or sessions will be ignored.') # Forcibly set session, user and password kwargs['session'] = OAuth2Session(client=LegacyApplicationClient(client_id=client_id)) kwargs['user'] = None kwargs['password'] = None super(OAuthClient, self).__init__(**kwargs) self.token_updater = token_updater self.client_id = client_id self.client_secret = client_secret self.token_url = "%s/oauth_token.do" % self.base_url def _get_oauth_session(self): """Creates a new OAuth session :return: - OAuth2Session object """ return self._get_session( OAuth2Session( client_id=self.client_id, token=self.token, token_updater=self.token_updater, auto_refresh_url=self.token_url, auto_refresh_kwargs={ "client_id": self.client_id, "client_secret": self.client_secret } ) )
[docs] def set_token(self, token): """Validate and set token :param token: the token (dict) to set """ if not token: self.token = None return expected_keys = set(("token_type", "refresh_token", "access_token", "scope", "expires_in", "expires_at")) if not isinstance(token, dict) or not expected_keys <= set(token): raise InvalidUsage("Token should contain a dictionary obtained using fetch_token()") self.token = token
def _legacy_request(self, *args, **kwargs): """Makes sure token has been set, then calls parent to create a new :class:`pysnow.LegacyRequest` object :param args: args to pass along to _legacy_request() :param kwargs: kwargs to pass along to _legacy_request() :return: - :class:`pysnow.LegacyRequest` object :raises: - MissingToken: If token hasn't been set """ if isinstance(self.token, dict): self.session = self._get_oauth_session() return super(OAuthClient, self)._legacy_request(*args, **kwargs) raise MissingToken("You must set_token() before creating a legacy request with OAuthClient")
[docs] def resource(self, api_path=None, base_path='/api/now', chunk_size=None): """Overrides :meth:`resource` provided by :class:`pysnow.Client` with extras for OAuth :param api_path: Path to the API to operate on :param base_path: (optional) Base path override :param chunk_size: Response stream parser chunk size (in bytes) :return: - :class:`Resource` object :raises: - InvalidUsage: If a path fails validation """ if isinstance(self.token, dict): self.session = self._get_oauth_session() return super(OAuthClient, self).resource(api_path, base_path) raise MissingToken("You must set_token() before creating a resource with OAuthClient")
[docs] def generate_token(self, user, password): """Takes user and password credentials and generates a new token :param user: user :param password: password :return: - dictionary containing token data :raises: - TokenCreateError: If there was an error generating the new token """ try: return dict(self.session.fetch_token(token_url=self.token_url, username=user, password=password, client_id=self.client_id, client_secret=self.client_secret)) except OAuth2Error as e: raise TokenCreateError(error=e.error, description=e.description)