Source code for pysnow.request

# -*- coding: utf-8 -*-

import itertools
import json
import os
import ntpath
import warnings

from pysnow import query

from pysnow.exceptions import (NoRequestExecuted,
                               MultipleResults,
                               NoResults,
                               InvalidUsage,
                               UnexpectedResponse,
                               MissingResult)


[docs]class Request(object): base_path = "api/now" def __init__(self, method, table, **kwargs): """Takes arguments used to perform a HTTP request :param method: HTTP request method :param table: table to operate on """ self.method = method self.table = table self.url_link = None # Updated when a linked request is iterated on self.base_url = kwargs.pop('base_url') self.request_params = kwargs.pop('request_params') self.raise_on_empty = kwargs.pop('raise_on_empty') self.session = kwargs.pop('session') self._last_response = None if method in ('GET', 'DELETE'): self.query = kwargs.pop('query') @property def last_response(self): """Return _last_response after making sure an inner `requests.request` has been performed :raise: :NoRequestExecuted: If no request has been executed :return: last response """ if self._last_response is None: raise NoRequestExecuted("%s hasn't been executed" % self) return self._last_response @last_response.setter def last_response(self, response): """ Sets last_response property :param response: `requests.request` response """ self._last_response = response @property def count(self): """ Returns the number of records the query would yield""" self.request_params.update({'sysparm_count': True}) response = self.session.get(self._get_stats_url(), params=self._get_formatted_query(fields=list(), limit=None, order_by=list(), offset=None)) content = self._get_content(response) return int(content['stats']['count']) @property def status_code(self): """Return last_response.status_code after making sure an inner `requests.request` has been performed :return: status_code of last_response """ return self.last_response.status_code def _all_inner(self, fields, limit, order_by, offset): """Yields all records for the query and follows links if present on the response after validating :return: List of records with content """ response = self.session.get(self._get_table_url(), params=self._get_formatted_query(fields, limit, order_by, offset)) yield self._get_content(response) while 'next' in response.links: self.url_link = response.links['next']['url'] response = self.session.get(self.url_link) yield self._get_content(response)
[docs] def get_all(self, fields=list(), limit=None, order_by=list(), offset=None): """DEPRECATED - see get_multiple()""" warnings.warn("get_all() is deprecated, please use get_multiple() instead", DeprecationWarning) return self.get_multiple(fields, limit, order_by, offset)
[docs] def get_multiple(self, fields=list(), limit=None, order_by=list(), offset=None): """Wrapper method that takes whatever was returned by the _all_inner() generators and chains it in one result The response can be sorted by passing a list of fields to order_by. Example: get_multiple(order_by=['category', '-created_on']) would sort the category field in ascending order, with a secondary sort by created_on in descending order. :param fields: List of fields to return in the result :param limit: Limits the number of records returned :param order_by: Sort response based on certain fields :param offset: A number of records to skip before returning records (for pagination) :return: Iterable chain object """ return itertools.chain.from_iterable(self._all_inner(fields, limit, order_by, offset))
[docs] def get_one(self, fields=list()): """Convenience function for queries returning only one result. Validates response before returning. :param fields: List of fields to return in the result :raise: :MultipleResults: if more than one match is found :return: Record content """ response = self.session.get(self._get_table_url(), params=self._get_formatted_query(fields, limit=None, order_by=list(), offset=None)) content = self._get_content(response) l = len(content) if l > 1: raise MultipleResults('Multiple results for get_one()') if len(content) == 0: return {} return content[0]
[docs] def insert(self, payload): """Inserts a new record with the payload passed as an argument :param payload: The record to create (dict) :return: Created record """ response = self.session.post(self._get_table_url(), data=json.dumps(payload)) return self._get_content(response)
[docs] def delete(self): """Deletes the queried record and returns response content after response validation :raise: :NoResults: if query returned no results :NotImplementedError: if query returned more than one result (currently not supported) :return: Delete response content (Generally always {'Success': True}) """ try: result = self.get_one() if 'sys_id' not in result: raise NoResults() except MultipleResults: raise MultipleResults("Deletion of multiple records is not supported") except NoResults as e: e.args = ('Cannot delete a non-existing record',) raise response = self.session.delete(self._get_table_url(sys_id=result['sys_id'])) return self._get_content(response)
[docs] def update(self, payload): """Updates the queried record with `payload` and returns the updated record after validating the response :param payload: Payload to update the record with :raise: :NoResults: if query returned no results :MultipleResults: if query returned more than one result (currently not supported) :return: The updated record """ try: result = self.get_one() if 'sys_id' not in result: raise NoResults() except MultipleResults: raise MultipleResults("Update of multiple records is not supported") except NoResults as e: e.args = ('Cannot update a non-existing record',) raise if not isinstance(payload, dict): raise InvalidUsage("Update payload must be of type dict") response = self.session.put(self._get_table_url(sys_id=result['sys_id']), data=json.dumps(payload)) return self._get_content(response)
[docs] def clone(self, reset_fields=list()): """Clones the queried record :param reset_fields: Fields to reset :raise: :NoResults: if query returned no results :MultipleResults: if query returned more than one result (currently not supported) :UnexpectedResponse: informs the user about what likely went wrong :return: The cloned record """ if not isinstance(reset_fields, list): raise InvalidUsage("reset_fields must be a list() of fields") try: response = self.get_one() if 'sys_id' not in response: raise NoResults() except MultipleResults: raise MultipleResults('Cloning multiple records is not supported') except NoResults as e: e.args = ('Cannot clone a non-existing record',) raise payload = {} # Iterate over fields in the result for field in response: # Ignore fields in reset_fields if field in reset_fields: continue item = response[field] # Check if the item is of type dict and has a sys_id ref (value) if isinstance(item, dict) and 'value' in item: payload[field] = item['value'] else: payload[field] = item try: return self.insert(payload) except UnexpectedResponse as e: if e.status_code == 403: # User likely attempted to clone a record without resetting a unique field e.args = ('Unable to create clone. Make sure unique fields has been reset.',) raise
[docs] def attach(self, file): """Attaches the queried record with `file` and returns the response after validating the response :param file: File to attach to the record :raise: :NoResults: if query returned no results :MultipleResults: if query returned more than one result (currently not supported) :return: The attachment record metadata """ try: result = self.get_one() if 'sys_id' not in result: raise NoResults() except MultipleResults: raise MultipleResults('Attaching a file to multiple records is not supported') except NoResults: raise NoResults('Attempted to attach file to a non-existing record') if not os.path.isfile(file): raise InvalidUsage("Attachment '%s' must be an existing regular file" % file) response = self.session.post( self._get_attachment_url('upload'), data={ 'table_name': self.table, 'table_sys_id': result['sys_id'], 'file_name': ntpath.basename(file) }, files={'file': open(file, 'rb')}, headers={'content-type': None} # Temporarily override header ) return self._get_content(response)
def _get_content(self, response): """Checks for errors in the response. Returns response content, in bytes. :param response: response object :raise: :UnexpectedResponse: if the server responded with an unexpected response :return: ServiceNow response content """ method = response.request.method self.last_response = response server_error = { 'summary': None, 'details': None } try: content_json = response.json() if 'error' in content_json: e = content_json['error'] if 'message' in e: server_error['summary'] = e['message'] if 'detail' in e: server_error['details'] = e['detail'] except ValueError: content_json = {} if method == 'DELETE': # Make sure the delete operation returned the expected response if response.status_code == 204: return {'success': True} else: raise UnexpectedResponse( 204, response.status_code, method, server_error['summary'], server_error['details'] ) # Make sure the POST operation returned the expected response elif method == 'POST' and response.status_code != 201: raise UnexpectedResponse( 201, response.status_code, method, server_error['summary'], server_error['details'] ) # It seems that Helsinki and later returns status 200 instead of 404 on empty result sets if ('result' in content_json and len(content_json['result']) == 0) or response.status_code == 404: if self.raise_on_empty is True: raise NoResults('Query yielded no results') elif 'error' in content_json: raise UnexpectedResponse( 200, response.status_code, method, server_error['summary'], server_error['details'] ) if 'result' not in content_json: raise MissingResult("The request was successful but the content didn't contain the expected 'result'") return content_json['result'] def _get_table_url(self, **kwargs): return self._get_url('table', item=self.table, **kwargs) def _get_attachment_url(self, action): return self._get_url('attachment', item=action) def _get_stats_url(self): return self._get_url('stats', item=self.table) def _get_url(self, resource, item, sys_id=None): """Takes table and sys_id (if present), and returns a URL :param resource: API resource :param item: API resource item :param sys_id: Record sys_id :return: url string """ url_str = '%(base_url)s/%(base_path)s/%(resource)s/%(item)s' % ( { 'base_url': self.base_url, 'base_path': self.base_path, 'resource': resource, 'item': item } ) if sys_id: return "%s/%s" % (url_str, sys_id) return url_str def _get_formatted_query(self, fields, limit, order_by, offset): """ Converts the query to a ServiceNow-interpretable format :return: ServiceNow query """ if not isinstance(order_by, list): raise InvalidUsage("Argument order_by must be of type list()") if not isinstance(fields, list): raise InvalidUsage("Argument fields must be of type list()") if isinstance(self.query, query.QueryBuilder): sysparm_query = str(self.query) elif isinstance(self.query, dict): # Dict-type query try: items = self.query.iteritems() # Python 2 except AttributeError: items = self.query.items() # Python 3 sysparm_query = '^'.join(['%s=%s' % (field, value) for field, value in items]) elif isinstance(self.query, str): # String-type query sysparm_query = self.query else: raise InvalidUsage("Query must be instance of %s, %s or %s" % (query.QueryBuilder, str, dict)) for field in order_by: if field[0] == '-': sysparm_query += "^ORDERBYDESC%s" % field[1:] else: sysparm_query += "^ORDERBY%s" % field params = {'sysparm_query': sysparm_query} params.update(self.request_params) if limit is not None: params.update({'sysparm_limit': limit, 'sysparm_suppress_pagination_header': True}) if offset is not None: params.update({'sysparm_offset': offset}) if len(fields) > 0: params.update({'sysparm_fields': ",".join(fields)}) return params