Skip to content

Commit

Permalink
Add big serial test
Browse files Browse the repository at this point in the history
  • Loading branch information
tejeswarm committed Mar 10, 2023
1 parent 3af32e7 commit b409abb
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 49 deletions.
11 changes: 6 additions & 5 deletions src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery,

if (multiShardQuery)
{
deferredError = DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
plannerRestrictionContext);
deferredError =
DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
plannerRestrictionContext);
if (deferredError)
{
return deferredError;
Expand All @@ -149,8 +150,8 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"a join with USING causes an internal naming "
"conflict, use ON instead", NULL, NULL);
"a join with USING causes an internal naming "
"conflict, use ON instead", NULL, NULL);
}

return NULL;
Expand Down Expand Up @@ -627,7 +628,7 @@ MergeActionListSupported(Oid resultRelationId, FromExpr *joinTree, Node *quals,

if (commandType != CMD_INSERT &&
MasterIrreducibleExpression((Node *) targetEntry->expr,
&hasVarArgument, &hasBadCoalesce))
&hasVarArgument, &hasBadCoalesce))
{
Assert(hasVarArgument || hasBadCoalesce);
}
Expand Down
12 changes: 6 additions & 6 deletions src/backend/distributed/planner/multi_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool
multiShardQuery,
Oid *distributedTableId);
static DeferredErrorMessage * MultiShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext
*
plannerRestrictionContext);
PlannerRestrictionContext
*
plannerRestrictionContext);
static DeferredErrorMessage * SingleShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
Expand Down Expand Up @@ -1279,7 +1279,7 @@ ErrorIfOnConflictNotSupported(Query *queryTree)
*/
static DeferredErrorMessage *
MultiShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
PlannerRestrictionContext *plannerRestrictionContext)
{
DeferredErrorMessage *errorMessage = NULL;
RangeTblEntry *resultRangeTable = ExtractResultRelationRTE(originalQuery);
Expand Down Expand Up @@ -1874,8 +1874,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
if (*planningError)
{
/*
* For MERGE, we do _not_ plan anything other than Router job, let's
* not continue further down the lane in distributed planning, simply
* For MERGE, we do _not_ plan any other router job than the MERGE job itself,
* let's not continue further down the lane in distributed planning, simply
* bail out.
*/
if (IsMergeQuery(originalQuery))
Expand Down
3 changes: 2 additions & 1 deletion src/include/distributed/merge_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
#include "distributed/errormessage.h"

extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte);
extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery,
bool multiShardQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
#endif /* MERGE_PLANNER_H */
51 changes: 39 additions & 12 deletions src/test/regress/expected/merge.out
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ MERGE INTO t1
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (pg_res.id, pg_res.val);
-- Two rows with id 2 and val incremented, id 3, and id 1 is deleted
SELECT * FROM t1 order by id;
id | val
---------------------------------------------------------------------
Expand Down Expand Up @@ -1235,11 +1234,7 @@ WHEN MATCHED THEN
DO NOTHING
WHEN NOT MATCHED THEN
INSERT VALUES(fn_source.id, fn_source.source);
DEBUG: function does not have co-located tables
DEBUG: generating subplan XXX_1 for subquery SELECT id, source FROM merge_schema.f_dist() f(id integer, source character varying)
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
DEBUG: Plan XXX query after replacing subqueries and CTEs: MERGE INTO merge_schema.fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT f.id, f.source FROM merge_schema.f_dist() f(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
RESET client_min_messages;
SELECT * INTO fn_local FROM fn_target ORDER BY 1 ;
-- Should be equal
Expand Down Expand Up @@ -2426,6 +2421,37 @@ USING source_json sdn
ON sda.id = sdn.id
WHEN matched THEN
UPDATE SET z = immutable_hash(sdn.z);
-- Test bigserial
CREATE TABLE source_serial (id integer, z int, d bigserial);
CREATE TABLE target_serial (id integer, z int, d bigserial);
INSERT INTO source_serial SELECT i,i FROM generate_series(0,100)i;
SELECT create_distributed_table('source_serial', 'id'), create_distributed_table('target_serial', 'id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_serial$$)
create_distributed_table | create_distributed_table
---------------------------------------------------------------------
|
(1 row)

MERGE INTO target_serial sda
USING source_serial sdn
ON sda.id = sdn.id
WHEN NOT matched THEN
INSERT (id, z) VALUES (id, z);
SELECT count(*) from source_serial;
count
---------------------------------------------------------------------
101
(1 row)

SELECT count(*) from target_serial;
count
---------------------------------------------------------------------
101
(1 row)

--
-- Error and Unsupported scenarios
--
Expand Down Expand Up @@ -2609,9 +2635,8 @@ $$;
-- relation which will have unexpected/suprising results.
MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON
t1.id = s1.id AND s1.id = 2
WHEN NOT matched THEN
INSERT (id, val)
VALUES (s1.id , random());
WHEN matched THEN
UPDATE SET id = s1.id, val = random();
ERROR: functions used in MERGE actions on distributed tables must not be VOLATILE
-- Test preventing "ON" join condition from writing to the database
BEGIN;
Expand Down Expand Up @@ -3103,7 +3128,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
DROP FUNCTION merge_when_and_write();
DROP SCHEMA merge_schema CASCADE;
NOTICE: drop cascades to 78 other objects
NOTICE: drop cascades to 80 other objects
DETAIL: drop cascades to function insert_data()
drop cascades to table pg_result
drop cascades to table local_local
Expand Down Expand Up @@ -3173,9 +3198,11 @@ drop cascades to function pa_compare_tables()
drop cascades to table source_json
drop cascades to table target_json
drop cascades to function immutable_hash(integer)
drop cascades to table source_serial
drop cascades to table target_serial
drop cascades to table pg
drop cascades to table t1_4000118
drop cascades to table s1_4000119
drop cascades to table t1_4000126
drop cascades to table s1_4000127
drop cascades to table t1
drop cascades to table s1
drop cascades to table dist_colocated
Expand Down
46 changes: 37 additions & 9 deletions src/test/regress/expected/merge_arbitrary.out
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,25 @@ SELECT * FROM target_cj ORDER BY 1;

ROLLBACK;
-- Test PREPARE
CREATE TABLE prept(t1 int, t2 int);
CREATE TABLE preps(s1 int, s2 int);
INSERT INTO prept VALUES(100, 0);
INSERT INTO preps VALUES(100, 0);
INSERT INTO preps VALUES(200, 0);
PREPARE insert(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED THEN
UPDATE SET t2 = t2 + $1
UPDATE SET t2 = t2 + $1
WHEN NOT MATCHED THEN
INSERT VALUES(s1, s2);
INSERT VALUES(s1, s2);
PREPARE delete(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED AND prept.t2 = $1 THEN
DELETE
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 1;
UPDATE SET t2 = t2 + 1;
INSERT INTO prept VALUES(100, 0);
INSERT INTO preps VALUES(100, 0);
INSERT INTO preps VALUES(200, 0);
EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0);
Expand All @@ -86,3 +84,33 @@ SELECT * FROM prept;
100 | 12
(1 row)

-- Test local tables
INSERT INTO s1 VALUES(1, 0); -- Matches DELETE clause
INSERT INTO s1 VALUES(2, 1); -- Matches UPDATE clause
INSERT INTO s1 VALUES(3, 1); -- No Match INSERT clause
INSERT INTO s1 VALUES(4, 1); -- No Match INSERT clause
INSERT INTO s1 VALUES(6, 1); -- No Match INSERT clause
INSERT INTO t1 VALUES(1, 0); -- Will be deleted
INSERT INTO t1 VALUES(2, 0); -- Will be updated
INSERT INTO t1 VALUES(5, 0); -- Will be intact
WITH s1_res AS (
SELECT * FROM s1
)
MERGE INTO t1
USING s1_res ON (s1_res.id = t1.id)
WHEN MATCHED AND s1_res.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
SELECT * FROM t1 order by id;
id | val
---------------------------------------------------------------------
2 | 1
3 | 1
4 | 1
5 | 0
6 | 1
(5 rows)

46 changes: 46 additions & 0 deletions src/test/regress/expected/merge_arbitrary_create.out
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE;
CREATE SCHEMA merge_arbitrary_schema;
SET search_path TO merge_arbitrary_schema;
Expand All @@ -24,3 +31,42 @@ SELECT create_distributed_table('source_cj2', 'sid2');

(1 row)

CREATE TABLE prept(t1 int, t2 int);
CREATE TABLE preps(s1 int, s2 int);
SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1');
create_distributed_table | create_distributed_table
---------------------------------------------------------------------
|
(1 row)

PREPARE insert(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED THEN
UPDATE SET t2 = t2 + $1
WHEN NOT MATCHED THEN
INSERT VALUES(s1, s2);
PREPARE delete(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED AND prept.t2 = $1 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 1;
-- Citus local tables
CREATE TABLE t1(id int, val int);
CREATE TABLE s1(id int, val int);
SELECT citus_add_local_table_to_metadata('t1');
citus_add_local_table_to_metadata
---------------------------------------------------------------------

(1 row)

SELECT citus_add_local_table_to_metadata('s1');
citus_add_local_table_to_metadata
---------------------------------------------------------------------

(1 row)

6 changes: 6 additions & 0 deletions src/test/regress/expected/merge_arbitrary_create_0.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
9 changes: 5 additions & 4 deletions src/test/regress/sql/merge.sql
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ MERGE INTO t1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (pg_res.id, pg_res.val);

-- Two rows with id 2 and val incremented, id 3, and id 1 is deleted
SELECT * FROM t1 order by id;
SELECT * INTO merge_result FROM t1 order by id;

Expand Down Expand Up @@ -1482,6 +1481,9 @@ ON sda.id = sdn.id
WHEN NOT matched THEN
INSERT (id, z) VALUES (id, z);

SELECT count(*) from source_serial;
SELECT count(*) from target_serial;

--
-- Error and Unsupported scenarios
--
Expand Down Expand Up @@ -1615,9 +1617,8 @@ $$;
-- relation which will have unexpected/suprising results.
MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON
t1.id = s1.id AND s1.id = 2
WHEN NOT matched THEN
INSERT (id, val)
VALUES (s1.id , random());
WHEN matched THEN
UPDATE SET id = s1.id, val = random();

-- Test preventing "ON" join condition from writing to the database
BEGIN;
Expand Down
47 changes: 35 additions & 12 deletions src/test/regress/sql/merge_arbitrary.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,28 @@ SELECT * FROM target_cj ORDER BY 1;
ROLLBACK;

-- Test PREPARE
CREATE TABLE prept(t1 int, t2 int);
CREATE TABLE preps(s1 int, s2 int);

INSERT INTO prept VALUES(100, 0);

INSERT INTO preps VALUES(100, 0);
INSERT INTO preps VALUES(200, 0);

PREPARE insert(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED THEN
UPDATE SET t2 = t2 + $1
UPDATE SET t2 = t2 + $1
WHEN NOT MATCHED THEN
INSERT VALUES(s1, s2);
INSERT VALUES(s1, s2);

PREPARE delete(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED AND prept.t2 = $1 THEN
DELETE
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 1;
UPDATE SET t2 = t2 + 1;

INSERT INTO prept VALUES(100, 0);

INSERT INTO preps VALUES(100, 0);
INSERT INTO preps VALUES(200, 0);

EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0);
Expand All @@ -77,3 +74,29 @@ EXECUTE insert(1); EXECUTE delete(0);

-- Should have the counter as 12 (6 * 2)
SELECT * FROM prept;

-- Test local tables
INSERT INTO s1 VALUES(1, 0); -- Matches DELETE clause
INSERT INTO s1 VALUES(2, 1); -- Matches UPDATE clause
INSERT INTO s1 VALUES(3, 1); -- No Match INSERT clause
INSERT INTO s1 VALUES(4, 1); -- No Match INSERT clause
INSERT INTO s1 VALUES(6, 1); -- No Match INSERT clause

INSERT INTO t1 VALUES(1, 0); -- Will be deleted
INSERT INTO t1 VALUES(2, 0); -- Will be updated
INSERT INTO t1 VALUES(5, 0); -- Will be intact

WITH s1_res AS (
SELECT * FROM s1
)
MERGE INTO t1
USING s1_res ON (s1_res.id = t1.id)

WHEN MATCHED AND s1_res.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);

SELECT * FROM t1 order by id;
Loading

0 comments on commit b409abb

Please sign in to comment.