Skip to content

Multicorn aggregation/grouping pushdown support #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 78 additions & 17 deletions pg_es_fdw/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@
from multicorn import ForeignDataWrapper
from multicorn.utils import log_to_postgres as log2pg

from ._es_query import quals_to_es
from ._es_query import _PG_TO_ES_AGG_FUNCS, quals_to_es


class ElasticsearchFDW(ForeignDataWrapper):
""" Elastic Search Foreign Data Wrapper """

@property
def rowid_column(self):
""" Returns a column name which will act as a rowid column for
delete/update operations.
"""Returns a column name which will act as a rowid column for
delete/update operations.

This can be either an existing column name, or a made-up one. This
column name should be subsequently present in every returned
resultset. """
This can be either an existing column name, or a made-up one. This
column name should be subsequently present in every returned
resultset."""

return self._rowid_column

Expand Down Expand Up @@ -73,8 +73,8 @@ def __init__(self, options, columns):
self.scroll_id = None

def get_rel_size(self, quals, columns):
""" Helps the planner by returning costs.
Returns a tuple of the form (number of rows, average row width) """
"""Helps the planner by returning costs.
Returns a tuple of the form (number of rows, average row width)"""

try:
query, _ = self._get_query(quals)
Expand All @@ -93,23 +93,41 @@ def get_rel_size(self, quals, columns):
)
return (0, 0)

def explain(self, quals, columns, sortkeys=None, verbose=False):
query, _ = self._get_query(quals)
def can_pushdown_upperrel(self):
return {
"groupby_supported": True,
"agg_functions": _PG_TO_ES_AGG_FUNCS,
}

def explain(
self,
quals,
columns,
sortkeys=None,
aggs=None,
group_clauses=None,
verbose=False,
):
query, _ = self._get_query(quals, aggs=aggs, group_clauses=group_clauses)
return [
"Elasticsearch query to %s" % self.client,
"Query: %s" % json.dumps(query),
"Query: %s" % json.dumps(query, indent=4),
]

def execute(self, quals, columns):
def execute(self, quals, columns, aggs=None, group_clauses=None):
""" Execute the query """

try:
query, query_string = self._get_query(quals)
query, query_string = self._get_query(
quals, aggs=aggs, group_clauses=group_clauses
)

is_aggregation = aggs or group_clauses

if query:
response = self.client.search(
size=self.scroll_size,
scroll=self.scroll_duration,
size=self.scroll_size if not is_aggregation else 0,
scroll=self.scroll_duration if not is_aggregation else None,
body=query,
**self.arguments
)
Expand All @@ -118,7 +136,13 @@ def execute(self, quals, columns):
size=self.scroll_size, scroll=self.scroll_duration, **self.arguments
)

if not response["hits"]["hits"]:
if not response["hits"]["hits"] and not is_aggregation:
return

if is_aggregation:
yield from self._handle_aggregation_response(
query, response, aggs, group_clauses
)
return

while True:
Expand Down Expand Up @@ -221,7 +245,7 @@ def delete(self, document_id):
)
return (0, 0)

def _get_query(self, quals):
def _get_query(self, quals, aggs=None, group_clauses=None):
ignore_columns = []
if self.query_column:
ignore_columns.append(self.query_column)
Expand All @@ -230,10 +254,16 @@ def _get_query(self, quals):

query = quals_to_es(
quals,
aggs=aggs,
group_clauses=group_clauses,
ignore_columns=ignore_columns,
column_map={self._rowid_column: "_id"} if self._rowid_column else None,
)

if group_clauses is not None:
# Configure pagination for GROUP BY's
query["aggs"]["group_buckets"]["composite"]["size"] = self.scroll_size

if not self.query_column:
return query, None

Expand Down Expand Up @@ -283,3 +313,34 @@ def _convert_response_column(self, column, row_data):
if isinstance(value, (list, dict)):
return json.dumps(value)
return value

def _handle_aggregation_response(self, query, response, aggs, group_clauses):
if group_clauses is None:
result = {}

for agg_name in aggs:
result[agg_name] = response["aggregations"][agg_name]["value"]
yield result
else:
while True:
for bucket in response["aggregations"]["group_buckets"]["buckets"]:
result = {}

for column in group_clauses:
result[column] = bucket["key"][column]

if aggs is not None:
for agg_name in aggs:
result[agg_name] = bucket[agg_name]["value"]

yield result

# Check if we need to paginate results
if "after_key" not in response["aggregations"]["group_buckets"]:
break

query["aggs"]["group_buckets"]["composite"]["after"] = response[
"aggregations"
]["group_buckets"]["after_key"]

