Skip to content

Commit

Permalink
Django style filtering review
Browse files Browse the repository at this point in the history
  • Loading branch information
bartonip authored and filak-sap committed Jun 28, 2020
1 parent 61dd89e commit 548c12e
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 105 deletions.
3 changes: 3 additions & 0 deletions pyodata/v2/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,9 @@ def proprty(self, property_name):
def proprties(self):
return list(self._properties.values())

def has_proprty(self, proprty_name):
return proprty_name in self._properties

@classmethod
def from_etree(cls, type_node, config: Config):
name = type_node.get('Name')
Expand Down
272 changes: 170 additions & 102 deletions pyodata/v2/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def execute(self):
if body:
self._logger.debug(' body: %s', body)

params = "&".join("%s=%s" % (k,v) for k,v in self.get_query_params().items())
params = "&".join("%s=%s" % (k, v) for k, v in self.get_query_params().items())
response = self._connection.request(
self.get_method(), url, headers=headers, params=params, data=body)

Expand Down Expand Up @@ -993,145 +993,213 @@ def __ge__(self, value):
def __gt__(self, value):
return GetEntitySetFilter.format_filter(self._proprty, 'gt', value)

class FilterExpression(object):
def __init__(self, *args, **kwargs):
self.expressions = kwargs
self.other = None
self.operator = None

class FilterExpression:
"""A class representing named expression of OData $filter"""

def __init__(self, **kwargs):
self._expressions = kwargs
self._other = None
self._operator = None

@property
def expressions(self):
"""Get expressions where key is property name with the operator suffix
and value is the left hand side operand.
"""

return self._expressions.items()

@property
def other(self):
"""Get an instance of the other operand"""

return self._other

@property
def operator(self):
"""The other operand"""

return self._operator

def __or__(self, other):
self.other = other
self.operator = "or"
if self._other is not None:
raise RuntimeError('The FilterExpression already initialized')

self._other = other
self._operator = "or"
return self

def __and__(self, other):
self.other = other
self.operator = "and"
if self._other is not None:
raise RuntimeError('The FilterExpression already initialized')

self._other = other
self._operator = "and"
return self

class GetEntitySetFilterChainable(object):

class GetEntitySetFilterChainable:
"""
Example expressions
FirstName="Tim"
FirstName__contains="Tim"
FirstName='Tim'
FirstName__contains='Tim'
Age__gt=56
Age__gte=6
Age__lt=78
Age__lte=90
Age__range=(5,9)
FirstName__in=["Tim", "Bob", "Sam"]
FirstName__startswith="Tim"
FirstName__endswith="mothy"
Addresses__Suburb="Chatswood"
Addresses__Suburb__contains="wood"
FirstName__in=['Tim', 'Bob', 'Sam']
FirstName__startswith='Tim'
FirstName__endswith='mothy'
Addresses__Suburb='Chatswood'
Addresses__Suburb__contains='wood'
"""
operators = ["startswith", "endswith", "lt", "lte", "gt", "gte", "contains", "range", "in", "length"]
def __init__(self, request, filter_expressions, exprs):
self.request = request
self.expressions = exprs
self.filter_expressions = filter_expressions

OPERATORS = [
'startswith',
'endswith',
'lt',
'lte',
'gt',
'gte',
'contains',
'range',
'in',
'length',
'eq'
]

def __init__(self, entity_type, filter_expressions, exprs):
self._entity_type = entity_type
self._filter_expressions = filter_expressions
self._expressions = exprs

@property
def expressions(self):
"""Get expressions as a list of tuples where the first item
is a property name with the operator suffix and the second item
is a left hand side value.
"""

return self._expressions.items()

def proprty_obj(self, name):
return self.request._entity_type.proprty(name)
"""Returns a model property for a particular property"""

return self._entity_type.proprty(name)

def _decode_and_combine_filter_expression(self, filter_expression):
filter_expressions = [self._decode_expression(expr, val) for expr, val in filter_expression.expressions]
return self._combine_expressions(filter_expressions)

