Source code for pysnow.query

# -*- coding: utf-8 -*-

import inspect
from pysnow.exceptions import (QueryEmpty,
                               QueryExpressionError,
                               QueryMissingField,
                               QueryMultipleExpressions,
                               QueryTypeError)


[docs]class QueryBuilder(object): def __init__(self): """Query builder - used for building complex queries""" self._query = [] self.current_field = None self.c_oper = None self.l_oper = None
[docs] def AND(self): """And operator""" return self._add_logical_operator('^')
[docs] def OR(self): """OR operator""" return self._add_logical_operator('^OR')
[docs] def NQ(self): """NQ (new query) operator""" return self._add_logical_operator('^NQ')
[docs] def field(self, field): """ Sets the field to operate on :param field: field (str) to operate on :return: self """ self.current_field = field return self
[docs] def starts_with(self, value): """Query records with the given field starting with the value specified""" return self._add_condition('STARTSWITH', value, types=[str])
[docs] def ends_with(self, value): """Query records with the given field ending with the value specified""" return self._add_condition('ENDSWITH', value, types=[str])
[docs] def contains(self, value): """Query records with the given field containing the value specified""" return self._add_condition('LIKE', value, types=[str])
[docs] def not_contains(self, value): """Query records with the given field not containing the value specified""" return self._add_condition('NOTLIKE', value, types=[str])
[docs] def is_empty(self): """Query records with the given field empty""" return self._add_condition('ISEMPTY', '', types=[str, int])
[docs] def equals(self, data): """ Query records with the given field equalling either: - the value passed (str) - any of the values passed (list) """ if isinstance(data, str): return self._add_condition('=', data, types=[int, str]) elif isinstance(data, list): return self._add_condition('IN', ",".join(map(str, data)), types=[str]) raise QueryTypeError('Expected value of type `str` or `list`, not %s' % type(data))
[docs] def not_equals(self, value): """ Query records with the given field not equalling: - the value specified - any of the values specified (list) """ if isinstance(value, str): return self._add_condition('!=', value, types=[int, str]) elif isinstance(value, list): return self._add_condition('NOT IN', ",".join(value), types=[str]) raise QueryTypeError('Expected value of type `str` or `list`, not %s' % type(value))
[docs] def greater_than(self, value): """Query records with the given field greater than the value specified""" if hasattr(value, 'strftime'): value = value.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(value, str): raise QueryTypeError('Expected value of type `int` or instance of `datetime`, not %s' % type(value)) return self._add_condition('>', value, types=[int, str])
[docs] def less_than(self, value): """Query records with the given field less than the value specified""" if hasattr(value, 'strftime'): value = value.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(value, str): raise QueryTypeError('Expected value of type `int` or instance of `datetime`, not %s' % type(value)) return self._add_condition('<', value, types=[int, str])
[docs] def between(self, start, end): """Query records in a start and end range :param start: `int` or `datetime` object :param end: `int` or `datetime` object :raise: :QueryTypeError: if start or end arguments is of an invalid type :return: self """ if hasattr(start, 'strftime') and hasattr(end, 'strftime'): dt_between = ( 'javascript:gs.dateGenerate("%(start)s")' "@" 'javascript:gs.dateGenerate("%(end)s")' ) % { 'start': start.strftime('%Y-%m-%d %H:%M:%S'), 'end': end.strftime('%Y-%m-%d %H:%M:%S') } elif isinstance(start, int) and isinstance(end, int): dt_between = '%d@%d' % (start, end) else: raise QueryTypeError("Expected `start` and `end` of type `int` " "or instance of `datetime`, not %s and %s" % (type(start), type(end))) return self._add_condition('BETWEEN', dt_between, types=[str])
def _add_condition(self, operator, value, types): """ Appends condition to self._query after performing validation :param operator: operator (str) :param value: value / operand :param types: allowed types :raise: :QueryMissingField: if a field hasn't been set :QueryMultipleExpressions: if a condition already has been set :QueryTypeError: if the value is of an unexpected type :return: self """ if not self.current_field: raise QueryMissingField("Expressions requires a field()") elif not type(value) in types: caller = inspect.currentframe().f_back.f_code.co_name raise QueryTypeError("Invalid type passed to %s() , expected: %s" % (caller, types)) elif self.c_oper: raise QueryMultipleExpressions("Expected logical operator after expression") self.c_oper = inspect.currentframe().f_back.f_code.co_name self._query.append("%(current_field)s%(operator)s%(value)s" % { 'current_field': self.current_field, 'operator': operator, 'value': value}) return self def _add_logical_operator(self, operator): """ Adds a logical operator between conditions in query :param operator: logical operator (str) :raise: :QueryExpressionError: if a expression hasn't been set :return: self """ if not self.c_oper: raise QueryExpressionError("Logical operators must be preceded by an expression") self.current_field = None self.c_oper = None self.l_oper = inspect.currentframe().f_back.f_code.co_name self._query.append(operator) return self def __str__(self): """ String representation of the query object :raise: :QueryEmpty: if there's no conditions defined :QueryMissingField: if field() hasn't been set :QueryExpressionError: if a expression hasn't been set :return: Query string """ if len(self._query) == 0: raise QueryEmpty("At least one condition is required") elif self.current_field is None: raise QueryMissingField("Logical operator expects a field()") elif self.c_oper is None: raise QueryExpressionError("field() expects an expression") return str().join(self._query)
# For backwards compatibility class Query(QueryBuilder): pass