From 7501a616415b0d34e3cdcdb6378af6378d190187 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Fri, 18 Oct 2024 12:00:54 -0500 Subject: [PATCH 1/2] agent: apply a timeout when dialing a proxy connector --- crates/agent/src/proxy_connectors.rs | 25 +++++++++++++++++++++++++ crates/gazette/src/lib.rs | 5 +++++ 2 files changed, 30 insertions(+) diff --git a/crates/agent/src/proxy_connectors.rs b/crates/agent/src/proxy_connectors.rs index ea963f8845..10c8e506c4 100644 --- a/crates/agent/src/proxy_connectors.rs +++ b/crates/agent/src/proxy_connectors.rs @@ -144,6 +144,30 @@ impl ProxyConnectors { futures::channel::oneshot::Sender<()>, impl Future> + 'a, ), + )> { + Ok( + tokio::time::timeout(DIAL_PROXY_TIMEOUT, self.dial_proxy_inner(data_plane, task)) + .await + .with_context(|| { + format!( + "timed out starting proxy connector in data-plane {}", + data_plane.reactor_address + ) + })??, + ) + } + + async fn dial_proxy_inner<'a>( + &'a self, + data_plane: &tables::DataPlane, + task: ops::ShardRef, + ) -> anyhow::Result<( + tonic::transport::Channel, + gazette::Metadata, + ( + futures::channel::oneshot::Sender<()>, + impl Future> + 'a, + ), )> { let tables::DataPlane { reactor_address, @@ -247,6 +271,7 @@ impl ProxyConnectors { } } +const DIAL_PROXY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); // One minute. const CONNECTOR_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); // Five minutes. /* diff --git a/crates/gazette/src/lib.rs b/crates/gazette/src/lib.rs index 5ed060f0b4..47d9757561 100644 --- a/crates/gazette/src/lib.rs +++ b/crates/gazette/src/lib.rs @@ -93,6 +93,11 @@ pub async fn dial_channel(endpoint: &str) -> Result { .assume_http2(true), )?; + // Note that this await can block for *longer* than connect_timeout, + // because that timeout only accounts for TCP connection time and does + // not apply to time required to start the HTTP/2 transport. + // This manifests if the server has bound its port but is not serving it. + let channel = match ep.uri().scheme_str() { Some("unix") => { ep.connect_with_connector(tower::util::service_fn(|uri: tonic::transport::Uri| { From eef1422bdf8aaa3bec8a5ee3c30074acca0c5c58 Mon Sep 17 00:00:00 2001 From: Phil Date: Fri, 18 Oct 2024 14:04:03 -0400 Subject: [PATCH 2/2] activate: timeout activate requests Applies an application-level timeout to activations, to prevent indefinite blocking in the agent. --- crates/activate/src/lib.rs | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/crates/activate/src/lib.rs b/crates/activate/src/lib.rs index fe51c93cdc..73253b6990 100644 --- a/crates/activate/src/lib.rs +++ b/crates/activate/src/lib.rs @@ -173,6 +173,17 @@ async fn apply_changes( journal_client: &gazette::journal::Client, shard_client: &gazette::shard::Client, changes: impl IntoIterator, +) -> anyhow::Result<()> { + tokio::time::timeout(std::time::Duration::from_secs(60), async { + try_apply_changes(journal_client, shard_client, changes).await + }) + .await? +} + +async fn try_apply_changes( + journal_client: &gazette::journal::Client, + shard_client: &gazette::shard::Client, + changes: impl IntoIterator, ) -> anyhow::Result<()> { let mut journal_deletes = Vec::new(); let mut journal_upserts = Vec::new(); @@ -295,6 +306,32 @@ async fn converge_task_changes<'a>( ops_logs_template: Option<&broker::JournalSpec>, ops_stats_template: Option<&broker::JournalSpec>, initial_splits: usize, +) -> anyhow::Result> { + tokio::time::timeout(std::time::Duration::from_secs(60), async { + try_converge_task_changes( + journal_client, + shard_client, + task_type, + task_name, + template, + ops_logs_template, + ops_stats_template, + initial_splits, + ) + .await + }) + .await? +} + +async fn try_converge_task_changes<'a>( + journal_client: &gazette::journal::Client, + shard_client: &gazette::shard::Client, + task_type: ops::TaskType, + task_name: &str, + template: TaskTemplate<'a>, + ops_logs_template: Option<&broker::JournalSpec>, + ops_stats_template: Option<&broker::JournalSpec>, + initial_splits: usize, ) -> anyhow::Result> { let (list_shards, list_recovery) = list_task_request(task_type, task_name);