Skip to content

Commit

Permalink
Implement latest set of comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tejeswarm committed Mar 11, 2023
1 parent b409abb commit d75fab3
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 206 deletions.
41 changes: 4 additions & 37 deletions src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ int PlannerLevel = 0;

static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable);
static bool IsUpdateOrDelete(Query *query);
static bool IsUpdateOrDeleteOrMerge(Query *query);
static PlannedStmt * CreateDistributedPlannedStmt(
DistributedPlanningContext *planContext);
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
Expand Down Expand Up @@ -153,7 +153,7 @@ distributed_planner(Query *parse,
* We cannot have merge command for this path as well because
* there cannot be recursively planned merge command.
*/
Assert(!ContainsMergeCommandWalker((Node *) parse));
Assert(!IsMergeQuery(parse));

needsDistributedPlanning = true;
}
Expand Down Expand Up @@ -295,39 +295,6 @@ distributed_planner(Query *parse,
}


/*
* ContainsMergeCommandWalker walks over the node and finds if there are any
* Merge command (e.g., CMD_MERGE) in the node.
*/
bool
ContainsMergeCommandWalker(Node *node)
{
#if PG_VERSION_NUM < PG_VERSION_15
return false;
#endif

if (node == NULL)
{
return false;
}

if (IsA(node, Query))
{
Query *query = (Query *) node;
if (IsMergeQuery(query))
{
return true;
}

return query_tree_walker((Query *) node, ContainsMergeCommandWalker, NULL, 0);
}

return expression_tree_walker(node, ContainsMergeCommandWalker, NULL);

return false;
}


/*
* ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker.
* The function traverses the input query and returns all the range table
Expand Down Expand Up @@ -631,7 +598,7 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan)
* IsUpdateOrDelete returns true if the query performs an update or delete.
*/
bool
IsUpdateOrDelete(Query *query)
IsUpdateOrDeleteOrMerge(Query *query)
{
return query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE ||
Expand Down Expand Up @@ -809,7 +776,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
* if it is planned as a multi shard modify query.
*/
if ((distributedPlan->planningError ||
(IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan(
(IsUpdateOrDeleteOrMerge(planContext->originalQuery) && IsMultiTaskPlan(
distributedPlan))) &&
hasUnresolvedParams)
{
Expand Down
179 changes: 86 additions & 93 deletions src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@

#if PG_VERSION_NUM >= PG_VERSION_15

static bool CheckIfRTETypeIsUnsupported(Query *parse,
RangeTblEntry *rangeTableEntry,
DeferredErrorMessage **returnMessage);
static DeferredErrorMessage * ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse,
List *
distTablesList,
PlannerRestrictionContext
*
plannerRestrictionContext);
static bool QueryHasMergeCommand(Query *queryTree);
static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Query *parse,
List *rangeTableList,
PlannerRestrictionContext *
Expand Down Expand Up @@ -69,7 +71,7 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
#else

/* For non-MERGE commands it's a no-op */
if (!QueryHasMergeCommand(originalQuery))
if (!IsMergeQuery(originalQuery))
{
return NULL;
}
Expand Down Expand Up @@ -218,16 +220,80 @@ ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse, List *distTablesLis
"must be colocated", NULL, NULL);
}

/* Are source and target tables joined on distribution column? */
if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext))
return NULL;
}


/*
* ErrorIfRTETypeIsUnsupported Checks for types of tables that are not supported, such
* as, reference tables, append-distributed tables and materialized view as target relation.
* Routine returns true for the supported types, and false for everything else, only for
* unsupported types it fills the appropriate error message in the parameter passed.
*/
static bool
CheckIfRTETypeIsUnsupported(Query *parse,
RangeTblEntry *rangeTableEntry,
DeferredErrorMessage **returnMessage)
{
/* skip the regular views as they are replaced with subqueries */
if (rangeTableEntry->relkind == RELKIND_VIEW)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"MERGE command is only supported when distributed "
"tables are joined on their distribution column",
NULL, NULL);
return true;
}

