Skip to content

Commit

Permalink
refactor: implement the distributed lock of shard
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Mar 20, 2023
1 parent d71e875 commit 6206b3b
Show file tree
Hide file tree
Showing 9 changed files with 426 additions and 14 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ datafusion-expr = "18.0.0"
datafusion-optimizer = "18.0.0"
datafusion-proto = "18.0.0"
df_operator = { path = "df_operator" }
etcd-client = "0.10.3"
env_logger = "0.6"
futures = "0.3"
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", features = [
Expand Down Expand Up @@ -133,6 +134,7 @@ clap = { workspace = true }
cluster = { workspace = true }
common_util = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
log = { workspace = true }
logger = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async-trait = { workspace = true }
ceresdbproto = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
etcd-client = { workspace = true }
log = { workspace = true }
meta_client = { workspace = true }
rust-fsm = "0.6.0"
Expand All @@ -23,3 +24,4 @@ serde_json = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
tokio = { workspace = true }

77 changes: 68 additions & 9 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ use tokio::{
};

use crate::{
config::ClusterConfig, shard_tables_cache::ShardTablesCache, topology::ClusterTopology,
Cluster, ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, OpenShard,
OpenShardWithCause, Result, ShardNotFound, TableNotFound,
config::ClusterConfig, shard_lock_manager::ShardLockManager,
shard_tables_cache::ShardTablesCache, topology::ClusterTopology, CloseShardWithCause, Cluster,
ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, OpenShard, OpenShardWithCause,
Result, ShardNotFound, TableNotFound,
};

/// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`].
Expand All @@ -49,6 +50,7 @@ pub struct ClusterImpl {
config: ClusterConfig,
heartbeat_handle: Mutex<Option<JoinHandle<()>>>,
stop_heartbeat_tx: Mutex<Option<Sender<()>>>,
shard_lock_manager: Arc<ShardLockManager>,
}

impl ClusterImpl {
Expand All @@ -57,15 +59,20 @@ impl ClusterImpl {
meta_client: MetaClientRef,
config: ClusterConfig,
runtime: Arc<Runtime>,
etcd_client: etcd_client::Client,
endpoint: String,
gprc_port: u16,
) -> Result<Self> {
let inner = Inner::new(shard_tables_cache, meta_client)?;

let cache_ref = Arc::new(shard_tables_cache);
let inner = Arc::new(Inner::new(cache_ref, meta_client)?);
let shard_lock_manager = ShardLockManager::new(endpoint, gprc_port, etcd_client);
Ok(Self {
inner: Arc::new(inner),
inner,
runtime,
config,
heartbeat_handle: Mutex::new(None),
stop_heartbeat_tx: Mutex::new(None),
shard_lock_manager: Arc::new(shard_lock_manager),
})
}

Expand Down Expand Up @@ -115,13 +122,13 @@ impl ClusterImpl {
}

struct Inner {
shard_tables_cache: ShardTablesCache,
shard_tables_cache: Arc<ShardTablesCache>,
meta_client: MetaClientRef,
topology: RwLock<ClusterTopology>,
}

impl Inner {
fn new(shard_tables_cache: ShardTablesCache, meta_client: MetaClientRef) -> Result<Self> {
fn new(shard_tables_cache: Arc<ShardTablesCache>, meta_client: MetaClientRef) -> Result<Self> {
Ok(Self {
shard_tables_cache,
meta_client,
Expand Down Expand Up @@ -223,6 +230,11 @@ impl Inner {
shard_id: shard_info.id,
})?;

info!(
"get tables of shards,shard {}, result {:?}",
shard_info.id, resp
);

ensure!(
resp.tables_by_shard.len() == 1,
OpenShard {
Expand Down Expand Up @@ -348,11 +360,58 @@ impl Cluster for ClusterImpl {
}

async fn open_shard(&self, req: &OpenShardRequest) -> Result<TablesOfShard> {
let shard_id = req.shard.clone().unwrap().id;

let inner = self.inner.clone();
let callback = move |shard_id| {
let result = inner.close_shard(&CloseShardRequest { shard_id });
match result {
Ok(_) => Ok(true),
Err(e) => Err(e),
}
};

let grant_result = self
.shard_lock_manager
.clone()
.grant_lock(shard_id, callback)
.await
.box_err()
.context(OpenShardWithCause { shard_id })?;

ensure!(
grant_result,
OpenShard {
shard_id,
msg: format!("open a shard failed, shardID:{shard_id:?}"),
}
);

info!("Get shard lock finish");
self.inner.open_shard(req).await
}

async fn close_shard(&self, req: &CloseShardRequest) -> Result<TablesOfShard> {
self.inner.close_shard(req)
let close_result = self.inner.close_shard(req);
match close_result {
Ok(_) => {
let revoke_result = self
.shard_lock_manager
.clone()
.revoke_lock(req.shard_id)
.await
.box_err()
.context(CloseShardWithCause {
shard_id: req.shard_id,
});

match revoke_result {
Ok(()) => close_result,
Err(e) => Err(e),
}
}
Err(_) => close_result,
}
}

async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()> {
Expand Down
10 changes: 10 additions & 0 deletions cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ pub mod config;
pub mod shard_tables_cache;
// FIXME: Remove this lint ignore derive when topology about schema tables is
// finished.
pub mod shard_lock_manager;
#[allow(dead_code)]
pub mod topology;

#[derive(Debug, Snafu)]
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(display("Internal error, msg:{}, source:{}.", msg, source))]
Internal { msg: String, source: GenericError },

#[snafu(display("Build meta client failed, err:{}.", source))]
BuildMetaClient { source: meta_client::Error },

Expand Down Expand Up @@ -61,6 +65,12 @@ pub enum Error {
source: GenericError,
},

#[snafu(display("Fail to close shard, source:{}.", source))]
CloseShardWithCause {
shard_id: ShardId,
source: GenericError,
},

#[snafu(display("Shard not found, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
ShardNotFound { msg: String, backtrace: Backtrace },

Expand Down
Loading

0 comments on commit 6206b3b

Please sign in to comment.