def _process_query_objects(self):
"""Processes FilterExpression objects to OData lookups"""

def process_query_objects(self):
filter_expressions = []
for q in self.filter_expressions:
lhs_expressions = []
rhs_expressions = []
for expr, val in q.expressions.items():
lhs_expressions.append(self.decode_expression(expr, val))
lhs_expressions = self.combine_expressions(lhs_expressions)

if q.other:
for expr, val in q.other.expressions.items():
rhs_expressions.append(self.decode_expression(expr, val))
rhs_expressions = self.combine_expressions(rhs_expressions)

filter_expressions.append(f"({lhs_expressions}) {q.operator} ({rhs_expressions})")

for expr in self._filter_expressions:
lhs_expressions = self._decode_and_combine_filter_expression(expr)

if expr.other is not None:
rhs_expressions = self._decode_and_combine_filter_expression(expr.other)
filter_expressions.append(f'({lhs_expressions}) {expr.operator} ({rhs_expressions})')
else:
filter_expressions.append(lhs_expression)
filter_expressions.append(lhs_expressions)

return filter_expressions

def process_expressions(self):
filter_expressions = []
for expr, val in self.expressions.items():
filter_expressions.append(self.decode_expression(expr, val))
def _process_expressions(self):
filter_expressions = [self._decode_expression(expr, val) for expr, val in self.expressions]

filter_expressions.extend(self._process_query_objects())

filter_expressions.extend(self.process_query_objects())
return filter_expressions

def decode_expression(self, expr, val):
properties = self.request._entity_type._properties.keys()
def _decode_expression(self, expr, val):
field = None
# field_heirarchy = []
operator = "eq"
exprs = expr.split("__")
operator = 'eq'
exprs = expr.split('__')

for part in exprs:
if part in properties:
if self._entity_type.has_proprty(part):
field = part
# field_heirarchy.append(part)
elif part in self.__class__.operators:
elif part in self.__class__.OPERATORS:
operator = part

# field = "/".join(field_heirarchy)
else:
raise ValueError(f'"{part}" is not a valid property or operator')
# field = '/'.join(field_heirarchy)

# target_field = self.proprty_obj(field_heirarchy[-1])
expression = self.build_expression(field, operator, val)
expression = self._build_expression(field, operator, val)

return expression

def combine_expressions(self, expressions):
return " and ".join(expressions)

def build_expression(self, field_name, operator, value):
# pylint: disable=no-self-use
def _combine_expressions(self, expressions):
return ' and '.join(expressions)

# pylint: disable=too-many-return-statements, too-many-branches
def _build_expression(self, field_name, operator, value):
target_field = self.proprty_obj(field_name)
if operator not in ["length", "in", "range"]:

if operator not in ['length', 'in', 'range']:
value = target_field.to_literal(value)
if operator == "lt":
return f"{field_name} lt {value}"
elif operator == "lte":
return f"{field_name} le {value}"
elif operator == "gte":
return f"{field_name} ge {value}"
elif operator == "gt":
return f"{field_name} gt {value}"
elif operator == "startswith":
return f"startswith({field_name}, {value}) eq true"
elif operator == "endswith":
return f"endswith({field_name}, {value}) eq true"
elif operator == "length":

if operator == 'lt':
return f'{field_name} lt {value}'

if operator == 'lte':
return f'{field_name} le {value}'

if operator == 'gte':
return f'{field_name} ge {value}'

if operator == 'gt':
return f'{field_name} gt {value}'

if operator == 'startswith':
return f'startswith({field_name}, {value}) eq true'

if operator == 'endswith':
return f'endswith({field_name}, {value}) eq true'

if operator == 'length':
value = int(value)
return f"length({field_name}) eq {value}"
elif operator in ["contains"]:
return f"substringof({value}, {field_name}) eq true"
elif operator == "range":
if not (isinstance(value, tuple) or isinstance(value, list)):
raise TypeError("Range must be tuple or list not {}".format(type(value)))
return f'length({field_name}) eq {value}'

