Skip to content

Commit

Permalink
[#15857] YSQL: support IndexScan aggregate pushdown
Browse files Browse the repository at this point in the history
Summary:
Before this change, aggregate pushdown is supported by ForeignScan,
IndexOnlyScan, and YbSeqScan.  Support it for IndexScan.  See D26556 for
a similar change with more detailed summary.

Unlike previous cases, index scan aggregate pushdown involves

- Binds to the index: nothing special for aggregate pushdown.
- Targets to the table, bound on ybctids received from the index: this
  exercises a different codepath on DocDB side, so modification is
  needed there.  And because server-side needs modification,
  rolling upgrade-support is added using auto pg flag on a new GUC
  yb_enable_index_aggregate_pushdown.
Jira: DB-5261

Test Plan:
    ./yb_build.sh fastdebug --gcc11 --java-test TestPgRegressAggregates
    ./yb_build.sh fastdebug --gcc11 \
      --java-test TestPgRegressFeaturePartition
    ./yb_build.sh fastdebug --gcc11 \
      --java-test 'TestPgPushdown#aggregatesVar'

Upgrade test:

    #!/usr/bin/env bash
    set -euxo pipefail

    # Create old release cluster.
    PREFIX=${1:-/tmp}
    if [ ! -d "$PREFIX"/yugabyte-2.18.1.0 ]; then
      curl "https://downloads.yugabyte.com/releases/2.18.1.0/yugabyte-2.18.1.0-b84-linux-x86_64.tar.gz" \
        | tar xzv -C "$PREFIX"
    fi
    "$PREFIX"/yugabyte-2.18.1.0/bin/post_install.sh
    "$PREFIX"/yugabyte-2.18.1.0/bin/yb-ctl destroy
    "$PREFIX"/yugabyte-2.18.1.0/bin/yb-ctl create --rf 3

    # Need to use release build to be compatible with release archives.
    ./yb_build.sh release --sj

    # Rolling upgrade: upgrade masters first.
    for idx in {1..3}; do
      "$PREFIX"/yugabyte-2.18.1.0/bin/yb-ctl stop_node --master "$idx"
      bin/yb-ctl start_node --master "$idx"
    done
    # Rolling upgrade: upgrade one tserver.
    "$PREFIX"/yugabyte-2.18.1.0/bin/yb-ctl stop_node 1
    bin/yb-ctl start_node 1
    # Wait for ysqlsh to be ready (yb-ctl seems to not properly take care of this)
    while ! bin/ysqlsh -c ''; do sleep 1; done
    # Query should succeed without pushdown.
    output=$(bin/ysqlsh -X -v "ON_ERROR_STOP=1" <<EOT
    SHOW yb_enable_index_aggregate_pushdown;
    CREATE TABLE t (i int, j int);
    CREATE INDEX NONCONCURRENTLY ON t (i ASC);
    INSERT INTO t SELECT g, -g FROM generate_series(1, 100) g;
    \set query 'SELECT SUM(j) FROM t WHERE i > 4'
    EXPLAIN :query; :query;
    EOT
    )
    grep "^ off$" <<<"$output"
    grep "Partial Aggregate: true" <<<"$output" && exit 1
    grep "^ -5040$" <<<"$output"
    # Rolling upgrade: upgrade remaining tservers.
    for idx in {2..3}; do
      "$PREFIX"/yugabyte-2.18.1.0/bin/yb-ctl stop_node "$idx"
      bin/yb-ctl start_node "$idx"
    done
    # Rolling upgrade: run ysql upgrade.
    build/latest/bin/yb-admin \
      -master_addresses=127.0.0.1,127.0.0.2,127.0.0.3 \
      upgrade_ysql
    # Rolling upgrade: promote auto-flags.
    build/latest/bin/yb-admin \
      -master_addresses=127.0.0.1,127.0.0.2,127.0.0.3 \
      promote_auto_flags
    for idx in {1..3}; do
      while ! curl "127.0.0.$idx:9000/api/v1/varz" \
        | grep -q 'ysql_yb_enable_index_aggregate_pushdown","value":"true'; do
        sleep 1
      done
    done
    # Query should succeed with pushdown.
    output=$(bin/ysqlsh -X -v "ON_ERROR_STOP=1" <<EOT
    SHOW yb_enable_index_aggregate_pushdown;
    \set query 'SELECT SUM(j) FROM t WHERE i > 4'
    EXPLAIN :query; :query;
    EOT
    )
    grep "^ on$" <<<"$output"
    grep "Partial Aggregate: true" <<<"$output"
    grep "^ -5040$" <<<"$output"

Close: #15857

Reviewers: amartsinchyk

Reviewed By: amartsinchyk

Subscribers: ybase, yql

Differential Revision: https://phorge.dev.yugabyte.com/D27427
  • Loading branch information
jaki committed Aug 10, 2023
1 parent 3f6911a commit 55eba85
Show file tree
Hide file tree
Showing 20 changed files with 842 additions and 165 deletions.
22 changes: 15 additions & 7 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgPushdown.java
Original file line number Diff line number Diff line change
Expand Up @@ -1370,11 +1370,14 @@ public void aggregatesConst() throws Exception {
/** Ensure pushing down aggregate functions with variables (columns). */
@Test
public void aggregatesVar() throws Exception {
StringBuilder sb = new StringBuilder();
for (String agg : Arrays.asList("COUNT", "SUM", "MAX", "MIN", "AVG")) {
for (String column : Arrays.asList("id", "v")) {
new AggregatePushdownTester(String.format("%s(%s)", agg, column)).test();
sb.append(String.format("%s(%s),", agg, column));
}
}
// Pass all aggregates at once. Make sure to remove the trailing comma.
new AggregatePushdownTester(sb.substring(0, sb.length() - 1)).test();
}

//
Expand Down Expand Up @@ -1723,22 +1726,27 @@ public void test() throws Exception {
"CREATE TABLE %s (id int PRIMARY KEY, v int)",
tableName));
stmt.executeUpdate(String.format(
"CREATE INDEX %s ON %s (v, id)",
"CREATE INDEX %s ON %s (v ASC, id)",
indexName, tableName));
stmt.executeUpdate(String.format(
"INSERT INTO %s ("
+ "SELECT generate_series, generate_series + 1 FROM generate_series(1, %s)"
+ ");",
tableName, numRowsToInsert));
verifyPushdown(stmt, "" /* hint */);
verifyPushdown(stmt, String.format("/*+SeqScan(%s)*/", tableName));
verifyPushdown(stmt, String.format("/*+IndexOnlyScan(%s %s)*/", tableName, indexName));
verifyPushdown(stmt, "" /* hint */, null /* quals */);
verifyPushdown(stmt, String.format("/*+SeqScan(%s)*/", tableName), null /* quals */);
final String quals = String.format("v > %s", numRowsToInsert / 2);
verifyPushdown(
stmt, String.format("/*+IndexOnlyScan(%s %s)*/", tableName, indexName), quals);
verifyPushdown(stmt, String.format("/*+IndexScan(%s %s)*/", tableName, indexName), quals);
stmt.executeUpdate(String.format("DROP TABLE %s", tableName));
}
}

