Skip to content

Commit

Permalink
Schedule parallel shard moves in background rebalancer by removing ta…
Browse files Browse the repository at this point in the history
…sk dependencies between shard moves across colocation groups. (#6756)

DESCRIPTION: This PR removes the task dependencies between shard moves
for which the shards belong to different colocation groups. This change
results in scheduling multiple tasks in the RUNNABLE state. Therefore it
is possible that the background task monitor can run them concurrently.

Previously, all the shard moves planned in a rebalance operation took
dependency on each other sequentially.
For instance, given the following table and shards 

colocation group 1 colocation group 2
table1 table2 table3 table4 table 5
shard11 shard21 shard31 shard41 shard51
shard12 shard22 shard32 shard42 shard52
  
 if the rebalancer planner returned the below set of moves 
` {move(shard11), move(shard12), move(shard41), move(shard42)}`

background rebalancer scheduled them such that they depend on each other
sequentially.
```
      {move(reftables) if there is any, none}
               |
      move( shard11)
               |
      move(shard12)
               |                {move(shard41)<--- move(shard12)} This is an artificial dependency  
      move(shard41)
               |
      move(shard42) 

```
This results in artificial dependencies between otherwise independent
moves.

Considering that the shards in different colocation groups can be moved
concurrently, this PR changes the dependency relationship between the
moves as follows:

```
      {move(reftables) if there is any, none}          {move(reftables) if there is any, none}     
               |                                                            |
      move(shard11)                                                  move(shard41)
               |                                                            |
      move(shard12)                                                   move(shard42) 
   
```

---------

Co-authored-by: Jelte Fennema <jelte.fennema@microsoft.com>
  • Loading branch information
emelsimsek and JelteF authored Mar 29, 2023
1 parent ce4bcf6 commit d3fb928
Show file tree
Hide file tree
Showing 8 changed files with 693 additions and 24 deletions.
193 changes: 170 additions & 23 deletions src/backend/distributed/operations/shard_rebalancer.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,19 @@ typedef struct WorkerShardStatistics
HTAB *statistics;
} WorkerShardStatistics;

/* ShardMoveDependencyHashEntry contains the taskId which any new shard move task within the corresponding colocation group must take a dependency on */
typedef struct ShardMoveDependencyInfo
{
int64 key;
int64 taskId;
} ShardMoveDependencyInfo;

typedef struct ShardMoveDependencies
{
HTAB *colocationDependencies;
HTAB *nodeDependencies;
} ShardMoveDependencies;

char *VariablesToBePassedToNewConnections = NULL;

/* static declarations for main logic */
Expand Down Expand Up @@ -1898,6 +1911,137 @@ ErrorOnConcurrentRebalance(RebalanceOptions *options)
}


/*
* GetColocationId function returns the colocationId of the shard in a PlacementUpdateEvent.
*/
static int64
GetColocationId(PlacementUpdateEvent *move)
{
ShardInterval *shardInterval = LoadShardInterval(move->shardId);

CitusTableCacheEntry *citusTableCacheEntry = GetCitusTableCacheEntry(
shardInterval->relationId);

return citusTableCacheEntry->colocationId;
}


/*
* InitializeShardMoveDependencies function creates the hash maps that we use to track
* the latest moves so that subsequent moves with the same properties must take a dependency
* on them. There are two hash maps. One is for tracking the latest move scheduled in a
* given colocation group and the other one is for tracking the latest move which involves
* a given node either as its source node or its target node.
*/
static ShardMoveDependencies
InitializeShardMoveDependencies()
{
ShardMoveDependencies shardMoveDependencies;
shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64,
ShardMoveDependencyInfo,
"colocationDependencyHashMap",
6);
shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int64,
ShardMoveDependencyInfo,
"nodeDependencyHashMap",
6);

return shardMoveDependencies;
}


/*
* GenerateTaskMoveDependencyList creates and returns a List of taskIds that
* the move must take a dependency on.
*/
static int64 *
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
ShardMoveDependencies shardMoveDependencies, int *nDepends)
{
HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64,
"shardMoveDependencyList", 0);

bool found;

/* Check if there exists a move in the same colocation group scheduled earlier. */
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found);

if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}

/* Check if there exists a move scheduled earlier whose source or target node
* overlaps with the current move's source node. */
shardMoveDependencyInfo = hash_search(
shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER,
&found);

if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}

/* Check if there exists a move scheduled earlier whose source or target node
* overlaps with the current move's target node. */
shardMoveDependencyInfo = hash_search(
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_ENTER,
&found);


