Skip to content

Commit

Permalink
Make ConjunctionContainsColumnFilter() static again, and rearrange th…
Browse files Browse the repository at this point in the history
…e code in MergeQuerySupported()

Minor: Restore the original format in the comments section.
  • Loading branch information
tejeswarm committed Mar 1, 2023
1 parent e230fb3 commit c0e8912
Show file tree
Hide file tree
Showing 18 changed files with 728 additions and 110 deletions.
5 changes: 4 additions & 1 deletion src/backend/distributed/planner/fast_path_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ bool EnableFastPathRouterPlanner = true;
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
Node **distributionKeyValue);
static bool ConjunctionContainsColumnFilter(Node *node,
Var *column,
Node **distributionKeyValue);


/*
Expand Down Expand Up @@ -292,7 +295,7 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey)
*
* If the conjuction contains column filter which is const, distributionKeyValue is set.
*/
bool
static bool
ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue)
{
if (node == NULL)
Expand Down
244 changes: 203 additions & 41 deletions src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,41 @@
#include <stddef.h>

#include "postgres.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/parsetree.h"
#include "utils/lsyscache.h"

#include "distributed/pg_version_constants.h"
#include "distributed/citus_clauses.h"
#include "distributed/listutils.h"
#include "distributed/merge_planner.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_router_planner.h"
#include "distributed/listutils.h"
#include "distributed/pg_version_constants.h"


#if PG_VERSION_NUM >= PG_VERSION_15

static DeferredErrorMessage * ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse,
List *
distTablesList,
PlannerRestrictionContext
*
plannerRestrictionContext);
static bool QueryHasMergeCommand(Query *queryTree);
static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Query *parse,
List *rangeTableList,
PlannerRestrictionContext *
restrictionContext);
static DeferredErrorMessage * ErrorIfDistTablesNotColocated(Query *parse,
List *distTablesList,
PlannerRestrictionContext *
plannerRestrictionContext);
static bool IsPartitionColumnInMergeSource(Expr *columnExpression, Query *query, bool
skipOuterVars);
#if PG_VERSION_NUM >= PG_VERSION_15
static DeferredErrorMessage * InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte);
skipOuterVars);
static DeferredErrorMessage * InsertPartitionColumnMatchesSource(Query *query,
RangeTblEntry *resultRte);

static DeferredErrorMessage * MergeActionListSupported(Oid resultRelationId,
FromExpr *joinTree, Node *quals,
List *targetList,
CmdType commandType);
#endif


Expand All @@ -48,6 +61,13 @@ DeferredErrorMessage *
MergeQuerySupported(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
/* function is void for pre-15 versions of Postgres */
#if PG_VERSION_NUM < PG_VERSION_15

return NULL;

#else

/* For non-MERGE commands it's a no-op */
if (!QueryHasMergeCommand(originalQuery))
{
Expand All @@ -71,24 +91,22 @@ MergeQuerySupported(Query *originalQuery,
plannerRestrictionContext);
if (deferredError)
{
return deferredError;
/* MERGE's unsupported combination, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}

Oid resultRelationId = resultRte->relid;
deferredError =
TargetlistAndFunctionsSupported(resultRelationId,
originalQuery->jointree,
originalQuery->jointree->quals,
originalQuery->targetList,
originalQuery->commandType,
originalQuery->returningList);
MergeActionListSupported(resultRelationId,
originalQuery->jointree,
originalQuery->jointree->quals,
originalQuery->targetList,
originalQuery->commandType);
if (deferredError)
{
return deferredError;
}

#if PG_VERSION_NUM >= PG_VERSION_15

/*
* MERGE is a special case where we have multiple modify statements
* within itself. Check each INSERT/UPDATE/DELETE individually.
Expand All @@ -98,30 +116,32 @@ MergeQuerySupported(Query *originalQuery,
{
Assert(originalQuery->returningList == NULL);
deferredError =
TargetlistAndFunctionsSupported(resultRelationId,
originalQuery->jointree,
action->qual,
action->targetList,
action->commandType,
originalQuery->returningList);
MergeActionListSupported(resultRelationId,
originalQuery->jointree,
action->qual,
action->targetList,
action->commandType);
if (deferredError)
{
return deferredError;
/* MERGE's unsupported scenario, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}
}

deferredError =
InsertPartitionColumnMatchesSource(originalQuery, resultRte);
if (deferredError)
{
return deferredError;
/* MERGE's unsupported scenario, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}

#endif

return NULL;

#endif
}


/*
* IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is
* permitted on special relations, such as materialized view, returns true only if
Expand All @@ -147,6 +167,8 @@ IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte)
}


#if PG_VERSION_NUM >= PG_VERSION_15

/*
* ErrorIfDistTablesNotColocated Checks to see if
*
Expand All @@ -158,8 +180,9 @@ IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte)
* If any of the conditions are not met, it raises an exception.
*/
static DeferredErrorMessage *
ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList,
PlannerRestrictionContext *plannerRestrictionContext)
ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse, List *distTablesList,
PlannerRestrictionContext *
plannerRestrictionContext)
{
/* All MERGE tables must be distributed */
if (list_length(distTablesList) < 2)
Expand All @@ -170,7 +193,7 @@ ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList,
}

/* All distributed tables must be colocated */
if (!AllRelationsInListColocated(distTablesList, RANGETABLE_ENTRY))
if (!AllRelationsInRTEListColocated(distTablesList))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"For MERGE command, all the distributed tables "
Expand Down Expand Up @@ -347,8 +370,9 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
NULL, NULL);
}

