Skip to content

ES Agg pushdown v2 #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pg_es_fdw/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from multicorn import ForeignDataWrapper
from multicorn.utils import log_to_postgres as log2pg

from ._es_query import _PG_TO_ES_AGG_FUNCS, quals_to_es
from ._es_query import _PG_TO_ES_AGG_FUNCS, _OPERATORS_SUPPORTED, quals_to_es


class ElasticsearchFDW(ForeignDataWrapper):
Expand Down Expand Up @@ -97,6 +97,7 @@ def can_pushdown_upperrel(self):
return {
"groupby_supported": True,
"agg_functions": _PG_TO_ES_AGG_FUNCS,
"operators_supported": _OPERATORS_SUPPORTED,
}

def explain(
Expand Down Expand Up @@ -319,6 +320,12 @@ def _handle_aggregation_response(self, query, response, aggs, group_clauses):
result = {}

for agg_name in aggs:
if agg_name == "count.*":
# COUNT(*) is a special case, since it doesn't have a
# corresponding aggregation primitive in ES
result[agg_name] = response["hits"]["total"]["value"]
continue

result[agg_name] = response["aggregations"][agg_name]["value"]
yield result
else:
Expand All @@ -331,6 +338,12 @@ def _handle_aggregation_response(self, query, response, aggs, group_clauses):

if aggs is not None:
for agg_name in aggs:
if agg_name == "count.*":
# In general case with GROUP BY clauses COUNT(*)
# is taken from the bucket's doc_count field
result[agg_name] = bucket["doc_count"]
continue

result[agg_name] = bucket[agg_name]["value"]

yield result
Expand Down
63 changes: 48 additions & 15 deletions pg_es_fdw/_es_query.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
try:
from multicorn import ANY
except ImportError:
Expand All @@ -18,8 +19,31 @@
"min": "min",
"sum": "sum",
"count": "value_count",
"count.*": None # not mapped to a particular function
}

_OPERATORS_SUPPORTED = [">", ">=", "<", "<=", "=", "<>", "!=", "~~"]


def _convert_pattern_match_to_es(expr):
def _pg_es_pattern_map(matchobj):
if matchobj.group(0) == "%":
return "*"
elif matchobj.group(0) == "_":
return "?"
elif matchobj.group(0) == "\%":
return "%"
elif matchobj.group(0) == "\_":
return "_"
elif matchobj.group(0) == "*":
return "\\*"
elif matchobj.group(0) == "?":
return "\\?"
elif matchobj.group(0) == "\\\\":
return "\\"

return re.sub(r'\\\\|\\?%|\\?_|\*|\?', _pg_es_pattern_map, fr"{expr}")


def _base_qual_to_es(col, op, value, column_map=None):
if column_map:
Expand All @@ -43,7 +67,7 @@ def _base_qual_to_es(col, op, value, column_map=None):
return {"bool": {"must_not": {"term": {col: value}}}}

if op == "~~":
return {"match": {col: value.replace("%", "*")}}
return {"wildcard": {col: _convert_pattern_match_to_es(value)}}

# For unknown operators, get everything
return {"match_all": {}}
Expand Down Expand Up @@ -82,6 +106,18 @@ def quals_to_es(
"""Convert a list of Multicorn quals to an ElasticSearch query"""
ignore_columns = ignore_columns or []

query = {
"query": {
"bool": {
"must": [
_qual_to_es(q, column_map)
for q in quals
if q.field_name not in ignore_columns
]
}
}
}

# Aggregation/grouping queries
if aggs is not None:
aggs_query = {
Expand All @@ -91,10 +127,18 @@ def quals_to_es(
}
}
for agg_name, agg_props in aggs.items()
if agg_name != "count.*"
}

if group_clauses is None:
return {"aggs": aggs_query}
if "count.*" in aggs:
# There is no particular COUNT(*) equivalent in ES, instead
# for plain aggregations (e.g. no grouping statements), we need
# to enable the track_total_hits option in order to get an
# accuate number of matched docs.
query["track_total_hits"] = True

query["aggs"] = aggs_query

if group_clauses is not None:
group_query = {
Expand All @@ -111,17 +155,6 @@ def quals_to_es(
if aggs is not None:
group_query["group_buckets"]["aggregations"] = aggs_query

return {"aggs": group_query}
query["aggs"] = group_query

# Regular query
return {
"query": {
"bool": {
"must": [
_qual_to_es(q, column_map)
for q in quals
if q.field_name not in ignore_columns
]
}
}
}
return query