if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}

*nDepends = hash_get_num_entries(dependsList);

int64 *dependsArray = NULL;

if (*nDepends > 0)
{
HASH_SEQ_STATUS seq;

dependsArray = palloc((*nDepends) * sizeof(int64));

hash_seq_init(&seq, dependsList);
int i = 0;
int64 *dependsTaskId;

while ((dependsTaskId = (int64 *) hash_seq_search(&seq)) != NULL)
{
dependsArray[i++] = *dependsTaskId;
}
}

return dependsArray;
}


/*
* UpdateShardMoveDependencies function updates the dependency maps with the latest move's taskId.
*/
static void
UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId,
ShardMoveDependencies shardMoveDependencies)
{
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;

shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
&move->sourceNode->nodeId, HASH_ENTER, NULL);

shardMoveDependencyInfo->taskId = taskId;

shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
&move->targetNode->nodeId, HASH_ENTER, NULL);

shardMoveDependencyInfo->taskId = taskId;
}


/*
* RebalanceTableShardsBackground rebalances the shards for the relations
* inside the relationIdList across the different workers. It does so using our
Expand Down Expand Up @@ -1974,18 +2118,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
StringInfoData buf = { 0 };
initStringInfo(&buf);

/*
* Currently we only have two tasks that any move can depend on:
* - replicating reference tables
* - the previous move
*
* prevJobIdx tells what slot to write the id of the task into. We only use both slots
* if we are actually replicating reference tables.
*/
int64 prevJobId[2] = { 0 };
int prevJobIdx = 0;

List *referenceTableIdList = NIL;
int64 replicateRefTablesTaskId = 0;

if (HasNodesWithMissingReferenceTables(&referenceTableIdList))
{
Expand All @@ -2001,15 +2135,15 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
appendStringInfo(&buf,
"SELECT pg_catalog.replicate_reference_tables(%s)",
quote_literal_cstr(shardTranferModeLabel));
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
prevJobIdx, prevJobId);
prevJobId[prevJobIdx] = task->taskid;
prevJobIdx++;
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0,
NULL);
replicateRefTablesTaskId = task->taskid;
}

PlacementUpdateEvent *move = NULL;
bool first = true;
int prevMoveIndex = prevJobIdx;

ShardMoveDependencies shardMoveDependencies = InitializeShardMoveDependencies();

foreach_ptr(move, placementUpdateList)
{
resetStringInfo(&buf);
Expand All @@ -2021,14 +2155,27 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
move->targetNode->nodeId,
quote_literal_cstr(shardTranferModeLabel));

BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
prevJobIdx, prevJobId);
prevJobId[prevMoveIndex] = task->taskid;
if (first)
int64 colocationId = GetColocationId(move);

int nDepends = 0;

int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId,
shardMoveDependencies,
&nDepends);

if (nDepends == 0 && replicateRefTablesTaskId > 0)
{
first = false;
prevJobIdx++;
nDepends = 1;
dependsArray = palloc(nDepends * sizeof(int64));
dependsArray[0] = replicateRefTablesTaskId;
}

BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
nDepends,
dependsArray);

UpdateShardMoveDependencies(move, colocationId, task->taskid,
shardMoveDependencies);
}

ereport(NOTICE,
Expand Down
2 changes: 1 addition & 1 deletion src/test/regress/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ check-follower-cluster: all
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_follower_schedule $(EXTRA_TESTS)

check-operations: all
$(pg_regress_multi_check) --load-extension=citus \
$(pg_regress_multi_check) --load-extension=citus --worker-count=6 \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/operations_schedule $(EXTRA_TESTS)

check-columnar: all
Expand Down
8 changes: 8 additions & 0 deletions src/test/regress/citus_tests/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ def extra_tests(self):
"multi_mx_function_table_reference",
],
),
"background_rebalance_parallel": TestDeps(
None,
[
"multi_test_helpers",
"multi_cluster_management",
],
worker_count=6,
),
"multi_mx_modifying_xacts": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_router_planner": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]),
Expand Down
6 changes: 6 additions & 0 deletions src/test/regress/expected/background_rebalance.out
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ SELECT state, details from citus_rebalance_status();
finished | {"tasks": [], "task_state_counts": {"done": 2}}
(1 row)

SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------

(1 row)

-- Remove coordinator again to allow rerunning of this test
SELECT 1 FROM citus_remove_node('localhost', :master_port);
?column?
Expand Down
Loading

0 comments on commit d3fb928

Please sign in to comment.