Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: implement the distributed lock of shard #706

Merged
merged 13 commits into from
Apr 21, 2023
23 changes: 21 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ bytes = "1.1.0"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.2"
ceresdbproto = "1.0.3"
chrono = "0.4"
clap = "3.0"
clru = "0.6.1"
Expand All @@ -78,6 +78,7 @@ common_util = { path = "common_util" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b87871fdd1f4ce64201eb1f7c79a0547627f37e9" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b87871fdd1f4ce64201eb1f7c79a0547627f37e9" }
df_operator = { path = "df_operator" }
etcd-client = "0.10.3"
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
env_logger = "0.6"
futures = "0.3"
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", features = [
Expand Down Expand Up @@ -137,6 +138,7 @@ clap = { workspace = true }
cluster = { workspace = true }
common_util = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
log = { workspace = true }
logger = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ test:
cd $(DIR); cargo test --workspace -- --test-threads=4

integration-test:
# TODO: restore it as `make run` after we fix the clustering integration test.
cd $(DIR)/integration_tests; make run-local
cd $(DIR)/integration_tests; make run

# grcov needs build first, then run test
build-ut:
Expand Down
2 changes: 2 additions & 0 deletions cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ async-trait = { workspace = true }
ceresdbproto = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
etcd-client = { workspace = true }
log = { workspace = true }
meta_client = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
Expand Down
99 changes: 73 additions & 26 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ use std::{
use async_trait::async_trait;
use ceresdbproto::{
meta_event::{
CloseShardRequest, CloseTableOnShardRequest, CreateTableOnShardRequest,
DropTableOnShardRequest, OpenShardRequest, OpenTableOnShardRequest, UpdateShardInfo,
CloseTableOnShardRequest, CreateTableOnShardRequest, DropTableOnShardRequest,
OpenTableOnShardRequest, UpdateShardInfo,
},
meta_service::TableInfo as TableInfoPb,
};
use common_types::table::ShardId;
use common_util::{
error::BoxError,
runtime::{JoinHandle, Runtime},
};
use etcd_client::ConnectOptions;
use log::{error, info, warn};
use meta_client::{
types::{
Expand All @@ -32,9 +34,12 @@ use tokio::{
};

use crate::{
config::ClusterConfig, shard_tables_cache::ShardTablesCache, topology::ClusterTopology,
Cluster, ClusterNodesNotFound, ClusterNodesResp, Internal, MetaClientFailure, OpenShard,
OpenShardWithCause, Result, ShardNotFound, TableNotFound,
config::ClusterConfig,
shard_lock_manager::{ShardLockManager, ShardLockManagerRef},
shard_tables_cache::ShardTablesCache,
topology::ClusterTopology,
Cluster, ClusterNodesNotFound, ClusterNodesResp, EtcdClientFailureWithCause, Internal,
MetaClientFailure, OpenShard, OpenShardWithCause, Result, ShardNotFound, TableNotFound,
};

/// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`].
Expand All @@ -49,23 +54,43 @@ pub struct ClusterImpl {
config: ClusterConfig,
heartbeat_handle: Mutex<Option<JoinHandle<()>>>,
stop_heartbeat_tx: Mutex<Option<Sender<()>>>,
shard_lock_manager: ShardLockManagerRef,
}

impl ClusterImpl {
pub fn new(
pub async fn create(
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
node_name: String,
shard_tables_cache: ShardTablesCache,
meta_client: MetaClientRef,
config: ClusterConfig,
runtime: Arc<Runtime>,
) -> Result<Self> {
let inner = Inner::new(shard_tables_cache, meta_client)?;

let inner = Arc::new(Inner::new(shard_tables_cache, meta_client)?);
let connect_options = ConnectOptions::from(&config.etcd_client);
let etcd_client =
etcd_client::Client::connect(&config.etcd_client.server_addrs, Some(connect_options))
.await
.context(EtcdClientFailureWithCause {
msg: "failed to connect to etcd",
})?;

let shard_lock_key_prefix = Self::shard_lock_key_prefix(
&config.etcd_client.root_path,
&config.meta_client.cluster_name,
);
let shard_lock_manager = ShardLockManager::new(
shard_lock_key_prefix,
node_name,
etcd_client,
runtime.clone(),
);
Ok(Self {
inner: Arc::new(inner),
inner,
runtime,
config,
heartbeat_handle: Mutex::new(None),
stop_heartbeat_tx: Mutex::new(None),
shard_lock_manager: Arc::new(shard_lock_manager),
})
}

Expand Down Expand Up @@ -109,8 +134,9 @@ impl ClusterImpl {
self.config.meta_client.lease.0 / 2
}

pub fn shard_tables_cache(&self) -> &ShardTablesCache {
&self.inner.shard_tables_cache
fn shard_lock_key_prefix(root_path: &str, cluster_name: &str) -> String {
const SHARD_LOCK_KEY: &str = "shards";
format!("{root_path}/{cluster_name}/{SHARD_LOCK_KEY}")
}
}

Expand Down Expand Up @@ -187,12 +213,7 @@ impl Inner {
Ok(resp)
}

async fn open_shard(&self, req: &OpenShardRequest) -> Result<TablesOfShard> {
let shard_info = req.shard.as_ref().context(OpenShard {
shard_id: 0u32,
msg: "missing shard info in the request",
})?;

async fn open_shard(&self, shard_info: &ShardInfo) -> Result<TablesOfShard> {
if let Some(tables_of_shard) = self.shard_tables_cache.get(shard_info.id) {
if tables_of_shard.shard_info.version == shard_info.version {
info!(
Expand Down Expand Up @@ -245,11 +266,11 @@ impl Inner {
Ok(tables_of_shard)
}

fn close_shard(&self, req: &CloseShardRequest) -> Result<TablesOfShard> {
fn close_shard(&self, shard_id: ShardId) -> Result<TablesOfShard> {
self.shard_tables_cache
.remove(req.shard_id)
.remove(shard_id)
.with_context(|| ShardNotFound {
msg: format!("close non-existent shard, shard_id:{}", req.shard_id),
msg: format!("close non-existent shard, shard_id:{shard_id}"),
})
}

Expand Down Expand Up @@ -286,7 +307,7 @@ impl Inner {

self.shard_tables_cache.try_insert_table_to_shard(
update_shard_info.prev_version,
ShardInfo::from(curr_shard_info),
ShardInfo::from(&curr_shard_info),
TableInfo::try_from(table_info)
.box_err()
.context(Internal {
Expand All @@ -312,7 +333,7 @@ impl Inner {

self.shard_tables_cache.try_remove_table_from_shard(
update_shard_info.prev_version,
ShardInfo::from(curr_shard_info),
ShardInfo::from(&curr_shard_info),
TableInfo::try_from(table_info)
.box_err()
.context(Internal {
Expand Down Expand Up @@ -355,12 +376,12 @@ impl Cluster for ClusterImpl {
Ok(())
}

async fn open_shard(&self, req: &OpenShardRequest) -> Result<TablesOfShard> {
self.inner.open_shard(req).await
async fn open_shard(&self, shard_info: &ShardInfo) -> Result<TablesOfShard> {
self.inner.open_shard(shard_info).await
}

async fn close_shard(&self, req: &CloseShardRequest) -> Result<TablesOfShard> {
self.inner.close_shard(req)
async fn close_shard(&self, shard_id: ShardId) -> Result<TablesOfShard> {
self.inner.close_shard(shard_id)
}

async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()> {
Expand All @@ -386,4 +407,30 @@ impl Cluster for ClusterImpl {
async fn fetch_nodes(&self) -> Result<ClusterNodesResp> {
self.inner.fetch_nodes().await
}

fn shard_lock_manager(&self) -> ShardLockManagerRef {
self.shard_lock_manager.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_format_shard_lock_key_prefix() {
let cases = vec![
(
("/ceresdb", "defaultCluster"),
"/ceresdb/defaultCluster/shards",
),
(("/", "defaultCluster"), "/defaultCluster/shards"),
(("/", ""), "/shards"),
];

for ((root_path, cluster_name), expected) in cases {
let actual = ClusterImpl::shard_lock_key_prefix(root_path, cluster_name);
assert_eq!(actual, expected);
}
}
}
48 changes: 47 additions & 1 deletion cluster/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use common_types::schema::TIMESTAMP_COLUMN;
use common_util::config::ReadableDuration;
use etcd_client::ConnectOptions;
use meta_client::meta_impl::MetaClientConfig;
use serde::{Deserialize, Serialize};
use table_engine::ANALYTIC_ENGINE_TYPE;
Expand All @@ -22,9 +24,53 @@ impl Default for SchemaConfig {
}
}

const DEFAULT_ETCD_ROOT_PATH: &str = "/ceresdb";

#[derive(Clone, Deserialize, Debug, Serialize)]
#[serde(default)]
pub struct EtcdClientConfig {
/// The etcd server addresses
pub server_addrs: Vec<String>,
/// Root path in the etcd used by the ceresdb server
pub root_path: String,

pub connect_timeout: ReadableDuration,
pub rpc_timeout: ReadableDuration,
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved

/// The lease of the shard lock in seconds.
///
/// It should be greater than `shard_lock_lease_check_interval`.
pub shard_lock_lease_ttl_sec: u64,
/// The interval of checking whether the shard lock lease is expired
pub shard_lock_lease_check_interval: ReadableDuration,
}

impl From<&EtcdClientConfig> for ConnectOptions {
fn from(config: &EtcdClientConfig) -> Self {
ConnectOptions::default()
.with_connect_timeout(config.connect_timeout.0)
.with_timeout(config.rpc_timeout.0)
}
}

impl Default for EtcdClientConfig {
fn default() -> Self {
Self {
server_addrs: vec!["127.0.0.1:2379".to_string()],
root_path: DEFAULT_ETCD_ROOT_PATH.to_string(),

rpc_timeout: ReadableDuration::secs(5),
connect_timeout: ReadableDuration::secs(5),
shard_lock_lease_ttl_sec: 15,
shard_lock_lease_check_interval: ReadableDuration::millis(200),
}
}
}

#[derive(Default, Clone, Deserialize, Debug, Serialize)]
#[serde(default)]
pub struct ClusterConfig {
pub cmd_channel_buffer_size: usize,
pub meta_client: MetaClientConfig,
pub etcd_client: EtcdClientConfig,
}
Loading