Skip to content

Commit

Permalink
Offload entry root slice to QE
Browse files Browse the repository at this point in the history
For some queries, Sort, (Finalized) Aggregate and Join are executed on
QD, which increase workload on a single point. This PR alleviates this
by offloading these operators to a QE. Specifically, it checks whether
the root slice (the slice of the Plan before it meets the first Motion)
contains these operators in should_offload_entry_to_qe_plan_walker, then
it checks whether the offloading can be performed safely in
safe_to_offload_entry_to_qe_rte_walker by walking the range table.
This implementation specifically does not rely on RelOptInfo or anything
postgres-optimizer-specific so that we can port this to Orca in the
future.
  • Loading branch information
Ray-Eldath committed Mar 18, 2024
1 parent ee4d648 commit 6bab2a3
Show file tree
Hide file tree
Showing 9 changed files with 1,311 additions and 1 deletion.
166 changes: 165 additions & 1 deletion src/backend/optimizer/plan/createplan.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "optimizer/tlist.h"
#include "parser/parse_clause.h"
#include "parser/parsetree.h"
#include "partitioning/partdesc.h"
#include "partitioning/partprune.h"
#include "utils/lsyscache.h"
#include "utils/uri.h"
Expand Down Expand Up @@ -102,6 +103,11 @@ typedef struct
bool result;
} contain_motion_walk_context;

typedef struct
{
bool computeOnSlice; /* does root slice contain computation node (Sort, Join, Agg) */
} offload_entry_to_qe_plan_walk_context;

static Plan *create_scan_plan(PlannerInfo *root, Path *best_path,
int flags);
static List *build_path_tlist(PlannerInfo *root, Path *path);
Expand Down Expand Up @@ -9063,4 +9069,162 @@ push_locus_down_after_elide_motion(Plan* plan)
plan = plan->lefttree;
}
}
}
}

/*
* Restore Entry locus to SingleQE in the root slice.
* This is simply a reverse of push_locus_down_after_elide_motion.
* The difference is that it's NOT used when creating a plan but rather
* after a plan gets created, it's used to modify the plan in offload_entry_to_qe.
*/
static void
replace_entry_locus_with_singleqe(Plan *plan)
{
while (plan && (plan->locustype == CdbLocusType_Entry))
{
plan->locustype = CdbLocusType_SingleQE;
switch (nodeTag(plan))
{
case T_Motion:
return;
case T_Append:
{
List *subplans = NIL;
ListCell *cell;
subplans = ((Append *) (plan))->appendplans;
foreach(cell, subplans)
{
replace_entry_locus_with_singleqe(lfirst(cell));
}
break;
}
case T_SubqueryScan:
plan = ((SubqueryScan *)(plan))->subplan;
break;
case T_NestLoop:
case T_MergeJoin:
case T_HashJoin:
replace_entry_locus_with_singleqe(plan->righttree);
/* FALLTHROUGH */
default:
plan = plan->lefttree;
break;
}
}
}

/*
* Check whether we can safely offload root slice on QD to a QE.
*/
static bool
safe_to_offload_entry_to_qe_rte_walker(List *rtes)
{
ListCell *lc;
foreach(lc, rtes)
{
RangeTblEntry *rte = lfirst_node(RangeTblEntry, lc);
if (rte->rtekind == RTE_RELATION)
{
// check if any partition of a partitioned table is a coordinator-only external/foreign table
if (rte->relkind == RELKIND_PARTITIONED_TABLE)
{
Relation rel;
PartitionDesc desc;

rel = relation_open(rte->relid, NoLock);
desc = RelationGetPartitionDesc(rel, true);
relation_close(rel, NoLock);
for (int i = 0; i < desc->nparts; i++)
{
if (GpPolicyIsEntry(GpPolicyFetch(desc->oids[i])))
return false;
}
return true;
}
else
return !GpPolicyIsEntry(GpPolicyFetch(rte->relid));
}
else if (rte->rtekind == RTE_SUBQUERY)
{
if (!safe_to_offload_entry_to_qe_rte_walker(rte->subquery->rtable))
return false;
}
}
return true;
}

/*
* Check if there are multiple Motion in which the root slice contains computation (Sort, Join or Aggregate).
*/
static bool
should_offload_entry_to_qe_plan_walker(Plan *plan, offload_entry_to_qe_plan_walk_context *ctx)
{
while (plan && plan->locustype == CdbLocusType_Entry)
{
switch (nodeTag(plan))
{
case T_Motion:
return ctx->computeOnSlice;
case T_SubqueryScan:
plan = ((SubqueryScan *) plan)->subplan;
break;
/* join */
case T_Join:
case T_MergeJoin:
case T_HashJoin:
case T_NestLoop:
ctx->computeOnSlice = true;
if (should_offload_entry_to_qe_plan_walker(plan->righttree, ctx))
return true;
plan = plan->lefttree;
break;
/* sort */
case T_Sort:
/* aggregates*/
case T_Agg:
case T_WindowAgg:
ctx->computeOnSlice = true;
/* FALLTHROUGH */
default:
plan = plan->lefttree;
break;
}
}
return false;
}

