diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 846dd9fab83..fa60d57f645 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -46,6 +46,7 @@ #include "optimizer/tlist.h" #include "parser/parse_clause.h" #include "parser/parsetree.h" +#include "partitioning/partdesc.h" #include "partitioning/partprune.h" #include "utils/lsyscache.h" #include "utils/uri.h" @@ -102,6 +103,11 @@ typedef struct bool result; } contain_motion_walk_context; +typedef struct +{ + bool computeOnSlice; /* does root slice contain computation node (Sort, Join, Agg) */ +} offload_entry_to_qe_plan_walk_context; + static Plan *create_scan_plan(PlannerInfo *root, Path *best_path, int flags); static List *build_path_tlist(PlannerInfo *root, Path *path); @@ -9063,4 +9069,162 @@ push_locus_down_after_elide_motion(Plan* plan) plan = plan->lefttree; } } -} \ No newline at end of file +} + +/* + * Restore Entry locus to SingleQE in the root slice. + * This is simply a reverse of push_locus_down_after_elide_motion. + * The difference is that it's NOT used when creating a plan but rather + * after a plan gets created, it's used to modify the plan in offload_entry_to_qe. + */ +static void +replace_entry_locus_with_singleqe(Plan *plan) +{ + while (plan && (plan->locustype == CdbLocusType_Entry)) + { + plan->locustype = CdbLocusType_SingleQE; + switch (nodeTag(plan)) + { + case T_Motion: + return; + case T_Append: + { + List *subplans = NIL; + ListCell *cell; + subplans = ((Append *) (plan))->appendplans; + foreach(cell, subplans) + { + replace_entry_locus_with_singleqe(lfirst(cell)); + } + break; + } + case T_SubqueryScan: + plan = ((SubqueryScan *)(plan))->subplan; + break; + case T_NestLoop: + case T_MergeJoin: + case T_HashJoin: + replace_entry_locus_with_singleqe(plan->righttree); + /* FALLTHROUGH */ + default: + plan = plan->lefttree; + break; + } + } +} + +/* + * Check whether we can safely offload root slice on QD to a QE. + */ +static bool +safe_to_offload_entry_to_qe_rte_walker(List *rtes) +{ + ListCell *lc; + foreach(lc, rtes) + { + RangeTblEntry *rte = lfirst_node(RangeTblEntry, lc); + if (rte->rtekind == RTE_RELATION) + { + // check if any partition of a partitioned table is a coordinator-only external/foreign table + if (rte->relkind == RELKIND_PARTITIONED_TABLE) + { + Relation rel; + PartitionDesc desc; + + rel = relation_open(rte->relid, NoLock); + desc = RelationGetPartitionDesc(rel, true); + relation_close(rel, NoLock); + for (int i = 0; i < desc->nparts; i++) + { + if (GpPolicyIsEntry(GpPolicyFetch(desc->oids[i]))) + return false; + } + return true; + } + else + return !GpPolicyIsEntry(GpPolicyFetch(rte->relid)); + } + else if (rte->rtekind == RTE_SUBQUERY) + { + if (!safe_to_offload_entry_to_qe_rte_walker(rte->subquery->rtable)) + return false; + } + } + return true; +} + +/* + * Check if there are multiple Motion in which the root slice contains computation (Sort, Join or Aggregate). + */ +static bool +should_offload_entry_to_qe_plan_walker(Plan *plan, offload_entry_to_qe_plan_walk_context *ctx) +{ + while (plan && plan->locustype == CdbLocusType_Entry) + { + switch (nodeTag(plan)) + { + case T_Motion: + return ctx->computeOnSlice; + case T_SubqueryScan: + plan = ((SubqueryScan *) plan)->subplan; + break; + /* join */ + case T_Join: + case T_MergeJoin: + case T_HashJoin: + case T_NestLoop: + ctx->computeOnSlice = true; + if (should_offload_entry_to_qe_plan_walker(plan->righttree, ctx)) + return true; + plan = plan->lefttree; + break; + /* sort */ + case T_Sort: + /* aggregates*/ + case T_Agg: + case T_WindowAgg: + ctx->computeOnSlice = true; + /* FALLTHROUGH */ + default: + plan = plan->lefttree; + break; + } + } + return false; +} + +Plan * +offload_entry_to_qe(PlannerInfo *root, Plan *plan, int sendslice_parallel) +{ + offload_entry_to_qe_plan_walk_context plan_walk_ctx; + plan_walk_ctx.computeOnSlice = false; + + if (root->parse->commandType == CMD_SELECT && + should_offload_entry_to_qe_plan_walker(plan, &plan_walk_ctx) && + safe_to_offload_entry_to_qe_rte_walker(root->parse->rtable) && + !contain_volatile_functions((Node *) root->parse)) + { + CdbPathLocus entrylocus; + PlanSlice *sendSlice; + { + sendSlice = (PlanSlice *) palloc0(sizeof(PlanSlice)); + sendSlice->gangType = GANGTYPE_SINGLETON_READER; + sendSlice->numsegments = 1; + sendSlice->sliceIndex = -1; + sendSlice->parallel_workers = sendslice_parallel; + sendSlice->segindex = gp_session_id % getgpsegmentCount(); + } + + replace_entry_locus_with_singleqe(plan); + + plan = (Plan *) make_union_motion(plan); + ((Motion *) plan)->senderSliceInfo = sendSlice; + + plan->locustype = CdbLocusType_Entry; + CdbPathLocus_MakeEntry(&entrylocus); + if (plan->flow) + pfree(plan->flow); + plan->flow = cdbpathtoplan_create_flow(root, entrylocus); + } + return plan; +} diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 2163d9b543a..3b2cb809038 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -594,6 +594,12 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, /* Decorate the top node of the plan with a Flow node. */ top_plan->flow = cdbpathtoplan_create_flow(root, best_path->locus); + /* Modifier: If root slice is executed on QD, try to offload it to a QE */ + if (enable_offload_entry_to_qe && Gp_role == GP_ROLE_DISPATCH) + { + top_plan = offload_entry_to_qe(root, top_plan, best_path->locus.parallel_workers); + } + /* * If creating a plan for a scrollable cursor, make sure it can run * backwards on demand. Add a Material node at the top at need. diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index 786d52185b0..f7a1dd2b5cb 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -430,6 +430,7 @@ bool gp_enable_global_deadlock_detector = false; bool gp_enable_predicate_pushdown; int gp_predicate_pushdown_sample_rows; +bool enable_offload_entry_to_qe = false; bool enable_answer_query_using_materialized_views = false; static const struct config_enum_entry gp_log_format_options[] = { @@ -2429,6 +2430,16 @@ struct config_bool ConfigureNamesBool_gp[] = true, NULL, NULL, NULL }, + { + {"enable_offload_entry_to_qe", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Enable plans with operations on coordinator to be offloaded to QEs."), + NULL, + GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_GPDB_NO_SYNC + }, + &enable_offload_entry_to_qe, + false, + NULL, NULL, NULL + }, { {"optimizer_enable_gather_on_segment_for_dml", PGC_USERSET, DEVELOPER_OPTIONS, gettext_noop("Enable DML optimization by enforcing a non-master gather in the optimizer."), diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 13a0c95266c..9fcc979b349 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -80,6 +80,7 @@ extern Result *make_result(List *tlist, Node *resconstantqual, Plan *subplan); extern Plan *add_sort_cost(PlannerInfo *root, Plan *input, double limit_tuples); extern Plan *plan_pushdown_tlist(PlannerInfo *root, Plan *plan, List *tlist); /*CDB*/ +extern Plan *offload_entry_to_qe(PlannerInfo *root, Plan *plan, int sendslice_parallel); /* CBDB */ /* External use of these functions is deprecated: */ extern Sort *make_sort_from_sortclauses(List *sortcls, diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index a24949b1b6f..90f054e7f33 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -301,6 +301,7 @@ extern bool enable_parallel; extern int gp_appendonly_insert_files; extern int gp_appendonly_insert_files_tuples_range; extern bool enable_answer_query_using_materialized_views; +extern bool enable_offload_entry_to_qe; /* * gp_enable_multiphase_limit is not cost based. * When set to false, the planner will not use multi-phase limit. diff --git a/src/include/utils/unsync_guc_name.h b/src/include/utils/unsync_guc_name.h index 04256739496..e89222e80e0 100644 --- a/src/include/utils/unsync_guc_name.h +++ b/src/include/utils/unsync_guc_name.h @@ -150,6 +150,7 @@ "enable_seqscan", "enable_sort", "enable_tidscan", + "enable_offload_entry_to_qe", "escape_string_warning", "event_source", "exit_on_error", diff --git a/src/test/regress/expected/offload_entry_to_qe.out b/src/test/regress/expected/offload_entry_to_qe.out new file mode 100644 index 00000000000..ea6dde325a8 --- /dev/null +++ b/src/test/regress/expected/offload_entry_to_qe.out @@ -0,0 +1,935 @@ +set optimizer = off; -- orca is currently unsupported +set enable_offload_entry_to_qe = on; +create temp table tst(x int, y int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Cloudberry Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +create index tst_y_idx on tst(y); +insert into tst values (1, 1), (1, 1), (2, 1), (10, 10); +analyze tst; +-- accept Aggregate, Finalize Aggregate, Limit + Finalize Aggregate, WindowAgg, Sort + Unique +explain (costs off, locus) select count(x) from tst where x > 1; + QUERY PLAN +------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Seq Scan on tst + Locus: Hashed + Filter: (x > 1) + Optimizer: Postgres query optimizer +(10 rows) + +explain (costs off, locus) select count(x) from tst; + QUERY PLAN +------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(11 rows) + +explain (costs off, locus) select count(x) from tst limit 1; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Limit + Locus: SingleQE + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(13 rows) + +explain (costs off, locus) select count(x) over () from tst; + QUERY PLAN +------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> WindowAgg + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(9 rows) + +explain (costs off, locus) select count(x) over () from tst group by x limit 1; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Limit + Locus: SingleQE + -> WindowAgg + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> HashAggregate + Locus: Hashed + Group Key: x + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(14 rows) + +explain (costs off, locus) select distinct min(x), max(x) from tst; + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Unique + Locus: SingleQE + Group Key: (min(x)), (max(x)) + -> Sort + Locus: SingleQE + Sort Key: (min(x)), (max(x)) + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(17 rows) + +reset enable_offload_entry_to_qe; -- compare results with GUC set to false +select count(x) from tst where x > 1; + count +------- + 2 +(1 row) + +select count(x) from tst; + count +------- + 4 +(1 row) + +select count(x) from tst limit 1; + count +------- + 4 +(1 row) + +select count(x) over () from tst; + count +------- + 4 + 4 + 4 + 4 +(4 rows) + +select count(x) over () from tst group by x limit 1; + count +------- + 3 +(1 row) + +select distinct min(x), max(x) from tst; + min | max +-----+----- + 1 | 10 +(1 row) + +set enable_offload_entry_to_qe = on; +select count(x) from tst where x > 1; + count +------- + 2 +(1 row) + +select count(x) from tst; + count +------- + 4 +(1 row) + +select count(x) from tst limit 1; + count +------- + 4 +(1 row) + +select count(x) over () from tst; + count +------- + 4 + 4 + 4 + 4 +(4 rows) + +select count(x) over () from tst group by x limit 1; + count +------- + 3 +(1 row) + +select distinct min(x), max(x) from tst; + min | max +-----+----- + 1 | 10 +(1 row) + +-- accept Merge Join, Nested Loop and Hash Join +set enable_seqscan = off; +set enable_nestloop = off; +set enable_mergejoin = on; +explain (costs off, locus) select * from tst t1, tst t2 where t1.y = t2.y order by t1.y; + QUERY PLAN +-------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Merge Join + Locus: SingleQE + Merge Cond: (t1.y = t2.y) + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + Merge Key: t1.y + -> Index Scan using tst_y_idx on tst t1 + Locus: Hashed + -> Materialize + Locus: SingleQE + -> Gather Motion 3:1 (slice3; segments: 3) + Locus: SingleQE + Merge Key: t2.y + -> Index Scan using tst_y_idx on tst t2 + Locus: Hashed + Optimizer: Postgres query optimizer +(18 rows) + +set enable_offload_entry_to_qe = off; +select * from tst t1, tst t2 where t1.y = t2.y order by t1.y; + x | y | x | y +----+----+----+---- + 2 | 1 | 2 | 1 + 2 | 1 | 1 | 1 + 2 | 1 | 1 | 1 + 1 | 1 | 2 | 1 + 1 | 1 | 1 | 1 + 1 | 1 | 1 | 1 + 1 | 1 | 2 | 1 + 1 | 1 | 1 | 1 + 1 | 1 | 1 | 1 + 10 | 10 | 10 | 10 +(10 rows) + +set enable_offload_entry_to_qe = on; +select * from tst t1, tst t2 where t1.y = t2.y order by t1.y; + x | y | x | y +----+----+----+---- + 2 | 1 | 2 | 1 + 2 | 1 | 1 | 1 + 2 | 1 | 1 | 1 + 1 | 1 | 2 | 1 + 1 | 1 | 1 | 1 + 1 | 1 | 1 | 1 + 1 | 1 | 2 | 1 + 1 | 1 | 1 | 1 + 1 | 1 | 1 | 1 + 10 | 10 | 10 | 10 +(10 rows) + +reset enable_mergejoin; +reset enable_nestloop; +reset enable_seqscan; +explain (costs off, locus) select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on true; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Nested Loop + Locus: SingleQE + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst + Locus: Hashed + -> Values Scan on "*VALUES*" + Locus: General + Optimizer: Postgres query optimizer +(15 rows) + +explain (costs off, locus) select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on t1.x = v.x; + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Hash Join + Locus: SingleQE + Hash Cond: ("*VALUES*".column1 = (min(tst.x))) + -> Values Scan on "*VALUES*" + Locus: General + -> Hash + Locus: SingleQE + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(18 rows) + +reset enable_offload_entry_to_qe; +select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on true; + x | x +---+---- + 1 | 1 + 1 | 10 +(2 rows) + +select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on t1.x = v.x; + x | x +---+--- + 1 | 1 +(1 row) + +set enable_offload_entry_to_qe = on; +select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on true; + x | x +---+---- + 1 | 1 + 1 | 10 +(2 rows) + +select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on t1.x = v.x; + x | x +---+--- + 1 | 1 +(1 row) + +-- accept InitPlan and SubPlan +explain (costs off, locus) select count(*) from tst where tst.y = (select max(y) from tst); + QUERY PLAN +-------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Aggregate + Locus: SingleQE + InitPlan 1 (returns $1) (slice3) + -> Finalize Aggregate + Locus: Entry + -> Gather Motion 3:1 (slice4; segments: 3) + Locus: Entry + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst tst_1 + Locus: Hashed + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Seq Scan on tst + Locus: Hashed + Filter: (y = $1) + Optimizer: Postgres query optimizer +(19 rows) + +explain (costs off, locus) select (select max((select distinct x from tst t2 where t2.x = t1.x))) from tst t1; + QUERY PLAN +----------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst t1 + Locus: Hashed + SubPlan 1 + -> Unique + Locus: OuteryQuery + -> Result + Locus: OuteryQuery + Filter: (t2.x = t1.x) + -> Materialize + Locus: OuteryQuery + -> Broadcast Motion 3:3 (slice3; segments: 3) + Locus: OuteryQuery + -> Seq Scan on tst t2 + Locus: Hashed + SubPlan 2 + -> Result + Locus: General + Optimizer: Postgres query optimizer +(26 rows) + +reset enable_offload_entry_to_qe; +select count(*) from tst where tst.y = (select max(y) from tst); + count +------- + 1 +(1 row) + +select (select max((select distinct x from tst t2 where t2.x = t1.x))) from tst t1; + max +----- + 10 +(1 row) + +set enable_offload_entry_to_qe = on; +select count(*) from tst where tst.y = (select max(y) from tst); + count +------- + 1 +(1 row) + +select (select max((select distinct x from tst t2 where t2.x = t1.x))) from tst t1; + max +----- + 10 +(1 row) + +-- test cte and recursive cte +explain (costs off, locus) +with t(a,b) as (select min(y), max(y) from tst), + unused(a) as (select * from tst) +select t.b, rank() over () from t; -- test Subquery on QD + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> WindowAgg + Locus: SingleQE + -> Subquery Scan on t + Locus: SingleQE + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(15 rows) + +explain (costs off, locus) +with t1(a,b) as (select min(y), max(y) from tst), + t2(a,b) as (select x, count(x) over () from tst group by x) +select * from t1 join t2 on t1.a < t2.a order by t1.a, t2.a; + QUERY PLAN +------------------------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Sort + Locus: SingleQE + Sort Key: (min(tst.y)), tst_1.x + -> Nested Loop + Locus: SingleQE + Join Filter: ((min(tst.y)) < tst_1.x) + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst + Locus: Hashed + -> Materialize + Locus: SingleQE + -> WindowAgg + Locus: SingleQE + -> Gather Motion 3:1 (slice3; segments: 3) + Locus: SingleQE + -> HashAggregate + Locus: Hashed + Group Key: tst_1.x + -> Seq Scan on tst tst_1 + Locus: Hashed + Optimizer: Postgres query optimizer +(28 rows) + +create table recursive_cte_tst (id int,parentid int,score int) distributed replicated; +insert into recursive_cte_tst values(0, -1, 1); +insert into recursive_cte_tst values(1, 0, 1); +insert into recursive_cte_tst values(2, 0, 2); +insert into recursive_cte_tst values(3, 1, 10); +insert into recursive_cte_tst values(4, 1, 5); +insert into recursive_cte_tst values(5, 2, 1); +insert into recursive_cte_tst values(6, 3, 41); +insert into recursive_cte_tst values(7, 4, 42); +insert into recursive_cte_tst values(8, 5, 42); +insert into recursive_cte_tst values(9, 6, 42); +explain (locus, costs off) with recursive cte as ( + select 1 depth, a.id, a.score from recursive_cte_tst a where id = 0 + union all + select c.depth + 1, k.id, k.score from recursive_cte_tst k inner join cte c on c.id = k.parentid +) +select rank() over (order by avg(score) desc), + avg(score) +from cte group by depth order by avg desc limit 5; -- note that the SingleQE on the left side of RecursiveUnion *isn't* the same SingleQE as the right side + QUERY PLAN +------------------------------------------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Limit + Locus: SingleQE + -> WindowAgg + Locus: SingleQE + Order By: (avg(a.score)) + -> Sort + Locus: SingleQE + Sort Key: (avg(a.score)) DESC + -> HashAggregate + Locus: SingleQE + Group Key: (1) + -> Recursive Union + Locus: SingleQE + -> Gather Motion 1:1 (slice2; segments: 1) + Locus: SingleQE + -> Seq Scan on recursive_cte_tst a + Locus: SegmentGeneral + Filter: (id = 0) + -> Hash Join + Locus: SingleQE + Hash Cond: (c.id = k.parentid) + -> WorkTable Scan on cte c + Locus: SingleQE + -> Hash + Locus: SingleQE + -> Gather Motion 1:1 (slice3; segments: 1) + Locus: SingleQE + -> Seq Scan on recursive_cte_tst k + Locus: SegmentGeneral + Optimizer: Postgres query optimizer +(32 rows) + +reset enable_offload_entry_to_qe; +with t(a,b) as (select min(y), max(y) from tst), + unused(a) as (select * from tst) +select t.b, rank() over () from t; + b | rank +----+------ + 10 | 1 +(1 row) + +with t1(a,b) as (select min(y), max(y) from tst), + t2(a,b) as (select x, count(x) over () from tst group by x) +select * from t1 join t2 on t1.a < t2.a order by t1.a, t2.a; + a | b | a | b +---+----+----+--- + 1 | 10 | 2 | 3 + 1 | 10 | 10 | 3 +(2 rows) + +with recursive cte as ( + select 1 depth, a.id, a.score from recursive_cte_tst a where id = 0 + union all + select c.depth + 1, k.id, k.score from recursive_cte_tst k inner join cte c on c.id = k.parentid +) select rank() over (order by avg(score) desc), avg(score) from cte group by depth order by avg desc limit 5; + rank | avg +------+------------------------ + 1 | 42.0000000000000000 + 2 | 41.6666666666666667 + 3 | 5.3333333333333333 + 4 | 1.5000000000000000 + 5 | 1.00000000000000000000 +(5 rows) + +set enable_offload_entry_to_qe = on; +with t(a,b) as (select min(y), max(y) from tst), + unused(a) as (select * from tst) +select t.b, rank() over () from t; + b | rank +----+------ + 10 | 1 +(1 row) + +with t1(a,b) as (select min(y), max(y) from tst), + t2(a,b) as (select x, count(x) over () from tst group by x) +select * from t1 join t2 on t1.a < t2.a order by t1.a, t2.a; + a | b | a | b +---+----+----+--- + 1 | 10 | 2 | 3 + 1 | 10 | 10 | 3 +(2 rows) + +with recursive cte as ( + select 1 depth, a.id, a.score from recursive_cte_tst a where id = 0 + union all + select c.depth + 1, k.id, k.score from recursive_cte_tst k inner join cte c on c.id = k.parentid +) select rank() over (order by avg(score) desc), avg(score) from cte group by depth order by avg desc limit 5; + rank | avg +------+------------------------ + 1 | 42.0000000000000000 + 2 | 41.6666666666666667 + 3 | 5.3333333333333333 + 4 | 1.5000000000000000 + 5 | 1.00000000000000000000 +(5 rows) + +-- reject pure Limit and pure InitPlan +explain (costs off, locus) select * from tst limit 1; + QUERY PLAN +------------------------------------------------ + Limit + Locus: Entry + -> Gather Motion 3:1 (slice1; segments: 3) + Locus: Entry + -> Limit + Locus: Hashed + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(9 rows) + +create function dummyf(int) returns int as 'select 1;' language sql; +explain (costs off, locus) select min(dummyf(x)) from tst; + QUERY PLAN +-------------------------------------------------------- + Result + Locus: Entry + InitPlan 1 (returns $0) (slice1) + -> Limit + Locus: Entry + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: Entry + -> Result + Locus: Hashed + One-Time Filter: (1 IS NOT NULL) + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(13 rows) + +explain (costs off, locus) select count(*) from tst where tst.x = (select min(dummyf(x)) from tst); + QUERY PLAN +---------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Aggregate + Locus: SingleQE + InitPlan 2 (returns $1) (slice3) + -> Result + Locus: Entry + InitPlan 1 (returns $0) (slice4) + -> Limit + Locus: Entry + -> Gather Motion 3:1 (slice5; segments: 3) + Locus: Entry + -> Result + Locus: Hashed + One-Time Filter: (1 IS NOT NULL) + -> Seq Scan on tst tst_1 + Locus: Hashed + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Seq Scan on tst + Locus: Hashed + Filter: (x = $1) + Optimizer: Postgres query optimizer +(23 rows) + +reset enable_offload_entry_to_qe; +select min(dummyf(x)) from tst; + min +----- + 1 +(1 row) + +select count(*) from tst where tst.x = (select min(dummyf(x)) from tst); + count +------- + 2 +(1 row) + +set enable_offload_entry_to_qe = on; +select min(dummyf(x)) from tst; + min +----- + 1 +(1 row) + +select count(*) from tst where tst.x = (select min(dummyf(x)) from tst); + count +------- + 2 +(1 row) + +-- reject updates +explain (costs off, locus) update tst set x = (select min(x) from tst); + QUERY PLAN +--------------------------------------------------------------- + Update on tst + Locus: Strewn + InitPlan 1 (returns $0) (slice2) + -> Finalize Aggregate + Locus: Entry + -> Gather Motion 3:1 (slice3; segments: 3) + Locus: Entry + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst tst_1 + Locus: Hashed + -> Explicit Redistribute Motion 3:3 (slice1; segments: 3) + Locus: Hashed + -> Split + Locus: Hashed + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(18 rows) + +-- test functions +explain (costs off, locus) select max(x)::text || ' ' || timeofday() from tst; -- volatile + QUERY PLAN +------------------------------------------------ + Finalize Aggregate + Locus: Entry + -> Gather Motion 3:1 (slice1; segments: 3) + Locus: Entry + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(9 rows) + +explain (costs off, locus) select max(x)::text || ' ' || now() from tst; -- stable + QUERY PLAN +------------------------------------------------------ + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Hashed + -> Seq Scan on tst + Locus: Hashed + Optimizer: Postgres query optimizer +(11 rows) + +-- test write functions +create function mod_dummyf(i int) returns int as $$ +begin +update tst set y = y + 1 where x = $1; +return $1; +end; +$$ language plpgsql stable; +explain (costs off, locus) select mod_dummyf(42); + QUERY PLAN +------------------------------------- + Result + Locus: General + Optimizer: Postgres query optimizer +(3 rows) + +select mod_dummyf(42); -- should fail +ERROR: UPDATE is not allowed in a non-volatile function +CONTEXT: SQL statement "update tst set y = y + 1 where x = $1" +PL/pgSQL function mod_dummyf(integer) line 3 at SQL statement +drop function dummyf; +drop function mod_dummyf; +-- test external table +CREATE EXTERNAL WEB TEMP TABLE tst_exttbl(LIKE tst) EXECUTE 'printf "1\t42\n"' ON COORDINATOR FORMAT 'text'; +CREATE EXTERNAL WEB TEMP TABLE tst_exttbl_all(LIKE tst) EXECUTE 'printf "2\t43\n"' ON ALL FORMAT 'text'; +explain (costs off, locus) select max(e.y) from tst_exttbl e join tst t2 on (e.x = t2.x); + QUERY PLAN +------------------------------------------------------------ + Aggregate + Locus: Entry + -> Gather Motion 3:1 (slice1; segments: 3) + Locus: Entry + -> Hash Join + Locus: Hashed + Hash Cond: (t2.x = e.x) + -> Seq Scan on tst t2 + Locus: Hashed + -> Hash + Locus: Hashed + -> Redistribute Motion 1:3 (slice2) + Locus: Hashed + Hash Key: e.x + -> Foreign Scan on tst_exttbl e + Locus: Entry + Optimizer: Postgres query optimizer +(17 rows) + +explain (costs off, locus) select max(e.y) from tst_exttbl_all e join tst t2 on (e.x = t2.x); + QUERY PLAN +--------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Strewn + -> Hash Join + Locus: Strewn + Hash Cond: (e.x = t2.x) + -> Foreign Scan on tst_exttbl_all e + Locus: Strewn + -> Hash + Locus: Replicated + -> Broadcast Motion 3:3 (slice3; segments: 3) + Locus: Replicated + -> Seq Scan on tst t2 + Locus: Hashed + Optimizer: Postgres query optimizer +(20 rows) + +reset enable_offload_entry_to_qe; +select max(e.y) from tst_exttbl e join tst t2 on (e.x = t2.x); + max +----- + 42 +(1 row) + +select max(e.y) from tst_exttbl_all e join tst t2 on (e.x = t2.x); + max +----- + 43 +(1 row) + +set enable_offload_entry_to_qe = on; +select max(e.y) from tst_exttbl e join tst t2 on (e.x = t2.x); + max +----- + 42 +(1 row) + +select max(e.y) from tst_exttbl_all e join tst t2 on (e.x = t2.x); + max +----- + 43 +(1 row) + +-- test partitioned table +create temp table part(like tst) distributed by (x) partition by range (x) +( + partition part1 start (0) end (10), + partition part2 start (10) end (20) +); +insert into part select * from tst; +explain (costs off, locus) select min(y), max(y) from part; + QUERY PLAN +------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Strewn + -> Append + Locus: Strewn + -> Seq Scan on part_1_prt_part1 part_1 + Locus: Hashed + -> Seq Scan on part_1_prt_part2 part_2 + Locus: Hashed + Optimizer: Postgres query optimizer +(15 rows) + +reset enable_offload_entry_to_qe; +select min(y), max(y) from part; + min | max +-----+----- + 1 | 10 +(1 row) + +set enable_offload_entry_to_qe = on; +select min(y), max(y) from part; + min | max +-----+----- + 1 | 10 +(1 row) + +-- test partitioned table with external table as partition +ALTER TABLE part EXCHANGE PARTITION part1 WITH TABLE tst_exttbl; +explain (costs off, locus) select min(y), max(y) from part; + QUERY PLAN +------------------------------------------------------- + Aggregate + Locus: Entry + -> Append + Locus: Entry + -> Foreign Scan on part_1_prt_part1 part_1 + Locus: Entry + -> Gather Motion 3:1 (slice1; segments: 3) + Locus: Entry + -> Seq Scan on part_1_prt_part2 part_2 + Locus: Hashed + Optimizer: Postgres query optimizer +(11 rows) + +reset enable_offload_entry_to_qe; +select min(y), max(y) from part; + min | max +-----+----- + 10 | 42 +(1 row) + +set enable_offload_entry_to_qe = on; +select min(y), max(y) from part; + min | max +-----+----- + 10 | 42 +(1 row) + +ALTER TABLE part EXCHANGE PARTITION part1 WITH TABLE tst_exttbl_all; +explain (costs off, locus) select min(y), max(y) from part; + QUERY PLAN +----------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: SingleQE + -> Partial Aggregate + Locus: Strewn + -> Append + Locus: Strewn + -> Foreign Scan on part_1_prt_part1 part_1 + Locus: Strewn + -> Seq Scan on part_1_prt_part2 part_2 + Locus: Hashed + Optimizer: Postgres query optimizer +(15 rows) + +reset enable_offload_entry_to_qe; +select min(y), max(y) from part; + min | max +-----+----- + 10 | 43 +(1 row) + +set enable_offload_entry_to_qe = on; +select min(y), max(y) from part; + min | max +-----+----- + 10 | 43 +(1 row) + diff --git a/src/test/regress/greenplum_schedule b/src/test/regress/greenplum_schedule index e1fb04294a1..e15d5a3c6ee 100755 --- a/src/test/regress/greenplum_schedule +++ b/src/test/regress/greenplum_schedule @@ -301,6 +301,9 @@ test: create_extension_fail # check profile feature test: profile +# check offload entry root slice to QE feature +test: offload_entry_to_qe + # Tests of Answer Query Using Materialized Views. test: aqumv diff --git a/src/test/regress/sql/offload_entry_to_qe.sql b/src/test/regress/sql/offload_entry_to_qe.sql new file mode 100644 index 00000000000..67fd15d1dc6 --- /dev/null +++ b/src/test/regress/sql/offload_entry_to_qe.sql @@ -0,0 +1,188 @@ +set optimizer = off; -- orca is currently unsupported +set enable_offload_entry_to_qe = on; + +create temp table tst(x int, y int); +create index tst_y_idx on tst(y); +insert into tst values (1, 1), (1, 1), (2, 1), (10, 10); +analyze tst; + +-- accept Aggregate, Finalize Aggregate, Limit + Finalize Aggregate, WindowAgg, Sort + Unique +explain (costs off, locus) select count(x) from tst where x > 1; +explain (costs off, locus) select count(x) from tst; +explain (costs off, locus) select count(x) from tst limit 1; +explain (costs off, locus) select count(x) over () from tst; +explain (costs off, locus) select count(x) over () from tst group by x limit 1; +explain (costs off, locus) select distinct min(x), max(x) from tst; +reset enable_offload_entry_to_qe; -- compare results with GUC set to false +select count(x) from tst where x > 1; +select count(x) from tst; +select count(x) from tst limit 1; +select count(x) over () from tst; +select count(x) over () from tst group by x limit 1; +select distinct min(x), max(x) from tst; +set enable_offload_entry_to_qe = on; +select count(x) from tst where x > 1; +select count(x) from tst; +select count(x) from tst limit 1; +select count(x) over () from tst; +select count(x) over () from tst group by x limit 1; +select distinct min(x), max(x) from tst; + +-- accept Merge Join, Nested Loop and Hash Join +set enable_seqscan = off; +set enable_nestloop = off; +set enable_mergejoin = on; +explain (costs off, locus) select * from tst t1, tst t2 where t1.y = t2.y order by t1.y; +set enable_offload_entry_to_qe = off; +select * from tst t1, tst t2 where t1.y = t2.y order by t1.y; +set enable_offload_entry_to_qe = on; +select * from tst t1, tst t2 where t1.y = t2.y order by t1.y; +reset enable_mergejoin; +reset enable_nestloop; +reset enable_seqscan; +explain (costs off, locus) select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on true; +explain (costs off, locus) select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on t1.x = v.x; +reset enable_offload_entry_to_qe; +select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on true; +select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on t1.x = v.x; +set enable_offload_entry_to_qe = on; +select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on true; +select * from (select min(x) x from tst) t1 join (values (1), (10)) v(x) on t1.x = v.x; + +-- accept InitPlan and SubPlan +explain (costs off, locus) select count(*) from tst where tst.y = (select max(y) from tst); +explain (costs off, locus) select (select max((select distinct x from tst t2 where t2.x = t1.x))) from tst t1; +reset enable_offload_entry_to_qe; +select count(*) from tst where tst.y = (select max(y) from tst); +select (select max((select distinct x from tst t2 where t2.x = t1.x))) from tst t1; +set enable_offload_entry_to_qe = on; +select count(*) from tst where tst.y = (select max(y) from tst); +select (select max((select distinct x from tst t2 where t2.x = t1.x))) from tst t1; + +-- test cte and recursive cte +explain (costs off, locus) +with t(a,b) as (select min(y), max(y) from tst), + unused(a) as (select * from tst) +select t.b, rank() over () from t; -- test Subquery on QD + +explain (costs off, locus) +with t1(a,b) as (select min(y), max(y) from tst), + t2(a,b) as (select x, count(x) over () from tst group by x) +select * from t1 join t2 on t1.a < t2.a order by t1.a, t2.a; + +create table recursive_cte_tst (id int,parentid int,score int) distributed replicated; +insert into recursive_cte_tst values(0, -1, 1); +insert into recursive_cte_tst values(1, 0, 1); +insert into recursive_cte_tst values(2, 0, 2); +insert into recursive_cte_tst values(3, 1, 10); +insert into recursive_cte_tst values(4, 1, 5); +insert into recursive_cte_tst values(5, 2, 1); +insert into recursive_cte_tst values(6, 3, 41); +insert into recursive_cte_tst values(7, 4, 42); +insert into recursive_cte_tst values(8, 5, 42); +insert into recursive_cte_tst values(9, 6, 42); +explain (locus, costs off) with recursive cte as ( + select 1 depth, a.id, a.score from recursive_cte_tst a where id = 0 + union all + select c.depth + 1, k.id, k.score from recursive_cte_tst k inner join cte c on c.id = k.parentid +) +select rank() over (order by avg(score) desc), + avg(score) +from cte group by depth order by avg desc limit 5; -- note that the SingleQE on the left side of RecursiveUnion *isn't* the same SingleQE as the right side + +reset enable_offload_entry_to_qe; +with t(a,b) as (select min(y), max(y) from tst), + unused(a) as (select * from tst) +select t.b, rank() over () from t; +with t1(a,b) as (select min(y), max(y) from tst), + t2(a,b) as (select x, count(x) over () from tst group by x) +select * from t1 join t2 on t1.a < t2.a order by t1.a, t2.a; +with recursive cte as ( + select 1 depth, a.id, a.score from recursive_cte_tst a where id = 0 + union all + select c.depth + 1, k.id, k.score from recursive_cte_tst k inner join cte c on c.id = k.parentid +) select rank() over (order by avg(score) desc), avg(score) from cte group by depth order by avg desc limit 5; + +set enable_offload_entry_to_qe = on; +with t(a,b) as (select min(y), max(y) from tst), + unused(a) as (select * from tst) +select t.b, rank() over () from t; +with t1(a,b) as (select min(y), max(y) from tst), + t2(a,b) as (select x, count(x) over () from tst group by x) +select * from t1 join t2 on t1.a < t2.a order by t1.a, t2.a; +with recursive cte as ( + select 1 depth, a.id, a.score from recursive_cte_tst a where id = 0 + union all + select c.depth + 1, k.id, k.score from recursive_cte_tst k inner join cte c on c.id = k.parentid +) select rank() over (order by avg(score) desc), avg(score) from cte group by depth order by avg desc limit 5; + +-- reject pure Limit and pure InitPlan +explain (costs off, locus) select * from tst limit 1; +create function dummyf(int) returns int as 'select 1;' language sql; +explain (costs off, locus) select min(dummyf(x)) from tst; +explain (costs off, locus) select count(*) from tst where tst.x = (select min(dummyf(x)) from tst); +reset enable_offload_entry_to_qe; +select min(dummyf(x)) from tst; +select count(*) from tst where tst.x = (select min(dummyf(x)) from tst); +set enable_offload_entry_to_qe = on; +select min(dummyf(x)) from tst; +select count(*) from tst where tst.x = (select min(dummyf(x)) from tst); + +-- reject updates +explain (costs off, locus) update tst set x = (select min(x) from tst); + +-- test functions +explain (costs off, locus) select max(x)::text || ' ' || timeofday() from tst; -- volatile +explain (costs off, locus) select max(x)::text || ' ' || now() from tst; -- stable + +-- test write functions +create function mod_dummyf(i int) returns int as $$ +begin +update tst set y = y + 1 where x = $1; +return $1; +end; +$$ language plpgsql stable; +explain (costs off, locus) select mod_dummyf(42); +select mod_dummyf(42); -- should fail + +drop function dummyf; +drop function mod_dummyf; + +-- test external table +CREATE EXTERNAL WEB TEMP TABLE tst_exttbl(LIKE tst) EXECUTE 'printf "1\t42\n"' ON COORDINATOR FORMAT 'text'; +CREATE EXTERNAL WEB TEMP TABLE tst_exttbl_all(LIKE tst) EXECUTE 'printf "2\t43\n"' ON ALL FORMAT 'text'; +explain (costs off, locus) select max(e.y) from tst_exttbl e join tst t2 on (e.x = t2.x); +explain (costs off, locus) select max(e.y) from tst_exttbl_all e join tst t2 on (e.x = t2.x); +reset enable_offload_entry_to_qe; +select max(e.y) from tst_exttbl e join tst t2 on (e.x = t2.x); +select max(e.y) from tst_exttbl_all e join tst t2 on (e.x = t2.x); +set enable_offload_entry_to_qe = on; +select max(e.y) from tst_exttbl e join tst t2 on (e.x = t2.x); +select max(e.y) from tst_exttbl_all e join tst t2 on (e.x = t2.x); + +-- test partitioned table +create temp table part(like tst) distributed by (x) partition by range (x) +( + partition part1 start (0) end (10), + partition part2 start (10) end (20) +); +insert into part select * from tst; +explain (costs off, locus) select min(y), max(y) from part; +reset enable_offload_entry_to_qe; +select min(y), max(y) from part; +set enable_offload_entry_to_qe = on; +select min(y), max(y) from part; + +-- test partitioned table with external table as partition +ALTER TABLE part EXCHANGE PARTITION part1 WITH TABLE tst_exttbl; +explain (costs off, locus) select min(y), max(y) from part; +reset enable_offload_entry_to_qe; +select min(y), max(y) from part; +set enable_offload_entry_to_qe = on; +select min(y), max(y) from part; +ALTER TABLE part EXCHANGE PARTITION part1 WITH TABLE tst_exttbl_all; +explain (costs off, locus) select min(y), max(y) from part; +reset enable_offload_entry_to_qe; +select min(y), max(y) from part; +set enable_offload_entry_to_qe = on; +select min(y), max(y) from part;