Skip to content

[DNM] An experimental branch to try to push down aggregate functions #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: REL_11_CARTO
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 111 additions & 27 deletions contrib/postgres_fdw/deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ static void appendGroupByClause(List *tlist, deparse_expr_cxt *context);
static void appendAggOrderBy(List *orderList, List *targetList,
deparse_expr_cxt *context);
static void appendFunctionName(Oid funcid, deparse_expr_cxt *context);
static void deparseRowExpr(RowExpr *node, deparse_expr_cxt *context);
static Node *deparseSortGroupClause(Index ref, List *tlist, bool force_colno,
deparse_expr_cxt *context);

Expand Down Expand Up @@ -338,8 +339,10 @@ foreign_expr_walker(Node *node,
*/
if (var->varattno < 0 &&
var->varattno != SelfItemPointerAttributeNumber &&
var->varattno != ObjectIdAttributeNumber)
var->varattno != ObjectIdAttributeNumber) {
elog(WARNING, "RTORRE return false 1");
return false;
}

/* Else check the collation */
collation = var->varcollid;
Expand Down Expand Up @@ -407,23 +410,31 @@ foreign_expr_walker(Node *node,
ArrayRef *ar = (ArrayRef *) node;

/* Assignment should not be in restrictions. */
if (ar->refassgnexpr != NULL)
if (ar->refassgnexpr != NULL) {
elog(WARNING, "RTORRE return false 2");
return false;
}

/*
* Recurse to remaining subexpressions. Since the array
* subscripts must yield (noncollatable) integers, they won't
* affect the inner_cxt state.
*/
if (!foreign_expr_walker((Node *) ar->refupperindexpr,
glob_cxt, &inner_cxt))
glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 3");
return false;
}
if (!foreign_expr_walker((Node *) ar->reflowerindexpr,
glob_cxt, &inner_cxt))
glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 4");
return false;
}
if (!foreign_expr_walker((Node *) ar->refexpr,
glob_cxt, &inner_cxt))
elog(WARNING, "RTORRE return false 5"); {
return false;
}

/*
* Array subscripting should yield same collation as input,
Expand All @@ -450,15 +461,19 @@ foreign_expr_walker(Node *node,
* can't be sent to remote because it might have incompatible
* semantics on remote side.
*/
if (!is_shippable(fe->funcid, ProcedureRelationId, fpinfo))
if (!is_shippable(fe->funcid, ProcedureRelationId, fpinfo)) {
elog(WARNING, "RTORRE return false 6");
return false;
}

/*
* Recurse to input subexpressions.
*/
if (!foreign_expr_walker((Node *) fe->args,
glob_cxt, &inner_cxt))
glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 7");
return false;
}

/*
* If function's input collation is not derived from a foreign
Expand All @@ -467,8 +482,10 @@ foreign_expr_walker(Node *node,
if (fe->inputcollid == InvalidOid)
/* OK, inputs are all noncollatable */ ;
else if (inner_cxt.state != FDW_COLLATE_SAFE ||
fe->inputcollid != inner_cxt.collation)
fe->inputcollid != inner_cxt.collation) {
elog(WARNING, "RTORRE return false 8");
return false;
}

/*
* Detect whether node is introducing a collation not derived
Expand Down Expand Up @@ -498,15 +515,19 @@ foreign_expr_walker(Node *node,
* (If the operator is shippable, we assume its underlying
* function is too.)
*/
if (!is_shippable(oe->opno, OperatorRelationId, fpinfo))
if (!is_shippable(oe->opno, OperatorRelationId, fpinfo)) {
elog(WARNING, "RTORRE return false 9");
return false;
}

/*
* Recurse to input subexpressions.
*/
if (!foreign_expr_walker((Node *) oe->args,
glob_cxt, &inner_cxt))
glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 10");
return false;
}

/*
* If operator's input collation is not derived from a foreign
Expand All @@ -515,8 +536,10 @@ foreign_expr_walker(Node *node,
if (oe->inputcollid == InvalidOid)
/* OK, inputs are all noncollatable */ ;
else if (inner_cxt.state != FDW_COLLATE_SAFE ||
oe->inputcollid != inner_cxt.collation)
oe->inputcollid != inner_cxt.collation) {
elog(WARNING, "RTORRE return false 11");
return false;
}

/* Result-collation handling is same as for functions */
collation = oe->opcollid;
Expand All @@ -538,15 +561,19 @@ foreign_expr_walker(Node *node,
/*
* Again, only shippable operators can be sent to remote.
*/
if (!is_shippable(oe->opno, OperatorRelationId, fpinfo))
if (!is_shippable(oe->opno, OperatorRelationId, fpinfo)) {
elog(WARNING, "RTORRE return false 12");
return false;
}

/*
* Recurse to input subexpressions.
*/
if (!foreign_expr_walker((Node *) oe->args,
glob_cxt, &inner_cxt))
glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 13");
return false;
}

/*
* If operator's input collation is not derived from a foreign
Expand All @@ -555,8 +582,10 @@ foreign_expr_walker(Node *node,
if (oe->inputcollid == InvalidOid)
/* OK, inputs are all noncollatable */ ;
else if (inner_cxt.state != FDW_COLLATE_SAFE ||
oe->inputcollid != inner_cxt.collation)
oe->inputcollid != inner_cxt.collation) {
elog(WARNING, "RTORRE return false 14");
return false;
}

/* Output is always boolean and so noncollatable. */
collation = InvalidOid;
Expand All @@ -571,8 +600,10 @@ foreign_expr_walker(Node *node,
* Recurse to input subexpression.
*/
if (!foreign_expr_walker((Node *) r->arg,
glob_cxt, &inner_cxt))
glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 15");
return false;
}

