Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

This implements MERGE phase-III #6696

Merged
merged 2 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 7 additions & 226 deletions src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,9 @@ static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */
int PlannerLevel = 0;

static void ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree,
List *rangeTableList);
static bool ContainsMergeCommandWalker(Node *node);
static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable);
static bool IsUpdateOrDelete(Query *query);
static bool IsUpdateOrDeleteOrMerge(Query *query);
static PlannedStmt * CreateDistributedPlannedStmt(
DistributedPlanningContext *planContext);
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
Expand Down Expand Up @@ -132,7 +129,7 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
static void ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList);


/* Distributed planner hook */
PlannedStmt *
Expand All @@ -156,7 +153,7 @@ distributed_planner(Query *parse,
* We cannot have merge command for this path as well because
* there cannot be recursively planned merge command.
*/
Assert(!ContainsMergeCommandWalker((Node *) parse));
Assert(!IsMergeQuery(parse));

needsDistributedPlanning = true;
}
Expand Down Expand Up @@ -200,12 +197,6 @@ distributed_planner(Query *parse,

if (!fastPathRouterQuery)
{
/*
* Fast path queries cannot have merge command, and we
* prevent the remaining here.
*/
ErrorIfQueryHasUnsupportedMergeCommand(parse, rangeTableList);

/*
* When there are partitioned tables (not applicable to fast path),
* pretend that they are regular tables to avoid unnecessary work
Expand Down Expand Up @@ -304,72 +295,6 @@ distributed_planner(Query *parse,
}


/*
* ErrorIfQueryHasUnsupportedMergeCommand walks over the query tree and bails out
* if there is no Merge command (e.g., CMD_MERGE) in the query tree. For merge,
* looks for all supported combinations, throws an exception if any violations
* are seen.
*/
static void
ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree, List *rangeTableList)
{
/*
* Postgres currently doesn't support Merge queries inside subqueries and
* ctes, but lets be defensive and do query tree walk anyway.
*
* We do not call this path for fast-path queries to avoid this additional
* overhead.
*/
if (!ContainsMergeCommandWalker((Node *) queryTree))
{
/* No MERGE found */
return;
}


/*
* In Citus we have limited support for MERGE, it's allowed
* only if all the tables(target, source or any CTE) tables
* are are local i.e. a combination of Citus local and Non-Citus
* tables (regular Postgres tables).
*/
ErrorIfMergeHasUnsupportedTables(queryTree, rangeTableList);
}


/*
* ContainsMergeCommandWalker walks over the node and finds if there are any
* Merge command (e.g., CMD_MERGE) in the node.
*/
static bool
ContainsMergeCommandWalker(Node *node)
{
#if PG_VERSION_NUM < PG_VERSION_15
return false;
#endif

if (node == NULL)
{
return false;
}

if (IsA(node, Query))
{
Query *query = (Query *) node;
if (IsMergeQuery(query))
{
return true;
}

return query_tree_walker((Query *) node, ContainsMergeCommandWalker, NULL, 0);
}

return expression_tree_walker(node, ContainsMergeCommandWalker, NULL);

return false;
}


/*
* ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker.
* The function traverses the input query and returns all the range table
Expand Down Expand Up @@ -673,10 +598,11 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan)
* IsUpdateOrDelete returns true if the query performs an update or delete.
*/
bool
IsUpdateOrDelete(Query *query)
IsUpdateOrDeleteOrMerge(Query *query)
{
return query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE;
query->commandType == CMD_DELETE ||
query->commandType == CMD_MERGE;
}


Expand Down Expand Up @@ -850,7 +776,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
* if it is planned as a multi shard modify query.
*/
if ((distributedPlan->planningError ||
(IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan(
(IsUpdateOrDeleteOrMerge(planContext->originalQuery) && IsMultiTaskPlan(
distributedPlan))) &&
hasUnresolvedParams)
{
Expand Down Expand Up @@ -2611,148 +2537,3 @@ WarnIfListHasForeignDistributedTable(List *rangeTableList)
}
}
}


/*
* IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is
* permitted on special relations, such as materialized view, returns true only if
* it's a "source" relation.
*/
bool
IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte)
{
if (!IsMergeQuery(parse))
{
return false;
}

RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable);

/* Is it a target relation? */
if (targetRte->relid == rte->relid)
{
return false;
}

return true;
}


/*
* ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE
* present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus
* tables (regular Postgres tables), raises an exception for all other combinations.
*/
static void
ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList)
{
ListCell *tableCell = NULL;

foreach(tableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(tableCell);
Oid relationId = rangeTableEntry->relid;

switch (rangeTableEntry->rtekind)
{
case RTE_RELATION:
{
/* Check the relation type */
break;
}

case RTE_SUBQUERY:
case RTE_FUNCTION:
case RTE_TABLEFUNC:
case RTE_VALUES:
case RTE_JOIN:
case RTE_CTE:
{
/* Skip them as base table(s) will be checked */
continue;
}

/*
* RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations,
* such as, trigger data; until we find a genuine use case, raise an
* exception.
* RTE_RESULT is a node added by the planner and we shouldn't
* encounter it in the parse tree.
*/
case RTE_NAMEDTUPLESTORE:
case RTE_RESULT:
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not supported with "
"Tuplestores and results")));
break;
}

default:
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command: Unrecognized range table entry.")));
}
}

/* RTE Relation can be of various types, check them now */

/* skip the regular views as they are replaced with subqueries */
if (rangeTableEntry->relkind == RELKIND_VIEW)
{
continue;
}

if (rangeTableEntry->relkind == RELKIND_MATVIEW ||
rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE)
{
/* Materialized view or Foreign table as target is not allowed */
if (IsMergeAllowedOnRelation(parse, rangeTableEntry))
{
/* Non target relation is ok */
continue;
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not allowed "
"on materialized view")));
}
}

if (rangeTableEntry->relkind != RELKIND_RELATION &&
rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Unexpected relation type(relkind:%c) in MERGE command",
rangeTableEntry->relkind)));
}

Assert(rangeTableEntry->relid != 0);

/* Distributed tables and Reference tables are not supported yet */
if (IsCitusTableType(relationId, REFERENCE_TABLE) ||
IsCitusTableType(relationId, DISTRIBUTED_TABLE))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not supported on "
"distributed/reference tables yet")));
}

/* Regular Postgres tables and Citus local tables are allowed */
if (!IsCitusTable(relationId) ||
IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
continue;
}


/* Any other Citus table type missing ? */
}

/* All the tables are local, supported */
}
5 changes: 3 additions & 2 deletions src/backend/distributed/planner/fast_path_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@
bool EnableFastPathRouterPlanner = true;

static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
static bool ConjunctionContainsColumnFilter(Node *node, Var *column,
Node **distributionKeyValue);
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
Node **distributionKeyValue);
static bool ConjunctionContainsColumnFilter(Node *node,
Var *column,
Node **distributionKeyValue);


/*
Expand Down
Loading