Plan *
offload_entry_to_qe(PlannerInfo *root, Plan *plan, int sendslice_parallel)
{
offload_entry_to_qe_plan_walk_context plan_walk_ctx;
plan_walk_ctx.computeOnSlice = false;

if (root->parse->commandType == CMD_SELECT &&
should_offload_entry_to_qe_plan_walker(plan, &plan_walk_ctx) &&
safe_to_offload_entry_to_qe_rte_walker(root->parse->rtable) &&
!contain_volatile_functions((Node *) root->parse))
{
CdbPathLocus entrylocus;
PlanSlice *sendSlice;
{
sendSlice = (PlanSlice *) palloc0(sizeof(PlanSlice));
sendSlice->gangType = GANGTYPE_SINGLETON_READER;
sendSlice->numsegments = 1;
sendSlice->sliceIndex = -1;
sendSlice->parallel_workers = sendslice_parallel;
sendSlice->segindex = gp_session_id % getgpsegmentCount();
}

replace_entry_locus_with_singleqe(plan);

plan = (Plan *) make_union_motion(plan);
((Motion *) plan)->senderSliceInfo = sendSlice;

plan->locustype = CdbLocusType_Entry;
CdbPathLocus_MakeEntry(&entrylocus);
if (plan->flow)
pfree(plan->flow);
plan->flow = cdbpathtoplan_create_flow(root, entrylocus);
}
return plan;
}
6 changes: 6 additions & 0 deletions src/backend/optimizer/plan/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,12 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
/* Decorate the top node of the plan with a Flow node. */
top_plan->flow = cdbpathtoplan_create_flow(root, best_path->locus);

/* Modifier: If root slice is executed on QD, try to offload it to a QE */
if (enable_offload_entry_to_qe && Gp_role == GP_ROLE_DISPATCH)
{
top_plan = offload_entry_to_qe(root, top_plan, best_path->locus.parallel_workers);
}

/*
* If creating a plan for a scrollable cursor, make sure it can run
* backwards on demand. Add a Material node at the top at need.
Expand Down
11 changes: 11 additions & 0 deletions src/backend/utils/misc/guc_gp.c
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ bool gp_enable_global_deadlock_detector = false;
bool gp_enable_predicate_pushdown;
int gp_predicate_pushdown_sample_rows;

bool enable_offload_entry_to_qe = false;
bool enable_answer_query_using_materialized_views = false;

static const struct config_enum_entry gp_log_format_options[] = {
Expand Down Expand Up @@ -2429,6 +2430,16 @@ struct config_bool ConfigureNamesBool_gp[] =
true,
NULL, NULL, NULL
},
{
{"enable_offload_entry_to_qe", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("Enable plans with operations on coordinator to be offloaded to QEs."),
NULL,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_GPDB_NO_SYNC
},
&enable_offload_entry_to_qe,
false,
NULL, NULL, NULL
},
{
{"optimizer_enable_gather_on_segment_for_dml", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("Enable DML optimization by enforcing a non-master gather in the optimizer."),
Expand Down
1 change: 1 addition & 0 deletions src/include/optimizer/planmain.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ extern Result *make_result(List *tlist, Node *resconstantqual, Plan *subplan);
extern Plan *add_sort_cost(PlannerInfo *root, Plan *input,
double limit_tuples);
extern Plan *plan_pushdown_tlist(PlannerInfo *root, Plan *plan, List *tlist); /*CDB*/
extern Plan *offload_entry_to_qe(PlannerInfo *root, Plan *plan, int sendslice_parallel); /* CBDB */

/* External use of these functions is deprecated: */
extern Sort *make_sort_from_sortclauses(List *sortcls,
Expand Down
1 change: 1 addition & 0 deletions src/include/utils/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ extern bool enable_parallel;
extern int gp_appendonly_insert_files;
extern int gp_appendonly_insert_files_tuples_range;
extern bool enable_answer_query_using_materialized_views;
extern bool enable_offload_entry_to_qe;
/*
* gp_enable_multiphase_limit is not cost based.
* When set to false, the planner will not use multi-phase limit.
Expand Down
1 change: 1 addition & 0 deletions src/include/utils/unsync_guc_name.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
"enable_seqscan",
"enable_sort",
"enable_tidscan",
"enable_offload_entry_to_qe",
"escape_string_warning",
"event_source",
"exit_on_error",
Expand Down
Loading

0 comments on commit 6bab2a3

Please sign in to comment.