diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index c5282202eac..5d30ff8be6c 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -190,6 +190,19 @@ typedef struct WorkerShardStatistics HTAB *statistics; } WorkerShardStatistics; +/* ShardMoveDependencyHashEntry contains the taskId which any new shard move task within the corresponding colocation group must take a dependency on */ +typedef struct ShardMoveDependencyInfo +{ + int64 key; + int64 taskId; +} ShardMoveDependencyInfo; + +typedef struct ShardMoveDependencies +{ + HTAB *colocationDependencies; + HTAB *nodeDependencies; +} ShardMoveDependencies; + char *VariablesToBePassedToNewConnections = NULL; /* static declarations for main logic */ @@ -1898,6 +1911,137 @@ ErrorOnConcurrentRebalance(RebalanceOptions *options) } +/* + * GetColocationId function returns the colocationId of the shard in a PlacementUpdateEvent. + */ +static int64 +GetColocationId(PlacementUpdateEvent *move) +{ + ShardInterval *shardInterval = LoadShardInterval(move->shardId); + + CitusTableCacheEntry *citusTableCacheEntry = GetCitusTableCacheEntry( + shardInterval->relationId); + + return citusTableCacheEntry->colocationId; +} + + +/* + * InitializeShardMoveDependencies function creates the hash maps that we use to track + * the latest moves so that subsequent moves with the same properties must take a dependency + * on them. There are two hash maps. One is for tracking the latest move scheduled in a + * given colocation group and the other one is for tracking the latest move which involves + * a given node either as its source node or its target node. + */ +static ShardMoveDependencies +InitializeShardMoveDependencies() +{ + ShardMoveDependencies shardMoveDependencies; + shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64, + ShardMoveDependencyInfo, + "colocationDependencyHashMap", + 6); + shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int64, + ShardMoveDependencyInfo, + "nodeDependencyHashMap", + 6); + + return shardMoveDependencies; +} + + +/* + * GenerateTaskMoveDependencyList creates and returns a List of taskIds that + * the move must take a dependency on. + */ +static int64 * +GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, + ShardMoveDependencies shardMoveDependencies, int *nDepends) +{ + HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64, + "shardMoveDependencyList", 0); + + bool found; + + /* Check if there exists a move in the same colocation group scheduled earlier. */ + ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( + shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found); + + if (found) + { + hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + } + + /* Check if there exists a move scheduled earlier whose source or target node + * overlaps with the current move's source node. */ + shardMoveDependencyInfo = hash_search( + shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER, + &found); + + if (found) + { + hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + } + + /* Check if there exists a move scheduled earlier whose source or target node + * overlaps with the current move's target node. */ + shardMoveDependencyInfo = hash_search( + shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_ENTER, + &found); + + + if (found) + { + hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + } + + *nDepends = hash_get_num_entries(dependsList); + + int64 *dependsArray = NULL; + + if (*nDepends > 0) + { + HASH_SEQ_STATUS seq; + + dependsArray = palloc((*nDepends) * sizeof(int64)); + + hash_seq_init(&seq, dependsList); + int i = 0; + int64 *dependsTaskId; + + while ((dependsTaskId = (int64 *) hash_seq_search(&seq)) != NULL) + { + dependsArray[i++] = *dependsTaskId; + } + } + + return dependsArray; +} + + +/* + * UpdateShardMoveDependencies function updates the dependency maps with the latest move's taskId. + */ +static void +UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId, + ShardMoveDependencies shardMoveDependencies) +{ + ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( + shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL); + shardMoveDependencyInfo->taskId = taskId; + + shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies, + &move->sourceNode->nodeId, HASH_ENTER, NULL); + + shardMoveDependencyInfo->taskId = taskId; + + shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies, + &move->targetNode->nodeId, HASH_ENTER, NULL); + + shardMoveDependencyInfo->taskId = taskId; +} + + /* * RebalanceTableShardsBackground rebalances the shards for the relations * inside the relationIdList across the different workers. It does so using our @@ -1974,18 +2118,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo StringInfoData buf = { 0 }; initStringInfo(&buf); - /* - * Currently we only have two tasks that any move can depend on: - * - replicating reference tables - * - the previous move - * - * prevJobIdx tells what slot to write the id of the task into. We only use both slots - * if we are actually replicating reference tables. - */ - int64 prevJobId[2] = { 0 }; - int prevJobIdx = 0; - List *referenceTableIdList = NIL; + int64 replicateRefTablesTaskId = 0; if (HasNodesWithMissingReferenceTables(&referenceTableIdList)) { @@ -2001,15 +2135,15 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo appendStringInfo(&buf, "SELECT pg_catalog.replicate_reference_tables(%s)", quote_literal_cstr(shardTranferModeLabel)); - BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, - prevJobIdx, prevJobId); - prevJobId[prevJobIdx] = task->taskid; - prevJobIdx++; + BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0, + NULL); + replicateRefTablesTaskId = task->taskid; } PlacementUpdateEvent *move = NULL; - bool first = true; - int prevMoveIndex = prevJobIdx; + + ShardMoveDependencies shardMoveDependencies = InitializeShardMoveDependencies(); + foreach_ptr(move, placementUpdateList) { resetStringInfo(&buf); @@ -2021,14 +2155,27 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo move->targetNode->nodeId, quote_literal_cstr(shardTranferModeLabel)); - BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, - prevJobIdx, prevJobId); - prevJobId[prevMoveIndex] = task->taskid; - if (first) + int64 colocationId = GetColocationId(move); + + int nDepends = 0; + + int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId, + shardMoveDependencies, + &nDepends); + + if (nDepends == 0 && replicateRefTablesTaskId > 0) { - first = false; - prevJobIdx++; + nDepends = 1; + dependsArray = palloc(nDepends * sizeof(int64)); + dependsArray[0] = replicateRefTablesTaskId; } + + BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, + nDepends, + dependsArray); + + UpdateShardMoveDependencies(move, colocationId, task->taskid, + shardMoveDependencies); } ereport(NOTICE, diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index d9700df8020..2de83ec576e 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -223,7 +223,7 @@ check-follower-cluster: all -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_follower_schedule $(EXTRA_TESTS) check-operations: all - $(pg_regress_multi_check) --load-extension=citus \ + $(pg_regress_multi_check) --load-extension=citus --worker-count=6 \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/operations_schedule $(EXTRA_TESTS) check-columnar: all diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 25c9d4ecd5d..d686a71c5ee 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -110,6 +110,14 @@ def extra_tests(self): "multi_mx_function_table_reference", ], ), + "background_rebalance_parallel": TestDeps( + None, + [ + "multi_test_helpers", + "multi_cluster_management", + ], + worker_count=6, + ), "multi_mx_modifying_xacts": TestDeps(None, ["multi_mx_create_table"]), "multi_mx_router_planner": TestDeps(None, ["multi_mx_create_table"]), "multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]), diff --git a/src/test/regress/expected/background_rebalance.out b/src/test/regress/expected/background_rebalance.out index c82078d6f2f..e4495ccf90d 100644 --- a/src/test/regress/expected/background_rebalance.out +++ b/src/test/regress/expected/background_rebalance.out @@ -291,6 +291,12 @@ SELECT state, details from citus_rebalance_status(); finished | {"tasks": [], "task_state_counts": {"done": 2}} (1 row) +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- Remove coordinator again to allow rerunning of this test SELECT 1 FROM citus_remove_node('localhost', :master_port); ?column? diff --git a/src/test/regress/expected/background_rebalance_parallel.out b/src/test/regress/expected/background_rebalance_parallel.out new file mode 100644 index 00000000000..862beb57e41 --- /dev/null +++ b/src/test/regress/expected/background_rebalance_parallel.out @@ -0,0 +1,364 @@ +/* + Test to check if the background tasks scheduled by the background rebalancer + has the correct dependencies. +*/ +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; +SET client_min_messages TO WARNING; +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50; +SELECT 1 FROM master_remove_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */ +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg1 (b int PRIMARY KEY); +SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */ +CREATE TABLE table1_colg2 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg2 (b int primary key); +SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */ +CREATE TABLE table1_colg3 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg3 (b int primary key); +SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +/* Add two new node so that we can rebalance */ +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport +--------------------------------------------------------------------- + table1_colg1 | 85674000 | 0 | localhost | 57637 | localhost | 57640 + table1_colg1 | 85674001 | 0 | localhost | 57638 | localhost | 57639 + table2_colg1 | 85674004 | 0 | localhost | 57637 | localhost | 57640 + table2_colg1 | 85674005 | 0 | localhost | 57638 | localhost | 57639 + table1_colg2 | 85674008 | 0 | localhost | 57637 | localhost | 57640 + table1_colg2 | 85674009 | 0 | localhost | 57638 | localhost | 57639 + table2_colg2 | 85674012 | 0 | localhost | 57637 | localhost | 57640 + table2_colg2 | 85674013 | 0 | localhost | 57638 | localhost | 57639 + table1_colg3 | 85674016 | 0 | localhost | 57637 | localhost | 57640 + table1_colg3 | 85674017 | 0 | localhost | 57638 | localhost | 57639 + table2_colg3 | 85674020 | 0 | localhost | 57637 | localhost | 57640 + table2_colg3 | 85674021 | 0 | localhost | 57638 | localhost | 57639 +(12 rows) + +SELECT * FROM citus_rebalance_start(); + citus_rebalance_start +--------------------------------------------------------------------- + 17777 +(1 row) + +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +/*Check that a move is dependent on + 1. any other move scheduled earlier in its colocation group. + 2. any other move scheduled earlier whose source node or target + node overlaps with the current moves nodes. */ +SELECT S.shardid, P.colocationid +FROM pg_dist_shard S, pg_dist_partition P +WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; + shardid | colocationid +--------------------------------------------------------------------- + 85674000 | 50050 + 85674001 | 50050 + 85674002 | 50050 + 85674003 | 50050 + 85674004 | 50050 + 85674005 | 50050 + 85674006 | 50050 + 85674007 | 50050 + 85674008 | 50051 + 85674009 | 50051 + 85674010 | 50051 + 85674011 | 50051 + 85674012 | 50051 + 85674013 | 50051 + 85674014 | 50051 + 85674015 | 50051 + 85674016 | 50052 + 85674017 | 50052 + 85674018 | 50052 + 85674019 | 50052 + 85674020 | 50052 + 85674021 | 50052 + 85674022 | 50052 + 85674023 | 50052 +(24 rows) + +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto') + 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto') + 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') + 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') + 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') + 1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') + 1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') +(7 rows) + +/* Check that if there is a reference table that needs to be synched to a node, + any move without a dependency must depend on the move task for reference table. */ +SELECT 1 FROM citus_drain_node('localhost',:worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +/* Drain worker_3 so that we can move only one colocation group to worker_3 + to create an unbalance that would cause parallel rebalancing. */ +SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); +CREATE TABLE ref_table(a int PRIMARY KEY); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +/* Move all the shards of Colocation group 3 to worker_3.*/ +SELECT +master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port +ORDER BY + shardid; + master_move_shard_placement +--------------------------------------------------------------------- + + + + +(4 rows) + +CALL citus_cleanup_orphaned_resources(); +/* Activate and new nodes so that we can rebalance. */ +SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_5_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_6_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT * FROM citus_rebalance_start(); + citus_rebalance_start +--------------------------------------------------------------------- + 17778 +(1 row) + +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT S.shardid, P.colocationid +FROM pg_dist_shard S, pg_dist_partition P +WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; + shardid | colocationid +--------------------------------------------------------------------- + 85674000 | 50050 + 85674001 | 50050 + 85674002 | 50050 + 85674003 | 50050 + 85674004 | 50050 + 85674005 | 50050 + 85674006 | 50050 + 85674007 | 50050 + 85674008 | 50051 + 85674009 | 50051 + 85674010 | 50051 + 85674011 | 50051 + 85674012 | 50051 + 85674013 | 50051 + 85674014 | 50051 + 85674015 | 50051 + 85674016 | 50052 + 85674017 | 50052 + 85674018 | 50052 + 85674019 | 50052 + 85674020 | 50052 + 85674021 | 50052 + 85674022 | 50052 + 85674023 | 50052 + 85674024 | 50053 +(25 rows) + +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto') + 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto') + 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') + 1010 | SELECT pg_catalog.citus_move_shard_placement(85674017,52,53,'auto') | 1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto') + 1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') + 1012 | SELECT pg_catalog.citus_move_shard_placement(85674001,50,55,'auto') | 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') +(6 rows) + +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_3_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_4_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_5_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_6_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +-- keep the rest of the tests inact that depends node/group ids +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 15afd9e1847..f5e77c835bf 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -5,6 +5,7 @@ test: shard_rebalancer_unit test: shard_rebalancer test: background_rebalance test: worker_copy_table_to_node +test: background_rebalance_parallel test: foreign_key_to_reference_shard_rebalance test: multi_move_mx test: shard_move_deferred_delete diff --git a/src/test/regress/sql/background_rebalance.sql b/src/test/regress/sql/background_rebalance.sql index 4d105655b8d..59b296576ed 100644 --- a/src/test/regress/sql/background_rebalance.sql +++ b/src/test/regress/sql/background_rebalance.sql @@ -104,6 +104,8 @@ SELECT 1 FROM citus_rebalance_start(shard_transfer_mode := 'force_logical'); SELECT citus_rebalance_wait(); SELECT state, details from citus_rebalance_status(); +SELECT public.wait_for_resource_cleanup(); + -- Remove coordinator again to allow rerunning of this test SELECT 1 FROM citus_remove_node('localhost', :master_port); SELECT public.wait_until_metadata_sync(30000); diff --git a/src/test/regress/sql/background_rebalance_parallel.sql b/src/test/regress/sql/background_rebalance_parallel.sql new file mode 100644 index 00000000000..8c5fb5bb122 --- /dev/null +++ b/src/test/regress/sql/background_rebalance_parallel.sql @@ -0,0 +1,141 @@ +/* + Test to check if the background tasks scheduled by the background rebalancer + has the correct dependencies. +*/ +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; +SET client_min_messages TO WARNING; + +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; + +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50; + +SELECT 1 FROM master_remove_node('localhost', :worker_1_port); +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + +SELECT 1 FROM master_add_node('localhost', :worker_1_port); +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + +/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */ +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none'); + +CREATE TABLE table2_colg1 (b int PRIMARY KEY); + +SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1'); + +/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */ +CREATE TABLE table1_colg2 (a int PRIMARY KEY); + +SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg2 (b int primary key); + +SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2'); + +/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */ +CREATE TABLE table1_colg3 (a int PRIMARY KEY); + +SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg3 (b int primary key); + +SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3'); + + +/* Add two new node so that we can rebalance */ +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); +SELECT 1 FROM citus_add_node('localhost', :worker_4_port); + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + +SELECT * FROM citus_rebalance_start(); + +SELECT citus_rebalance_wait(); + +/*Check that a move is dependent on + 1. any other move scheduled earlier in its colocation group. + 2. any other move scheduled earlier whose source node or target + node overlaps with the current moves nodes. */ +SELECT S.shardid, P.colocationid +FROM pg_dist_shard S, pg_dist_partition P +WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; + +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC; + + +/* Check that if there is a reference table that needs to be synched to a node, + any move without a dependency must depend on the move task for reference table. */ +SELECT 1 FROM citus_drain_node('localhost',:worker_4_port); +SELECT public.wait_for_resource_cleanup(); +SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true); + +/* Drain worker_3 so that we can move only one colocation group to worker_3 + to create an unbalance that would cause parallel rebalancing. */ +SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); +SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); + +CALL citus_cleanup_orphaned_resources(); + +CREATE TABLE ref_table(a int PRIMARY KEY); + +SELECT create_reference_table('ref_table'); + +/* Move all the shards of Colocation group 3 to worker_3.*/ +SELECT +master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port +ORDER BY + shardid; + +CALL citus_cleanup_orphaned_resources(); + +/* Activate and new nodes so that we can rebalance. */ +SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); +SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); + +SELECT 1 FROM citus_add_node('localhost', :worker_5_port); +SELECT 1 FROM citus_add_node('localhost', :worker_6_port); + +SELECT * FROM citus_rebalance_start(); + +SELECT citus_rebalance_wait(); + +SELECT S.shardid, P.colocationid +FROM pg_dist_shard S, pg_dist_partition P +WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; + +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC; + +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +SELECT public.wait_for_resource_cleanup(); +select citus_remove_node('localhost', :worker_3_port); +select citus_remove_node('localhost', :worker_4_port); +select citus_remove_node('localhost', :worker_5_port); +select citus_remove_node('localhost', :worker_6_port); +-- keep the rest of the tests inact that depends node/group ids +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; +