Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: shard lock module #853

Merged
merged 9 commits into from
Apr 25, 2023
9 changes: 8 additions & 1 deletion cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,17 @@ pub struct ClusterImpl {
}

impl ClusterImpl {
pub async fn create(
pub async fn try_new(
node_name: String,
shard_tables_cache: ShardTablesCache,
meta_client: MetaClientRef,
config: ClusterConfig,
runtime: Arc<Runtime>,
) -> Result<Self> {
if let Err(e) = config.etcd_client.validate() {
return InvalidArguments { msg: e }.fail();
}

let inner = Arc::new(Inner::new(shard_tables_cache, meta_client)?);
let connect_options = ConnectOptions::from(&config.etcd_client);
let etcd_client =
Expand All @@ -83,6 +87,9 @@ impl ClusterImpl {
shard_lock_key_prefix,
node_name,
etcd_client,
config.etcd_client.shard_lock_lease_ttl_sec,
config.etcd_client.shard_lock_lease_check_interval.0,
config.etcd_client.rpc_timeout(),
runtime.clone(),
);
Ok(Self {
Expand Down
37 changes: 32 additions & 5 deletions cluster/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::time::Duration;

use common_types::schema::TIMESTAMP_COLUMN;
use common_util::config::ReadableDuration;
use etcd_client::ConnectOptions;
Expand All @@ -25,6 +27,7 @@ impl Default for SchemaConfig {
}

const DEFAULT_ETCD_ROOT_PATH: &str = "/ceresdb";
const MIN_SHARD_LOCK_LEASE_TTL_SEC: u64 = 15;

#[derive(Clone, Deserialize, Debug, Serialize)]
#[serde(default)]
Expand All @@ -34,22 +37,48 @@ pub struct EtcdClientConfig {
/// Root path in the etcd used by the ceresdb server
pub root_path: String,

/// Timeout to connect to etcd cluster
pub connect_timeout: ReadableDuration,
pub rpc_timeout: ReadableDuration,

/// The lease of the shard lock in seconds.
///
/// It should be greater than `shard_lock_lease_check_interval`.
/// NOTE: the rpc timeout to the etcd cluster is determined by it.
pub shard_lock_lease_ttl_sec: u64,
/// The interval of checking whether the shard lock lease is expired
pub shard_lock_lease_check_interval: ReadableDuration,
}

impl EtcdClientConfig {
pub fn validate(&self) -> Result<(), String> {
if self.shard_lock_lease_ttl_sec < MIN_SHARD_LOCK_LEASE_TTL_SEC {
return Err(format!(
"shard_lock_lease_ttl_sec should be greater than {MIN_SHARD_LOCK_LEASE_TTL_SEC}"
));
}

if self.shard_lock_lease_check_interval.0
>= Duration::from_secs(self.shard_lock_lease_ttl_sec)
{
return Err(format!(
"shard_lock_lease_check_interval({}) should be less than shard_lock_lease_ttl_sec({}s)",
self.shard_lock_lease_check_interval, self.shard_lock_lease_ttl_sec,
));
}

Ok(())
}

pub fn rpc_timeout(&self) -> Duration {
Duration::from_secs(self.shard_lock_lease_ttl_sec) / 6
}
}

impl From<&EtcdClientConfig> for ConnectOptions {
fn from(config: &EtcdClientConfig) -> Self {
ConnectOptions::default()
.with_connect_timeout(config.connect_timeout.0)
.with_timeout(config.rpc_timeout.0)
.with_timeout(config.rpc_timeout())
}
}

Expand All @@ -58,10 +87,8 @@ impl Default for EtcdClientConfig {
Self {
server_addrs: vec!["127.0.0.1:2379".to_string()],
root_path: DEFAULT_ETCD_ROOT_PATH.to_string(),

rpc_timeout: ReadableDuration::secs(5),
connect_timeout: ReadableDuration::secs(5),
shard_lock_lease_ttl_sec: 15,
shard_lock_lease_ttl_sec: 30,
shard_lock_lease_check_interval: ReadableDuration::millis(200),
}
}
Expand Down
Loading