Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check before logicalrep for rebalancer, error if needed #6754

Merged
merged 5 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions src/backend/distributed/operations/shard_rebalancer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1818,10 +1818,10 @@ static void
RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
{
char transferMode = LookupShardTransferMode(shardReplicationModeOid);
EnsureReferenceTablesExistOnAllNodesExtended(transferMode);

if (list_length(options->relationIdList) == 0)
{
EnsureReferenceTablesExistOnAllNodesExtended(transferMode);
return;
}

Expand All @@ -1836,6 +1836,25 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)

List *placementUpdateList = GetRebalanceSteps(options);

if (transferMode == TRANSFER_MODE_AUTOMATIC)
{
/*
* If the shard transfer mode is set to auto, we should check beforehand
* if we are able to use logical replication to transfer shards or not.
* We throw an error if any of the tables do not have a replica identity, which
* is required for logical replication to replicate UPDATE and DELETE commands.
*/
PlacementUpdateEvent *placementUpdate = NULL;
foreach_ptr(placementUpdate, placementUpdateList)
{
Oid relationId = RelationIdForShard(placementUpdate->shardId);
List *colocatedTableList = ColocatedTableList(relationId);
VerifyTablesHaveReplicaIdentity(colocatedTableList);
}
}
Comment on lines +1839 to +1854
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should be done even before planning the moves and before calling EnsureReferenceTablesExistOnAllNodesExtended. We do this already for the background rebalancer:

const char shardTransferMode = LookupShardTransferMode(shardReplicationModeOid);
List *colocatedTableList = NIL;
Oid relationId = InvalidOid;
foreach_oid(relationId, options->relationIdList)
{
colocatedTableList = list_concat(colocatedTableList,
ColocatedTableList(relationId));
}
Oid colocatedTableId = InvalidOid;
foreach_oid(colocatedTableId, colocatedTableList)
{
EnsureTableOwner(colocatedTableId);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we actually want to check is the move list itself. Otherwise we would be throwing the logical replication error even for the balanced tables (if they don't have a replica identity or primary key), but we actually were not going to move their shards. If we move the check to somewhere before planning the moves, then we might even throw that error instead of "no moves available" notice message. I think we should do the same for the background rebalancer. We should call VerifyTablesHaveReplicaIdentity for each element in placementUpdateList.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does seem useful to indicate to users that their future rebalances will fail by throwing the error even when there are no moves, because otherwise they will find out when they actually do need to rebalance. I don't feel strongly about this though.


EnsureReferenceTablesExistOnAllNodesExtended(transferMode);

if (list_length(placementUpdateList) == 0)
{
return;
Expand Down Expand Up @@ -1916,12 +1935,6 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
EnsureTableOwner(colocatedTableId);
}

if (shardTransferMode == TRANSFER_MODE_AUTOMATIC)
{
/* make sure that all tables included in the rebalance have a replica identity*/
VerifyTablesHaveReplicaIdentity(colocatedTableList);
}

List *placementUpdateList = GetRebalanceSteps(options);

if (list_length(placementUpdateList) == 0)
Expand All @@ -1930,6 +1943,23 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
return 0;
}

if (shardTransferMode == TRANSFER_MODE_AUTOMATIC)
{
/*
* If the shard transfer mode is set to auto, we should check beforehand
* if we are able to use logical replication to transfer shards or not.
* We throw an error if any of the tables do not have a replica identity, which
* is required for logical replication to replicate UPDATE and DELETE commands.
*/
PlacementUpdateEvent *placementUpdate = NULL;
foreach_ptr(placementUpdate, placementUpdateList)
{
relationId = RelationIdForShard(placementUpdate->shardId);
List *colocatedTables = ColocatedTableList(relationId);
VerifyTablesHaveReplicaIdentity(colocatedTables);
}
}

DropOrphanedResourcesInSeparateTransaction();

/* find the name of the shard transfer mode to interpolate in the scheduled command */
Expand Down
34 changes: 33 additions & 1 deletion src/test/regress/expected/shard_rebalancer.out
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,6 @@ SELECT * from master_drain_node('localhost', :worker_2_port);
ERROR: cannot use logical replication to transfer shards of the relation colocated_rebalance_test since it doesn't have a REPLICA IDENTITY or PRIMARY KEY
DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY.
HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'.
CONTEXT: while executing command on localhost:xxxxx
-- Make sure shouldhaveshards is false
select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port;
shouldhaveshards
Expand Down Expand Up @@ -2714,6 +2713,39 @@ SELECT sh.logicalrelid, pl.nodeport
(5 rows)

DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
-- verify we detect if one of the tables do not have a replica identity or primary key
-- and error out in case of shard transfer mode = auto
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)

create table table_with_primary_key (a int primary key);
select create_distributed_table('table_with_primary_key','a');
create_distributed_table
---------------------------------------------------------------------

(1 row)

create table table_without_primary_key (a bigint);
select create_distributed_table('table_without_primary_key','a');
create_distributed_table
---------------------------------------------------------------------

(1 row)

-- add the second node back, then rebalance
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
select 1 from citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)

select rebalance_table_shards();
ERROR: cannot use logical replication to transfer shards of the relation table_without_primary_key since it doesn't have a REPLICA IDENTITY or PRIMARY KEY
DROP TABLE table_with_primary_key, table_without_primary_key;
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO OFF;
REVOKE ALL ON SCHEMA public FROM testrole;
Expand Down
15 changes: 15 additions & 0 deletions src/test/regress/sql/shard_rebalancer.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,21 @@ SELECT sh.logicalrelid, pl.nodeport

DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;

-- verify we detect if one of the tables do not have a replica identity or primary key
-- and error out in case of shard transfer mode = auto
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);

create table table_with_primary_key (a int primary key);
select create_distributed_table('table_with_primary_key','a');
create table table_without_primary_key (a bigint);
select create_distributed_table('table_without_primary_key','a');

-- add the second node back, then rebalance
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
select 1 from citus_add_node('localhost', :worker_2_port);
select rebalance_table_shards();

DROP TABLE table_with_primary_key, table_without_primary_key;
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO OFF;
REVOKE ALL ON SCHEMA public FROM testrole;
Expand Down