return NULL;
if (rangeTableEntry->relkind == RELKIND_MATVIEW ||
rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE)
{
/* Materialized view or Foreign table as target is not allowed */
if (IsMergeAllowedOnRelation(parse, rangeTableEntry))
{
/* Non target relation is ok */
return true;
}
else
{
/* Usually we don't reach this exception as the Postgres parser catches it */
StringInfo errorMessage = makeStringInfo();
appendStringInfo(errorMessage, "MERGE command is not allowed on "
"relation type(relkind:%c)",
rangeTableEntry->relkind);
*returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL, NULL);
}
}

if (rangeTableEntry->relkind != RELKIND_RELATION &&
rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE)
{
StringInfo errorMessage = makeStringInfo();
appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) "
"in MERGE command", rangeTableEntry->relkind);
*returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL, NULL);
}

Assert(rangeTableEntry->relid != 0);

/* Reference tables are not supported yet */
if (IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE))
{
*returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"MERGE command is not supported on reference "
"tables yet", NULL, NULL);
}

/* Append/Range tables are not supported */
if (IsCitusTableType(rangeTableEntry->relid, APPEND_DISTRIBUTED) ||
IsCitusTableType(rangeTableEntry->relid, RANGE_DISTRIBUTED))
{
*returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"For MERGE command, all the distributed tables "
"must be colocated, for append/range distribution, "
"colocation is not supported", NULL,
"Consider using hash distribution instead");
}

return false;
}


Expand Down Expand Up @@ -294,65 +360,19 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
}

/* RTE Relation can be of various types, check them now */

/* skip the regular views as they are replaced with subqueries */
if (rangeTableEntry->relkind == RELKIND_VIEW)
DeferredErrorMessage *errorMessage = NULL;
if (CheckIfRTETypeIsUnsupported(parse, rangeTableEntry, &errorMessage))
{
continue;
}

if (rangeTableEntry->relkind == RELKIND_MATVIEW ||
rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE)
else
{
/* Materialized view or Foreign table as target is not allowed */
if (IsMergeAllowedOnRelation(parse, rangeTableEntry))
{
/* Non target relation is ok */
continue;
}
else
if (errorMessage)
{
/* Usually we don't reach this exception as the Postgres parser catches it */
StringInfo errorMessage = makeStringInfo();
appendStringInfo(errorMessage,
"MERGE command is not allowed on "
"relation type(relkind:%c)", rangeTableEntry->relkind);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data,
NULL, NULL);
return errorMessage;
}
}

if (rangeTableEntry->relkind != RELKIND_RELATION &&
rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE)
{
StringInfo errorMessage = makeStringInfo();
appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) "
"in MERGE command", rangeTableEntry->relkind);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data,
NULL, NULL);
}

Assert(rangeTableEntry->relid != 0);

/* Reference tables are not supported yet */
if (IsCitusTableType(relationId, REFERENCE_TABLE))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"MERGE command is not supported on reference "
"tables yet", NULL, NULL);
}

/* Append/Range tables are not supported */
if (IsCitusTableType(relationId, APPEND_DISTRIBUTED) ||
IsCitusTableType(relationId, RANGE_DISTRIBUTED))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"For MERGE command, all the distributed tables "
"must be colocated, for append/range distribution, "
"colocation is not supported", NULL,
"Consider using hash distribution instead");
}

/*
* For now, save all distributed tables, later (below) we will
* check for supported combination(s).
Expand Down Expand Up @@ -389,35 +409,12 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
}

/* Ensure all distributed tables are indeed co-located and joined on distribution column */
return ErrorIfTablesNotColocatedAndJoinedOnDistColumn(parse, distTablesList,
return ErrorIfTablesNotColocatedAndJoinedOnDistColumn(parse,
distTablesList,
restrictionContext);
}


/*
* QueryHasMergeCommand walks over the query tree and returns false if there
* is no Merge command (e.g., CMD_MERGE), true otherwise.
*/
static bool
QueryHasMergeCommand(Query *queryTree)
{
/*
* Postgres currently doesn't support Merge queries inside subqueries and
* ctes, but lets be defensive and do query tree walk anyway.
*
* We do not call this path for fast-path queries to avoid this additional
* overhead.
*/
if (!ContainsMergeCommandWalker((Node *) queryTree))
{
/* No MERGE found */
return false;
}

return true;
}


