Skip to content

Commit

Permalink
Refactor some of the planning code to accomodate a new planning path …
Browse files Browse the repository at this point in the history
…for MERGE SQL
  • Loading branch information
tejeswarm committed Mar 22, 2023
1 parent e1f1d63 commit da7db53
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 41 deletions.
153 changes: 112 additions & 41 deletions src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/merge_planner.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/distributed_planner.h"
Expand Down Expand Up @@ -68,6 +69,17 @@
#include "utils/syscache.h"


/* RouterPlanType is used to determine the router plan to invoke */
typedef enum RouterPlanType
{
INSERT_SELECT_INTO_CITUS_TABLE,
INSERT_SELECT_INTO_LOCAL_TABLE,
DML_QUERY,
SELECT_QUERY,
MERGE_QUERY,
REPLAN_WITH_BOUND_PARAMETERS
} RouterPlanType;

static List *plannerRestrictionContextList = NIL;
int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */
static uint64 NextPlanId = 1;
Expand Down Expand Up @@ -129,6 +141,9 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
static RouterPlanType GetRouterPlanType(Query *query,
Query *originalQuery,
bool hasUnresolvedParams);


/* Distributed planner hook */
Expand Down Expand Up @@ -881,6 +896,51 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
}


/*
* GetRouterPlanType checks the parse tree to return appropriate plan type.
*/
static RouterPlanType
GetRouterPlanType(Query *query, Query *originalQuery, bool hasUnresolvedParams)
{
if (!IsModifyCommand(originalQuery))
{
return SELECT_QUERY;
}

Oid targetRelationId = ModifyQueryResultRelationId(query);

EnsureModificationsCanRunOnRelation(targetRelationId);
EnsurePartitionTableNotReplicated(targetRelationId);

/* Check the type of modification being done */

if (InsertSelectIntoCitusTable(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return INSERT_SELECT_INTO_CITUS_TABLE;
}
else if (InsertSelectIntoLocalTable(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return INSERT_SELECT_INTO_LOCAL_TABLE;
}
else if (IsMergeQuery(originalQuery))
{
return MERGE_QUERY;
}
else
{
return DML_QUERY;
}
}


/*
* CreateDistributedPlan generates a distributed plan for a query.
* It goes through 3 steps:
Expand All @@ -898,64 +958,71 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
DistributedPlan *distributedPlan = NULL;
bool hasCtes = originalQuery->cteList != NIL;

if (IsModifyCommand(originalQuery))
{
Oid targetRelationId = ModifyQueryResultRelationId(query);

EnsureModificationsCanRunOnRelation(targetRelationId);
/* Step 1: Try router planner */

EnsurePartitionTableNotReplicated(targetRelationId);
RouterPlanType routerPlan = GetRouterPlanType(query, originalQuery,
hasUnresolvedParams);

if (InsertSelectIntoCitusTable(originalQuery))
switch (routerPlan)
{
case INSERT_SELECT_INTO_CITUS_TABLE:
{
if (hasUnresolvedParams)
{
/*
* Unresolved parameters can cause performance regressions in
* INSERT...SELECT when the partition column is a parameter
* because we don't perform any additional pruning in the executor.
*/
return NULL;
}

distributedPlan =
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext,
CreateInsertSelectPlan(planId,
originalQuery,
plannerRestrictionContext,
boundParams);
break;
}
else if (InsertSelectIntoLocalTable(originalQuery))

case INSERT_SELECT_INTO_LOCAL_TABLE:
{
if (hasUnresolvedParams)
{
/*
* Unresolved parameters can cause performance regressions in
* INSERT...SELECT when the partition column is a parameter
* because we don't perform any additional pruning in the executor.
*/
return NULL;
}
distributedPlan =
CreateInsertSelectIntoLocalTablePlan(planId, originalQuery, boundParams,
CreateInsertSelectIntoLocalTablePlan(planId,
originalQuery,
boundParams,
hasUnresolvedParams,
plannerRestrictionContext);
break;
}
else

case DML_QUERY:
{
/* modifications are always routed through the same planner/executor */
distributedPlan =
CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
break;
}
}
else
{
/*
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planning process needed to
* produce distributed query plans.
*/

distributedPlan = CreateRouterPlan(originalQuery, query,
plannerRestrictionContext);
case MERGE_QUERY:
{
distributedPlan =
CreateMergePlan(originalQuery, query, plannerRestrictionContext);
break;
}

case REPLAN_WITH_BOUND_PARAMETERS:
{
/*
* Unresolved parameters can cause performance regressions in
* INSERT...SELECT when the partition column is a parameter
* because we don't perform any additional pruning in the executor.
*/
return NULL;
}

case SELECT_QUERY:
{
/*
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planning process needed to
* produce distributed query plans.
*/
distributedPlan =
CreateRouterPlan(originalQuery, query, plannerRestrictionContext);
break;
}
}

/* the functions above always return a plan, possibly with an error */
Expand Down Expand Up @@ -996,6 +1063,8 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
boundParams);
Assert(originalQuery != NULL);

/* Step 2: Generate subplans for CTEs and complex subqueries */

/*
* Plan subqueries and CTEs that cannot be pushed down by recursively
* calling the planner and return the resulting plans to subPlanList.
Expand Down Expand Up @@ -1096,6 +1165,8 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
query->cteList = NIL;
Assert(originalQuery->cteList == NIL);

/* Step 3: Try Logical planner */

MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query,
plannerRestrictionContext);
MultiLogicalPlanOptimize(logicalPlan);
Expand Down
17 changes: 17 additions & 0 deletions src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,23 @@ static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
#endif


/*
* CreateMergePlan attempts to create a plan for the given MERGE SQL
* statement. If planning fails ->planningError is set to a description
* of the failure.
*/
DistributedPlan *
CreateMergePlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *plannerRestrictionContext)
{
/*
* For now, this is a place holder until we isolate the merge
* planning into it's own code-path.
*/
return CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
}


/*
* MergeQuerySupported does check for a MERGE command in the query, if it finds
* one, it will verify the below criteria
Expand Down
5 changes: 5 additions & 0 deletions src/include/distributed/merge_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
#include "nodes/parsenodes.h"
#include "distributed/distributed_planner.h"
#include "distributed/errormessage.h"
#include "distributed/multi_physical_planner.h"

extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte);
extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery,
bool multiShardQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
extern DistributedPlan * CreateMergePlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *
plannerRestrictionContext);

#endif /* MERGE_PLANNER_H */

0 comments on commit da7db53

Please sign in to comment.