if operator in ['contains']:
return f'substringof({value}, {field_name}) eq true'

if operator == 'range':
if not isinstance(value, (tuple, list)):
raise TypeError('Range must be tuple or list not {}'.format(type(value)))

if len(value) != 2:
raise ValueError("Only two items can be passed in a range.")

x = target_field.to_literal(value[0])
y = target_field.to_literal(value[1])
return f"{field_name} gte {x} and {field_name} lte {y}"
elif operator == "in":
literal_values = []
for v in value:
val = target_field.to_literal(v)
literal_values.append(f"{field_name} eq {val}")
return " or ".join(literal_values)
elif operator == "eq":
return f"{field_name} eq {value}"
else:
raise ValueError(f"Invalid expression {operator}")
raise ValueError('Only two items can be passed in a range.')

low_bound = target_field.to_literal(value[0])
high_bound = target_field.to_literal(value[1])

return f'{field_name} gte {low_bound} and {field_name} lte {high_bound}'

def as_filter_string(self):
expressions = self.process_expressions()
result = self.combine_expressions(expressions)
if operator == 'in':
literal_values = (f'{field_name} eq {target_field.to_literal(item)}' for item in value)
return ' or '.join(literal_values)

if operator == 'eq':
return f'{field_name} eq {value}'

raise ValueError(f'Invalid expression {operator}')

def __str__(self):
expressions = self._process_expressions()
result = self._combine_expressions(expressions)
return quote(result)


class GetEntitySetRequest(QueryRequest):
"""GET on EntitySet"""

Expand All @@ -1144,18 +1212,18 @@ def __getattr__(self, name):
proprty = self._entity_type.proprty(name)
return GetEntitySetFilter(proprty)

def set_filter(self, filter_val):
filter_text = self._filter + " and " if self._filter else ""
def _set_filter(self, filter_val):
filter_text = self._filter + ' and ' if self._filter else ''
filter_text += filter_val
self._filter = filter_text

def filter(self, *args, **kwargs):
if len(args) and isinstance(args[0], str):
if args and len(args) == 1 and isinstance(args[0], str):
self._filter = args[0]
return self
else:
self.set_filter(GetEntitySetFilterChainable(self, args, kwargs).as_filter_string())
return self
self._set_filter(str(GetEntitySetFilterChainable(self._entity_type, args, kwargs)))

return self


class EntitySetProxy:
Expand Down
22 changes: 21 additions & 1 deletion tests/test_model_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest
from pyodata.v2.model import Schema, Typ, StructTypeProperty, Types, EntityType, EdmStructTypeSerializer, \
Association, AssociationSet, EndRole, AssociationSetEndRole, TypeInfo, MetadataBuilder, ParserError, PolicyWarning, \
PolicyIgnore, Config, PolicyFatal, NullType, NullAssociation, current_timezone
PolicyIgnore, Config, PolicyFatal, NullType, NullAssociation, current_timezone, StructType
from pyodata.exceptions import PyODataException, PyODataModelError, PyODataParserError
from tests.conftest import assert_logging_policy

Expand Down Expand Up @@ -1404,3 +1404,23 @@ def test_missing_property_referenced_in_annotation(mock_warning, xml_builder_fac
)).build()

assert mock_warning.called is False


def test_struct_type_has_property_initial_instance():
struct_type = StructType('Name', 'Label', False)

assert struct_type.has_proprty('proprty') == False


def test_struct_type_has_property_no():
struct_type = StructType('Name', 'Label', False)
struct_type._properties['foo'] = 'ugly test hack'

assert not struct_type.has_proprty('proprty')


def test_struct_type_has_property_yes():
struct_type = StructType('Name', 'Label', False)
struct_type._properties['proprty'] = 'ugly test hack'

assert struct_type.has_proprty('proprty')
Loading

0 comments on commit 548c12e

Please sign in to comment.