Skip to content

Commit

Permalink
fix: add timeout to keep-alive response to prevent potential split-br…
Browse files Browse the repository at this point in the history
…ain. (#9342)
  • Loading branch information
shanicky authored Apr 23, 2023
1 parent 8cd9dbb commit 8a494e5
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
2 changes: 1 addition & 1 deletion ci/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 28
timeout_in_minutes: 35

- label: "end-to-end iceberg sink test (release mode)"
command: "ci/scripts/e2e-iceberg-sink-test.sh -p ci-release"
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
}

fn validate_config(config: &RwConfig) {
if config.meta.meta_leader_lease_secs <= 1 {
let error_msg = "meta leader lease secs should be larger than 1";
if config.meta.meta_leader_lease_secs <= 2 {
let error_msg = "meta leader lease secs should be larger than 2";
tracing::error!(error_msg);
panic!("{}", error_msg);
}
Expand Down
21 changes: 14 additions & 7 deletions src/meta/src/rpc/election_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ impl ElectionClient for EtcdElectionClient {

// timeout controller, when keep alive fails for more than a certain period of time
// before it is considered a complete failure
let mut timeout = time::interval(Duration::from_secs((ttl / 2) as u64));
let mut timeout = time::interval(Duration::from_secs_f64(ttl as f64 / 2.0));
timeout.reset();

let mut keep_alive_sending = false;

loop {
tokio::select! {
biased;
Expand All @@ -158,16 +160,21 @@ impl ElectionClient for EtcdElectionClient {
break;
}

_ = ticker.tick() => {
_ = ticker.tick(), if !keep_alive_sending => {
if let Err(err) = keeper.keep_alive().await {
tracing::debug!("keep alive for lease {} failed {}", lease_id, err);
continue
}

match resp_stream.message().await {
keep_alive_sending = true;
}

resp = resp_stream.message() => {
keep_alive_sending = false;
match resp {
Ok(Some(resp)) => {
if resp.ttl() <= 0 {
tracing::warn!("lease expired or revoked {}", lease_id);
tracing::error!("lease expired or revoked {}", lease_id);
keep_alive_fail_tx.send(()).unwrap();
break;
}
Expand All @@ -178,7 +185,7 @@ impl ElectionClient for EtcdElectionClient {
tracing::debug!("lease keeper for lease {} response stream closed unexpected", lease_id);

// try to re-create lease keeper, with timeout as ttl / 2
if let Ok(Ok((keeper_, resp_stream_))) = time::timeout(Duration::from_secs((ttl / 2) as u64), lease_client.keep_alive(lease_id)).await {
if let Ok(Ok((keeper_, resp_stream_))) = time::timeout(Duration::from_secs_f64(ttl as f64 / 2.0), lease_client.keep_alive(lease_id)).await {
keeper = keeper_;
resp_stream = resp_stream_;
};
Expand Down Expand Up @@ -247,7 +254,7 @@ impl ElectionClient for EtcdElectionClient {
tracing::debug!("observe stream closed unexpected, recreating");

// try to re-create observe stream, with timeout as ttl / 2
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs((ttl / 2) as u64), self.client.observe(META_ELECTION_KEY)).await {
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs_f64(ttl as f64 / 2.0), self.client.observe(META_ELECTION_KEY)).await {
observe_stream = stream;
tracing::debug!("recreating observe stream");
}
Expand Down Expand Up @@ -386,7 +393,7 @@ mod tests {
let mut ticker = time::interval(Duration::from_secs(1));
loop {
ticker.tick().await;
if let Ok(_) = client_.run_once(3, stop.clone()).await {
if let Ok(_) = client_.run_once(5, stop.clone()).await {
break;
}
}
Expand Down

0 comments on commit 8a494e5

Please sign in to comment.