Skip to content

Commit

Permalink
feat: support access etcd with tls
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Oct 10, 2023
1 parent ee70ab4 commit ff8f2fa
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 16 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", re
df_operator = { path = "df_operator" }
df_engine_extensions = { path = "df_engine_extensions" }
future_ext = { path = "components/future_ext" }
etcd-client = "0.10.3"
etcd-client = { version = "0.10.3", features = ["tls"] }
env_logger = "0.6"
futures = "0.3"
generic_error = { path = "components/generic_error" }
Expand Down
1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
# limitations under the License.

large-error-threshold = 1024
enum-variant-size-threshold = 1024
42 changes: 36 additions & 6 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{

use async_trait::async_trait;
use common_types::table::ShardId;
use etcd_client::ConnectOptions;
use etcd_client::{Certificate, ConnectOptions, Identity, TlsOptions};
use generic_error::BoxError;
use logger::{error, info, warn};
use meta_client::{
Expand All @@ -32,17 +32,19 @@ use meta_client::{
use runtime::{JoinHandle, Runtime};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::{
fs, io,
sync::mpsc::{self, Sender},
time,
};

use crate::{
config::ClusterConfig,
config::{ClusterConfig, EtcdClientConfig},
shard_lock_manager::{ShardLockManager, ShardLockManagerRef},
shard_set::{Shard, ShardRef, ShardSet},
topology::ClusterTopology,
Cluster, ClusterNodesNotFound, ClusterNodesResp, EtcdClientFailureWithCause, InvalidArguments,
MetaClientFailure, OpenShard, OpenShardWithCause, Result, ShardNotFound,
Cluster, ClusterNodesNotFound, ClusterNodesResp, EtcdClientFailureWithCause,
InitEtcdClientConfig, InvalidArguments, MetaClientFailure, OpenShard, OpenShardWithCause,
Result, ShardNotFound,
};

/// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`].
Expand Down Expand Up @@ -72,8 +74,9 @@ impl ClusterImpl {
return InvalidArguments { msg: e }.fail();
}

let inner = Arc::new(Inner::new(shard_set, meta_client)?);
let connect_options = ConnectOptions::from(&config.etcd_client);
let connect_options = build_etcd_connect_options(&config.etcd_client)
.await
.context(InitEtcdClientConfig)?;
let etcd_client =
etcd_client::Client::connect(&config.etcd_client.server_addrs, Some(connect_options))
.await
Expand All @@ -94,6 +97,8 @@ impl ClusterImpl {
config.etcd_client.rpc_timeout(),
runtime.clone(),
);

let inner = Arc::new(Inner::new(shard_set, meta_client)?);
Ok(Self {
inner,
runtime,
Expand Down Expand Up @@ -383,6 +388,31 @@ impl Cluster for ClusterImpl {
}
}

/// Build the connect options for accessing etcd cluster.
async fn build_etcd_connect_options(config: &EtcdClientConfig) -> io::Result<ConnectOptions> {
let connect_options = ConnectOptions::default()
.with_connect_timeout(config.connect_timeout.0)
.with_timeout(config.rpc_timeout());

let tls = &config.tls;
if tls.enable {
let server_ca_cert = fs::read(&tls.ca_cert_path).await?;
let client_cert = fs::read(&tls.client_cert_path).await?;
let client_key = fs::read(&tls.client_key_path).await?;

let ca_cert = Certificate::from_pem(server_ca_cert);
let client_ident = Identity::from_pem(client_cert, client_key);
let tls_options = TlsOptions::new()
.domain_name(&tls.domain)
.ca_certificate(ca_cert)
.identity(client_ident);

Ok(connect_options.with_tls(tls_options))
} else {
Ok(connect_options)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
35 changes: 26 additions & 9 deletions cluster/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::time::Duration;

use common_types::schema::TIMESTAMP_COLUMN;
use etcd_client::ConnectOptions;
use meta_client::meta_impl::MetaClientConfig;
use serde::{Deserialize, Serialize};
use table_engine::ANALYTIC_ENGINE_TYPE;
Expand All @@ -41,6 +40,16 @@ impl Default for SchemaConfig {
const DEFAULT_ETCD_ROOT_PATH: &str = "/ceresdb";
const MIN_SHARD_LOCK_LEASE_TTL_SEC: u64 = 15;

#[derive(Clone, Deserialize, Debug, Serialize)]
#[serde(default)]
pub struct TlsConfig {
pub enable: bool,
pub domain: String,
pub ca_cert_path: String,
pub client_key_path: String,
pub client_cert_path: String,
}

#[derive(Clone, Deserialize, Debug, Serialize)]
#[serde(default)]
pub struct EtcdClientConfig {
Expand All @@ -52,6 +61,9 @@ pub struct EtcdClientConfig {
/// Timeout to connect to etcd cluster
pub connect_timeout: ReadableDuration,

/// Tls config to access etcd cluster.
pub tls: TlsConfig,

/// The lease of the shard lock in seconds.
///
/// It should be greater than `shard_lock_lease_check_interval`.
Expand Down Expand Up @@ -86,26 +98,31 @@ impl EtcdClientConfig {
}
}

impl From<&EtcdClientConfig> for ConnectOptions {
fn from(config: &EtcdClientConfig) -> Self {
ConnectOptions::default()
.with_connect_timeout(config.connect_timeout.0)
.with_timeout(config.rpc_timeout())
}
}

impl Default for EtcdClientConfig {
fn default() -> Self {
Self {
server_addrs: vec!["127.0.0.1:2379".to_string()],
root_path: DEFAULT_ETCD_ROOT_PATH.to_string(),
tls: TlsConfig::default(),
connect_timeout: ReadableDuration::secs(5),
shard_lock_lease_ttl_sec: 30,
shard_lock_lease_check_interval: ReadableDuration::millis(200),
}
}
}

impl Default for TlsConfig {
fn default() -> Self {
Self {
enable: false,
domain: "".to_string(),
ca_cert_path: "".to_string(),
client_key_path: "".to_string(),
client_cert_path: "".to_string(),
}
}
}

#[derive(Default, Clone, Deserialize, Debug, Serialize)]
#[serde(default)]
pub struct ClusterConfig {
Expand Down
6 changes: 6 additions & 0 deletions cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ pub enum Error {
#[snafu(display("Meta client execute failed, err:{source}."))]
MetaClientFailure { source: meta_client::Error },

#[snafu(display("Failed to init etcd client config, err:{source}.\nBacktrace:\n{backtrace}"))]
InitEtcdClientConfig {
source: std::io::Error,
backtrace: Backtrace,
},

#[snafu(display("Etcd client failure, msg:{msg}, err:{source}.\nBacktrace:\n{backtrace}"))]
EtcdClientFailureWithCause {
msg: String,
Expand Down

0 comments on commit ff8f2fa

Please sign in to comment.