Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Mar 27, 2023
1 parent da7db53 commit 24603b3
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 41 deletions.
4 changes: 4 additions & 0 deletions src/test/regress/citus_tests/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ def extra_tests(self):
"multi_mx_function_table_reference",
],
),
"multi_mx_modifying_xacts": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_router_planner": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]),
"multi_simple_queries": TestDeps("base_schedule"),
}

if not (test_file_name or test_file_path):
Expand Down
8 changes: 8 additions & 0 deletions src/test/regress/expected/multi_data_types.out
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
-- create, distribute, INSERT, SELECT and UPDATE
-- ===================================================================
SET citus.next_shard_id TO 530000;
-- Given that other test files depend on the existence of types created in this file,
-- we cannot drop them at the end. Instead, we drop them at the beginning of the test
-- to make this file runnable multiple times via run_test.py.
BEGIN;
SET LOCAL client_min_messages TO WARNING;
DROP TYPE IF EXISTS test_composite_type, other_composite_type, bug_status CASCADE;
DROP OPERATOR FAMILY IF EXISTS cats_op_fam USING hash;
COMMIT;
-- create a custom type...
CREATE TYPE test_composite_type AS (
i integer,
Expand Down
73 changes: 54 additions & 19 deletions src/test/regress/expected/multi_modifying_xacts.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
SET citus.next_shard_id TO 1200000;
SET citus.next_placement_id TO 1200000;
CREATE SCHEMA multi_modifying_xacts;
SET search_path TO multi_modifying_xacts;
-- ===================================================================
-- test end-to-end modification functionality
-- ===================================================================
Expand Down Expand Up @@ -190,7 +192,7 @@ ALTER TABLE labs ADD COLUMN motto text;
INSERT INTO labs VALUES (6, 'Bell Labs');
ABORT;
-- but the DDL should correctly roll back
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
SELECT "Column", "Type", "Modifiers" FROM public.table_desc WHERE relid='multi_modifying_xacts.labs'::regclass;
Column | Type | Modifiers
---------------------------------------------------------------------
id | bigint | not null
Expand Down Expand Up @@ -339,7 +341,7 @@ CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$
END;
$rli$ LANGUAGE plpgsql;
-- register after insert trigger
SELECT * FROM run_command_on_placements('researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_large_id()')
SELECT * FROM run_command_on_placements('multi_modifying_xacts.researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE multi_modifying_xacts.reject_large_id()')
ORDER BY nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
Expand Down Expand Up @@ -498,6 +500,7 @@ AND s.logicalrelid = 'objects'::regclass;

-- create trigger on one worker to reject certain values
\c - - - :worker_2_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
BEGIN
Expand All @@ -514,6 +517,7 @@ AFTER INSERT ON objects_1200003
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- test partial failure; worker_1 succeeds, 2 fails
-- in this case, we expect the transaction to abort
\set VERBOSITY terse
Expand Down Expand Up @@ -551,6 +555,7 @@ DELETE FROM objects;
-- there cannot be errors on different shards at different times
-- because the first failure will fail the whole transaction
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
BEGIN
Expand All @@ -567,6 +572,7 @@ AFTER INSERT ON labs_1200002
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
Expand Down Expand Up @@ -602,12 +608,14 @@ AND (s.logicalrelid = 'objects'::regclass OR

-- what if the failures happen at COMMIT time?
\c - - - :worker_2_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad ON objects_1200003;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON objects_1200003
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- should be the same story as before, just at COMMIT time
-- as we use 2PC, the transaction is rollbacked
BEGIN;
Expand Down Expand Up @@ -644,12 +652,14 @@ WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'objects'::regclass;
-- what if all nodes have failures at COMMIT time?
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad ON labs_1200002;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON labs_1200002
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- reduce the log level for differences between PG14 and PG15
-- in PGconn->errorMessage
-- relevant PG commit b15f254466aefbabcbed001929f6e09db59fd158
Expand Down Expand Up @@ -688,8 +698,10 @@ AND (s.logicalrelid = 'objects'::regclass OR

-- what if one shard (objects) succeeds but another (labs) completely fails?
\c - - - :worker_2_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad ON objects_1200003;
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
SET citus.next_shard_id TO 1200004;
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
Expand Down Expand Up @@ -833,6 +845,7 @@ SELECT * FROM reference_modifying_xacts;

-- lets fail on of the workers at before the commit time
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad_reference() RETURNS trigger AS $rb$
BEGIN
Expand All @@ -849,6 +862,7 @@ AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
Expand All @@ -860,12 +874,14 @@ ERROR: illegal value
COMMIT;
-- lets fail one of the workers at COMMIT time
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
Expand All @@ -890,8 +906,10 @@ ORDER BY s.logicalrelid, sp.shardstate;

-- for the time-being drop the constraint
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- now create a hash distributed table and run tests
-- including both the reference table and the hash
-- distributed table
Expand Down Expand Up @@ -923,6 +941,7 @@ INSERT INTO hash_modifying_xacts VALUES (2, 2);
ABORT;
-- lets fail one of the workers before COMMIT time for the hash table
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad_hash() RETURNS trigger AS $rb$
BEGIN
Expand All @@ -939,6 +958,7 @@ AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
Expand All @@ -955,13 +975,15 @@ SELECT * FROM reference_modifying_xacts WHERE key = 55;
-- now lets fail on of the workers for the hash distributed table table
-- when there is a reference table involved
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad_hash ON hash_modifying_xacts_1200007;
-- the trigger is on execution time
CREATE CONSTRAINT TRIGGER reject_bad_hash
AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
Expand Down Expand Up @@ -994,11 +1016,13 @@ ORDER BY s.logicalrelid, sp.shardstate;
-- and ensure that hash distributed table's
-- change is rollbacked as well
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
BEGIN;
-- to expand participant to include all worker nodes
Expand Down Expand Up @@ -1127,8 +1151,10 @@ SELECT count(*) FROM pg_dist_transaction;
-- in which we'll make the remote host unavailable
-- first create the new user on all nodes
CREATE USER test_user;
GRANT ALL ON SCHEMA multi_modifying_xacts TO test_user;
-- now connect back to the master with the new user
\c - test_user - :master_port
SET search_path TO multi_modifying_xacts;
SET citus.next_shard_id TO 1200015;
CREATE TABLE reference_failure_test (key int, value int);
SELECT create_reference_table('reference_failure_test');
Expand All @@ -1148,21 +1174,24 @@ SELECT create_distributed_table('numbers_hash_failure_test', 'key');

-- ensure that the shard is created for this user
\c - test_user - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.override_table_visibility TO false;
\dt reference_failure_test_1200015
List of relations
Schema | Name | Type | Owner
List of relations
Schema | Name | Type | Owner
---------------------------------------------------------------------
public | reference_failure_test_1200015 | table | test_user
multi_modifying_xacts | reference_failure_test_1200015 | table | test_user
(1 row)

-- now connect with the default user,
-- and rename the existing user
\c - :default_user - :worker_1_port
SET search_path TO multi_modifying_xacts;
ALTER USER test_user RENAME TO test_user_new;
NOTICE: not propagating ALTER ROLE ... RENAME TO commands to worker nodes
-- connect back to master and query the reference table
\c - test_user - :master_port
SET search_path TO multi_modifying_xacts;
-- should fail since the worker doesn't have test_user anymore
INSERT INTO reference_failure_test VALUES (1, '1');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
Expand Down Expand Up @@ -1277,14 +1306,17 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin

-- break the other node as well
\c - :default_user - :worker_2_port
SET search_path TO multi_modifying_xacts;
ALTER USER test_user RENAME TO test_user_new;
NOTICE: not propagating ALTER ROLE ... RENAME TO commands to worker nodes
\c - test_user - :master_port
SET search_path TO multi_modifying_xacts;
-- fails on all shard placements
INSERT INTO numbers_hash_failure_test VALUES (2,2);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
-- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port
SET search_path TO multi_modifying_xacts;
SET citus.next_shard_id TO 1200020;
SET citus.next_placement_id TO 1200033;
-- unbreak both nodes by renaming the user back to the original name
Expand All @@ -1297,6 +1329,7 @@ SELECT * FROM run_command_on_workers('ALTER USER test_user_new RENAME TO test_us

DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second,
reference_failure_test, numbers_hash_failure_test;
REVOKE ALL ON SCHEMA multi_modifying_xacts FROM test_user;
DROP USER test_user;
-- set up foreign keys to test transactions with co-located and reference tables
BEGIN;
Expand All @@ -1322,7 +1355,9 @@ SELECT create_reference_table('itemgroups');

(1 row)

SET client_min_messages TO WARNING;
DROP TABLE IF EXISTS users ;
RESET client_min_messages;
CREATE TABLE users (
id int PRIMARY KEY,
name text,
Expand Down Expand Up @@ -1354,18 +1389,18 @@ JOIN
USING (shardid)
ORDER BY
id;
id | shard_name | nodename | nodeport
---------------------------------------------------------------------
1 | users_1200022 | localhost | 57637
2 | users_1200025 | localhost | 57638
3 | users_1200023 | localhost | 57638
4 | users_1200023 | localhost | 57638
5 | users_1200022 | localhost | 57637
6 | users_1200024 | localhost | 57637
7 | users_1200023 | localhost | 57638
8 | users_1200022 | localhost | 57637
9 | users_1200025 | localhost | 57638
10 | users_1200022 | localhost | 57637
id | shard_name | nodename | nodeport
---------------------------------------------------------------------
1 | multi_modifying_xacts.users_1200022 | localhost | 57637
2 | multi_modifying_xacts.users_1200025 | localhost | 57638
3 | multi_modifying_xacts.users_1200023 | localhost | 57638
4 | multi_modifying_xacts.users_1200023 | localhost | 57638
5 | multi_modifying_xacts.users_1200022 | localhost | 57637
6 | multi_modifying_xacts.users_1200024 | localhost | 57637
7 | multi_modifying_xacts.users_1200023 | localhost | 57638
8 | multi_modifying_xacts.users_1200022 | localhost | 57637
9 | multi_modifying_xacts.users_1200025 | localhost | 57638
10 | multi_modifying_xacts.users_1200022 | localhost | 57637
(10 rows)

END;
Expand Down Expand Up @@ -1546,5 +1581,5 @@ SELECT name FROM labs WHERE id = 1001;
(1 row)

RESET citus.function_opens_transaction_block;
DROP FUNCTION insert_abort();
DROP TABLE items, users, itemgroups, usergroups, researchers, labs;
SET client_min_messages TO WARNING;
DROP SCHEMA multi_modifying_xacts CASCADE;
2 changes: 2 additions & 0 deletions src/test/regress/expected/multi_mx_copy_data.out
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
--
-- MULTI_MX_COPY_DATA
--
TRUNCATE lineitem_mx, orders_mx;
\set nation_data_file :abs_srcdir '/data/nation.data'
\set client_side_copy_command '\\copy nation_hash FROM ' :'nation_data_file' ' with delimiter '''|''';'
:client_side_copy_command
Expand Down Expand Up @@ -161,3 +162,4 @@ SET search_path TO public;
:client_side_copy_command
\set client_side_copy_command '\\copy supplier_mx FROM ' :'supplier_data_file' ' with delimiter '''|''';'
:client_side_copy_command
DROP TABLE citus_mx_test_schema.nation_hash_replicated;
3 changes: 3 additions & 0 deletions src/test/regress/expected/multi_mx_modifying_xacts.out
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,6 @@ SELECT * FROM labs_mx WHERE id = 8;
---------------------------------------------------------------------
(0 rows)

TRUNCATE objects_mx, labs_mx, researchers_mx;
DROP TRIGGER reject_bad_mx ON labs_mx_1220102;
DROP FUNCTION reject_bad_mx;
4 changes: 4 additions & 0 deletions src/test/regress/expected/multi_mx_router_planner.out
Original file line number Diff line number Diff line change
Expand Up @@ -1460,3 +1460,7 @@ DEBUG: query has a single distribution column value: 1
51
(6 rows)

SET client_min_messages to WARNING;
TRUNCATE articles_hash_mx, company_employees_mx, articles_single_shard_hash_mx;
DROP MATERIALIZED VIEW mv_articles_hash_mx_error;
DROP TABLE authors_hash_mx;
4 changes: 2 additions & 2 deletions src/test/regress/expected/multi_router_planner.out
Original file line number Diff line number Diff line change
Expand Up @@ -1981,7 +1981,7 @@ SELECT master_create_empty_shard('articles_append') AS shard_id \gset
UPDATE pg_dist_shard SET shardmaxvalue = 100, shardminvalue=1 WHERE shardid = :shard_id;
-- we execute the query within a function to consolidate the error messages
-- between different executors
CREATE FUNCTION raise_failed_execution_router(query text) RETURNS void AS $$
CREATE OR REPLACE FUNCTION raise_failed_execution_router(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
Expand Down Expand Up @@ -2452,7 +2452,7 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'failure_test'::regclass
)
ORDER BY placementid;
ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------------------------------------------------------------------
840017 | 1 | localhost | 57637
Expand Down
Loading

0 comments on commit 24603b3

Please sign in to comment.