Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload entry root slice to QE #385

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 163 additions & 1 deletion src/backend/optimizer/plan/createplan.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "optimizer/tlist.h"
#include "parser/parse_clause.h"
#include "parser/parsetree.h"
#include "partitioning/partdesc.h"
#include "partitioning/partprune.h"
#include "utils/lsyscache.h"
#include "utils/uri.h"
Expand Down Expand Up @@ -102,6 +103,11 @@ typedef struct
bool result;
} contain_motion_walk_context;

typedef struct
{
bool computeOnSlice; /* does root slice contain computation node (Sort, Join, Agg) */
} offload_entry_to_qe_plan_walk_context;

static Plan *create_scan_plan(PlannerInfo *root, Path *best_path,
int flags);
static List *build_path_tlist(PlannerInfo *root, Path *path);
Expand Down Expand Up @@ -9063,4 +9069,160 @@ push_locus_down_after_elide_motion(Plan* plan)
plan = plan->lefttree;
}
}
}
Ray-Eldath marked this conversation as resolved.
Show resolved Hide resolved
}

/*
* Restore Entry locus to SingleQE in the root slice.
* This is simply a reverse of push_locus_down_after_elide_motion.
* The difference is that it's NOT used when creating a plan but rather
* after a plan gets created, it's used to modify the plan in offload_entry_to_qe.
*/
static void
replace_entry_locus_with_singleqe(Plan *plan)
{
while (plan && (plan->locustype == CdbLocusType_Entry))
{
plan->locustype = CdbLocusType_SingleQE;
switch (nodeTag(plan))
{
case T_Motion:
Ray-Eldath marked this conversation as resolved.
Show resolved Hide resolved
return;
case T_Append:
{
List *subplans = NIL;
ListCell *cell;
subplans = ((Append *) (plan))->appendplans;
foreach(cell, subplans)
{
replace_entry_locus_with_singleqe(lfirst(cell));
}
break;
}
case T_SubqueryScan:
plan = ((SubqueryScan *)(plan))->subplan;
break;
case T_NestLoop:
case T_MergeJoin:
case T_HashJoin:
replace_entry_locus_with_singleqe(plan->righttree);
/* FALLTHROUGH */
default:
avamingli marked this conversation as resolved.
Show resolved Hide resolved
Ray-Eldath marked this conversation as resolved.
Show resolved Hide resolved
plan = plan->lefttree;
break;
}
}
}

/*
* Check whether we can safely offload root slice on QD to a QE.
*/
static bool
safe_to_offload_entry_to_qe_rte_walker(List *rtes)
{
ListCell *lc;
foreach(lc, rtes)
{
RangeTblEntry *rte = lfirst_node(RangeTblEntry, lc);
if (rte->rtekind == RTE_RELATION)
{
// check if any partition of a partitioned table is a coordinator-only external/foreign table
yjhjstz marked this conversation as resolved.
Show resolved Hide resolved
if (rte->relkind == RELKIND_PARTITIONED_TABLE)
{
Relation rel;
PartitionDesc desc;

rel = relation_open(rte->relid, NoLock);
desc = RelationGetPartitionDesc(rel, true);
relation_close(rel, NoLock);
for (int i = 0; i < desc->nparts; i++)
{
if (GpPolicyIsEntry(GpPolicyFetch(desc->oids[i])))
return false;
}
return true;
}
else
return !GpPolicyIsEntry(GpPolicyFetch(rte->relid));
}
else if (rte->rtekind == RTE_SUBQUERY)
{
if (!safe_to_offload_entry_to_qe_rte_walker(rte->subquery->rtable))
return false;
}
}
return true;
}

/*
* Check if there are multiple Motion in which the root slice contains computation (Sort, Join or Aggregate).
*/
static bool
should_offload_entry_to_qe_plan_walker(Plan *plan, offload_entry_to_qe_plan_walk_context *ctx)
{
while (plan && plan->locustype == CdbLocusType_Entry)
{
switch (nodeTag(plan))
{
case T_Motion:
return ctx->computeOnSlice;
case T_SubqueryScan:
plan = ((SubqueryScan *) plan)->subplan;
break;
/* join */
case T_Join:
case T_MergeJoin:
case T_HashJoin:
case T_NestLoop:
ctx->computeOnSlice = true;
if (should_offload_entry_to_qe_plan_walker(plan->righttree, ctx))
return true;
plan = plan->lefttree;
break;
/* sort */
case T_Sort:
/* aggregates*/
case T_Agg:
case T_WindowAgg:
ctx->computeOnSlice = true;
/* FALLTHROUGH */
default:
Ray-Eldath marked this conversation as resolved.
Show resolved Hide resolved
plan = plan->lefttree;
break;
}
}
return false;
}

