Skip to content

Commit

Permalink
fix: make open shard idempotent (#338)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi authored Oct 24, 2022
1 parent 118affb commit 09ee196
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
19 changes: 14 additions & 5 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,21 @@ impl Inner {
msg: "missing shard info in the request",
})?;

if self.shard_tables_cache.contains(shard_info.id) {
OpenShard {
shard_id: shard_info.id,
msg: "shard is already opened",
if let Some(tables_of_shard) = self.shard_tables_cache.get(shard_info.id) {
if tables_of_shard.shard_info.version == shard_info.version {
info!(
"No need to open the exactly same shard again, shard_info:{:?}",
shard_info
);
return Ok(tables_of_shard);
}
.fail()?;
ensure!(
tables_of_shard.shard_info.version < shard_info.version,
OpenShard {
shard_id: shard_info.id,
msg: format!("open a shard with a smaller version, curr_shard_info:{:?}, new_shard_info:{:?}", tables_of_shard.shard_info, shard_info),
}
);
}

let req = GetTablesOfShardsRequest {
Expand Down
10 changes: 5 additions & 5 deletions cluster/src/shard_tables_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ impl ShardTablesCache {
self.inner.read().unwrap().all_shard_infos()
}

// Check whether the cache contains the shard.
pub fn contains(&self, shard_id: ShardId) -> bool {
self.inner.read().unwrap().contains(shard_id)
// Get the shard by its id.
pub fn get(&self, shard_id: ShardId) -> Option<TablesOfShard> {
self.inner.read().unwrap().get(shard_id)
}

/// Remove the shard.
Expand Down Expand Up @@ -144,8 +144,8 @@ impl Inner {
.collect()
}

fn contains(&self, shard_id: ShardId) -> bool {
self.tables_by_shard.contains_key(&shard_id)
fn get(&self, shard_id: ShardId) -> Option<TablesOfShard> {
self.tables_by_shard.get(&shard_id).cloned()
}

fn remove(&mut self, shard_id: ShardId) -> Option<TablesOfShard> {
Expand Down

0 comments on commit 09ee196

Please sign in to comment.