Skip to content

Commit

Permalink
fix: Update EtcdElectionClient keep_alive/observe behavior (#8058)
Browse files Browse the repository at this point in the history
Approved-By: yezizp2012
  • Loading branch information
shanicky authored Feb 22, 2023
1 parent 6c5f68f commit 8e499c7
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 12 deletions.
1 change: 0 additions & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ profile:
- use: kafka
persist-data: true


3etcd-3meta:
steps:
- use: etcd
Expand Down
12 changes: 11 additions & 1 deletion src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;

use risingwave_common::config::{load_config, MetaBackend};
use risingwave_common::config::{load_config, MetaBackend, RwConfig};
use tracing::info;

/// Start meta node
Expand Down Expand Up @@ -202,6 +202,8 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
MetaBackend::Mem => MetaStoreBackend::Mem,
};

validate_config(&config);

let max_heartbeat_interval =
Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64);
let barrier_interval = Duration::from_millis(config.streaming.barrier_interval_ms as u64);
Expand Down Expand Up @@ -271,3 +273,11 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
}
})
}

fn validate_config(config: &RwConfig) {
if config.meta.meta_leader_lease_secs <= 1 {
let error_msg = "meta leader lease secs should be larger than 1";
tracing::error!(error_msg);
panic!("{}", error_msg);
}
}
45 changes: 35 additions & 10 deletions src/meta/src/rpc/election_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,25 @@ impl ElectionClient for EtcdElectionClient {

let mut ticker = time::interval(Duration::from_secs(1));

// timeout controller, when keep alive fails for more than a certain period of time
// before it is considered a complete failure
let mut timeout = time::interval(Duration::from_secs((ttl / 2) as u64));
timeout.reset();

loop {
tokio::select! {
biased;

_ = timeout.tick() => {
tracing::warn!("lease {} keep alive timeout", lease_id);
keep_alive_fail_tx.send(()).unwrap();
break;
}

_ = ticker.tick() => {
if let Err(err) = keeper.keep_alive().await {
tracing::error!("keep alive for lease {} failed {}", lease_id, err);
keep_alive_fail_tx.send(()).unwrap();
break;
tracing::warn!("keep alive for lease {} failed {}", lease_id, err);
continue
}

match resp_stream.message().await {
Expand All @@ -164,16 +174,23 @@ impl ElectionClient for EtcdElectionClient {
keep_alive_fail_tx.send(()).unwrap();
break;
}

timeout.reset();
},
Ok(None) => {
tracing::warn!("lease keeper for lease {} response stream closed unexpected", lease_id);
keep_alive_fail_tx.send(()).unwrap();
break;

// try to re-create lease keeper, with timeout as ttl / 2
if let Ok(Ok((keeper_, resp_stream_))) = time::timeout(Duration::from_secs((ttl / 2) as u64), lease_client.keep_alive(lease_id)).await {
keeper = keeper_;
resp_stream = resp_stream_;
};

continue;
}
Err(e) => {
tracing::error!("lease keeper failed {}", e.to_string());
keep_alive_fail_tx.send(()).unwrap();
break;
continue;
}
};
}
Expand Down Expand Up @@ -224,16 +241,24 @@ impl ElectionClient for EtcdElectionClient {
},
resp = observe_stream.next() => {
match resp {
None => unreachable!(),
None => {
tracing::warn!("observe stream closed unexpected, recreating");

// try to re-create observe stream, with timeout as ttl / 2
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs((ttl / 2) as u64), election_client.observe(META_ELECTION_KEY)).await {
observe_stream = stream;
tracing::info!("recreating observe stream");
}
}
Some(Ok(leader)) => {
if let Some(kv) = leader.kv() && kv.value() != self.id.as_bytes() {
tracing::warn!("leader has been changed to {}", String::from_utf8_lossy(kv.value()).to_string());
break;
}
}
Some(Err(e)) => {
tracing::error!("error {} received from leader observe stream", e.to_string());
break;
tracing::warn!("error {} received from leader observe stream", e.to_string());
continue
}
}
}
Expand Down

0 comments on commit 8e499c7

Please sign in to comment.