From b409abbae083bd36db8dedd4eb87dc0b0254e076 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Thu, 2 Mar 2023 09:15:24 -0800 Subject: [PATCH] Add big serial test --- .../distributed/planner/merge_planner.c | 11 ++-- .../planner/multi_router_planner.c | 12 ++--- src/include/distributed/merge_planner.h | 3 +- src/test/regress/expected/merge.out | 51 ++++++++++++++----- src/test/regress/expected/merge_arbitrary.out | 46 +++++++++++++---- .../expected/merge_arbitrary_create.out | 46 +++++++++++++++++ .../expected/merge_arbitrary_create_0.out | 6 +++ src/test/regress/sql/merge.sql | 9 ++-- src/test/regress/sql/merge_arbitrary.sql | 47 ++++++++++++----- .../regress/sql/merge_arbitrary_create.sql | 38 ++++++++++++++ 10 files changed, 220 insertions(+), 49 deletions(-) create mode 100644 src/test/regress/expected/merge_arbitrary_create_0.out diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index a5c3aff1575..a04d982f652 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -138,8 +138,9 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery, if (multiShardQuery) { - deferredError = DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, - plannerRestrictionContext); + deferredError = + DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, + plannerRestrictionContext); if (deferredError) { return deferredError; @@ -149,8 +150,8 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery, if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "a join with USING causes an internal naming " - "conflict, use ON instead", NULL, NULL); + "a join with USING causes an internal naming " + "conflict, use ON instead", NULL, NULL); } return NULL; @@ -627,7 +628,7 @@ MergeActionListSupported(Oid resultRelationId, FromExpr *joinTree, Node *quals, if (commandType != CMD_INSERT && MasterIrreducibleExpression((Node *) targetEntry->expr, - &hasVarArgument, &hasBadCoalesce)) + &hasVarArgument, &hasBadCoalesce)) { Assert(hasVarArgument || hasBadCoalesce); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 70cf7c52308..36df763203e 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -127,9 +127,9 @@ static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, Oid *distributedTableId); static DeferredErrorMessage * MultiShardUpdateDeleteSupported(Query *originalQuery, - PlannerRestrictionContext - * - plannerRestrictionContext); + PlannerRestrictionContext + * + plannerRestrictionContext); static DeferredErrorMessage * SingleShardUpdateDeleteSupported(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -1279,7 +1279,7 @@ ErrorIfOnConflictNotSupported(Query *queryTree) */ static DeferredErrorMessage * MultiShardUpdateDeleteSupported(Query *originalQuery, - PlannerRestrictionContext *plannerRestrictionContext) + PlannerRestrictionContext *plannerRestrictionContext) { DeferredErrorMessage *errorMessage = NULL; RangeTblEntry *resultRangeTable = ExtractResultRelationRTE(originalQuery); @@ -1874,8 +1874,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon if (*planningError) { /* - * For MERGE, we do _not_ plan anything other than Router job, let's - * not continue further down the lane in distributed planning, simply + * For MERGE, we do _not_ plan any other router job than the MERGE job itself, + * let's not continue further down the lane in distributed planning, simply * bail out. */ if (IsMergeQuery(originalQuery)) diff --git a/src/include/distributed/merge_planner.h b/src/include/distributed/merge_planner.h index 7238762af8a..243be14d014 100644 --- a/src/include/distributed/merge_planner.h +++ b/src/include/distributed/merge_planner.h @@ -19,7 +19,8 @@ #include "distributed/errormessage.h" extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte); -extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, bool multiShardQuery, +extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, + bool multiShardQuery, PlannerRestrictionContext * plannerRestrictionContext); #endif /* MERGE_PLANNER_H */ diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 35be03b1cb1..6f991f92c03 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -311,7 +311,6 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (pg_res.id, pg_res.val); --- Two rows with id 2 and val incremented, id 3, and id 1 is deleted SELECT * FROM t1 order by id; id | val --------------------------------------------------------------------- @@ -1235,11 +1234,7 @@ WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(fn_source.id, fn_source.source); -DEBUG: function does not have co-located tables -DEBUG: generating subplan XXX_1 for subquery SELECT id, source FROM merge_schema.f_dist() f(id integer, source character varying) -DEBUG: -DEBUG: Plan XXX query after replacing subqueries and CTEs: MERGE INTO merge_schema.fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source) -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; -- Should be equal @@ -2426,6 +2421,37 @@ USING source_json sdn ON sda.id = sdn.id WHEN matched THEN UPDATE SET z = immutable_hash(sdn.z); +-- Test bigserial +CREATE TABLE source_serial (id integer, z int, d bigserial); +CREATE TABLE target_serial (id integer, z int, d bigserial); +INSERT INTO source_serial SELECT i,i FROM generate_series(0,100)i; +SELECT create_distributed_table('source_serial', 'id'), create_distributed_table('target_serial', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_serial$$) + create_distributed_table | create_distributed_table +--------------------------------------------------------------------- + | +(1 row) + +MERGE INTO target_serial sda +USING source_serial sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (id, z); +SELECT count(*) from source_serial; + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(*) from target_serial; + count +--------------------------------------------------------------------- + 101 +(1 row) + -- -- Error and Unsupported scenarios -- @@ -2609,9 +2635,8 @@ $$; -- relation which will have unexpected/suprising results. MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON t1.id = s1.id AND s1.id = 2 - WHEN NOT matched THEN - INSERT (id, val) - VALUES (s1.id , random()); + WHEN matched THEN + UPDATE SET id = s1.id, val = random(); ERROR: functions used in MERGE actions on distributed tables must not be VOLATILE -- Test preventing "ON" join condition from writing to the database BEGIN; @@ -3103,7 +3128,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 78 other objects +NOTICE: drop cascades to 80 other objects DETAIL: drop cascades to function insert_data() drop cascades to table pg_result drop cascades to table local_local @@ -3173,9 +3198,11 @@ drop cascades to function pa_compare_tables() drop cascades to table source_json drop cascades to table target_json drop cascades to function immutable_hash(integer) +drop cascades to table source_serial +drop cascades to table target_serial drop cascades to table pg -drop cascades to table t1_4000118 -drop cascades to table s1_4000119 +drop cascades to table t1_4000126 +drop cascades to table s1_4000127 drop cascades to table t1 drop cascades to table s1 drop cascades to table dist_colocated diff --git a/src/test/regress/expected/merge_arbitrary.out b/src/test/regress/expected/merge_arbitrary.out index e64b35f9cf6..25742a0f64f 100644 --- a/src/test/regress/expected/merge_arbitrary.out +++ b/src/test/regress/expected/merge_arbitrary.out @@ -51,27 +51,25 @@ SELECT * FROM target_cj ORDER BY 1; ROLLBACK; -- Test PREPARE -CREATE TABLE prept(t1 int, t2 int); -CREATE TABLE preps(s1 int, s2 int); -INSERT INTO prept VALUES(100, 0); -INSERT INTO preps VALUES(100, 0); -INSERT INTO preps VALUES(200, 0); PREPARE insert(int) AS MERGE INTO prept USING preps ON prept.t1 = preps.s1 WHEN MATCHED THEN - UPDATE SET t2 = t2 + $1 + UPDATE SET t2 = t2 + $1 WHEN NOT MATCHED THEN - INSERT VALUES(s1, s2); + INSERT VALUES(s1, s2); PREPARE delete(int) AS MERGE INTO prept USING preps ON prept.t1 = preps.s1 WHEN MATCHED AND prept.t2 = $1 THEN - DELETE + DELETE WHEN MATCHED THEN - UPDATE SET t2 = t2 + 1; + UPDATE SET t2 = t2 + 1; +INSERT INTO prept VALUES(100, 0); +INSERT INTO preps VALUES(100, 0); +INSERT INTO preps VALUES(200, 0); EXECUTE insert(1); EXECUTE delete(0); EXECUTE insert(1); EXECUTE delete(0); EXECUTE insert(1); EXECUTE delete(0); @@ -86,3 +84,33 @@ SELECT * FROM prept; 100 | 12 (1 row) +-- Test local tables +INSERT INTO s1 VALUES(1, 0); -- Matches DELETE clause +INSERT INTO s1 VALUES(2, 1); -- Matches UPDATE clause +INSERT INTO s1 VALUES(3, 1); -- No Match INSERT clause +INSERT INTO s1 VALUES(4, 1); -- No Match INSERT clause +INSERT INTO s1 VALUES(6, 1); -- No Match INSERT clause +INSERT INTO t1 VALUES(1, 0); -- Will be deleted +INSERT INTO t1 VALUES(2, 0); -- Will be updated +INSERT INTO t1 VALUES(5, 0); -- Will be intact +WITH s1_res AS ( + SELECT * FROM s1 +) +MERGE INTO t1 + USING s1_res ON (s1_res.id = t1.id) + WHEN MATCHED AND s1_res.val = 0 THEN + DELETE + WHEN MATCHED THEN + UPDATE SET val = t1.val + 1 + WHEN NOT MATCHED THEN + INSERT (id, val) VALUES (s1_res.id, s1_res.val); +SELECT * FROM t1 order by id; + id | val +--------------------------------------------------------------------- + 2 | 1 + 3 | 1 + 4 | 1 + 5 | 0 + 6 | 1 +(5 rows) + diff --git a/src/test/regress/expected/merge_arbitrary_create.out b/src/test/regress/expected/merge_arbitrary_create.out index 37dcc1d2637..a423bccb0ed 100644 --- a/src/test/regress/expected/merge_arbitrary_create.out +++ b/src/test/regress/expected/merge_arbitrary_create.out @@ -1,3 +1,10 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE; CREATE SCHEMA merge_arbitrary_schema; SET search_path TO merge_arbitrary_schema; @@ -24,3 +31,42 @@ SELECT create_distributed_table('source_cj2', 'sid2'); (1 row) +CREATE TABLE prept(t1 int, t2 int); +CREATE TABLE preps(s1 int, s2 int); +SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1'); + create_distributed_table | create_distributed_table +--------------------------------------------------------------------- + | +(1 row) + +PREPARE insert(int) AS +MERGE INTO prept +USING preps +ON prept.t1 = preps.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + $1 +WHEN NOT MATCHED THEN + INSERT VALUES(s1, s2); +PREPARE delete(int) AS +MERGE INTO prept +USING preps +ON prept.t1 = preps.s1 +WHEN MATCHED AND prept.t2 = $1 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 1; +-- Citus local tables +CREATE TABLE t1(id int, val int); +CREATE TABLE s1(id int, val int); +SELECT citus_add_local_table_to_metadata('t1'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('s1'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/merge_arbitrary_create_0.out b/src/test/regress/expected/merge_arbitrary_create_0.out new file mode 100644 index 00000000000..a7e3fbf2062 --- /dev/null +++ b/src/test/regress/expected/merge_arbitrary_create_0.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 6785d2a0e96..caf0429fde8 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -224,7 +224,6 @@ MERGE INTO t1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (pg_res.id, pg_res.val); --- Two rows with id 2 and val incremented, id 3, and id 1 is deleted SELECT * FROM t1 order by id; SELECT * INTO merge_result FROM t1 order by id; @@ -1482,6 +1481,9 @@ ON sda.id = sdn.id WHEN NOT matched THEN INSERT (id, z) VALUES (id, z); +SELECT count(*) from source_serial; +SELECT count(*) from target_serial; + -- -- Error and Unsupported scenarios -- @@ -1615,9 +1617,8 @@ $$; -- relation which will have unexpected/suprising results. MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON t1.id = s1.id AND s1.id = 2 - WHEN NOT matched THEN - INSERT (id, val) - VALUES (s1.id , random()); + WHEN matched THEN + UPDATE SET id = s1.id, val = random(); -- Test preventing "ON" join condition from writing to the database BEGIN; diff --git a/src/test/regress/sql/merge_arbitrary.sql b/src/test/regress/sql/merge_arbitrary.sql index 0b84070a1ad..e78964acf08 100644 --- a/src/test/regress/sql/merge_arbitrary.sql +++ b/src/test/regress/sql/merge_arbitrary.sql @@ -40,31 +40,28 @@ SELECT * FROM target_cj ORDER BY 1; ROLLBACK; -- Test PREPARE -CREATE TABLE prept(t1 int, t2 int); -CREATE TABLE preps(s1 int, s2 int); - -INSERT INTO prept VALUES(100, 0); - -INSERT INTO preps VALUES(100, 0); -INSERT INTO preps VALUES(200, 0); - PREPARE insert(int) AS MERGE INTO prept USING preps ON prept.t1 = preps.s1 WHEN MATCHED THEN - UPDATE SET t2 = t2 + $1 + UPDATE SET t2 = t2 + $1 WHEN NOT MATCHED THEN - INSERT VALUES(s1, s2); + INSERT VALUES(s1, s2); PREPARE delete(int) AS MERGE INTO prept USING preps ON prept.t1 = preps.s1 WHEN MATCHED AND prept.t2 = $1 THEN - DELETE + DELETE WHEN MATCHED THEN - UPDATE SET t2 = t2 + 1; + UPDATE SET t2 = t2 + 1; + +INSERT INTO prept VALUES(100, 0); + +INSERT INTO preps VALUES(100, 0); +INSERT INTO preps VALUES(200, 0); EXECUTE insert(1); EXECUTE delete(0); EXECUTE insert(1); EXECUTE delete(0); @@ -77,3 +74,29 @@ EXECUTE insert(1); EXECUTE delete(0); -- Should have the counter as 12 (6 * 2) SELECT * FROM prept; + +-- Test local tables +INSERT INTO s1 VALUES(1, 0); -- Matches DELETE clause +INSERT INTO s1 VALUES(2, 1); -- Matches UPDATE clause +INSERT INTO s1 VALUES(3, 1); -- No Match INSERT clause +INSERT INTO s1 VALUES(4, 1); -- No Match INSERT clause +INSERT INTO s1 VALUES(6, 1); -- No Match INSERT clause + +INSERT INTO t1 VALUES(1, 0); -- Will be deleted +INSERT INTO t1 VALUES(2, 0); -- Will be updated +INSERT INTO t1 VALUES(5, 0); -- Will be intact + +WITH s1_res AS ( + SELECT * FROM s1 +) +MERGE INTO t1 + USING s1_res ON (s1_res.id = t1.id) + + WHEN MATCHED AND s1_res.val = 0 THEN + DELETE + WHEN MATCHED THEN + UPDATE SET val = t1.val + 1 + WHEN NOT MATCHED THEN + INSERT (id, val) VALUES (s1_res.id, s1_res.val); + +SELECT * FROM t1 order by id; diff --git a/src/test/regress/sql/merge_arbitrary_create.sql b/src/test/regress/sql/merge_arbitrary_create.sql index bd6d30b9f0a..f93fe9dfd14 100644 --- a/src/test/regress/sql/merge_arbitrary_create.sql +++ b/src/test/regress/sql/merge_arbitrary_create.sql @@ -1,3 +1,11 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif + DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE; CREATE SCHEMA merge_arbitrary_schema; SET search_path TO merge_arbitrary_schema; @@ -10,3 +18,33 @@ CREATE TABLE source_cj2(sid2 int, src2 text, val2 int); SELECT create_distributed_table('target_cj', 'tid'); SELECT create_distributed_table('source_cj1', 'sid1'); SELECT create_distributed_table('source_cj2', 'sid2'); + +CREATE TABLE prept(t1 int, t2 int); +CREATE TABLE preps(s1 int, s2 int); + +SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1'); + +PREPARE insert(int) AS +MERGE INTO prept +USING preps +ON prept.t1 = preps.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + $1 +WHEN NOT MATCHED THEN + INSERT VALUES(s1, s2); + +PREPARE delete(int) AS +MERGE INTO prept +USING preps +ON prept.t1 = preps.s1 +WHEN MATCHED AND prept.t2 = $1 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 1; + +-- Citus local tables +CREATE TABLE t1(id int, val int); +CREATE TABLE s1(id int, val int); + +SELECT citus_add_local_table_to_metadata('t1'); +SELECT citus_add_local_table_to_metadata('s1');