/*
* RelabelType must not introduce a collation not derived from
Expand All @@ -598,8 +629,10 @@ foreign_expr_walker(Node *node,
* Recurse to input subexpressions.
*/
if (!foreign_expr_walker((Node *) b->args,
glob_cxt, &inner_cxt))
glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 16");
return false;
}

/* Output is always boolean and so noncollatable. */
collation = InvalidOid;
Expand All @@ -614,8 +647,10 @@ foreign_expr_walker(Node *node,
* Recurse to input subexpressions.
*/
if (!foreign_expr_walker((Node *) nt->arg,
glob_cxt, &inner_cxt))
glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 17");
return false;
}

/* Output is always boolean and so noncollatable. */
collation = InvalidOid;
Expand All @@ -630,8 +665,10 @@ foreign_expr_walker(Node *node,
* Recurse to input subexpressions.
*/
if (!foreign_expr_walker((Node *) a->elements,
glob_cxt, &inner_cxt))
glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 18");
return false;
}

/*
* ArrayExpr must not introduce a collation not derived from
Expand Down Expand Up @@ -660,8 +697,10 @@ foreign_expr_walker(Node *node,
foreach(lc, l)
{
if (!foreign_expr_walker((Node *) lfirst(lc),
glob_cxt, &inner_cxt))
glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 19");
return false;
}
}

/*
Expand All @@ -681,16 +720,22 @@ foreign_expr_walker(Node *node,
ListCell *lc;

/* Not safe to pushdown when not in grouping context */
if (!IS_UPPER_REL(glob_cxt->foreignrel))
if (!IS_UPPER_REL(glob_cxt->foreignrel)) {
elog(WARNING, "RTORRE return false 20");
return false;
}

/* Only non-split aggregates are pushable. */
if (agg->aggsplit != AGGSPLIT_SIMPLE)
if (agg->aggsplit != AGGSPLIT_SIMPLE) {
elog(WARNING, "RTORRE return false 21");
return false;
}

/* As usual, it must be shippable. */
if (!is_shippable(agg->aggfnoid, ProcedureRelationId, fpinfo))
if (!is_shippable(agg->aggfnoid, ProcedureRelationId, fpinfo)) {
elog(WARNING, "RTORRE return false 22");
return false;
}

/*
* Recurse to input args. aggdirectargs, aggorder and
Expand All @@ -709,8 +754,10 @@ foreign_expr_walker(Node *node,
n = (Node *) tle->expr;
}

if (!foreign_expr_walker(n, glob_cxt, &inner_cxt))
if (!foreign_expr_walker(n, glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 23");
return false;
}
}

/*
Expand All @@ -737,25 +784,31 @@ foreign_expr_walker(Node *node,
if (srt->sortop != typentry->lt_opr &&
srt->sortop != typentry->gt_opr &&
!is_shippable(srt->sortop, OperatorRelationId,
fpinfo))
fpinfo)) {
elog(WARNING, "RTORRE return false 24");
return false;
}
}
}

/* Check aggregate filter */
if (!foreign_expr_walker((Node *) agg->aggfilter,
glob_cxt, &inner_cxt))
glob_cxt, &inner_cxt)) {
elog(WARNING, "RTORRE return false 25");
return false;
}

/*
* If aggregate's input collation is not derived from a
* foreign Var, it can't be sent to remote.
*/
if (agg->inputcollid == InvalidOid)
if (agg->inputcollid == InvalidOid || inner_cxt.state == FDW_COLLATE_NONE)
/* OK, inputs are all noncollatable */ ;
else if (inner_cxt.state != FDW_COLLATE_SAFE ||
agg->inputcollid != inner_cxt.collation)
agg->inputcollid != inner_cxt.collation) {
elog(WARNING, "RTORRE return false 26");
return false;
}

/*
* Detect whether node is introducing a collation not derived
Expand All @@ -775,12 +828,21 @@ foreign_expr_walker(Node *node,
state = FDW_COLLATE_UNSAFE;
}
break;
default:
case T_RowExpr:
/*
* rtorre: this is a bold move, let's consider it true. Trying to
* cover the st_asmvt(ROW(st_asmvtgeom(...)) case. I guess the
* proper solution is to examine the row expression carefully.
*/
ereport(WARNING, (errmsg_internal("RTORRE in a T_RowExpr")));
return true;
default:

/*
* If it's anything else, assume it's unsafe. This list can be
* expanded later, but don't forget to add deparse support below.
*/
elog(WARNING, "RTORRE unrecognized node type: %d", (int) nodeTag(node));
return false;
}

Expand Down Expand Up @@ -2354,13 +2416,35 @@ deparseExpr(Expr *node, deparse_expr_cxt *context)
case T_Aggref:
deparseAggref((Aggref *) node, context);
break;
case T_RowExpr:
deparseRowExpr((RowExpr *) node, context);
break;
default:
elog(ERROR, "unsupported expression type for deparse: %d",
(int) nodeTag(node));
break;
}
}

static void
deparseRowExpr(RowExpr *node, deparse_expr_cxt *context)
{
StringInfo buf = context->buf;
bool first;
ListCell *lc;

appendStringInfoString(buf, "ROW(");
first = true;
foreach(lc, node->args)
{
if (!first)
appendStringInfo(buf, ", ");
deparseExpr((Expr *) lfirst(lc), context);
first = false;
}
appendStringInfoChar(buf, ')');
}

/*
* Deparse given Var node into context->buf.
*
Expand Down