From d75fab354f0b67eceb01e1de536235ae9eccfc9f Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Mon, 6 Mar 2023 18:46:12 -0800 Subject: [PATCH] Implement latest set of comments --- .../distributed/planner/distributed_planner.c | 41 +--- .../distributed/planner/merge_planner.c | 179 +++++++++--------- .../planner/multi_router_planner.c | 5 +- .../planner/query_pushdown_planning.c | 12 +- src/backend/distributed/utils/citus_clauses.c | 15 ++ src/include/distributed/distributed_planner.h | 1 - src/test/regress/expected/merge.out | 81 ++++++-- src/test/regress/expected/merge_arbitrary.out | 18 +- .../expected/merge_arbitrary_create.out | 6 +- src/test/regress/expected/pg15.out | 31 +-- src/test/regress/sql/merge.sql | 42 +++- src/test/regress/sql/merge_arbitrary.sql | 18 +- .../regress/sql/merge_arbitrary_create.sql | 6 +- src/test/regress/sql/pg15.sql | 25 ++- 14 files changed, 274 insertions(+), 206 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 262258d7f9f..17b63ee0ab9 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -77,7 +77,7 @@ int PlannerLevel = 0; static bool ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable); -static bool IsUpdateOrDelete(Query *query); +static bool IsUpdateOrDeleteOrMerge(Query *query); static PlannedStmt * CreateDistributedPlannedStmt( DistributedPlanningContext *planContext); static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, @@ -153,7 +153,7 @@ distributed_planner(Query *parse, * We cannot have merge command for this path as well because * there cannot be recursively planned merge command. */ - Assert(!ContainsMergeCommandWalker((Node *) parse)); + Assert(!IsMergeQuery(parse)); needsDistributedPlanning = true; } @@ -295,39 +295,6 @@ distributed_planner(Query *parse, } -/* - * ContainsMergeCommandWalker walks over the node and finds if there are any - * Merge command (e.g., CMD_MERGE) in the node. - */ -bool -ContainsMergeCommandWalker(Node *node) -{ - #if PG_VERSION_NUM < PG_VERSION_15 - return false; - #endif - - if (node == NULL) - { - return false; - } - - if (IsA(node, Query)) - { - Query *query = (Query *) node; - if (IsMergeQuery(query)) - { - return true; - } - - return query_tree_walker((Query *) node, ContainsMergeCommandWalker, NULL, 0); - } - - return expression_tree_walker(node, ContainsMergeCommandWalker, NULL); - - return false; -} - - /* * ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker. * The function traverses the input query and returns all the range table @@ -631,7 +598,7 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan) * IsUpdateOrDelete returns true if the query performs an update or delete. */ bool -IsUpdateOrDelete(Query *query) +IsUpdateOrDeleteOrMerge(Query *query) { return query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE || @@ -809,7 +776,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) * if it is planned as a multi shard modify query. */ if ((distributedPlan->planningError || - (IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan( + (IsUpdateOrDeleteOrMerge(planContext->originalQuery) && IsMultiTaskPlan( distributedPlan))) && hasUnresolvedParams) { diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index a04d982f652..50cded6a33e 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -27,13 +27,15 @@ #if PG_VERSION_NUM >= PG_VERSION_15 +static bool CheckIfRTETypeIsUnsupported(Query *parse, + RangeTblEntry *rangeTableEntry, + DeferredErrorMessage **returnMessage); static DeferredErrorMessage * ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse, List * distTablesList, PlannerRestrictionContext * plannerRestrictionContext); -static bool QueryHasMergeCommand(Query *queryTree); static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, PlannerRestrictionContext * @@ -69,7 +71,7 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery, #else /* For non-MERGE commands it's a no-op */ - if (!QueryHasMergeCommand(originalQuery)) + if (!IsMergeQuery(originalQuery)) { return NULL; } @@ -218,16 +220,80 @@ ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse, List *distTablesLis "must be colocated", NULL, NULL); } - /* Are source and target tables joined on distribution column? */ - if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) + return NULL; +} + + +/* + * ErrorIfRTETypeIsUnsupported Checks for types of tables that are not supported, such + * as, reference tables, append-distributed tables and materialized view as target relation. + * Routine returns true for the supported types, and false for everything else, only for + * unsupported types it fills the appropriate error message in the parameter passed. + */ +static bool +CheckIfRTETypeIsUnsupported(Query *parse, + RangeTblEntry *rangeTableEntry, + DeferredErrorMessage **returnMessage) +{ + /* skip the regular views as they are replaced with subqueries */ + if (rangeTableEntry->relkind == RELKIND_VIEW) { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is only supported when distributed " - "tables are joined on their distribution column", - NULL, NULL); + return true; } - return NULL; + if (rangeTableEntry->relkind == RELKIND_MATVIEW || + rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) + { + /* Materialized view or Foreign table as target is not allowed */ + if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) + { + /* Non target relation is ok */ + return true; + } + else + { + /* Usually we don't reach this exception as the Postgres parser catches it */ + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "MERGE command is not allowed on " + "relation type(relkind:%c)", + rangeTableEntry->relkind); + *returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, NULL); + } + } + + if (rangeTableEntry->relkind != RELKIND_RELATION && + rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) + { + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) " + "in MERGE command", rangeTableEntry->relkind); + *returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, NULL); + } + + Assert(rangeTableEntry->relid != 0); + + /* Reference tables are not supported yet */ + if (IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE)) + { + *returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported on reference " + "tables yet", NULL, NULL); + } + + /* Append/Range tables are not supported */ + if (IsCitusTableType(rangeTableEntry->relid, APPEND_DISTRIBUTED) || + IsCitusTableType(rangeTableEntry->relid, RANGE_DISTRIBUTED)) + { + *returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, all the distributed tables " + "must be colocated, for append/range distribution, " + "colocation is not supported", NULL, + "Consider using hash distribution instead"); + } + + return false; } @@ -294,65 +360,19 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, } /* RTE Relation can be of various types, check them now */ - - /* skip the regular views as they are replaced with subqueries */ - if (rangeTableEntry->relkind == RELKIND_VIEW) + DeferredErrorMessage *errorMessage = NULL; + if (CheckIfRTETypeIsUnsupported(parse, rangeTableEntry, &errorMessage)) { continue; } - - if (rangeTableEntry->relkind == RELKIND_MATVIEW || - rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) + else { - /* Materialized view or Foreign table as target is not allowed */ - if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) - { - /* Non target relation is ok */ - continue; - } - else + if (errorMessage) { - /* Usually we don't reach this exception as the Postgres parser catches it */ - StringInfo errorMessage = makeStringInfo(); - appendStringInfo(errorMessage, - "MERGE command is not allowed on " - "relation type(relkind:%c)", rangeTableEntry->relkind); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, - NULL, NULL); + return errorMessage; } } - if (rangeTableEntry->relkind != RELKIND_RELATION && - rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) - { - StringInfo errorMessage = makeStringInfo(); - appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) " - "in MERGE command", rangeTableEntry->relkind); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, - NULL, NULL); - } - - Assert(rangeTableEntry->relid != 0); - - /* Reference tables are not supported yet */ - if (IsCitusTableType(relationId, REFERENCE_TABLE)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is not supported on reference " - "tables yet", NULL, NULL); - } - - /* Append/Range tables are not supported */ - if (IsCitusTableType(relationId, APPEND_DISTRIBUTED) || - IsCitusTableType(relationId, RANGE_DISTRIBUTED)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "For MERGE command, all the distributed tables " - "must be colocated, for append/range distribution, " - "colocation is not supported", NULL, - "Consider using hash distribution instead"); - } - /* * For now, save all distributed tables, later (below) we will * check for supported combination(s). @@ -389,35 +409,12 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, } /* Ensure all distributed tables are indeed co-located and joined on distribution column */ - return ErrorIfTablesNotColocatedAndJoinedOnDistColumn(parse, distTablesList, + return ErrorIfTablesNotColocatedAndJoinedOnDistColumn(parse, + distTablesList, restrictionContext); } -/* - * QueryHasMergeCommand walks over the query tree and returns false if there - * is no Merge command (e.g., CMD_MERGE), true otherwise. - */ -static bool -QueryHasMergeCommand(Query *queryTree) -{ - /* - * Postgres currently doesn't support Merge queries inside subqueries and - * ctes, but lets be defensive and do query tree walk anyway. - * - * We do not call this path for fast-path queries to avoid this additional - * overhead. - */ - if (!ContainsMergeCommandWalker((Node *) queryTree)) - { - /* No MERGE found */ - return false; - } - - return true; -} - - /* * IsPartitionColumnInMerge returns true if the given column is a partition column. * The function uses FindReferencedTableColumn to find the original relation @@ -461,7 +458,8 @@ IsPartitionColumnInMergeSource(Expr *columnExpression, Query *query, bool skipOu * value into the target which is not from the source table, if so, it * raises an exception. * Note: Inserting random values other than the joined column values will - * result in unexpected behaviour of rows ending up in incorrect shards. + * result in unexpected behaviour of rows ending up in incorrect shards, to + * prevent such mishaps, we disallow such inserts here. */ static DeferredErrorMessage * InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) @@ -475,7 +473,7 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) MergeAction *action = NULL; foreach_ptr(action, query->mergeActionList) { - /* Skip MATCHED clauses */ + /* Skip MATCHED clause as INSERTS are not allowed in it*/ if (action->matched) { continue; @@ -501,11 +499,6 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) TargetEntry *targetEntry = NULL; foreach_ptr(targetEntry, action->targetList) { - if (targetEntry->resjunk) - { - continue; - } - AttrNumber originalAttrNo = targetEntry->resno; /* skip processing of target table non-partition columns */ @@ -516,7 +509,7 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) foundDistributionColumn = true; - if (targetEntry->expr->type == T_Var) + if (IsA(targetEntry->expr, Var)) { if (IsPartitionColumnInMergeSource(targetEntry->expr, query, true)) { diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 36df763203e..407aeaf6510 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1310,8 +1310,9 @@ MultiShardUpdateDeleteSupported(Query *originalQuery, } else { - errorMessage = DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, - plannerRestrictionContext); + errorMessage = DeferErrorIfUnsupportedSubqueryPushdown( + originalQuery, + plannerRestrictionContext); } return errorMessage; diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 5cae1949744..cbe6a36062e 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -591,10 +591,16 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, } else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) { + StringInfo errorMessage = makeStringInfo(); + bool isMergeCmd = IsMergeQuery(originalQuery); + appendStringInfo(errorMessage, + "%s" + "only supported when all distributed tables are " + "co-located and joined on their distribution columns", + isMergeCmd ? "MERGE command is " : "complex joins are "); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "complex joins are only supported when all distributed tables are " - "co-located and joined on their distribution columns", - NULL, NULL); + errorMessage->data, NULL, NULL); } /* we shouldn't allow reference tables in the FROM clause when the query has sublinks */ diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index c48239548ed..ca05ed1d5b6 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -56,6 +56,21 @@ RequiresCoordinatorEvaluation(Query *query) return false; } + /* + * Currently all MERGE queries are routed to the workers, other + * than aggregating the tuple result, we shouldn't expect any + * evaluation at the coordinator. Moreover, the routine relies on + * the logic that any function present in the entire query tree might + * need a coordinator evalution, but this logic doesn't fully work + * for MERGE which has multiple query-statements in merge-actions. + * A function present in INSERT/UPDATE/DELETE will gets evaluated + * at the worker and not at the coordinator. + */ + if (IsMergeQuery(query)) + { + return false; + } + return FindNodeMatchingCheckFunction((Node *) query, CitusIsMutableFunction); } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 47ba032efb8..412859449a5 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -254,6 +254,5 @@ extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, bool hasUnresolvedParams, PlannerRestrictionContext * plannerRestrictionContext); -extern bool ContainsMergeCommandWalker(Node *node); #endif /* DISTRIBUTED_PLANNER_H */ diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 6f991f92c03..e466650dee6 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -2425,7 +2425,8 @@ WHEN matched THEN CREATE TABLE source_serial (id integer, z int, d bigserial); CREATE TABLE target_serial (id integer, z int, d bigserial); INSERT INTO source_serial SELECT i,i FROM generate_series(0,100)i; -SELECT create_distributed_table('source_serial', 'id'), create_distributed_table('target_serial', 'id'); +SELECT create_distributed_table('source_serial', 'id'), + create_distributed_table('target_serial', 'id'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. @@ -2452,16 +2453,60 @@ SELECT count(*) from target_serial; 101 (1 row) +SELECT count(distinct d) from source_serial; + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(distinct d) from target_serial; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- Test set operations +CREATE TABLE target_set(t1 int, t2 int); +CREATE TABLE source_set(s1 int, s2 int); +SELECT create_distributed_table('target_set', 't1'), + create_distributed_table('source_set', 's1'); + create_distributed_table | create_distributed_table +--------------------------------------------------------------------- + | +(1 row) + +INSERT INTO target_set VALUES(1, 0); +INSERT INTO source_set VALUES(1, 1); +INSERT INTO source_set VALUES(2, 2); +MERGE INTO target_set +USING (SELECT * FROM source_set UNION SELECT * FROM source_set) AS foo ON target_set.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.s1); +SELECT * FROM target_set ORDER BY 1, 2; + t1 | t2 +--------------------------------------------------------------------- + 1 | 100 + 2 | +(2 rows) + -- -- Error and Unsupported scenarios -- --- zero shard query as source_json is zero shard -EXPLAIN MERGE INTO target_json sda -USING (SELECT * FROM source_json WHERE false) sdn -ON sda.id = sdn.id AND sda.id = 2 -WHEN NOT matched THEN - INSERT (id, z) VALUES (sdn.id, 5); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +MERGE INTO target_set +USING (SELECT s1,s2 FROM source_set UNION SELECT s2,s1 FROM source_set) AS foo ON target_set.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 1; +ERROR: cannot pushdown the subquery since not all subqueries in the UNION have the partition column in the same position +DETAIL: Each leaf query of the UNION should return the partition column in the same position and all joins must be on the partition column +MERGE INTO target_set +USING (SELECT 2 as s3, source_set.* FROM (SELECT * FROM source_set LIMIT 1) as foo LEFT JOIN source_set USING( s1)) AS foo +ON target_set.t1 = foo.s1 +WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 +WHEN NOT MATCHED THEN INSERT VALUES(s1, s3); +ERROR: cannot push down this subquery +DETAIL: Limit clause is currently unsupported when a subquery references a column from another query -- modifying CTE not supported EXPLAIN WITH cte_1 AS (DELETE FROM target_json) @@ -2470,7 +2515,7 @@ USING source_json sdn ON sda.id = sdn.id WHEN NOT matched THEN INSERT (id, z) VALUES (sdn.id, 5); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- Grouping sets not supported MERGE INTO citus_target t USING (SELECT count(*), id FROM citus_source GROUP BY GROUPING SETS (id, val)) subq @@ -2663,7 +2708,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- Joining on partition columns with CTE WITH s1_res AS ( SELECT * FROM s1 @@ -2676,7 +2721,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- Constant Join condition WITH s1_res AS ( SELECT * FROM s1 @@ -2689,7 +2734,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- With a single WHEN clause, which causes a non-left join WITH s1_res AS ( SELECT * FROM s1 @@ -2698,7 +2743,7 @@ WITH s1_res AS ( WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- -- Reference tables -- @@ -2928,7 +2973,7 @@ WHEN MATCHED THEN UPDATE SET val = dist_colocated.val WHEN NOT MATCHED THEN INSERT VALUES(dist_colocated.id, dist_colocated.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- Both the source and target must be distributed MERGE INTO dist_target USING (SELECT 100 id) AS source @@ -3128,7 +3173,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 80 other objects +NOTICE: drop cascades to 82 other objects DETAIL: drop cascades to function insert_data() drop cascades to table pg_result drop cascades to table local_local @@ -3200,9 +3245,11 @@ drop cascades to table target_json drop cascades to function immutable_hash(integer) drop cascades to table source_serial drop cascades to table target_serial +drop cascades to table target_set +drop cascades to table source_set drop cascades to table pg -drop cascades to table t1_4000126 -drop cascades to table s1_4000127 +drop cascades to table t1_4000134 +drop cascades to table s1_4000135 drop cascades to table t1 drop cascades to table s1 drop cascades to table dist_colocated diff --git a/src/test/regress/expected/merge_arbitrary.out b/src/test/regress/expected/merge_arbitrary.out index 25742a0f64f..954b3f3c42e 100644 --- a/src/test/regress/expected/merge_arbitrary.out +++ b/src/test/regress/expected/merge_arbitrary.out @@ -51,10 +51,10 @@ SELECT * FROM target_cj ORDER BY 1; ROLLBACK; -- Test PREPARE -PREPARE insert(int) AS +PREPARE insert(int, int, int) AS MERGE INTO prept -USING preps -ON prept.t1 = preps.s1 +USING (SELECT $2, s1, s2 FROM preps WHERE s2 > $3) as foo +ON prept.t1 = foo.s1 WHEN MATCHED THEN UPDATE SET t2 = t2 + $1 WHEN NOT MATCHED THEN @@ -70,13 +70,13 @@ WHEN MATCHED THEN INSERT INTO prept VALUES(100, 0); INSERT INTO preps VALUES(100, 0); INSERT INTO preps VALUES(200, 0); -EXECUTE insert(1); EXECUTE delete(0); -EXECUTE insert(1); EXECUTE delete(0); -EXECUTE insert(1); EXECUTE delete(0); -EXECUTE insert(1); EXECUTE delete(0); -EXECUTE insert(1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); -- sixth time -EXECUTE insert(1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); -- Should have the counter as 12 (6 * 2) SELECT * FROM prept; t1 | t2 diff --git a/src/test/regress/expected/merge_arbitrary_create.out b/src/test/regress/expected/merge_arbitrary_create.out index a423bccb0ed..9b2444f1758 100644 --- a/src/test/regress/expected/merge_arbitrary_create.out +++ b/src/test/regress/expected/merge_arbitrary_create.out @@ -39,10 +39,10 @@ SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps' | (1 row) -PREPARE insert(int) AS +PREPARE insert(int, int, int) AS MERGE INTO prept -USING preps -ON prept.t1 = preps.s1 +USING (SELECT $2, s1, s2 FROM preps WHERE s2 > $3) as foo +ON prept.t1 = foo.s1 WHEN MATCHED THEN UPDATE SET t2 = t2 + $1 WHEN NOT MATCHED THEN diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index d92686b932a..7fc102dbb12 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -419,29 +419,36 @@ SELECT create_distributed_table('tbl2', 'x'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- also, not inside subqueries & ctes WITH targq AS ( SELECT * FROM tbl2 ) MERGE INTO tbl1 USING targq ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column --- crashes on beta3, fixed on 15 stable ---WITH foo AS ( --- MERGE INTO tbl1 USING tbl2 ON (true) --- WHEN MATCHED THEN DELETE ---) SELECT * FROM foo; ---COPY ( --- MERGE INTO tbl1 USING tbl2 ON (true) --- WHEN MATCHED THEN DELETE ---) TO stdout; +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +WITH foo AS ( + MERGE INTO tbl1 USING tbl2 ON (true) + WHEN MATCHED THEN DELETE +) SELECT * FROM foo; +ERROR: MERGE not supported in WITH query +COPY ( + MERGE INTO tbl1 USING tbl2 ON (true) + WHEN MATCHED THEN DELETE +) TO stdout; +ERROR: MERGE not supported in COPY +MERGE INTO tbl1 t +USING tbl2 +ON (true) +WHEN MATCHED THEN + DO NOTHING; +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns MERGE INTO tbl1 t USING tbl2 ON (true) WHEN MATCHED THEN UPDATE SET x = (SELECT count(*) FROM tbl2); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: updating the distribution column is not allowed in MERGE actions -- test numeric types with negative scale CREATE TABLE numeric_negative_scale(numeric_column numeric(3,-1), orig_value int); INSERT into numeric_negative_scale SELECT x,x FROM generate_series(111, 115) x; diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index caf0429fde8..d11a6c13dbf 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1473,7 +1473,8 @@ WHEN matched THEN CREATE TABLE source_serial (id integer, z int, d bigserial); CREATE TABLE target_serial (id integer, z int, d bigserial); INSERT INTO source_serial SELECT i,i FROM generate_series(0,100)i; -SELECT create_distributed_table('source_serial', 'id'), create_distributed_table('target_serial', 'id'); +SELECT create_distributed_table('source_serial', 'id'), + create_distributed_table('target_serial', 'id'); MERGE INTO target_serial sda USING source_serial sdn @@ -1484,16 +1485,43 @@ WHEN NOT matched THEN SELECT count(*) from source_serial; SELECT count(*) from target_serial; +SELECT count(distinct d) from source_serial; +SELECT count(distinct d) from target_serial; + +-- Test set operations +CREATE TABLE target_set(t1 int, t2 int); +CREATE TABLE source_set(s1 int, s2 int); + +SELECT create_distributed_table('target_set', 't1'), + create_distributed_table('source_set', 's1'); + +INSERT INTO target_set VALUES(1, 0); +INSERT INTO source_set VALUES(1, 1); +INSERT INTO source_set VALUES(2, 2); + +MERGE INTO target_set +USING (SELECT * FROM source_set UNION SELECT * FROM source_set) AS foo ON target_set.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.s1); +SELECT * FROM target_set ORDER BY 1, 2; + -- -- Error and Unsupported scenarios -- --- zero shard query as source_json is zero shard -EXPLAIN MERGE INTO target_json sda -USING (SELECT * FROM source_json WHERE false) sdn -ON sda.id = sdn.id AND sda.id = 2 -WHEN NOT matched THEN - INSERT (id, z) VALUES (sdn.id, 5); +MERGE INTO target_set +USING (SELECT s1,s2 FROM source_set UNION SELECT s2,s1 FROM source_set) AS foo ON target_set.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 1; + +MERGE INTO target_set +USING (SELECT 2 as s3, source_set.* FROM (SELECT * FROM source_set LIMIT 1) as foo LEFT JOIN source_set USING( s1)) AS foo +ON target_set.t1 = foo.s1 +WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 +WHEN NOT MATCHED THEN INSERT VALUES(s1, s3); + -- modifying CTE not supported EXPLAIN diff --git a/src/test/regress/sql/merge_arbitrary.sql b/src/test/regress/sql/merge_arbitrary.sql index e78964acf08..dcbe523bac1 100644 --- a/src/test/regress/sql/merge_arbitrary.sql +++ b/src/test/regress/sql/merge_arbitrary.sql @@ -40,10 +40,10 @@ SELECT * FROM target_cj ORDER BY 1; ROLLBACK; -- Test PREPARE -PREPARE insert(int) AS +PREPARE insert(int, int, int) AS MERGE INTO prept -USING preps -ON prept.t1 = preps.s1 +USING (SELECT $2, s1, s2 FROM preps WHERE s2 > $3) as foo +ON prept.t1 = foo.s1 WHEN MATCHED THEN UPDATE SET t2 = t2 + $1 WHEN NOT MATCHED THEN @@ -63,14 +63,14 @@ INSERT INTO prept VALUES(100, 0); INSERT INTO preps VALUES(100, 0); INSERT INTO preps VALUES(200, 0); -EXECUTE insert(1); EXECUTE delete(0); -EXECUTE insert(1); EXECUTE delete(0); -EXECUTE insert(1); EXECUTE delete(0); -EXECUTE insert(1); EXECUTE delete(0); -EXECUTE insert(1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); -- sixth time -EXECUTE insert(1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); -- Should have the counter as 12 (6 * 2) SELECT * FROM prept; diff --git a/src/test/regress/sql/merge_arbitrary_create.sql b/src/test/regress/sql/merge_arbitrary_create.sql index f93fe9dfd14..edf9b0d9d79 100644 --- a/src/test/regress/sql/merge_arbitrary_create.sql +++ b/src/test/regress/sql/merge_arbitrary_create.sql @@ -24,10 +24,10 @@ CREATE TABLE preps(s1 int, s2 int); SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1'); -PREPARE insert(int) AS +PREPARE insert(int, int, int) AS MERGE INTO prept -USING preps -ON prept.t1 = preps.s1 +USING (SELECT $2, s1, s2 FROM preps WHERE s2 > $3) as foo +ON prept.t1 = foo.s1 WHEN MATCHED THEN UPDATE SET t2 = t2 + $1 WHEN NOT MATCHED THEN diff --git a/src/test/regress/sql/pg15.sql b/src/test/regress/sql/pg15.sql index 121b41f8684..ac8062c65e8 100644 --- a/src/test/regress/sql/pg15.sql +++ b/src/test/regress/sql/pg15.sql @@ -269,16 +269,21 @@ WITH targq AS ( MERGE INTO tbl1 USING targq ON (true) WHEN MATCHED THEN DELETE; --- crashes on beta3, fixed on 15 stable ---WITH foo AS ( --- MERGE INTO tbl1 USING tbl2 ON (true) --- WHEN MATCHED THEN DELETE ---) SELECT * FROM foo; - ---COPY ( --- MERGE INTO tbl1 USING tbl2 ON (true) --- WHEN MATCHED THEN DELETE ---) TO stdout; +WITH foo AS ( + MERGE INTO tbl1 USING tbl2 ON (true) + WHEN MATCHED THEN DELETE +) SELECT * FROM foo; + +COPY ( + MERGE INTO tbl1 USING tbl2 ON (true) + WHEN MATCHED THEN DELETE +) TO stdout; + +MERGE INTO tbl1 t +USING tbl2 +ON (true) +WHEN MATCHED THEN + DO NOTHING; MERGE INTO tbl1 t USING tbl2