Skip to content

Commit

Permalink
Merge branch 'main' into arbitrary-conf-router-test
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir authored Mar 22, 2023
2 parents 3067a76 + 2713e01 commit 5a2a387
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 8 deletions.
44 changes: 37 additions & 7 deletions src/backend/distributed/operations/shard_rebalancer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1818,10 +1818,10 @@ static void
RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
{
char transferMode = LookupShardTransferMode(shardReplicationModeOid);
EnsureReferenceTablesExistOnAllNodesExtended(transferMode);

if (list_length(options->relationIdList) == 0)
{
EnsureReferenceTablesExistOnAllNodesExtended(transferMode);
return;
}

Expand All @@ -1836,6 +1836,25 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)

List *placementUpdateList = GetRebalanceSteps(options);

if (transferMode == TRANSFER_MODE_AUTOMATIC)
{
/*
* If the shard transfer mode is set to auto, we should check beforehand
* if we are able to use logical replication to transfer shards or not.
* We throw an error if any of the tables do not have a replica identity, which
* is required for logical replication to replicate UPDATE and DELETE commands.
*/
PlacementUpdateEvent *placementUpdate = NULL;
foreach_ptr(placementUpdate, placementUpdateList)
{
Oid relationId = RelationIdForShard(placementUpdate->shardId);
List *colocatedTableList = ColocatedTableList(relationId);
VerifyTablesHaveReplicaIdentity(colocatedTableList);
}
}

EnsureReferenceTablesExistOnAllNodesExtended(transferMode);

if (list_length(placementUpdateList) == 0)
{
return;
Expand Down Expand Up @@ -1916,12 +1935,6 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
EnsureTableOwner(colocatedTableId);
}

if (shardTransferMode == TRANSFER_MODE_AUTOMATIC)
{
/* make sure that all tables included in the rebalance have a replica identity*/
VerifyTablesHaveReplicaIdentity(colocatedTableList);
}

List *placementUpdateList = GetRebalanceSteps(options);

if (list_length(placementUpdateList) == 0)
Expand All @@ -1930,6 +1943,23 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
return 0;
}

if (shardTransferMode == TRANSFER_MODE_AUTOMATIC)
{
/*
* If the shard transfer mode is set to auto, we should check beforehand
* if we are able to use logical replication to transfer shards or not.
* We throw an error if any of the tables do not have a replica identity, which
* is required for logical replication to replicate UPDATE and DELETE commands.
*/
PlacementUpdateEvent *placementUpdate = NULL;
foreach_ptr(placementUpdate, placementUpdateList)
{
relationId = RelationIdForShard(placementUpdate->shardId);
List *colocatedTables = ColocatedTableList(relationId);
VerifyTablesHaveReplicaIdentity(colocatedTables);
}
}

DropOrphanedResourcesInSeparateTransaction();

/* find the name of the shard transfer mode to interpolate in the scheduled command */
Expand Down
34 changes: 33 additions & 1 deletion src/test/regress/expected/shard_rebalancer.out
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,6 @@ SELECT * from master_drain_node('localhost', :worker_2_port);
ERROR: cannot use logical replication to transfer shards of the relation colocated_rebalance_test since it doesn't have a REPLICA IDENTITY or PRIMARY KEY
DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY.
HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'.
CONTEXT: while executing command on localhost:xxxxx
-- Make sure shouldhaveshards is false
select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port;
shouldhaveshards
Expand Down Expand Up @@ -2714,6 +2713,39 @@ SELECT sh.logicalrelid, pl.nodeport
(5 rows)

DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
-- verify we detect if one of the tables do not have a replica identity or primary key
-- and error out in case of shard transfer mode = auto
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)

create table table_with_primary_key (a int primary key);
select create_distributed_table('table_with_primary_key','a');
create_distributed_table
---------------------------------------------------------------------

(1 row)

create table table_without_primary_key (a bigint);
select create_distributed_table('table_without_primary_key','a');
create_distributed_table
---------------------------------------------------------------------

(1 row)

-- add the second node back, then rebalance
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
select 1 from citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)

select rebalance_table_shards();
ERROR: cannot use logical replication to transfer shards of the relation table_without_primary_key since it doesn't have a REPLICA IDENTITY or PRIMARY KEY
DROP TABLE table_with_primary_key, table_without_primary_key;
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO OFF;
REVOKE ALL ON SCHEMA public FROM testrole;
Expand Down
15 changes: 15 additions & 0 deletions src/test/regress/sql/shard_rebalancer.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,21 @@ SELECT sh.logicalrelid, pl.nodeport

DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;

-- verify we detect if one of the tables do not have a replica identity or primary key
-- and error out in case of shard transfer mode = auto
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);

create table table_with_primary_key (a int primary key);
select create_distributed_table('table_with_primary_key','a');
create table table_without_primary_key (a bigint);
select create_distributed_table('table_without_primary_key','a');

-- add the second node back, then rebalance
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
select 1 from citus_add_node('localhost', :worker_2_port);
select rebalance_table_shards();

DROP TABLE table_with_primary_key, table_without_primary_key;
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO OFF;
REVOKE ALL ON SCHEMA public FROM testrole;
Expand Down

0 comments on commit 5a2a387

Please sign in to comment.