From d3fb9288ab290fa3ed1a1a9417e8d91fab9724f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emel=20=C5=9Eim=C5=9Fek?= Date: Wed, 29 Mar 2023 22:03:37 +0300 Subject: [PATCH] Schedule parallel shard moves in background rebalancer by removing task dependencies between shard moves across colocation groups. (#6756) DESCRIPTION: This PR removes the task dependencies between shard moves for which the shards belong to different colocation groups. This change results in scheduling multiple tasks in the RUNNABLE state. Therefore it is possible that the background task monitor can run them concurrently. Previously, all the shard moves planned in a rebalance operation took dependency on each other sequentially. For instance, given the following table and shards colocation group 1 colocation group 2 table1 table2 table3 table4 table 5 shard11 shard21 shard31 shard41 shard51 shard12 shard22 shard32 shard42 shard52 if the rebalancer planner returned the below set of moves ` {move(shard11), move(shard12), move(shard41), move(shard42)}` background rebalancer scheduled them such that they depend on each other sequentially. ``` {move(reftables) if there is any, none} | move( shard11) | move(shard12) | {move(shard41)<--- move(shard12)} This is an artificial dependency move(shard41) | move(shard42) ``` This results in artificial dependencies between otherwise independent moves. Considering that the shards in different colocation groups can be moved concurrently, this PR changes the dependency relationship between the moves as follows: ``` {move(reftables) if there is any, none} {move(reftables) if there is any, none} | | move(shard11) move(shard41) | | move(shard12) move(shard42) ``` --------- Co-authored-by: Jelte Fennema --- .../distributed/operations/shard_rebalancer.c | 193 ++++++++-- src/test/regress/Makefile | 2 +- src/test/regress/citus_tests/run_test.py | 8 + .../regress/expected/background_rebalance.out | 6 + .../background_rebalance_parallel.out | 364 ++++++++++++++++++ src/test/regress/operations_schedule | 1 + src/test/regress/sql/background_rebalance.sql | 2 + .../sql/background_rebalance_parallel.sql | 141 +++++++ 8 files changed, 693 insertions(+), 24 deletions(-) create mode 100644 src/test/regress/expected/background_rebalance_parallel.out create mode 100644 src/test/regress/sql/background_rebalance_parallel.sql diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index c5282202eac..5d30ff8be6c 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -190,6 +190,19 @@ typedef struct WorkerShardStatistics HTAB *statistics; } WorkerShardStatistics; +/* ShardMoveDependencyHashEntry contains the taskId which any new shard move task within the corresponding colocation group must take a dependency on */ +typedef struct ShardMoveDependencyInfo +{ + int64 key; + int64 taskId; +} ShardMoveDependencyInfo; + +typedef struct ShardMoveDependencies +{ + HTAB *colocationDependencies; + HTAB *nodeDependencies; +} ShardMoveDependencies; + char *VariablesToBePassedToNewConnections = NULL; /* static declarations for main logic */ @@ -1898,6 +1911,137 @@ ErrorOnConcurrentRebalance(RebalanceOptions *options) } +/* + * GetColocationId function returns the colocationId of the shard in a PlacementUpdateEvent. + */ +static int64 +GetColocationId(PlacementUpdateEvent *move) +{ + ShardInterval *shardInterval = LoadShardInterval(move->shardId); + + CitusTableCacheEntry *citusTableCacheEntry = GetCitusTableCacheEntry( + shardInterval->relationId); + + return citusTableCacheEntry->colocationId; +} + + +/* + * InitializeShardMoveDependencies function creates the hash maps that we use to track + * the latest moves so that subsequent moves with the same properties must take a dependency + * on them. There are two hash maps. One is for tracking the latest move scheduled in a + * given colocation group and the other one is for tracking the latest move which involves + * a given node either as its source node or its target node. + */ +static ShardMoveDependencies +InitializeShardMoveDependencies() +{ + ShardMoveDependencies shardMoveDependencies; + shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64, + ShardMoveDependencyInfo, + "colocationDependencyHashMap", + 6); + shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int64, + ShardMoveDependencyInfo, + "nodeDependencyHashMap", + 6); + + return shardMoveDependencies; +} + + +/* + * GenerateTaskMoveDependencyList creates and returns a List of taskIds that + * the move must take a dependency on. + */ +static int64 * +GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, + ShardMoveDependencies shardMoveDependencies, int *nDepends) +{ + HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64, + "shardMoveDependencyList", 0); + + bool found; + + /* Check if there exists a move in the same colocation group scheduled earlier. */ + ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( + shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found); + + if (found) + { + hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + } + + /* Check if there exists a move scheduled earlier whose source or target node + * overlaps with the current move's source node. */ + shardMoveDependencyInfo = hash_search( + shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER, + &found); + + if (found) + { + hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + } + + /* Check if there exists a move scheduled earlier whose source or target node + * overlaps with the current move's target node. */ + shardMoveDependencyInfo = hash_search( + shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_ENTER, + &found); + + + if (found) + { + hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + } + + *nDepends = hash_get_num_entries(dependsList); + + int64 *dependsArray = NULL; + + if (*nDepends > 0) + { + HASH_SEQ_STATUS seq; + + dependsArray = palloc((*nDepends) * sizeof(int64)); + + hash_seq_init(&seq, dependsList); + int i = 0; + int64 *dependsTaskId; + + while ((dependsTaskId = (int64 *) hash_seq_search(&seq)) != NULL) + { + dependsArray[i++] = *dependsTaskId; + } + } + + return dependsArray; +} + + +/* + * UpdateShardMoveDependencies function updates the dependency maps with the latest move's taskId. + */ +static void +UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId, + ShardMoveDependencies shardMoveDependencies) +{ + ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( + shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL); + shardMoveDependencyInfo->taskId = taskId; + + shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies, + &move->sourceNode->nodeId, HASH_ENTER, NULL); + + shardMoveDependencyInfo->taskId = taskId; + + shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies, + &move->targetNode->nodeId, HASH_ENTER, NULL); + + shardMoveDependencyInfo->taskId = taskId; +} + + /* * RebalanceTableShardsBackground rebalances the shards for the relations * inside the relationIdList across the different workers. It does so using our @@ -1974,18 +2118,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo StringInfoData buf = { 0 }; initStringInfo(&buf); - /* - * Currently we only have two tasks that any move can depend on: - * - replicating reference tables - * - the previous move - * - * prevJobIdx tells what slot to write the id of the task into. We only use both slots - * if we are actually replicating reference tables. - */ - int64 prevJobId[2] = { 0 }; - int prevJobIdx = 0; - List *referenceTableIdList = NIL; + int64 replicateRefTablesTaskId = 0; if (HasNodesWithMissingReferenceTables(&referenceTableIdList)) { @@ -2001,15 +2135,15 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo appendStringInfo(&buf, "SELECT pg_catalog.replicate_reference_tables(%s)", quote_literal_cstr(shardTranferModeLabel)); - BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, - prevJobIdx, prevJobId); - prevJobId[prevJobIdx] = task->taskid; - prevJobIdx++; + BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0, + NULL); + replicateRefTablesTaskId = task->taskid; } PlacementUpdateEvent *move = NULL; - bool first = true; - int prevMoveIndex = prevJobIdx; + + ShardMoveDependencies shardMoveDependencies = InitializeShardMoveDependencies(); + foreach_ptr(move, placementUpdateList) { resetStringInfo(&buf); @@ -2021,14 +2155,27 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo move->targetNode->nodeId, quote_literal_cstr(shardTranferModeLabel)); - BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, - prevJobIdx, prevJobId); - prevJobId[prevMoveIndex] = task->taskid; - if (first) + int64 colocationId = GetColocationId(move); + + int nDepends = 0; + + int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId, + shardMoveDependencies, + &nDepends); + + if (nDepends == 0 && replicateRefTablesTaskId > 0) { - first = false; - prevJobIdx++; + nDepends = 1; + dependsArray = palloc(nDepends * sizeof(int64)); + dependsArray[0] = replicateRefTablesTaskId; } + + BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, + nDepends, + dependsArray); + + UpdateShardMoveDependencies(move, colocationId, task->taskid, + shardMoveDependencies); } ereport(NOTICE, diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index d9700df8020..2de83ec576e 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -223,7 +223,7 @@ check-follower-cluster: all -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_follower_schedule $(EXTRA_TESTS) check-operations: all - $(pg_regress_multi_check) --load-extension=citus \ + $(pg_regress_multi_check) --load-extension=citus --worker-count=6 \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/operations_schedule $(EXTRA_TESTS) check-columnar: all diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 25c9d4ecd5d..d686a71c5ee 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -110,6 +110,14 @@ def extra_tests(self): "multi_mx_function_table_reference", ], ), + "background_rebalance_parallel": TestDeps( + None, + [ + "multi_test_helpers", + "multi_cluster_management", + ], + worker_count=6, + ), "multi_mx_modifying_xacts": TestDeps(None, ["multi_mx_create_table"]), "multi_mx_router_planner": TestDeps(None, ["multi_mx_create_table"]), "multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]), diff --git a/src/test/regress/expected/background_rebalance.out b/src/test/regress/expected/background_rebalance.out index c82078d6f2f..e4495ccf90d 100644 --- a/src/test/regress/expected/background_rebalance.out +++ b/src/test/regress/expected/background_rebalance.out @@ -291,6 +291,12 @@ SELECT state, details from citus_rebalance_status(); finished | {"tasks": [], "task_state_counts": {"done": 2}} (1 row) +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- Remove coordinator again to allow rerunning of this test SELECT 1 FROM citus_remove_node('localhost', :master_port); ?column? diff --git a/src/test/regress/expected/background_rebalance_parallel.out b/src/test/regress/expected/background_rebalance_parallel.out new file mode 100644 index 00000000000..862beb57e41 --- /dev/null +++ b/src/test/regress/expected/background_rebalance_parallel.out @@ -0,0 +1,364 @@ +/* + Test to check if the background tasks scheduled by the background rebalancer + has the correct dependencies. +*/ +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; +SET client_min_messages TO WARNING; +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50; +SELECT 1 FROM master_remove_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */ +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg1 (b int PRIMARY KEY); +SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */ +CREATE TABLE table1_colg2 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg2 (b int primary key); +SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */ +CREATE TABLE table1_colg3 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg3 (b int primary key); +SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +/* Add two new node so that we can rebalance */ +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport +--------------------------------------------------------------------- + table1_colg1 | 85674000 | 0 | localhost | 57637 | localhost | 57640 + table1_colg1 | 85674001 | 0 | localhost | 57638 | localhost | 57639 + table2_colg1 | 85674004 | 0 | localhost | 57637 | localhost | 57640 + table2_colg1 | 85674005 | 0 | localhost | 57638 | localhost | 57639 + table1_colg2 | 85674008 | 0 | localhost | 57637 | localhost | 57640 + table1_colg2 | 85674009 | 0 | localhost | 57638 | localhost | 57639 + table2_colg2 | 85674012 | 0 | localhost | 57637 | localhost | 57640 + table2_colg2 | 85674013 | 0 | localhost | 57638 | localhost | 57639 + table1_colg3 | 85674016 | 0 | localhost | 57637 | localhost | 57640 + table1_colg3 | 85674017 | 0 | localhost | 57638 | localhost | 57639 + table2_colg3 | 85674020 | 0 | localhost | 57637 | localhost | 57640 + table2_colg3 | 85674021 | 0 | localhost | 57638 | localhost | 57639 +(12 rows) + +SELECT * FROM citus_rebalance_start(); + citus_rebalance_start +--------------------------------------------------------------------- + 17777 +(1 row) + +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +/*Check that a move is dependent on + 1. any other move scheduled earlier in its colocation group. + 2. any other move scheduled earlier whose source node or target + node overlaps with the current moves nodes. */ +SELECT S.shardid, P.colocationid +FROM pg_dist_shard S, pg_dist_partition P +WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; + shardid | colocationid +--------------------------------------------------------------------- + 85674000 | 50050 + 85674001 | 50050 + 85674002 | 50050 + 85674003 | 50050 + 85674004 | 50050 + 85674005 | 50050 + 85674006 | 50050 + 85674007 | 50050 + 85674008 | 50051 + 85674009 | 50051 + 85674010 | 50051 + 85674011 | 50051 + 85674012 | 50051 + 85674013 | 50051 + 85674014 | 50051 + 85674015 | 50051 + 85674016 | 50052 + 85674017 | 50052 + 85674018 | 50052 + 85674019 | 50052 + 85674020 | 50052 + 85674021 | 50052 + 85674022 | 50052 + 85674023 | 50052 +(24 rows) + +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto') + 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto') + 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') + 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') + 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') + 1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') + 1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') +(7 rows) + +/* Check that if there is a reference table that needs to be synched to a node, + any move without a dependency must depend on the move task for reference table. */ +SELECT 1 FROM citus_drain_node('localhost',:worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +/* Drain worker_3 so that we can move only one colocation group to worker_3 + to create an unbalance that would cause parallel rebalancing. */ +SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); +CREATE TABLE ref_table(a int PRIMARY KEY); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +/* Move all the shards of Colocation group 3 to worker_3.*/ +SELECT +master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port +ORDER BY + shardid; + master_move_shard_placement +--------------------------------------------------------------------- + + + + +(4 rows) + +CALL citus_cleanup_orphaned_resources(); +/* Activate and new nodes so that we can rebalance. */ +SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_5_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_6_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT * FROM citus_rebalance_start(); + citus_rebalance_start +--------------------------------------------------------------------- + 17778 +(1 row) + +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT S.shardid, P.colocationid +FROM pg_dist_shard S, pg_dist_partition P +WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; + shardid | colocationid +--------------------------------------------------------------------- + 85674000 | 50050 + 85674001 | 50050 + 85674002 | 50050 + 85674003 | 50050 + 85674004 | 50050 + 85674005 | 50050 + 85674006 | 50050 + 85674007 | 50050 + 85674008 | 50051 + 85674009 | 50051 + 85674010 | 50051 + 85674011 | 50051 + 85674012 | 50051 + 85674013 | 50051 + 85674014 | 50051 + 85674015 | 50051 + 85674016 | 50052 + 85674017 | 50052 + 85674018 | 50052 + 85674019 | 50052 + 85674020 | 50052 + 85674021 | 50052 + 85674022 | 50052 + 85674023 | 50052 + 85674024 | 50053 +(25 rows) + +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto') + 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto') + 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') + 1010 | SELECT pg_catalog.citus_move_shard_placement(85674017,52,53,'auto') | 1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto') + 1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') + 1012 | SELECT pg_catalog.citus_move_shard_placement(85674001,50,55,'auto') | 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') +(6 rows) + +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_3_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_4_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_5_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_6_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +-- keep the rest of the tests inact that depends node/group ids +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 15afd9e1847..f5e77c835bf 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -5,6 +5,7 @@ test: shard_rebalancer_unit test: shard_rebalancer test: background_rebalance test: worker_copy_table_to_node +test: background_rebalance_parallel test: foreign_key_to_reference_shard_rebalance test: multi_move_mx test: shard_move_deferred_delete diff --git a/src/test/regress/sql/background_rebalance.sql b/src/test/regress/sql/background_rebalance.sql index 4d105655b8d..59b296576ed 100644 --- a/src/test/regress/sql/background_rebalance.sql +++ b/src/test/regress/sql/background_rebalance.sql @@ -104,6 +104,8 @@ SELECT 1 FROM citus_rebalance_start(shard_transfer_mode := 'force_logical'); SELECT citus_rebalance_wait(); SELECT state, details from citus_rebalance_status(); +SELECT public.wait_for_resource_cleanup(); + -- Remove coordinator again to allow rerunning of this test SELECT 1 FROM citus_remove_node('localhost', :master_port); SELECT public.wait_until_metadata_sync(30000); diff --git a/src/test/regress/sql/background_rebalance_parallel.sql b/src/test/regress/sql/background_rebalance_parallel.sql new file mode 100644 index 00000000000..8c5fb5bb122 --- /dev/null +++ b/src/test/regress/sql/background_rebalance_parallel.sql @@ -0,0 +1,141 @@ +/* + Test to check if the background tasks scheduled by the background rebalancer + has the correct dependencies. +*/ +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; +SET client_min_messages TO WARNING; + +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; + +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50; + +SELECT 1 FROM master_remove_node('localhost', :worker_1_port); +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + +SELECT 1 FROM master_add_node('localhost', :worker_1_port); +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + +/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */ +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none'); + +CREATE TABLE table2_colg1 (b int PRIMARY KEY); + +SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1'); + +/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */ +CREATE TABLE table1_colg2 (a int PRIMARY KEY); + +SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg2 (b int primary key); + +SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2'); + +/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */ +CREATE TABLE table1_colg3 (a int PRIMARY KEY); + +SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg3 (b int primary key); + +SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3'); + + +/* Add two new node so that we can rebalance */ +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); +SELECT 1 FROM citus_add_node('localhost', :worker_4_port); + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + +SELECT * FROM citus_rebalance_start(); + +SELECT citus_rebalance_wait(); + +/*Check that a move is dependent on + 1. any other move scheduled earlier in its colocation group. + 2. any other move scheduled earlier whose source node or target + node overlaps with the current moves nodes. */ +SELECT S.shardid, P.colocationid +FROM pg_dist_shard S, pg_dist_partition P +WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; + +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC; + + +/* Check that if there is a reference table that needs to be synched to a node, + any move without a dependency must depend on the move task for reference table. */ +SELECT 1 FROM citus_drain_node('localhost',:worker_4_port); +SELECT public.wait_for_resource_cleanup(); +SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true); + +/* Drain worker_3 so that we can move only one colocation group to worker_3 + to create an unbalance that would cause parallel rebalancing. */ +SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); +SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); + +CALL citus_cleanup_orphaned_resources(); + +CREATE TABLE ref_table(a int PRIMARY KEY); + +SELECT create_reference_table('ref_table'); + +/* Move all the shards of Colocation group 3 to worker_3.*/ +SELECT +master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port +ORDER BY + shardid; + +CALL citus_cleanup_orphaned_resources(); + +/* Activate and new nodes so that we can rebalance. */ +SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); +SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); + +SELECT 1 FROM citus_add_node('localhost', :worker_5_port); +SELECT 1 FROM citus_add_node('localhost', :worker_6_port); + +SELECT * FROM citus_rebalance_start(); + +SELECT citus_rebalance_wait(); + +SELECT S.shardid, P.colocationid +FROM pg_dist_shard S, pg_dist_partition P +WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC; + +SELECT D.task_id, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC; + +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +SELECT public.wait_for_resource_cleanup(); +select citus_remove_node('localhost', :worker_3_port); +select citus_remove_node('localhost', :worker_4_port); +select citus_remove_node('localhost', :worker_5_port); +select citus_remove_node('localhost', :worker_6_port); +-- keep the rest of the tests inact that depends node/group ids +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; +