Plan *
offload_entry_to_qe(PlannerInfo *root, Plan *plan, int sendslice_parallel)
{
offload_entry_to_qe_plan_walk_context plan_walk_ctx;
plan_walk_ctx.computeOnSlice = false;

if (root->parse->commandType == CMD_SELECT &&
should_offload_entry_to_qe_plan_walker(plan, &plan_walk_ctx) &&
safe_to_offload_entry_to_qe_rte_walker(root->parse->rtable) &&
!contain_volatile_functions((Node *) root->parse))
{
CdbPathLocus entrylocus;
PlanSlice *sendSlice;
sendSlice = (PlanSlice *) palloc0(sizeof(PlanSlice));
sendSlice->gangType = GANGTYPE_SINGLETON_READER;
sendSlice->numsegments = 1;
sendSlice->sliceIndex = -1;
sendSlice->parallel_workers = sendslice_parallel;
sendSlice->segindex = gp_session_id % getgpsegmentCount();

replace_entry_locus_with_singleqe(plan);

plan = (Plan *) make_union_motion(plan);
((Motion *) plan)->senderSliceInfo = sendSlice;

plan->locustype = CdbLocusType_Entry;
CdbPathLocus_MakeEntry(&entrylocus);
if (plan->flow)
pfree(plan->flow);
Ray-Eldath marked this conversation as resolved.
Show resolved Hide resolved
plan->flow = cdbpathtoplan_create_flow(root, entrylocus);
}
return plan;
}
6 changes: 6 additions & 0 deletions src/backend/optimizer/plan/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,12 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
/* Decorate the top node of the plan with a Flow node. */
top_plan->flow = cdbpathtoplan_create_flow(root, best_path->locus);

/* Modifier: If root slice is executed on QD, try to offload it to a QE */
if (enable_offload_entry_to_qe && Gp_role == GP_ROLE_DISPATCH)
{
top_plan = offload_entry_to_qe(root, top_plan, best_path->locus.parallel_workers);
}

/*
* If creating a plan for a scrollable cursor, make sure it can run
* backwards on demand. Add a Material node at the top at need.
Expand Down
11 changes: 11 additions & 0 deletions src/backend/utils/misc/guc_gp.c
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ bool gp_enable_global_deadlock_detector = false;
bool gp_enable_predicate_pushdown;
int gp_predicate_pushdown_sample_rows;

bool enable_offload_entry_to_qe = false;
bool enable_answer_query_using_materialized_views = false;

static const struct config_enum_entry gp_log_format_options[] = {
Expand Down Expand Up @@ -2429,6 +2430,16 @@ struct config_bool ConfigureNamesBool_gp[] =
true,
NULL, NULL, NULL
},
{
{"enable_offload_entry_to_qe", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("Enable plans with operations on coordinator to be offloaded to QEs."),
NULL,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_GPDB_NO_SYNC
},
&enable_offload_entry_to_qe,
false,
NULL, NULL, NULL
},
{
{"optimizer_enable_gather_on_segment_for_dml", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("Enable DML optimization by enforcing a non-master gather in the optimizer."),
Expand Down
1 change: 1 addition & 0 deletions src/include/optimizer/planmain.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ extern Result *make_result(List *tlist, Node *resconstantqual, Plan *subplan);
extern Plan *add_sort_cost(PlannerInfo *root, Plan *input,
double limit_tuples);
extern Plan *plan_pushdown_tlist(PlannerInfo *root, Plan *plan, List *tlist); /*CDB*/
extern Plan *offload_entry_to_qe(PlannerInfo *root, Plan *plan, int sendslice_parallel); /* CBDB */

/* External use of these functions is deprecated: */
extern Sort *make_sort_from_sortclauses(List *sortcls,
Expand Down
1 change: 1 addition & 0 deletions src/include/utils/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ extern bool enable_parallel;
extern int gp_appendonly_insert_files;
extern int gp_appendonly_insert_files_tuples_range;
extern bool enable_answer_query_using_materialized_views;
extern bool enable_offload_entry_to_qe;
/*
* gp_enable_multiphase_limit is not cost based.
* When set to false, the planner will not use multi-phase limit.
Expand Down
1 change: 1 addition & 0 deletions src/include/utils/unsync_guc_name.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
"enable_seqscan",
"enable_sort",
"enable_tidscan",
"enable_offload_entry_to_qe",
"escape_string_warning",
"event_source",
"exit_on_error",
Expand Down
Loading
Loading