From c0e891295cd8c6e5303340a3a744a4aabc745a2b Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Fri, 24 Feb 2023 17:00:47 -0800 Subject: [PATCH] Make ConjunctionContainsColumnFilter() static again, and rearrange the code in MergeQuerySupported() Minor: Restore the original format in the comments section. --- .../planner/fast_path_router_planner.c | 5 +- .../distributed/planner/merge_planner.c | 244 ++++++++++++++--- .../planner/multi_physical_planner.c | 17 +- .../planner/multi_router_planner.c | 16 +- .../relation_restriction_equivalence.c | 68 +++-- src/include/distributed/distributed_planner.h | 3 - .../distributed/multi_router_planner.h | 7 +- .../relation_restriction_equivalence.h | 12 +- src/test/regress/create_schedule | 1 + src/test/regress/expected/merge.out | 246 +++++++++++++++++- src/test/regress/expected/merge_arbitrary.out | 45 ++++ .../regress/expected/merge_arbitrary_0.out | 6 + .../expected/merge_arbitrary_create.out | 26 ++ src/test/regress/expected/pgmerge.out | 4 +- src/test/regress/sql/merge.sql | 85 ++++++ src/test/regress/sql/merge_arbitrary.sql | 40 +++ .../regress/sql/merge_arbitrary_create.sql | 12 + src/test/regress/sql_schedule | 1 + 18 files changed, 728 insertions(+), 110 deletions(-) create mode 100644 src/test/regress/expected/merge_arbitrary.out create mode 100644 src/test/regress/expected/merge_arbitrary_0.out create mode 100644 src/test/regress/expected/merge_arbitrary_create.out create mode 100644 src/test/regress/sql/merge_arbitrary.sql create mode 100644 src/test/regress/sql/merge_arbitrary_create.sql diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index e7d91a101c7..ecb62478acc 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -56,6 +56,9 @@ bool EnableFastPathRouterPlanner = true; static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Node **distributionKeyValue); +static bool ConjunctionContainsColumnFilter(Node *node, + Var *column, + Node **distributionKeyValue); /* @@ -292,7 +295,7 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey) * * If the conjuction contains column filter which is const, distributionKeyValue is set. */ -bool +static bool ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue) { if (node == NULL) diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index bf6360e160e..6ca626d3a42 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -12,28 +12,41 @@ #include #include "postgres.h" +#include "nodes/makefuncs.h" +#include "optimizer/optimizer.h" #include "parser/parsetree.h" +#include "utils/lsyscache.h" -#include "distributed/pg_version_constants.h" +#include "distributed/citus_clauses.h" +#include "distributed/listutils.h" #include "distributed/merge_planner.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_router_planner.h" -#include "distributed/listutils.h" +#include "distributed/pg_version_constants.h" +#if PG_VERSION_NUM >= PG_VERSION_15 + +static DeferredErrorMessage * ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse, + List * + distTablesList, + PlannerRestrictionContext + * + plannerRestrictionContext); static bool QueryHasMergeCommand(Query *queryTree); static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, PlannerRestrictionContext * restrictionContext); -static DeferredErrorMessage * ErrorIfDistTablesNotColocated(Query *parse, - List *distTablesList, - PlannerRestrictionContext * - plannerRestrictionContext); static bool IsPartitionColumnInMergeSource(Expr *columnExpression, Query *query, bool - skipOuterVars); -#if PG_VERSION_NUM >= PG_VERSION_15 -static DeferredErrorMessage * InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte); + skipOuterVars); +static DeferredErrorMessage * InsertPartitionColumnMatchesSource(Query *query, + RangeTblEntry *resultRte); + +static DeferredErrorMessage * MergeActionListSupported(Oid resultRelationId, + FromExpr *joinTree, Node *quals, + List *targetList, + CmdType commandType); #endif @@ -48,6 +61,13 @@ DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionContext) { + /* function is void for pre-15 versions of Postgres */ + #if PG_VERSION_NUM < PG_VERSION_15 + + return NULL; + + #else + /* For non-MERGE commands it's a no-op */ if (!QueryHasMergeCommand(originalQuery)) { @@ -71,24 +91,22 @@ MergeQuerySupported(Query *originalQuery, plannerRestrictionContext); if (deferredError) { - return deferredError; + /* MERGE's unsupported combination, raise the exception */ + RaiseDeferredError(deferredError, ERROR); } Oid resultRelationId = resultRte->relid; deferredError = - TargetlistAndFunctionsSupported(resultRelationId, - originalQuery->jointree, - originalQuery->jointree->quals, - originalQuery->targetList, - originalQuery->commandType, - originalQuery->returningList); + MergeActionListSupported(resultRelationId, + originalQuery->jointree, + originalQuery->jointree->quals, + originalQuery->targetList, + originalQuery->commandType); if (deferredError) { return deferredError; } - #if PG_VERSION_NUM >= PG_VERSION_15 - /* * MERGE is a special case where we have multiple modify statements * within itself. Check each INSERT/UPDATE/DELETE individually. @@ -98,15 +116,15 @@ MergeQuerySupported(Query *originalQuery, { Assert(originalQuery->returningList == NULL); deferredError = - TargetlistAndFunctionsSupported(resultRelationId, - originalQuery->jointree, - action->qual, - action->targetList, - action->commandType, - originalQuery->returningList); + MergeActionListSupported(resultRelationId, + originalQuery->jointree, + action->qual, + action->targetList, + action->commandType); if (deferredError) { - return deferredError; + /* MERGE's unsupported scenario, raise the exception */ + RaiseDeferredError(deferredError, ERROR); } } @@ -114,14 +132,16 @@ MergeQuerySupported(Query *originalQuery, InsertPartitionColumnMatchesSource(originalQuery, resultRte); if (deferredError) { - return deferredError; + /* MERGE's unsupported scenario, raise the exception */ + RaiseDeferredError(deferredError, ERROR); } - #endif - return NULL; + + #endif } + /* * IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is * permitted on special relations, such as materialized view, returns true only if @@ -147,6 +167,8 @@ IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) } +#if PG_VERSION_NUM >= PG_VERSION_15 + /* * ErrorIfDistTablesNotColocated Checks to see if * @@ -158,8 +180,9 @@ IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) * If any of the conditions are not met, it raises an exception. */ static DeferredErrorMessage * -ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList, - PlannerRestrictionContext *plannerRestrictionContext) +ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse, List *distTablesList, + PlannerRestrictionContext * + plannerRestrictionContext) { /* All MERGE tables must be distributed */ if (list_length(distTablesList) < 2) @@ -170,7 +193,7 @@ ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList, } /* All distributed tables must be colocated */ - if (!AllRelationsInListColocated(distTablesList, RANGETABLE_ENTRY)) + if (!AllRelationsInRTEListColocated(distTablesList)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "For MERGE command, all the distributed tables " @@ -347,8 +370,9 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, NULL, NULL); } - /* Ensure all distributed tables are indeed co-located */ - return ErrorIfDistTablesNotColocated(parse, distTablesList, restrictionContext); + /* Ensure all distributed tables are indeed co-located and joined on distribution column */ + return ErrorIfTablesNotColocatedAndJoinedOnDistColumn(parse, distTablesList, + restrictionContext); } @@ -359,11 +383,6 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, static bool QueryHasMergeCommand(Query *queryTree) { - /* function is void for pre-15 versions of Postgres */ - #if PG_VERSION_NUM < PG_VERSION_15 - return false; - #else - /* * Postgres currently doesn't support Merge queries inside subqueries and * ctes, but lets be defensive and do query tree walk anyway. @@ -378,7 +397,6 @@ QueryHasMergeCommand(Query *queryTree) } return true; - #endif } @@ -420,8 +438,6 @@ IsPartitionColumnInMergeSource(Expr *columnExpression, Query *query, bool skipOu } -#if PG_VERSION_NUM >= PG_VERSION_15 - /* * InsertPartitionColumnMatchesSource check to see if MERGE is inserting a * value into the target which is not from the source table, if so, it @@ -516,4 +532,150 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) return NULL; } + +/* + * MergeActionListSupported Checks WHEN/ON clause actions to see what functions are allowed, if + * we are updating distribution column, etc. + */ +static DeferredErrorMessage * +MergeActionListSupported(Oid resultRelationId, FromExpr *joinTree, Node *quals, + List *targetList, CmdType commandType) +{ + uint32 rangeTableId = 1; + Var *partitionColumn = NULL; + if (IsCitusTable(resultRelationId)) + { + partitionColumn = PartitionColumn(resultRelationId, rangeTableId); + } + + ListCell *targetEntryCell = NULL; + bool hasVarArgument = false; /* A STABLE function is passed a Var argument */ + bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */ + foreach(targetEntryCell, targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + /* skip resjunk entries: UPDATE adds some for ctid, etc. */ + if (targetEntry->resjunk) + { + continue; + } + + bool targetEntryPartitionColumn = false; + AttrNumber targetColumnAttrNumber = InvalidAttrNumber; + + /* reference tables do not have partition column */ + if (partitionColumn == NULL) + { + targetEntryPartitionColumn = false; + } + else + { + if (commandType == CMD_UPDATE) + { + /* + * Note that it is not possible to give an alias to + * UPDATE table SET ... + */ + if (targetEntry->resname) + { + targetColumnAttrNumber = get_attnum(resultRelationId, + targetEntry->resname); + if (targetColumnAttrNumber == partitionColumn->varattno) + { + targetEntryPartitionColumn = true; + } + } + } + } + + if (targetEntryPartitionColumn && + TargetEntryChangesValue(targetEntry, partitionColumn, joinTree)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "updating the distribution column is not " + "allowed in MERGE actions", + NULL, NULL); + } + + if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr, + CitusIsVolatileFunction)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "functions used in MERGE actions on distributed " + "tables must not be VOLATILE", + NULL, NULL); + } + else if (MasterIrreducibleExpression((Node *) targetEntry->expr, + &hasVarArgument, &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } + + if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr, + NodeIsFieldStore)) + { + /* DELETE cannot do field indirection already */ + Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "inserting or modifying composite type fields is not " + "supported", NULL, + "Use the column name to insert or update the composite " + "type as a single value"); + } + } + + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "functions used in the %s clause of MERGE " + "queries on distributed tables must not be VOLATILE", + (commandType == CMD_MERGE) ? "ON" : "WHEN"); + + /* + * Check the condition, convert list of expressions into expression tree for further processing + */ + if (quals) + { + if (IsA(quals, List)) + { + quals = (Node *) make_ands_explicit((List *) quals); + } + + if (FindNodeMatchingCheckFunction((Node *) quals, CitusIsVolatileFunction)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, NULL); + } + else if (MasterIrreducibleExpression(quals, &hasVarArgument, &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } + } + + if (hasVarArgument) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "STABLE functions used in MERGE queries " + "cannot be called with column references", + NULL, NULL); + } + + if (hasBadCoalesce) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not allowed in CASE or " + "COALESCE statements", + NULL, NULL); + } + + if (quals != NULL && nodeTag(quals) == T_CurrentOfExpr) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot run MERGE actions with cursors", + NULL, NULL); + } + + return NULL; +} + + #endif diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 3349f717518..901e9de17df 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2225,17 +2225,14 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } /* - * For left joins we don't care about the shards pruned for - * the right hand side. If the right hand side would prune - * to a smaller set we should still send it to all tables - * of the left hand side. However if the right hand side is - * bigger than the left hand side we don't have to send the - * query to any shard that is not matching anything on the - * left hand side. + * For left joins we don't care about the shards pruned for the right hand side. + * If the right hand side would prune to a smaller set we should still send it to + * all tables of the left hand side. However if the right hand side is bigger than + * the left hand side we don't have to send the query to any shard that is not + * matching anything on the left hand side. * - * Instead we will simply skip any RelationRestriction if it - * is an OUTER join and the table is part of the non-outer - * side of the join. + * Instead we will simply skip any RelationRestriction if it is an OUTER join and + * the table is part of the non-outer side of the join. */ if (IsInnerTableOfOuterJoin(relationRestriction)) { diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 623b3cf0266..dff542fded8 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -126,7 +126,6 @@ static bool IsTidColumn(Node *node); static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, Oid *distributedTableId); -static bool NodeIsFieldStore(Node *node); static DeferredErrorMessage * MultiShardUpdateDeleteMergeSupported(Query *originalQuery, PlannerRestrictionContext * @@ -135,12 +134,8 @@ static DeferredErrorMessage * SingleShardUpdateDeleteSupported(Query *originalQu PlannerRestrictionContext * plannerRestrictionContext); static bool HasDangerousJoinUsing(List *rtableList, Node *jtnode); -static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, - bool *badCoalesce); static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context); -static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, - FromExpr *joinTree); static Job * RouterInsertJob(Query *originalQuery); static void ErrorIfNoShardsExist(CitusTableCacheEntry *cacheEntry); static DeferredErrorMessage * DeferErrorIfModifyView(Query *queryTree); @@ -894,7 +889,7 @@ IsLocallyAccessibleCitusLocalTable(Oid relationId) /* * NodeIsFieldStore returns true if given Node is a FieldStore object. */ -static bool +bool NodeIsFieldStore(Node *node) { return node && IsA(node, FieldStore); @@ -920,10 +915,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer plannerRestrictionContext); if (error) { - /* - * For MERGE, we do not do recursive plannning, simply bail out. - */ - RaiseDeferredError(error, ERROR); + return error; } error = ModifyPartialQuerySupported(queryTree, multiShardQuery, &distributedTableId); @@ -1464,7 +1456,7 @@ IsMergeQuery(Query *query) * which do, but for now we just error out. That makes both the code and user-education * easier. */ -static bool +bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce) { WalkerState data; @@ -1612,7 +1604,7 @@ MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context) * expression is a value that is implied by the qualifiers of the join * tree, or the target entry sets a different column. */ -static bool +bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree) { bool isColumnValueChanged = true; diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index f92f79da6da..6650995cf3e 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -151,6 +151,9 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass secondClass); static Var * PartitionKeyForRTEIdentityInQuery(Query *query, int targetRTEIndex, Index *partitionKeyIndex); +static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * + restrictionContext); +static bool AllRelationsInListColocated(List *relationList); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); static JoinRestrictionContext * FilterJoinRestrictionContext( JoinRestrictionContext *joinRestrictionContext, Relids @@ -381,8 +384,7 @@ SafeToPushdownUnionSubquery(Query *originalQuery, return false; } - if (!AllRelationsInListColocated(restrictionContext->relationRestrictionList, - RESTRICTION_CONTEXT)) + if (!AllRelationsInRestrictionContextColocated(restrictionContext)) { /* distribution columns are equal, but tables are not co-located */ return false; @@ -1918,34 +1920,56 @@ FindQueryContainingRTEIdentityInternal(Node *node, /* - * AllRelationsInListColocated determines whether all of the relations in the - * given list are co-located. - * Note: The list can be of dofferent types, which is specified by ListEntryType + * AllRelationsInRestrictionContextColocated determines whether all of the relations in the + * given relation restrictions list are co-located. */ -bool -AllRelationsInListColocated(List *relationList, ListEntryType entryType) +static bool +AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictionContext) { - void *varPtr = NULL; - RangeTblEntry *rangeTableEntry = NULL; RelationRestriction *relationRestriction = NULL; - int initialColocationId = INVALID_COLOCATION_ID; + List *relationIdList = NIL; /* check whether all relations exists in the main restriction list */ - foreach_ptr(varPtr, relationList) + foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList) { - Oid relationId = InvalidOid; + relationIdList = lappend_oid(relationIdList, relationRestriction->relationId); + } + + return AllRelationsInListColocated(relationIdList); +} + + +/* + * AllRelationsInRTEListColocated determines whether all of the relations in the + * given RangeTableEntry list are co-located. + */ +bool +AllRelationsInRTEListColocated(List *rangeTableEntryList) +{ + RangeTblEntry *rangeTableEntry = NULL; + List *relationIdList = NIL; + + foreach_ptr(rangeTableEntry, rangeTableEntryList) + { + relationIdList = lappend_oid(relationIdList, rangeTableEntry->relid); + } + + return AllRelationsInListColocated(relationIdList); +} - if (entryType == RANGETABLE_ENTRY) - { - rangeTableEntry = (RangeTblEntry *) varPtr; - relationId = rangeTableEntry->relid; - } - else if (entryType == RESTRICTION_CONTEXT) - { - relationRestriction = (RelationRestriction *) varPtr; - relationId = relationRestriction->relationId; - } +/* + * AllRelationsInListColocated determines whether all of the relations in the + * given list are co-located. + */ +static bool +AllRelationsInListColocated(List *relationList) +{ + int initialColocationId = INVALID_COLOCATION_ID; + Oid relationId = InvalidOid; + + foreach_oid(relationId, relationList) + { if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { continue; diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index a7b0f67cf39..47ba032efb8 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -254,9 +254,6 @@ extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, bool hasUnresolvedParams, PlannerRestrictionContext * plannerRestrictionContext); -extern bool ConjunctionContainsColumnFilter(Node *node, - Var *column, - Node **distributionKeyValue); extern bool ContainsMergeCommandWalker(Node *node); #endif /* DISTRIBUTED_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index b6c9cd5a05e..5c25087be5b 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -106,7 +106,10 @@ extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelation List *targetList, CmdType commandType, List *returningList); - - +extern bool NodeIsFieldStore(Node *node); +extern bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, + FromExpr *joinTree); +extern bool MasterIrreducibleExpression(Node *expression, bool *varArgument, + bool *badCoalesce); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index 4fd9c7015e5..e0e716c7e81 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -17,15 +17,6 @@ #define SINGLE_RTE_INDEX 1 -/* - * Represents the pointer type that's being passed in the list. - */ -typedef enum ListEntryType -{ - RANGETABLE_ENTRY, /* RangeTblEntry */ - RESTRICTION_CONTEXT /* RelationRestriction */ -} ListEntryType; - extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -63,6 +54,5 @@ extern RelationRestrictionContext * FilterRelationRestrictionContext( RelationRestrictionContext *relationRestrictionContext, Relids queryRteIdentities); -extern bool AllRelationsInListColocated(List *relationList, ListEntryType entryType); - +extern bool AllRelationsInRTEListColocated(List *rangeTableEntryList); #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/create_schedule b/src/test/regress/create_schedule index 82dfa24755a..db2ae92be62 100644 --- a/src/test/regress/create_schedule +++ b/src/test/regress/create_schedule @@ -13,3 +13,4 @@ test: arbitrary_configs_truncate_create test: arbitrary_configs_truncate_cascade_create test: arbitrary_configs_truncate_partition_create test: arbitrary_configs_alter_table_add_constraint_without_name_create +test: merge_arbitrary_create diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index e925989cc6b..35be03b1cb1 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -2220,9 +2220,231 @@ SELECT pa_compare_tables(); (1 row) ROLLBACK; +CREATE TABLE source_json( id integer, z int, d jsonb); +CREATE TABLE target_json( id integer, z int, d jsonb); +INSERT INTO source_json SELECT i,i FROM generate_series(0,100)i; +SELECT create_distributed_table('target_json','id'), create_distributed_table('source_json', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_json$$) + create_distributed_table | create_distributed_table +--------------------------------------------------------------------- + | +(1 row) + +-- single shard query given source_json is filtered and Postgres is smart to pushdown +-- filter to the target_json as well +EXPLAIN MERGE INTO target_json sda +USING (SELECT * FROM source_json WHERE id = 1) sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_json_4000091 sda (cost=0.00..50.56 rows=0 width=0) + -> Nested Loop Left Join (cost=0.00..50.56 rows=36 width=10) + Join Filter: (sda.id = source_json.id) + -> Seq Scan on source_json_4000095 source_json (cost=0.00..25.00 rows=6 width=4) + Filter: (id = 1) + -> Materialize (cost=0.00..25.03 rows=6 width=10) + -> Seq Scan on target_json_4000091 sda (cost=0.00..25.00 rows=6 width=10) + Filter: (id = 1) +(13 rows) + +-- zero shard query as filters do not match +EXPLAIN MERGE INTO target_json sda +USING (SELECT * FROM source_json WHERE id = 1) sdn +ON sda.id = sdn.id AND sda.id = 2 +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 0 + Tasks Shown: All +(3 rows) + +-- join for source_json is happening at at different place +EXPLAIN MERGE INTO target_json sda +USING source_json s1 LEFT JOIN (SELECT * FROM source_json) s2 USING(z) +ON sda.id = s1.id AND s1.id = s2.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (s2.id, 5); + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_json_4000091 sda (cost=825.41..1617.41 rows=0 width=0) + -> Merge Left Join (cost=825.41..1617.41 rows=7200 width=10) + Merge Cond: (s1.id = sda.id) + Join Filter: (s1.id = source_json.id) + -> Sort (cost=742.04..760.04 rows=7200 width=8) + Sort Key: s1.id + -> Merge Left Join (cost=166.75..280.75 rows=7200 width=8) + Merge Cond: (s1.z = source_json.z) + -> Sort (cost=83.37..86.37 rows=1200 width=8) + Sort Key: s1.z + -> Seq Scan on source_json_4000095 s1 (cost=0.00..22.00 rows=1200 width=8) + -> Sort (cost=83.37..86.37 rows=1200 width=8) + Sort Key: source_json.z + -> Seq Scan on source_json_4000095 source_json (cost=0.00..22.00 rows=1200 width=8) + -> Sort (cost=83.37..86.37 rows=1200 width=10) + Sort Key: sda.id + -> Seq Scan on target_json_4000091 sda (cost=0.00..22.00 rows=1200 width=10) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_json_4000092 sda (cost=825.41..1617.41 rows=0 width=0) + -> Merge Left Join (cost=825.41..1617.41 rows=7200 width=10) + Merge Cond: (s1.id = sda.id) + Join Filter: (s1.id = source_json.id) + -> Sort (cost=742.04..760.04 rows=7200 width=8) + Sort Key: s1.id + -> Merge Left Join (cost=166.75..280.75 rows=7200 width=8) + Merge Cond: (s1.z = source_json.z) + -> Sort (cost=83.37..86.37 rows=1200 width=8) + Sort Key: s1.z + -> Seq Scan on source_json_4000096 s1 (cost=0.00..22.00 rows=1200 width=8) + -> Sort (cost=83.37..86.37 rows=1200 width=8) + Sort Key: source_json.z + -> Seq Scan on source_json_4000096 source_json (cost=0.00..22.00 rows=1200 width=8) + -> Sort (cost=83.37..86.37 rows=1200 width=10) + Sort Key: sda.id + -> Seq Scan on target_json_4000092 sda (cost=0.00..22.00 rows=1200 width=10) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_json_4000093 sda (cost=825.41..1617.41 rows=0 width=0) + -> Merge Left Join (cost=825.41..1617.41 rows=7200 width=10) + Merge Cond: (s1.id = sda.id) + Join Filter: (s1.id = source_json.id) + -> Sort (cost=742.04..760.04 rows=7200 width=8) + Sort Key: s1.id + -> Merge Left Join (cost=166.75..280.75 rows=7200 width=8) + Merge Cond: (s1.z = source_json.z) + -> Sort (cost=83.37..86.37 rows=1200 width=8) + Sort Key: s1.z + -> Seq Scan on source_json_4000097 s1 (cost=0.00..22.00 rows=1200 width=8) + -> Sort (cost=83.37..86.37 rows=1200 width=8) + Sort Key: source_json.z + -> Seq Scan on source_json_4000097 source_json (cost=0.00..22.00 rows=1200 width=8) + -> Sort (cost=83.37..86.37 rows=1200 width=10) + Sort Key: sda.id + -> Seq Scan on target_json_4000093 sda (cost=0.00..22.00 rows=1200 width=10) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_json_4000094 sda (cost=825.41..1617.41 rows=0 width=0) + -> Merge Left Join (cost=825.41..1617.41 rows=7200 width=10) + Merge Cond: (s1.id = sda.id) + Join Filter: (s1.id = source_json.id) + -> Sort (cost=742.04..760.04 rows=7200 width=8) + Sort Key: s1.id + -> Merge Left Join (cost=166.75..280.75 rows=7200 width=8) + Merge Cond: (s1.z = source_json.z) + -> Sort (cost=83.37..86.37 rows=1200 width=8) + Sort Key: s1.z + -> Seq Scan on source_json_4000098 s1 (cost=0.00..22.00 rows=1200 width=8) + -> Sort (cost=83.37..86.37 rows=1200 width=8) + Sort Key: source_json.z + -> Seq Scan on source_json_4000098 source_json (cost=0.00..22.00 rows=1200 width=8) + -> Sort (cost=83.37..86.37 rows=1200 width=10) + Sort Key: sda.id + -> Seq Scan on target_json_4000094 sda (cost=0.00..22.00 rows=1200 width=10) +(79 rows) + +-- update JSON column +EXPLAIN MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN matched THEN + UPDATE SET d = '{"a" : 5}'; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_json_4000091 sda (cost=166.75..280.75 rows=0 width=0) + -> Merge Join (cost=166.75..280.75 rows=7200 width=6) + Merge Cond: (sda.id = sdn.id) + -> Sort (cost=83.37..86.37 rows=1200 width=10) + Sort Key: sda.id + -> Seq Scan on target_json_4000091 sda (cost=0.00..22.00 rows=1200 width=10) + -> Sort (cost=83.37..86.37 rows=1200 width=4) + Sort Key: sdn.id + -> Seq Scan on source_json_4000095 sdn (cost=0.00..22.00 rows=1200 width=4) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_json_4000092 sda (cost=166.75..280.75 rows=0 width=0) + -> Merge Join (cost=166.75..280.75 rows=7200 width=6) + Merge Cond: (sda.id = sdn.id) + -> Sort (cost=83.37..86.37 rows=1200 width=10) + Sort Key: sda.id + -> Seq Scan on target_json_4000092 sda (cost=0.00..22.00 rows=1200 width=10) + -> Sort (cost=83.37..86.37 rows=1200 width=4) + Sort Key: sdn.id + -> Seq Scan on source_json_4000096 sdn (cost=0.00..22.00 rows=1200 width=4) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_json_4000093 sda (cost=166.75..280.75 rows=0 width=0) + -> Merge Join (cost=166.75..280.75 rows=7200 width=6) + Merge Cond: (sda.id = sdn.id) + -> Sort (cost=83.37..86.37 rows=1200 width=10) + Sort Key: sda.id + -> Seq Scan on target_json_4000093 sda (cost=0.00..22.00 rows=1200 width=10) + -> Sort (cost=83.37..86.37 rows=1200 width=4) + Sort Key: sdn.id + -> Seq Scan on source_json_4000097 sdn (cost=0.00..22.00 rows=1200 width=4) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_json_4000094 sda (cost=166.75..280.75 rows=0 width=0) + -> Merge Join (cost=166.75..280.75 rows=7200 width=6) + Merge Cond: (sda.id = sdn.id) + -> Sort (cost=83.37..86.37 rows=1200 width=10) + Sort Key: sda.id + -> Seq Scan on target_json_4000094 sda (cost=0.00..22.00 rows=1200 width=10) + -> Sort (cost=83.37..86.37 rows=1200 width=4) + Sort Key: sdn.id + -> Seq Scan on source_json_4000098 sdn (cost=0.00..22.00 rows=1200 width=4) +(47 rows) + +CREATE FUNCTION immutable_hash(int) RETURNS int +AS 'SELECT hashtext( ($1 + $1)::text);' +LANGUAGE SQL +IMMUTABLE +RETURNS NULL ON NULL INPUT; +MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN matched THEN + UPDATE SET z = immutable_hash(sdn.z); -- -- Error and Unsupported scenarios -- +-- zero shard query as source_json is zero shard +EXPLAIN MERGE INTO target_json sda +USING (SELECT * FROM source_json WHERE false) sdn +ON sda.id = sdn.id AND sda.id = 2 +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +-- modifying CTE not supported +EXPLAIN +WITH cte_1 AS (DELETE FROM target_json) +MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- Grouping sets not supported MERGE INTO citus_target t USING (SELECT count(*), id FROM citus_source GROUP BY GROUPING SETS (id, val)) subq @@ -2290,7 +2512,7 @@ MERGE INTO target_cj t UPDATE SET tid = tid + 9, src = src || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid1, 'inserted by merge', val1); -ERROR: modifying the partition value of rows is not allowed +ERROR: updating the distribution column is not allowed in MERGE actions ROLLBACK; -- Foreign table as target MERGE INTO foreign_table @@ -2382,13 +2604,22 @@ BEGIN RETURN TRUE; END; $$; +-- Test functions executing in MERGE statement. This is to prevent the functions from +-- doing a random sql, which may be executed in a remote node or modifying the target +-- relation which will have unexpected/suprising results. +MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON + t1.id = s1.id AND s1.id = 2 + WHEN NOT matched THEN + INSERT (id, val) + VALUES (s1.id , random()); +ERROR: functions used in MERGE actions on distributed tables must not be VOLATILE -- Test preventing "ON" join condition from writing to the database BEGIN; MERGE INTO t1 USING s1 ON t1.id = s1.id AND t1.id = 2 AND (merge_when_and_write()) WHEN MATCHED THEN UPDATE SET val = t1.val + s1.val; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: functions used in the ON clause of MERGE queries on distributed tables must not be VOLATILE ROLLBACK; -- Test preventing WHEN clause(s) from writing to the database BEGIN; @@ -2396,7 +2627,7 @@ MERGE INTO t1 USING s1 ON t1.id = s1.id AND t1.id = 2 WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET val = t1.val + s1.val; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: functions used in the WHEN clause of MERGE queries on distributed tables must not be VOLATILE ROLLBACK; -- Joining on partition columns with sub-query MERGE INTO t1 @@ -2872,7 +3103,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 75 other objects +NOTICE: drop cascades to 78 other objects DETAIL: drop cascades to function insert_data() drop cascades to table pg_result drop cascades to table local_local @@ -2939,9 +3170,12 @@ drop cascades to table citus_pa_target drop cascades to table pg_pa_source drop cascades to table citus_pa_source drop cascades to function pa_compare_tables() +drop cascades to table source_json +drop cascades to table target_json +drop cascades to function immutable_hash(integer) drop cascades to table pg -drop cascades to table t1_4000110 -drop cascades to table s1_4000111 +drop cascades to table t1_4000118 +drop cascades to table s1_4000119 drop cascades to table t1 drop cascades to table s1 drop cascades to table dist_colocated diff --git a/src/test/regress/expected/merge_arbitrary.out b/src/test/regress/expected/merge_arbitrary.out new file mode 100644 index 00000000000..705dbff631c --- /dev/null +++ b/src/test/regress/expected/merge_arbitrary.out @@ -0,0 +1,45 @@ +SET search_path TO merge_arbitrary_schema; +INSERT INTO target_cj VALUES (1, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (3, 'target', 0); +INSERT INTO source_cj1 VALUES (2, 'source-1', 10); +INSERT INTO source_cj2 VALUES (2, 'source-2', 20); +BEGIN; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-2 | 0 + 2 | source-2 | 0 + 3 | target | 0 +(4 rows) + +ROLLBACK; +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-1 | 20 + 2 | source-1 | 20 + 3 | target | 0 +(4 rows) + +ROLLBACK; diff --git a/src/test/regress/expected/merge_arbitrary_0.out b/src/test/regress/expected/merge_arbitrary_0.out new file mode 100644 index 00000000000..a7e3fbf2062 --- /dev/null +++ b/src/test/regress/expected/merge_arbitrary_0.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q diff --git a/src/test/regress/expected/merge_arbitrary_create.out b/src/test/regress/expected/merge_arbitrary_create.out new file mode 100644 index 00000000000..37dcc1d2637 --- /dev/null +++ b/src/test/regress/expected/merge_arbitrary_create.out @@ -0,0 +1,26 @@ +DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE; +CREATE SCHEMA merge_arbitrary_schema; +SET search_path TO merge_arbitrary_schema; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 6000000; +CREATE TABLE target_cj(tid int, src text, val int); +CREATE TABLE source_cj1(sid1 int, src1 text, val1 int); +CREATE TABLE source_cj2(sid2 int, src2 text, val2 int); +SELECT create_distributed_table('target_cj', 'tid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('source_cj1', 'sid1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('source_cj2', 'sid2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/pgmerge.out b/src/test/regress/expected/pgmerge.out index 0bedf356f9d..3ae4ac0d652 100644 --- a/src/test/regress/expected/pgmerge.out +++ b/src/test/regress/expected/pgmerge.out @@ -910,7 +910,7 @@ MERGE INTO wq_target t USING wq_source s ON t.tid = s.sid WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET balance = t.balance + s.balance; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: functions used in the WHEN clause of MERGE queries on distributed tables must not be VOLATILE ROLLBACK; -- Test preventing ON condition from writing to the database BEGIN; @@ -918,7 +918,7 @@ MERGE INTO wq_target t USING wq_source s ON t.tid = s.sid AND (merge_when_and_write()) WHEN MATCHED THEN UPDATE SET balance = t.balance + s.balance; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: functions used in the ON clause of MERGE queries on distributed tables must not be VOLATILE ROLLBACK; drop function merge_when_and_write(); DROP TABLE wq_target, wq_source; diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index b74fb43ac5d..6785d2a0e96 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1422,10 +1422,86 @@ MERGE INTO citus_pa_target t SELECT pa_compare_tables(); ROLLBACK; +CREATE TABLE source_json( id integer, z int, d jsonb); +CREATE TABLE target_json( id integer, z int, d jsonb); + +INSERT INTO source_json SELECT i,i FROM generate_series(0,100)i; + +SELECT create_distributed_table('target_json','id'), create_distributed_table('source_json', 'id'); + +-- single shard query given source_json is filtered and Postgres is smart to pushdown +-- filter to the target_json as well +EXPLAIN MERGE INTO target_json sda +USING (SELECT * FROM source_json WHERE id = 1) sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); + +-- zero shard query as filters do not match +EXPLAIN MERGE INTO target_json sda +USING (SELECT * FROM source_json WHERE id = 1) sdn +ON sda.id = sdn.id AND sda.id = 2 +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); + +-- join for source_json is happening at at different place +EXPLAIN MERGE INTO target_json sda +USING source_json s1 LEFT JOIN (SELECT * FROM source_json) s2 USING(z) +ON sda.id = s1.id AND s1.id = s2.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (s2.id, 5); + +-- update JSON column +EXPLAIN MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN matched THEN + UPDATE SET d = '{"a" : 5}'; + +CREATE FUNCTION immutable_hash(int) RETURNS int +AS 'SELECT hashtext( ($1 + $1)::text);' +LANGUAGE SQL +IMMUTABLE +RETURNS NULL ON NULL INPUT; + +MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN matched THEN + UPDATE SET z = immutable_hash(sdn.z); + +-- Test bigserial +CREATE TABLE source_serial (id integer, z int, d bigserial); +CREATE TABLE target_serial (id integer, z int, d bigserial); +INSERT INTO source_serial SELECT i,i FROM generate_series(0,100)i; +SELECT create_distributed_table('source_serial', 'id'), create_distributed_table('target_serial', 'id'); + +MERGE INTO target_serial sda +USING source_serial sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (id, z); + -- -- Error and Unsupported scenarios -- +-- zero shard query as source_json is zero shard +EXPLAIN MERGE INTO target_json sda +USING (SELECT * FROM source_json WHERE false) sdn +ON sda.id = sdn.id AND sda.id = 2 +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); + +-- modifying CTE not supported +EXPLAIN +WITH cte_1 AS (DELETE FROM target_json) +MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); + -- Grouping sets not supported MERGE INTO citus_target t USING (SELECT count(*), id FROM citus_source GROUP BY GROUPING SETS (id, val)) subq @@ -1534,6 +1610,15 @@ BEGIN END; $$; +-- Test functions executing in MERGE statement. This is to prevent the functions from +-- doing a random sql, which may be executed in a remote node or modifying the target +-- relation which will have unexpected/suprising results. +MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON + t1.id = s1.id AND s1.id = 2 + WHEN NOT matched THEN + INSERT (id, val) + VALUES (s1.id , random()); + -- Test preventing "ON" join condition from writing to the database BEGIN; MERGE INTO t1 diff --git a/src/test/regress/sql/merge_arbitrary.sql b/src/test/regress/sql/merge_arbitrary.sql new file mode 100644 index 00000000000..eddf26b7ceb --- /dev/null +++ b/src/test/regress/sql/merge_arbitrary.sql @@ -0,0 +1,40 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif + +SET search_path TO merge_arbitrary_schema; +INSERT INTO target_cj VALUES (1, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (3, 'target', 0); + +INSERT INTO source_cj1 VALUES (2, 'source-1', 10); +INSERT INTO source_cj2 VALUES (2, 'source-2', 20); + +BEGIN; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; diff --git a/src/test/regress/sql/merge_arbitrary_create.sql b/src/test/regress/sql/merge_arbitrary_create.sql new file mode 100644 index 00000000000..bd6d30b9f0a --- /dev/null +++ b/src/test/regress/sql/merge_arbitrary_create.sql @@ -0,0 +1,12 @@ +DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE; +CREATE SCHEMA merge_arbitrary_schema; +SET search_path TO merge_arbitrary_schema; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 6000000; +CREATE TABLE target_cj(tid int, src text, val int); +CREATE TABLE source_cj1(sid1 int, src1 text, val1 int); +CREATE TABLE source_cj2(sid2 int, src2 text, val2 int); + +SELECT create_distributed_table('target_cj', 'tid'); +SELECT create_distributed_table('source_cj1', 'sid1'); +SELECT create_distributed_table('source_cj2', 'sid2'); diff --git a/src/test/regress/sql_schedule b/src/test/regress/sql_schedule index f07f7af9a6e..272a84effff 100644 --- a/src/test/regress/sql_schedule +++ b/src/test/regress/sql_schedule @@ -14,3 +14,4 @@ test: arbitrary_configs_truncate test: arbitrary_configs_truncate_cascade test: arbitrary_configs_truncate_partition test: arbitrary_configs_alter_table_add_constraint_without_name +test: merge_arbitrary