/* Ensure all distributed tables are indeed co-located */
return ErrorIfDistTablesNotColocated(parse, distTablesList, restrictionContext);
/* Ensure all distributed tables are indeed co-located and joined on distribution column */
return ErrorIfTablesNotColocatedAndJoinedOnDistColumn(parse, distTablesList,
restrictionContext);
}


Expand All @@ -359,11 +383,6 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
static bool
QueryHasMergeCommand(Query *queryTree)
{
/* function is void for pre-15 versions of Postgres */
#if PG_VERSION_NUM < PG_VERSION_15
return false;
#else

/*
* Postgres currently doesn't support Merge queries inside subqueries and
* ctes, but lets be defensive and do query tree walk anyway.
Expand All @@ -378,7 +397,6 @@ QueryHasMergeCommand(Query *queryTree)
}

return true;
#endif
}


Expand Down Expand Up @@ -420,8 +438,6 @@ IsPartitionColumnInMergeSource(Expr *columnExpression, Query *query, bool skipOu
}


#if PG_VERSION_NUM >= PG_VERSION_15

/*
* InsertPartitionColumnMatchesSource check to see if MERGE is inserting a
* value into the target which is not from the source table, if so, it
Expand Down Expand Up @@ -516,4 +532,150 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)
return NULL;
}


/*
* MergeActionListSupported Checks WHEN/ON clause actions to see what functions are allowed, if
* we are updating distribution column, etc.
*/
static DeferredErrorMessage *
MergeActionListSupported(Oid resultRelationId, FromExpr *joinTree, Node *quals,
List *targetList, CmdType commandType)
{
uint32 rangeTableId = 1;
Var *partitionColumn = NULL;
if (IsCitusTable(resultRelationId))
{
partitionColumn = PartitionColumn(resultRelationId, rangeTableId);
}

ListCell *targetEntryCell = NULL;
bool hasVarArgument = false; /* A STABLE function is passed a Var argument */
bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */
foreach(targetEntryCell, targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);

/* skip resjunk entries: UPDATE adds some for ctid, etc. */
if (targetEntry->resjunk)
{
continue;
}

bool targetEntryPartitionColumn = false;
AttrNumber targetColumnAttrNumber = InvalidAttrNumber;

/* reference tables do not have partition column */
if (partitionColumn == NULL)
{
targetEntryPartitionColumn = false;
}
else
{
if (commandType == CMD_UPDATE)
{
/*
* Note that it is not possible to give an alias to
* UPDATE table SET ...
*/
if (targetEntry->resname)
{
targetColumnAttrNumber = get_attnum(resultRelationId,
targetEntry->resname);
if (targetColumnAttrNumber == partitionColumn->varattno)
{
targetEntryPartitionColumn = true;
}
}
}
}

if (targetEntryPartitionColumn &&
TargetEntryChangesValue(targetEntry, partitionColumn, joinTree))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"updating the distribution column is not "
"allowed in MERGE actions",
NULL, NULL);
}

if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr,
CitusIsVolatileFunction))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"functions used in MERGE actions on distributed "
"tables must not be VOLATILE",
NULL, NULL);
}
else if (MasterIrreducibleExpression((Node *) targetEntry->expr,
&hasVarArgument, &hasBadCoalesce))
{
Assert(hasVarArgument || hasBadCoalesce);
}

if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr,
NodeIsFieldStore))
{
/* DELETE cannot do field indirection already */
Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"inserting or modifying composite type fields is not "
"supported", NULL,
"Use the column name to insert or update the composite "
"type as a single value");
}
}

StringInfo errorMessage = makeStringInfo();
appendStringInfo(errorMessage, "functions used in the %s clause of MERGE "
"queries on distributed tables must not be VOLATILE",
(commandType == CMD_MERGE) ? "ON" : "WHEN");

/*
* Check the condition, convert list of expressions into expression tree for further processing
*/
if (quals)
{
if (IsA(quals, List))
{
quals = (Node *) make_ands_explicit((List *) quals);
}

if (FindNodeMatchingCheckFunction((Node *) quals, CitusIsVolatileFunction))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL, NULL);
}
else if (MasterIrreducibleExpression(quals, &hasVarArgument, &hasBadCoalesce))
{
Assert(hasVarArgument || hasBadCoalesce);
}
}

if (hasVarArgument)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"STABLE functions used in MERGE queries "
"cannot be called with column references",
NULL, NULL);
}

if (hasBadCoalesce)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"non-IMMUTABLE functions are not allowed in CASE or "
"COALESCE statements",
NULL, NULL);
}

if (quals != NULL && nodeTag(quals) == T_CurrentOfExpr)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot run MERGE actions with cursors",
NULL, NULL);
}

return NULL;
}


#endif
Loading

0 comments on commit c0e8912

Please sign in to comment.