From 30309aca0f84b49b6d243287cd6e975c8506e6c1 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Tue, 21 Mar 2023 14:45:03 -0700 Subject: [PATCH] Refactor some of the planning code to accomodate a new planning path for MERGE SQL --- .../distributed/planner/distributed_planner.c | 153 +++++++++++++----- .../distributed/planner/merge_planner.c | 17 ++ src/include/distributed/merge_planner.h | 5 + 3 files changed, 134 insertions(+), 41 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 866f7353a12..eb9e2178690 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -34,6 +34,7 @@ #include "distributed/intermediate_results.h" #include "distributed/listutils.h" #include "distributed/coordinator_protocol.h" +#include "distributed/merge_planner.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/distributed_planner.h" @@ -68,6 +69,17 @@ #include "utils/syscache.h" +/* RouterPlanType is used to determine the router plan to invoke */ +typedef enum RouterPlanType +{ + INSERT_SELECT_INTO_CITUS_TABLE, + INSERT_SELECT_INTO_LOCAL_TABLE, + DML_QUERY, + SELECT_QUERY, + MERGE_QUERY, + REPLAN_WITH_BOUND_PARAMETERS +} RouterPlanType; + static List *plannerRestrictionContextList = NIL; int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */ static uint64 NextPlanId = 1; @@ -129,6 +141,9 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext static RTEListProperties * GetRTEListProperties(List *rangeTableList); static List * TranslatedVars(PlannerInfo *root, int relationIndex); static void WarnIfListHasForeignDistributedTable(List *rangeTableList); +static RouterPlanType GetRouterPlanType(Query *query, + Query *originalQuery, + bool hasUnresolvedParams); /* Distributed planner hook */ @@ -881,6 +896,51 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan, } +/* + * GetRouterPlanType checks the parse tree to return appropriate plan type. + */ +static RouterPlanType +GetRouterPlanType(Query *query, Query *originalQuery, bool hasUnresolvedParams) +{ + if (!IsModifyCommand(originalQuery)) + { + return SELECT_QUERY; + } + + Oid targetRelationId = ModifyQueryResultRelationId(query); + + EnsureModificationsCanRunOnRelation(targetRelationId); + EnsurePartitionTableNotReplicated(targetRelationId); + + /* Check the type of modification being done */ + + if (InsertSelectIntoCitusTable(originalQuery)) + { + if (hasUnresolvedParams) + { + return REPLAN_WITH_BOUND_PARAMETERS; + } + return INSERT_SELECT_INTO_CITUS_TABLE; + } + else if (InsertSelectIntoLocalTable(originalQuery)) + { + if (hasUnresolvedParams) + { + return REPLAN_WITH_BOUND_PARAMETERS; + } + return INSERT_SELECT_INTO_LOCAL_TABLE; + } + else if (IsMergeQuery(originalQuery)) + { + return MERGE_QUERY; + } + else + { + return DML_QUERY; + } +} + + /* * CreateDistributedPlan generates a distributed plan for a query. * It goes through 3 steps: @@ -898,64 +958,71 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina DistributedPlan *distributedPlan = NULL; bool hasCtes = originalQuery->cteList != NIL; - if (IsModifyCommand(originalQuery)) - { - Oid targetRelationId = ModifyQueryResultRelationId(query); - - EnsureModificationsCanRunOnRelation(targetRelationId); + /* Step 1: Try router planner */ - EnsurePartitionTableNotReplicated(targetRelationId); + RouterPlanType routerPlan = GetRouterPlanType(query, originalQuery, + hasUnresolvedParams); - if (InsertSelectIntoCitusTable(originalQuery)) + switch (routerPlan) + { + case INSERT_SELECT_INTO_CITUS_TABLE: { - if (hasUnresolvedParams) - { - /* - * Unresolved parameters can cause performance regressions in - * INSERT...SELECT when the partition column is a parameter - * because we don't perform any additional pruning in the executor. - */ - return NULL; - } - distributedPlan = - CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext, + CreateInsertSelectPlan(planId, + originalQuery, + plannerRestrictionContext, boundParams); + break; } - else if (InsertSelectIntoLocalTable(originalQuery)) + + case INSERT_SELECT_INTO_LOCAL_TABLE: { - if (hasUnresolvedParams) - { - /* - * Unresolved parameters can cause performance regressions in - * INSERT...SELECT when the partition column is a parameter - * because we don't perform any additional pruning in the executor. - */ - return NULL; - } distributedPlan = - CreateInsertSelectIntoLocalTablePlan(planId, originalQuery, boundParams, + CreateInsertSelectIntoLocalTablePlan(planId, + originalQuery, + boundParams, hasUnresolvedParams, plannerRestrictionContext); + break; } - else + + case DML_QUERY: { /* modifications are always routed through the same planner/executor */ distributedPlan = CreateModifyPlan(originalQuery, query, plannerRestrictionContext); + break; } - } - else - { - /* - * For select queries we, if router executor is enabled, first try to - * plan the query as a router query. If not supported, otherwise try - * the full blown plan/optimize/physical planning process needed to - * produce distributed query plans. - */ - distributedPlan = CreateRouterPlan(originalQuery, query, - plannerRestrictionContext); + case MERGE_QUERY: + { + distributedPlan = + CreateMergePlan(originalQuery, query, plannerRestrictionContext); + break; + } + + case REPLAN_WITH_BOUND_PARAMETERS: + { + /* + * Unresolved parameters can cause performance regressions in + * INSERT...SELECT when the partition column is a parameter + * because we don't perform any additional pruning in the executor. + */ + return NULL; + } + + case SELECT_QUERY: + { + /* + * For select queries we, if router executor is enabled, first try to + * plan the query as a router query. If not supported, otherwise try + * the full blown plan/optimize/physical planning process needed to + * produce distributed query plans. + */ + distributedPlan = + CreateRouterPlan(originalQuery, query, plannerRestrictionContext); + break; + } } /* the functions above always return a plan, possibly with an error */ @@ -996,6 +1063,8 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina boundParams); Assert(originalQuery != NULL); + /* Step 2: Generate subplans for CTEs and complex subqueries */ + /* * Plan subqueries and CTEs that cannot be pushed down by recursively * calling the planner and return the resulting plans to subPlanList. @@ -1096,6 +1165,8 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina query->cteList = NIL; Assert(originalQuery->cteList == NIL); + /* Step 3: Try Logical planner */ + MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query, plannerRestrictionContext); MultiLogicalPlanOptimize(logicalPlan); diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 4839d5725f6..46a2484bdcd 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -54,6 +54,23 @@ static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid #endif +/* + * CreateMergePlan attempts to create a plan for the given MERGE SQL + * statement. If planning fails ->planningError is set to a description + * of the failure. + */ +DistributedPlan * +CreateMergePlan(Query *originalQuery, Query *query, + PlannerRestrictionContext *plannerRestrictionContext) +{ + /* + * For now, this is a place holder until we isolate the merge + * planning into it's own code-path. + */ + return CreateModifyPlan(originalQuery, query, plannerRestrictionContext); +} + + /* * MergeQuerySupported does check for a MERGE command in the query, if it finds * one, it will verify the below criteria diff --git a/src/include/distributed/merge_planner.h b/src/include/distributed/merge_planner.h index 243be14d014..158f26861e9 100644 --- a/src/include/distributed/merge_planner.h +++ b/src/include/distributed/merge_planner.h @@ -17,10 +17,15 @@ #include "nodes/parsenodes.h" #include "distributed/distributed_planner.h" #include "distributed/errormessage.h" +#include "distributed/multi_physical_planner.h" extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte); extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, bool multiShardQuery, PlannerRestrictionContext * plannerRestrictionContext); +extern DistributedPlan * CreateMergePlan(Query *originalQuery, Query *query, + PlannerRestrictionContext * + plannerRestrictionContext); + #endif /* MERGE_PLANNER_H */