response = self.client.search(size=0, body=query, **self.arguments)
49 changes: 45 additions & 4 deletions pg_es_fdw/_es_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
"<=": "lte",
}

_PG_TO_ES_AGG_FUNCS = {
"avg": "avg",
"max": "max",
"min": "min",
"sum": "sum",
"count": "value_count",
}


def _base_qual_to_es(col, op, value, column_map=None):
if column_map:
Expand Down Expand Up @@ -65,14 +73,47 @@ def _qual_to_es(qual, column_map=None):
}
}
else:
return _base_qual_to_es(
qual.field_name, qual.operator, qual.value, column_map
)
return _base_qual_to_es(qual.field_name, qual.operator, qual.value, column_map)


def quals_to_es(quals, ignore_columns=None, column_map=None):
def quals_to_es(
quals, aggs=None, group_clauses=None, ignore_columns=None, column_map=None
):
"""Convert a list of Multicorn quals to an ElasticSearch query"""
ignore_columns = ignore_columns or []

# Aggregation/grouping queries
if aggs is not None:
aggs_query = {
agg_name: {
_PG_TO_ES_AGG_FUNCS[agg_props["function"]]: {
"field": agg_props["column"]
}
}
for agg_name, agg_props in aggs.items()
}

if group_clauses is None:
return {"aggs": aggs_query}

if group_clauses is not None:
group_query = {
"group_buckets": {
"composite": {
"sources": [
{column: {"terms": {"field": column}}}
for column in group_clauses
]
}
}
}

if aggs is not None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we be in a situation where aggs is None and group_clauses isn't? Is it basically something like SELECT a, b, c FROM T GROUP BY a, b, c which is the same as SELECT DISTINCT a, b, c FROM T?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can, that is a good example. Here's a concrete one:

sgr@localhost:splitgraph> explain select column5, column4 from es.iris group by column4, column5
+--------------------------------------------------------------------------------------------+
| QUERY PLAN                                                                                 |
|--------------------------------------------------------------------------------------------|
| Foreign Scan  (cost=1.00..1.00 rows=1 width=1)                                             |
|   Multicorn: Elasticsearch query to <Elasticsearch([{'host': 'es01-test', 'port': 9200}])> |
|   Multicorn: Query: {                                                                      |
|     "aggs": {                                                                              |
|         "group_buckets": {                                                                 |
|             "composite": {                                                                 |
|                 "sources": [                                                               |
|                     {                                                                      |
|                         "column5": {                                                       |
|                             "terms": {                                                     |
|                                 "field": "column5"                                         |
|                             }                                                              |
|                         }                                                                  |
|                     },                                                                     |
|                     {                                                                      |
|                         "column4": {                                                       |
|                             "terms": {                                                     |
|                                 "field": "column4"                                         |
|                             }                                                              |
|                         }                                                                  |
|                     }                                                                      |
|                 ],                                                                         |
|                 "size": 1000                                                               |
|             }                                                                              |
|         }                                                                                  |
|     }                                                                                      |
| }                                                                                          |
+--------------------------------------------------------------------------------------------+
EXPLAIN
Time: 0.012s
sgr@localhost:splitgraph> select column5, column4 from es.iris group by column4, column5
+-----------------+---------+
| column5         | column4 |
|-----------------+---------|
| Iris-setosa     | 0.1     |
| Iris-setosa     | 0.2     |
| Iris-setosa     | 0.3     |
| Iris-setosa     | 0.4     |
| Iris-setosa     | 0.5     |
| Iris-setosa     | 0.6     |
| Iris-versicolor | 1.0     |
| Iris-versicolor | 1.1     |
| Iris-versicolor | 1.2     |
| Iris-versicolor | 1.3     |
| Iris-versicolor | 1.4     |
| Iris-versicolor | 1.5     |
| Iris-versicolor | 1.6     |
| Iris-versicolor | 1.7     |
| Iris-versicolor | 1.8     |
| Iris-virginica  | 1.4     |
| Iris-virginica  | 1.5     |
| Iris-virginica  | 1.6     |
| Iris-virginica  | 1.7     |
| Iris-virginica  | 1.8     |
| Iris-virginica  | 1.9     |
| Iris-virginica  | 2.0     |
| Iris-virginica  | 2.1     |
| Iris-virginica  | 2.2     |
| Iris-virginica  | 2.3     |
| Iris-virginica  | 2.4     |
| Iris-virginica  | 2.5     |
+-----------------+---------+

group_query["group_buckets"]["aggregations"] = aggs_query

return {"aggs": group_query}

# Regular query
return {
"query": {
"bool": {
Expand Down