/*
* IsPartitionColumnInMerge returns true if the given column is a partition column.
* The function uses FindReferencedTableColumn to find the original relation
Expand Down Expand Up @@ -461,7 +458,8 @@ IsPartitionColumnInMergeSource(Expr *columnExpression, Query *query, bool skipOu
* value into the target which is not from the source table, if so, it
* raises an exception.
* Note: Inserting random values other than the joined column values will
* result in unexpected behaviour of rows ending up in incorrect shards.
* result in unexpected behaviour of rows ending up in incorrect shards, to
* prevent such mishaps, we disallow such inserts here.
*/
static DeferredErrorMessage *
InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)
Expand All @@ -475,7 +473,7 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)
MergeAction *action = NULL;
foreach_ptr(action, query->mergeActionList)
{
/* Skip MATCHED clauses */
/* Skip MATCHED clause as INSERTS are not allowed in it*/
if (action->matched)
{
continue;
Expand All @@ -501,11 +499,6 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)
TargetEntry *targetEntry = NULL;
foreach_ptr(targetEntry, action->targetList)
{
if (targetEntry->resjunk)
{
continue;
}

AttrNumber originalAttrNo = targetEntry->resno;

/* skip processing of target table non-partition columns */
Expand All @@ -516,7 +509,7 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)

foundDistributionColumn = true;

if (targetEntry->expr->type == T_Var)
if (IsA(targetEntry->expr, Var))
{
if (IsPartitionColumnInMergeSource(targetEntry->expr, query, true))
{
Expand Down
5 changes: 3 additions & 2 deletions src/backend/distributed/planner/multi_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1310,8 +1310,9 @@ MultiShardUpdateDeleteSupported(Query *originalQuery,
}
else
{
errorMessage = DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
plannerRestrictionContext);
errorMessage = DeferErrorIfUnsupportedSubqueryPushdown(
originalQuery,
plannerRestrictionContext);
}

return errorMessage;
Expand Down
12 changes: 9 additions & 3 deletions src/backend/distributed/planner/query_pushdown_planning.c
Original file line number Diff line number Diff line change
Expand Up @@ -591,10 +591,16 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
}
else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext))
{
StringInfo errorMessage = makeStringInfo();
bool isMergeCmd = IsMergeQuery(originalQuery);
appendStringInfo(errorMessage,
"%s"
"only supported when all distributed tables are "
"co-located and joined on their distribution columns",
isMergeCmd ? "MERGE command is " : "complex joins are ");

return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"complex joins are only supported when all distributed tables are "
"co-located and joined on their distribution columns",
NULL, NULL);
errorMessage->data, NULL, NULL);
}

/* we shouldn't allow reference tables in the FROM clause when the query has sublinks */
Expand Down
15 changes: 15 additions & 0 deletions src/backend/distributed/utils/citus_clauses.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ RequiresCoordinatorEvaluation(Query *query)
return false;
}

/*
* Currently all MERGE queries are routed to the workers, other
* than aggregating the tuple result, we shouldn't expect any
* evaluation at the coordinator. Moreover, the routine relies on
* the logic that any function present in the entire query tree might
* need a coordinator evalution, but this logic doesn't fully work
* for MERGE which has multiple query-statements in merge-actions.
* A function present in INSERT/UPDATE/DELETE will gets evaluated
* at the worker and not at the coordinator.
*/
if (IsMergeQuery(query))
{
return false;
}

return FindNodeMatchingCheckFunction((Node *) query, CitusIsMutableFunction);
}

Expand Down
1 change: 0 additions & 1 deletion src/include/distributed/distributed_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,5 @@ extern struct DistributedPlan * CreateDistributedPlan(uint64 planId,
bool hasUnresolvedParams,
PlannerRestrictionContext *
plannerRestrictionContext);
extern bool ContainsMergeCommandWalker(Node *node);

#endif /* DISTRIBUTED_PLANNER_H */
Loading

0 comments on commit d75fab3

Please sign in to comment.