Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor some of the planning code to accommodate a new planning path for MERGE SQL #6786

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 112 additions & 41 deletions src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/merge_planner.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/distributed_planner.h"
Expand Down Expand Up @@ -68,6 +69,17 @@
#include "utils/syscache.h"


/* RouterPlanType is used to determine the router plan to invoke */
typedef enum RouterPlanType
{
INSERT_SELECT_INTO_CITUS_TABLE,
INSERT_SELECT_INTO_LOCAL_TABLE,
DML_QUERY,
SELECT_QUERY,
MERGE_QUERY,
REPLAN_WITH_BOUND_PARAMETERS
} RouterPlanType;

static List *plannerRestrictionContextList = NIL;
int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */
static uint64 NextPlanId = 1;
Expand Down Expand Up @@ -129,6 +141,9 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
static RouterPlanType GetRouterPlanType(Query *query,
Query *originalQuery,
bool hasUnresolvedParams);


/* Distributed planner hook */
Expand Down Expand Up @@ -881,6 +896,51 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
}


/*
* GetRouterPlanType checks the parse tree to return appropriate plan type.
*/
static RouterPlanType
GetRouterPlanType(Query *query, Query *originalQuery, bool hasUnresolvedParams)
{
if (!IsModifyCommand(originalQuery))
{
return SELECT_QUERY;
tejeswarm marked this conversation as resolved.
Show resolved Hide resolved
}

Oid targetRelationId = ModifyQueryResultRelationId(query);

EnsureModificationsCanRunOnRelation(targetRelationId);
EnsurePartitionTableNotReplicated(targetRelationId);

/* Check the type of modification being done */

if (InsertSelectIntoCitusTable(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return INSERT_SELECT_INTO_CITUS_TABLE;
}
else if (InsertSelectIntoLocalTable(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return INSERT_SELECT_INTO_LOCAL_TABLE;
}
else if (IsMergeQuery(originalQuery))
{
return MERGE_QUERY;
}
else
{
return DML_QUERY;
}
}


/*
* CreateDistributedPlan generates a distributed plan for a query.
* It goes through 3 steps:
Expand All @@ -898,64 +958,71 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
DistributedPlan *distributedPlan = NULL;
bool hasCtes = originalQuery->cteList != NIL;

if (IsModifyCommand(originalQuery))
{
Oid targetRelationId = ModifyQueryResultRelationId(query);

EnsureModificationsCanRunOnRelation(targetRelationId);
/* Step 1: Try router planner */

EnsurePartitionTableNotReplicated(targetRelationId);
RouterPlanType routerPlan = GetRouterPlanType(query, originalQuery,
hasUnresolvedParams);

if (InsertSelectIntoCitusTable(originalQuery))
switch (routerPlan)
{
case INSERT_SELECT_INTO_CITUS_TABLE:
{
if (hasUnresolvedParams)
{
/*
* Unresolved parameters can cause performance regressions in
* INSERT...SELECT when the partition column is a parameter
* because we don't perform any additional pruning in the executor.
*/
return NULL;
}

distributedPlan =
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext,
CreateInsertSelectPlan(planId,
originalQuery,
plannerRestrictionContext,
boundParams);
break;
}
else if (InsertSelectIntoLocalTable(originalQuery))

case INSERT_SELECT_INTO_LOCAL_TABLE:
{
if (hasUnresolvedParams)
{
/*
* Unresolved parameters can cause performance regressions in
* INSERT...SELECT when the partition column is a parameter
* because we don't perform any additional pruning in the executor.
*/
return NULL;
}
distributedPlan =
CreateInsertSelectIntoLocalTablePlan(planId, originalQuery, boundParams,
CreateInsertSelectIntoLocalTablePlan(planId,
originalQuery,
boundParams,
hasUnresolvedParams,
plannerRestrictionContext);
break;
}
else

case DML_QUERY:
{
/* modifications are always routed through the same planner/executor */
distributedPlan =
CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
break;
}
}
else
{
/*
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planning process needed to
* produce distributed query plans.
*/

distributedPlan = CreateRouterPlan(originalQuery, query,
plannerRestrictionContext);
case MERGE_QUERY:
{
distributedPlan =
CreateMergePlan(originalQuery, query, plannerRestrictionContext);
break;
}

case REPLAN_WITH_BOUND_PARAMETERS:
{
/*
* Unresolved parameters can cause performance regressions in
* INSERT...SELECT when the partition column is a parameter
* because we don't perform any additional pruning in the executor.
*/
return NULL;
}

case SELECT_QUERY:
{
/*
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planning process needed to
* produce distributed query plans.
*/
distributedPlan =
CreateRouterPlan(originalQuery, query, plannerRestrictionContext);
break;
}
}

/* the functions above always return a plan, possibly with an error */
Expand Down Expand Up @@ -996,6 +1063,8 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
boundParams);
Assert(originalQuery != NULL);

/* Step 2: Generate subplans for CTEs and complex subqueries */

/*
* Plan subqueries and CTEs that cannot be pushed down by recursively
* calling the planner and return the resulting plans to subPlanList.
Expand Down Expand Up @@ -1096,6 +1165,8 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
query->cteList = NIL;
Assert(originalQuery->cteList == NIL);

/* Step 3: Try Logical planner */

MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query,
plannerRestrictionContext);
MultiLogicalPlanOptimize(logicalPlan);
Expand Down
17 changes: 17 additions & 0 deletions src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,23 @@ static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
#endif


/*
* CreateMergePlan attempts to create a plan for the given MERGE SQL
* statement. If planning fails ->planningError is set to a description
* of the failure.
*/
DistributedPlan *
CreateMergePlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *plannerRestrictionContext)
{
/*
* For now, this is a place holder until we isolate the merge
* planning into it's own code-path.
*/
return CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
}


/*
* MergeQuerySupported does check for a MERGE command in the query, if it finds
* one, it will verify the below criteria
Expand Down
5 changes: 5 additions & 0 deletions src/include/distributed/merge_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
#include "nodes/parsenodes.h"
#include "distributed/distributed_planner.h"
#include "distributed/errormessage.h"
#include "distributed/multi_physical_planner.h"

extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte);
extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery,
bool multiShardQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
extern DistributedPlan * CreateMergePlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *
plannerRestrictionContext);

#endif /* MERGE_PLANNER_H */