diff --git a/changelog.d/24063_retry_docker_logs_client.feature.md b/changelog.d/24063_retry_docker_logs_client.feature.md new file mode 100644 index 0000000000000..6b61f1e9e1fb0 --- /dev/null +++ b/changelog.d/24063_retry_docker_logs_client.feature.md @@ -0,0 +1,3 @@ +The `docker_logs` source now includes exponential backoff retry logic for Docker daemon communication failures, with indefinite retry capability. This improves reliability when working with slow or temporarily unresponsive Docker daemons by retrying with increasing delays instead of immediately stopping. + +authors: titaneric diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index de361f40e29e9..44f9d289c9565 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -41,6 +41,7 @@ use vrl::{ use super::util::MultilineConfig; use crate::{ SourceSender, + common::backoff::ExponentialBackoff, config::{DataType, SourceConfig, SourceContext, SourceOutput, log_schema}, docker::{DockerTlsConfig, docker}, event::{self, EstimatedJsonEncodedSizeOf, LogEvent, Value, merge_state::LogEventMergeState}, @@ -468,6 +469,8 @@ struct DockerLogsSource { /// It may contain shortened container id. hostname: Option, backoff_duration: Duration, + /// Backoff strategy for events stream retries + events_backoff: ExponentialBackoff, } impl DockerLogsSource { @@ -521,6 +524,9 @@ impl DockerLogsSource { main_recv, hostname, backoff_duration: backoff_secs, + events_backoff: ExponentialBackoff::from_millis(2) + .factor(1000) + .max_delay(Duration::from_secs(300)), }) } @@ -620,6 +626,9 @@ impl DockerLogsSource { value = self.events.next() => { match value { Some(Ok(mut event)) => { + // Reset backoff on successful event + self.events_backoff.reset(); + let action = event.action.unwrap(); let actor = event.actor.take().unwrap(); let id = actor.id.unwrap(); @@ -662,13 +671,18 @@ impl DockerLogsSource { error, container_id: None, }); - return; + // Retry events stream with exponential backoff + if !self.retry_events_stream_with_backoff("Docker events stream failed").await { + error!("Docker events stream failed and retry exhausted, shutting down."); + return; + } }, None => { - // TODO: this could be fixed, but should be tried with some timeoff and exponential backoff - error!(message = "Docker log event stream has ended unexpectedly.", internal_log_rate_limit = false); - info!(message = "Shutting down docker_logs source."); - return; + // Retry events stream with exponential backoff + if !self.retry_events_stream_with_backoff("Docker events stream ended").await { + error!("Docker events stream ended and retry exhausted, shutting down."); + return; + } } }; } @@ -676,6 +690,31 @@ impl DockerLogsSource { } } + /// Retry events stream with exponential backoff + /// Returns true if retry was attempted, false if exhausted or shutdown + async fn retry_events_stream_with_backoff(&mut self, reason: &str) -> bool { + if let Some(delay) = self.events_backoff.next() { + warn!( + message = reason, + action = "retrying with backoff", + delay_ms = delay.as_millis() + ); + tokio::select! { + _ = tokio::time::sleep(delay) => { + self.events = Box::pin(self.esb.core.docker_logs_event_stream()); + true + } + _ = self.esb.shutdown.clone() => { + info!(message = "Shutdown signal received during retry backoff."); + false + } + } + } else { + error!(message = "Events stream retry exhausted.", reason = reason); + false + } + } + fn exclude_self(&self, id: &str) -> bool { self.hostname .as_ref()