private void verifyPushdown(Statement stmt, String hint) throws Exception {
String query = String.format("%s SELECT %s FROM %s", hint, optimizedExpr, tableName);
private void verifyPushdown(Statement stmt, String hint, String quals) throws Exception {
String query = String.format(
"%sSELECT %s FROM %s%s",
hint, optimizedExpr, tableName, (quals != null ? " WHERE " + quals : ""));
verifyStatementMetric(
stmt,
query,
Expand Down
18 changes: 2 additions & 16 deletions src/postgres/src/backend/access/yb_access/yb_lsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,6 @@ ybcingettuple(IndexScanDesc scan, ScanDirection dir)
if (ybscan->quit_scan)
return NULL;

/*
* As of 2023-06-28, aggregate pushdown is only implemented for
* IndexOnlyScan, not IndexScan.
*/
Assert(ybscan->prepare_params.index_only_scan);

scan->xs_recheck = YbNeedsRecheck(ybscan);
if (!ybscan->is_exec_done)
{
Expand All @@ -433,17 +427,9 @@ ybcingettuple(IndexScanDesc scan, ScanDirection dir)
/*
* Aggregate pushdown directly modifies the scan slot rather than
* passing it through xs_hitup or xs_itup.
*
* The index id passed into ybFetchNext is likely not going to be used
* as it is only used for system table scans, which have oid, and there
* shouldn't exist any system table secondary indexes that index the
* oid column.
* TODO(jason): deduplicate with ybcingettuple.
*/
scan->yb_agg_slot =
ybFetchNext(ybscan->handle, scan->yb_agg_slot,
RelationGetRelid(scan->indexRelation));
return !scan->yb_agg_slot->tts_isempty;
return ybc_getnext_aggslot(scan, ybscan->handle,
ybscan->prepare_params.index_only_scan);
}

/*
Expand Down
36 changes: 31 additions & 5 deletions src/postgres/src/backend/access/yb_access/yb_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -2106,8 +2106,8 @@ ybcSetupTargets(YbScanDesc ybScan, YbScanPlan scan_plan, Scan *pg_scan_plan)
* attribute numbers from table-based numbers to index-based ones.
*/
void
YbDmlAppendTargetsAggregate(List *aggrefs, TupleDesc tupdesc,
Relation index, YBCPgStatement handle)
YbDmlAppendTargetsAggregate(List *aggrefs, TupleDesc tupdesc, Relation index,
bool xs_want_itup, YBCPgStatement handle)
{
ListCell *lc;

Expand Down Expand Up @@ -2171,10 +2171,10 @@ YbDmlAppendTargetsAggregate(List *aggrefs, TupleDesc tupdesc,
*/
int attno = castNode(Var, tle->expr)->varoattno;
/*
* For index (only) scans, translate the table-based
* For index only scans, translate the table-based
* attribute number to an index-based one.
*/
if (index)
if (index && xs_want_itup)
attno = YbGetIndexAttnum(attno, index);
Form_pg_attribute attr = TupleDescAttr(tupdesc, attno - 1);
YBCPgTypeAttrs type_attrs = {attr->atttypmod};
Expand All @@ -2199,6 +2199,11 @@ YbDmlAppendTargetsAggregate(List *aggrefs, TupleDesc tupdesc,
/* Add aggregate operator as scan target. */
HandleYBStatus(YBCPgDmlAppendTarget(handle, op_handle));
}

/* Set ybbasectid in case of non-primary secondary index scan. */
if (index && !xs_want_itup && !index->rd_index->indisprimary)
YbDmlAppendTargetSystem(YBIdxBaseTupleIdAttributeNumber,
handle);
}

/*
Expand Down Expand Up @@ -2368,7 +2373,7 @@ ybcBeginScan(Relation relation,
*/
if (aggrefs != NIL)
YbDmlAppendTargetsAggregate(aggrefs, ybScan->target_desc, index,
ybScan->handle);
xs_want_itup, ybScan->handle);
else
ybcSetupTargets(ybScan, &scan_plan, pg_scan_plan);

Expand Down Expand Up @@ -2541,6 +2546,27 @@ ybc_getnext_indextuple(YbScanDesc ybScan, bool is_forward_scan, bool *recheck)
return ybcFetchNextIndexTuple(ybScan, is_forward_scan);
}

bool
ybc_getnext_aggslot(IndexScanDesc scan, YBCPgStatement handle,
bool index_only_scan)
{
/*
* As of 2023-08-10, the relid passed into ybFetchNext is not going to
* be used as it is only used when there are system targets, not
* counting the internal ybbasectid lookup to the index.
* YbDmlAppendTargetsAggregate only adds that ybbasectid plus operator
* targets.
* TODO(jason): this may need to be revisited when supporting GROUP BY
* aggregate pushdown where system columns are directly targeted.
*/
scan->yb_agg_slot = ybFetchNext(handle, scan->yb_agg_slot,
InvalidOid /* relid */);
/* For IndexScan, hack to make index_getnext think there are tuples. */
if (!index_only_scan)
scan->xs_hitup = (HeapTuple) 1;
return !scan->yb_agg_slot->tts_isempty;
}

void ybc_free_ybscan(YbScanDesc ybscan)
{
Assert(PointerIsValid(ybscan));
Expand Down
20 changes: 2 additions & 18 deletions src/postgres/src/backend/access/ybgin/ybginget.c
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ ybginDoFirstExec(IndexScanDesc scan, ScanDirection dir)
YbDmlAppendTargetsAggregate(scan->yb_aggrefs,
RelationGetDescr(scan->indexRelation),
scan->indexRelation,
scan->xs_want_itup,
ybso->handle);
else
ybginSetupTargets(scan);
Expand Down Expand Up @@ -600,14 +601,6 @@ ybgingettuple(IndexScanDesc scan, ScanDirection dir)
scan->xs_ctup.t_ybctid = 0;
if (scan->yb_aggrefs)
{
/*
* As of 2023-06-28, aggregate pushdown is only implemented for
* IndexOnlyScan, not IndexScan. Also, this codepath is not exercised
* because such queries hit error "non-default search mode" when
* setting up binds.
*/
Assert(scan->xs_want_itup);

/*
* TODO(jason): don't assume that recheck is needed.
*/
Expand All @@ -616,17 +609,8 @@ ybgingettuple(IndexScanDesc scan, ScanDirection dir)
/*
* Aggregate pushdown directly modifies the scan slot rather than
* passing it through xs_hitup or xs_itup.
*
* The index id passed into ybFetchNext is likely not going to be used
* as it is only used for system table scans, which have oid, and there
* shouldn't exist any system table secondary indexes that index the
* oid column.
* TODO(jason): deduplicate with ybcingettuple.
*/
scan->yb_agg_slot =
ybFetchNext(ybso->handle, scan->yb_agg_slot,
RelationGetRelid(scan->indexRelation));
return !scan->yb_agg_slot->tts_isempty;
return ybc_getnext_aggslot(scan, ybso->handle, scan->xs_want_itup);
}
while (HeapTupleIsValid(tup = ybginFetchNextHeapTuple(scan)))
{
Expand Down
28 changes: 20 additions & 8 deletions src/postgres/src/backend/executor/nodeAgg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,7 @@ yb_agg_pushdown_supported(AggState *aggstate)
/* Supported outer plan. */
if (!(IsA(outerPlanState(aggstate), ForeignScanState) ||
IsA(outerPlanState(aggstate), IndexOnlyScanState) ||
IsA(outerPlanState(aggstate), IndexScanState) ||
IsA(outerPlanState(aggstate), YbSeqScanState)))
return;
ss = (ScanState *) outerPlanState(aggstate);
Expand All @@ -1568,7 +1569,17 @@ yb_agg_pushdown_supported(AggState *aggstate)
if (ss->ps.qual)
return;
/* No indexquals that might be rechecked. */
if (IsA(ss, IndexOnlyScanState))
if (IsA(ss, IndexScanState))
{
/* Also check the GUC here. */
if (!yb_enable_index_aggregate_pushdown)
return;

IndexScanState *iss = castNode(IndexScanState, ss);
if (iss->yb_iss_might_recheck)
return;
}
else if (IsA(ss, IndexOnlyScanState))
{
IndexOnlyScanState *ioss = castNode(IndexOnlyScanState, ss);
if (ioss->yb_ioss_might_recheck)
Expand Down Expand Up @@ -2579,15 +2590,16 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
outerPlan = outerPlan(node);

/*
* For YB index only scan outer plan, we need to collect recheck
* information, so set that flag for the index only scan. Ideally, the
* flag is only set for YB relations since, later on, agg pushdown is
* disabled anyway for non-YB relations, but we don't have that information
* at this point: the relation is opened in the IndexOnlyScan node. So set
* the flag in all cases, and move the YB-relation check down there.
* For YB IndexScan/IndexOnlyScan outer plan, we need to collect recheck
* information, so set that eflag. Ideally, the flag is only set for YB
* relations since, later on, agg pushdown is disabled anyway for non-YB
* relations, but we don't have that information at this point: the
* relation is opened in the IndexScan/IndexOnlyScan node. So set the flag
* in all cases, and move the YB-relation check down there.
*/
int yb_eflags = 0;
if (IsYugaByteEnabled() && IsA(outerPlan, IndexOnlyScan))
if (IsYugaByteEnabled() &&
(IsA(outerPlan, IndexScan) || IsA(outerPlan, IndexOnlyScan)))
yb_eflags |= EXEC_FLAG_YB_AGG_PARENT;

outerPlanState(aggstate) = ExecInitNode(outerPlan, estate,
Expand Down
88 changes: 78 additions & 10 deletions src/postgres/src/backend/executor/nodeIndexscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,23 @@ IndexNext(IndexScanState *node)

if (scandesc == NULL)
{
if (IsYugaByteEnabled() && node->yb_iss_aggrefs)
{
/*
* For aggregate pushdown, we only read aggregate results from
* DocDB and pass that up to the aggregate node (agg pushdown
* wouldn't be enabled if we needed to read other expressions). Set
* up a dummy scan slot to hold as many attributes as there are
* pushed aggregates.
*/
TupleDesc tupdesc =
CreateTemplateTupleDesc(list_length(node->yb_iss_aggrefs),
false /* hasoid */);
ExecInitScanTupleSlot(estate, &node->ss, tupdesc);
/* Refresh the local pointer. */
slot = node->ss.ss_ScanTupleSlot;
}

IndexScan *plan = castNode(IndexScan, node->ss.ps.plan);

/*
Expand All @@ -124,11 +141,16 @@ IndexNext(IndexScanState *node)
node->iss_NumOrderByKeys);

node->iss_ScanDesc = scandesc;
scandesc->yb_scan_plan = (Scan *) plan;
scandesc->yb_rel_pushdown =
YbInstantiatePushdownParams(&plan->yb_rel_pushdown, estate);
scandesc->yb_idx_pushdown =
YbInstantiatePushdownParams(&plan->yb_idx_pushdown, estate);

if (IsYugaByteEnabled())
{
scandesc->yb_scan_plan = (Scan *) plan;
scandesc->yb_rel_pushdown =
YbInstantiatePushdownParams(&plan->yb_rel_pushdown, estate);
scandesc->yb_idx_pushdown =
YbInstantiatePushdownParams(&plan->yb_idx_pushdown, estate);
scandesc->yb_aggrefs = node->yb_iss_aggrefs;
}

/*
* If no run-time keys to calculate or they are ready, go ahead and
Expand Down Expand Up @@ -176,6 +198,12 @@ IndexNext(IndexScanState *node)
break;
}
}

/*
* Set reference to slot in scan desc so that YB amgettuple can use it
* during aggregate pushdown.
*/
scandesc->yb_agg_slot = slot;
}

/*
Expand All @@ -198,10 +226,21 @@ IndexNext(IndexScanState *node)
* Note: we pass 'false' because tuples returned by amgetnext are
* pointers onto disk pages and must not be pfree()'d.
*/
ExecStoreBufferHeapTuple(tuple, /* tuple to store */
slot, /* slot to store in */
scandesc->xs_cbuf); /* buffer containing
* tuple */
if (IsYugaByteEnabled() && scandesc->yb_aggrefs)
{
/*
* Slot should have already been updated by YB amgettuple.
*
* Also, index aggregate pushdown currently cannot support recheck,
* and this should have been prevented by earlier logic.
*/
Assert(!scandesc->xs_recheck);
}
else
ExecStoreBufferHeapTuple(tuple, /* tuple to store */
slot, /* slot to store in */
scandesc->xs_cbuf); /* buffer containing
* tuple */

/*
* If the index was lossy, we have to recheck the index quals using
Expand Down Expand Up @@ -1063,9 +1102,17 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)
* If we are just doing EXPLAIN (ie, aren't going to run the plan), stop
* here. This allows an index-advisor plugin to EXPLAIN a plan containing
* references to nonexistent indexes.
*
* YB note: For aggregate pushdown, we need recheck knowledge to determine
* whether aggregates can be pushed down or not. At the time of writing,
* - aggregate pushdown only supports YB relations
* - there cannot be a mix of non-YB tables and YB indexes, and vice versa
* Use those assumptions to avoid the perf hit on EXPLAIN non-YB relations.
*/
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
return indexstate;
if (!(IsYBRelation(currentRelation) &&
(eflags & EXEC_FLAG_YB_AGG_PARENT)))
return indexstate;

/*
* Open the index relation.
Expand Down Expand Up @@ -1101,6 +1148,27 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)
NULL, /* no ArrayKeys */
NULL);

/*
* For aggregate pushdown purposes, using the scan keys, determine ahead of
* beginning the scan whether indexqual recheck might happen, and pass that
* information up to the aggregate node. Only attempt this for YB
* relations since pushdown is not supported otherwise.
*/
if (IsYBRelation(indexstate->iss_RelationDesc) &&
(eflags & EXEC_FLAG_YB_AGG_PARENT))
{
indexstate->yb_iss_might_recheck =
yb_index_might_recheck(currentRelation,
indexstate->iss_RelationDesc,
false /* xs_want_itup */,
indexstate->iss_ScanKeys,
indexstate->iss_NumScanKeys);

/* Got the info for aggregate pushdown. EXPLAIN can return now. */
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
return indexstate;
}

/*
* any ORDER BY exprs have to be turned into scankeys in the same way
*/
Expand Down
1 change: 1 addition & 0 deletions src/postgres/src/backend/executor/ybc_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ ybcSetupScanTargets(ForeignScanState *node)
YbDmlAppendTargetsAggregate(node->yb_fdw_aggrefs,
RelationGetDescr(ss->ss_currentRelation),
NULL /* index */,
false /* xs_want_itup */,
handle);

/*
Expand Down
2 changes: 2 additions & 0 deletions src/postgres/src/backend/nodes/nodeFuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -4289,6 +4289,8 @@ YbPlanStateTryGetAggrefs(PlanState *ps)
return &castNode(ForeignScanState, ps)->yb_fdw_aggrefs;
case T_IndexOnlyScanState:
return &castNode(IndexOnlyScanState, ps)->yb_ioss_aggrefs;
case T_IndexScanState:
return &castNode(IndexScanState, ps)->yb_iss_aggrefs;
case T_YbSeqScanState:
return &castNode(YbSeqScanState, ps)->aggrefs;
default:
Expand Down
Loading

0 comments on commit 55eba85

Please sign in to comment.