Skip to content

Commit

Permalink
Add support for before_/after_aggregation events.
Browse files Browse the repository at this point in the history
Closes #1057.
  • Loading branch information
nicolaiarocci committed Apr 24, 2018
1 parent c158225 commit 8c3588f
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Development

Version 0.8
~~~~~~~~~~~
- New: ``before_aggregation`` and ``after_aggregation`` event hooks allow to
attach custom callbacks to aggregation endpoints. Closes #1057.
- Fix: Crash with Cerberus 1.2. Closes #1137.
- Docs: Add link to the Eve course. It was authored by the project author, and
it is hosted by TalkPython Training.
Expand Down
35 changes: 35 additions & 0 deletions docs/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,38 @@ notified of such a disastrous occurrence by hooking a callback function to the
hit by the DELETE after having retrieved the original document. NOTE: those two event are useful in order to
perform some business logic before the actual remove operation given the look up and the list of originals
.. _aggregation_hooks:
Aggregation event hooks
~~~~~~~~~~~~~~~~~~~~~~~
You can also attach one or more callbacks to your aggregation endpoints. The
``before_aggregation`` event is fired when an aggregation is about to be
performed. Any attached callback function will receive both the endpoint name
and the aggregation pipeline as arguments. The pipeline can then be altered if
needed.
.. code-block:: pycon
>>> def on_aggregate(endpoint, pipeline):
... pipeline.append({"$unwind": "$tags"})
>>> app = Eve()
>>> app.before_aggregation += on_aggregate
The ``after_aggregation`` event is fired when the aggregation has been
performed. An attached callback function could leverage this event to modify
the documents before they are returned to the client.
.. code-block:: pycon
>>> def alter_documents(endpoint, documents):
... for document in documents:
... document['hello'] = 'well, hello!'
>>> app = Eve()
>>> app.after_aggregation += alter_documents
For more information on aggregation support, see :ref:`aggregation`
.. admonition:: Please note
Expand Down Expand Up @@ -2204,6 +2236,8 @@ to a keyword of your liking, just set ``QUERY_AGGREGATION`` in your settings.
You can also set all options natively supported by PyMongo. For more
informations on aggregation see :ref:`datasource`.
Custom callback functions can be attached to the ``before_aggregation`` and ``after_aggregation`` event hooks. For more information, see :ref:`aggregation_hooks`.
Limitations
~~~~~~~~~~~
``HATEOAS`` is not available at aggregation endpoints. This should not
Expand Down Expand Up @@ -2237,6 +2271,7 @@ A single endpoint cannot serve both regular and aggregation results. However,
since it is possible to setup multiple endpoints all serving from the same
datasource (see :ref:`source`), similar functionality can be easily achieved.
MongoDB and SQL Support
------------------------
Support for single or multiple MongoDB database/servers comes out of the box.
Expand Down
4 changes: 4 additions & 0 deletions eve/methods/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,15 @@ def parse_again(st_value, key, value):
req_pipeline.append(skip)
req_pipeline.append(limit)

getattr(app, "before_aggregation")(resource, req_pipeline)

cursor = app.data.aggregate(resource, req_pipeline, options)

for document in cursor:
documents.append(document)

getattr(app, "after_aggregation")(resource, documents)

response[config.ITEMS] = documents

# PyMongo's CommandCursor does not return a count, so we cannot
Expand Down
78 changes: 78 additions & 0 deletions eve/tests/methods/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,9 @@ def test_get_aggregation_endpoint(self):
]
)

self.devent = DummyEvent(lambda: True)
self.app.before_aggregation += self.devent

self.app.register_resource(
'aggregate_test', {
'datasource': {
Expand All @@ -1333,6 +1336,7 @@ def test_get_aggregation_endpoint(self):

response, status = self.get('aggregate_test?aggregate=ciao')
self.assert400(status)
self.assertTrue(self.devent.called is None)

def assertOutput(doc, count, id):
self.assertEqual(doc['count'], count)
Expand All @@ -1345,6 +1349,7 @@ def assertOutput(doc, count, id):
assertOutput(docs[0], 3, 'cat')
assertOutput(docs[1], 2, 'dog')
assertOutput(docs[2], 1, 'mouse')
self.assertEqual('aggregate_test', self.devent.called[0])

response, status = self.get('aggregate_test?aggregate={"$field1":2}')
self.assert200(status)
Expand All @@ -1353,6 +1358,7 @@ def assertOutput(doc, count, id):
assertOutput(docs[0], 6, 'cat')
assertOutput(docs[1], 4, 'dog')
assertOutput(docs[2], 2, 'mouse')
self.assertEqual('aggregate_test', self.devent.called[0])

# this will return 0 for all documents 'count' fields as no $field1
# will be gien with the query (actually, no query will be there at all)
Expand All @@ -1363,10 +1369,12 @@ def assertOutput(doc, count, id):
self.assertEqual(docs[0]['count'], 0)
self.assertEqual(docs[1]['count'], 0)
self.assertEqual(docs[2]['count'], 0)
self.assertEqual('aggregate_test', self.devent.called[0])

# malformed field name is ignored
response, status = self.get('aggregate_test?aggregate={"field1":1}')
self.assert200(status)
self.assertEqual('aggregate_test', self.devent.called[0])

# unknown field is ignored
response, status = self.get('aggregate_test?aggregate={"$unknown":1}')
Expand Down Expand Up @@ -2045,6 +2053,76 @@ def test_on_fetched_item_contacts(self):
self.assertEqual(self.item_id, str(self.devent.called[0][id_field]))
self.assertEqual(1, len(self.devent.called))

def test_get_before_aggregation_hook(self):
_db = self.connection[MONGO_DBNAME]
_db.aggregate_test.insert_many(
[
{"x": 1, "tags": ["dog", "cat"]},
{"x": 2, "tags": ["cat"]},
{"x": 2, "tags": ["mouse", "cat", "dog"]},
{"x": 3, "tags": []}
]
)

self.app.before_aggregation += self.devent

self.app.register_resource(
'aggregate_test', {
'datasource': {
'aggregation': {
'pipeline': [
{"$unwind": "$tags"},
{"$group": {"_id": "$tags", "count": {"$sum":
"$field1"}}},
],
}
}
}
)

response, status = self.get('aggregate_test?aggregate=ciao')
self.assert400(status)
self.assertTrue(self.devent.called is None)

response, status = self.get('aggregate_test?aggregate={"$field1":1}')
self.assert200(status)
self.assertEqual('aggregate_test', self.devent.called[0])

def test_get_after_aggregation_hook(self):
_db = self.connection[MONGO_DBNAME]
_db.aggregate_test.insert_many(
[
{"x": 1, "tags": ["dog", "cat"]},
{"x": 2, "tags": ["cat"]},
{"x": 2, "tags": ["mouse", "cat", "dog"]},
{"x": 3, "tags": []}
]
)

self.app.after_aggregation += self.devent

self.app.register_resource(
'aggregate_test', {
'datasource': {
'aggregation': {
'pipeline': [
{"$unwind": "$tags"},
{"$group": {"_id": "$tags", "count": {"$sum":
"$field1"}}},
],
}
}
}
)

response, status = self.get('aggregate_test?aggregate=ciao')
self.assert400(status)
self.assertTrue(self.devent.called is None)

response, status = self.get('aggregate_test?aggregate={"$field1":1}')
self.assert200(status)
self.assertEqual('aggregate_test', self.devent.called[0])

def get_resource(self):
return self.test_client.get(self.known_resource_url)

Expand Down

0 comments on commit 8c3588f

Please sign in to comment.