From e44fc25ca3bccdf50c4fc4b40a73b00850c66a82 Mon Sep 17 00:00:00 2001 From: Artem Medvedev Date: Sun, 26 May 2024 15:50:43 +0200 Subject: [PATCH] feat: make container and exec logs `Send`able (#637) There is no need to stick logs to lifetime of container: - this way logs can't be sent to another thread/task - logs will stop with EOF if container has been dropped anyway, what's reasonable - it's step forward log followers (like in [Java implementation](https://java.testcontainers.org/features/container_logs/)) --- testcontainers/src/core/client.rs | 14 ++-- testcontainers/src/core/client/exec.rs | 12 ++-- .../src/core/containers/async_container.rs | 59 +++++++++------- .../core/containers/async_container/exec.rs | 10 +-- .../src/core/containers/sync_container.rs | 67 +++++++++++-------- .../core/containers/sync_container/exec.rs | 16 ++--- .../containers/sync_container/sync_reader.rs | 17 +++-- testcontainers/src/core/logs.rs | 12 ++-- 8 files changed, 116 insertions(+), 91 deletions(-) diff --git a/testcontainers/src/core/client.rs b/testcontainers/src/core/client.rs index 4d94a4cd..c954425b 100644 --- a/testcontainers/src/core/client.rs +++ b/testcontainers/src/core/client.rs @@ -91,12 +91,12 @@ impl Client { Ok(Client { config, bollard }) } - pub(crate) fn stdout_logs(&self, id: &str) -> LogStreamAsync<'_> { - self.logs(id, LogSource::StdOut) + pub(crate) fn stdout_logs(&self, id: &str, follow: bool) -> LogStreamAsync { + self.logs(id, LogSource::StdOut, follow) } - pub(crate) fn stderr_logs(&self, id: &str) -> LogStreamAsync<'_> { - self.logs(id, LogSource::StdErr) + pub(crate) fn stderr_logs(&self, id: &str, follow: bool) -> LogStreamAsync { + self.logs(id, LogSource::StdErr, follow) } pub(crate) async fn ports(&self, id: &str) -> Result { @@ -152,7 +152,7 @@ impl Client { &self, container_id: &str, cmd: Vec, - ) -> Result, ClientError> { + ) -> Result { let config = CreateExecOptions { cmd: Some(cmd), attach_stdout: Some(true), @@ -245,9 +245,9 @@ impl Client { .map_err(ClientError::InspectExec) } - fn logs(&self, container_id: &str, log_source: LogSource) -> LogStreamAsync<'_> { + fn logs(&self, container_id: &str, log_source: LogSource, follow: bool) -> LogStreamAsync { let options = LogsOptions { - follow: true, + follow, stdout: log_source.is_stdout(), stderr: log_source.is_stderr(), tail: "all".to_owned(), diff --git a/testcontainers/src/core/client/exec.rs b/testcontainers/src/core/client/exec.rs index 4f2ce851..c78d7969 100644 --- a/testcontainers/src/core/client/exec.rs +++ b/testcontainers/src/core/client/exec.rs @@ -1,21 +1,21 @@ use crate::core::logs::LogStreamAsync; -pub(crate) struct ExecResult<'a> { +pub(crate) struct ExecResult { pub(crate) id: String, - pub(crate) stdout: LogStreamAsync<'a>, - pub(crate) stderr: LogStreamAsync<'a>, + pub(crate) stdout: LogStreamAsync, + pub(crate) stderr: LogStreamAsync, } -impl<'a> ExecResult<'a> { +impl ExecResult { pub(crate) fn id(&self) -> &str { &self.id } - pub(crate) fn stdout(&mut self) -> &mut LogStreamAsync<'a> { + pub(crate) fn stdout(&mut self) -> &mut LogStreamAsync { &mut self.stdout } - pub(crate) fn stderr(&mut self) -> &mut LogStreamAsync<'a> { + pub(crate) fn stderr(&mut self) -> &mut LogStreamAsync { &mut self.stderr } } diff --git a/testcontainers/src/core/containers/async_container.rs b/testcontainers/src/core/containers/async_container.rs index b390b024..2c938a01 100644 --- a/testcontainers/src/core/containers/async_container.rs +++ b/testcontainers/src/core/containers/async_container.rs @@ -171,7 +171,7 @@ where } /// Executes a command in the container. - pub async fn exec(&self, cmd: ExecCommand) -> Result> { + pub async fn exec(&self, cmd: ExecCommand) -> Result { let ExecCommand { cmd, container_ready_conditions, @@ -262,14 +262,14 @@ where } /// Returns an asynchronous reader for stdout. - pub fn stdout(&self) -> Pin> { - let stdout = self.docker_client.stdout_logs(&self.id); + pub fn stdout(&self) -> Pin> { + let stdout = self.docker_client.stdout_logs(&self.id, true); Box::pin(tokio_util::io::StreamReader::new(stdout.into_inner())) } /// Returns an asynchronous reader for stderr. - pub fn stderr(&self) -> Pin> { - let stderr = self.docker_client.stderr_logs(&self.id); + pub fn stderr(&self) -> Pin> { + let stderr = self.docker_client.stderr_logs(&self.id, true); Box::pin(tokio_util::io::StreamReader::new(stderr.into_inner())) } @@ -281,13 +281,13 @@ where match condition { WaitFor::StdOutMessage { message } => self .docker_client - .stdout_logs(id) + .stdout_logs(id, true) .wait_for_message(message) .await .map_err(WaitContainerError::from)?, WaitFor::StdErrMessage { message } => self .docker_client - .stderr_logs(id) + .stderr_logs(id, true) .wait_for_message(message) .await .map_err(WaitContainerError::from)?, @@ -382,23 +382,34 @@ mod tests { let image = GenericImage::new("testcontainers/helloworld", "1.1.0"); let container = RunnableImage::from(image).start().await?; - let mut stderr_lines = container.stderr().lines(); - - let expected_messages = [ - "DELAY_START_MSEC: 0", - "Sleeping for 0 ms", - "Starting server on port 8080", - "Sleeping for 0 ms", - "Starting server on port 8081", - "Ready, listening on 8080 and 8081", - ]; - for expected_message in expected_messages { - let line = stderr_lines.next_line().await?.expect("line must exist"); - assert!( - line.contains(expected_message), - "Log message ('{line}') doesn't contain expected message ('{expected_message}')" - ); - } + let stderr = container.stderr(); + + // it's possible to send logs into background task + let log_follower_task = tokio::spawn(async move { + let mut stderr_lines = stderr.lines(); + let expected_messages = [ + "DELAY_START_MSEC: 0", + "Sleeping for 0 ms", + "Starting server on port 8080", + "Sleeping for 0 ms", + "Starting server on port 8081", + "Ready, listening on 8080 and 8081", + ]; + for expected_message in expected_messages { + let line = stderr_lines.next_line().await?.expect("line must exist"); + if !line.contains(expected_message) { + anyhow::bail!( + "Log message ('{}') doesn't contain expected message ('{}')", + line, + expected_message + ); + } + } + Ok(()) + }); + log_follower_task + .await + .map_err(|_| anyhow::anyhow!("failed to join log follower task"))??; // logs are accessible after container is stopped container.stop().await?; diff --git a/testcontainers/src/core/containers/async_container/exec.rs b/testcontainers/src/core/containers/async_container/exec.rs index 9175b4bf..538d01ef 100644 --- a/testcontainers/src/core/containers/async_container/exec.rs +++ b/testcontainers/src/core/containers/async_container/exec.rs @@ -7,14 +7,14 @@ use tokio::io::{AsyncBufRead, AsyncReadExt}; use crate::core::{client::Client, error::Result}; /// Represents the result of an executed command in a container. -pub struct ExecResult<'a> { +pub struct ExecResult { pub(super) client: Arc, pub(crate) id: String, - pub(super) stdout: BoxStream<'a, std::result::Result>, - pub(super) stderr: BoxStream<'a, std::result::Result>, + pub(super) stdout: BoxStream<'static, std::result::Result>, + pub(super) stderr: BoxStream<'static, std::result::Result>, } -impl<'a> ExecResult<'a> { +impl ExecResult { /// Returns the exit code of the executed command. /// If the command has not yet exited, this will return `None`. pub async fn exit_code(&self) -> Result> { @@ -49,7 +49,7 @@ impl<'a> ExecResult<'a> { } } -impl fmt::Debug for ExecResult<'_> { +impl fmt::Debug for ExecResult { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ExecResult").field("id", &self.id).finish() } diff --git a/testcontainers/src/core/containers/sync_container.rs b/testcontainers/src/core/containers/sync_container.rs index 7de2b1a1..3fc0ef5b 100644 --- a/testcontainers/src/core/containers/sync_container.rs +++ b/testcontainers/src/core/containers/sync_container.rs @@ -1,4 +1,4 @@ -use std::{fmt, io::BufRead, net::IpAddr}; +use std::{fmt, io::BufRead, net::IpAddr, sync::Arc}; use crate::{ core::{env, error::Result, ports::Ports, ExecCommand}, @@ -28,7 +28,7 @@ pub struct Container { /// Internal representation of a running docker container, to be able to terminate runtime correctly when `Container` is dropped. struct ActiveContainer { - runtime: tokio::runtime::Runtime, + runtime: Arc, async_impl: ContainerAsync, } @@ -50,7 +50,7 @@ impl Container { pub(crate) fn new(runtime: tokio::runtime::Runtime, async_impl: ContainerAsync) -> Self { Self { inner: Some(ActiveContainer { - runtime, + runtime: Arc::new(runtime), async_impl, }), } @@ -132,11 +132,11 @@ where } /// Executes a command in the container. - pub fn exec(&self, cmd: ExecCommand) -> Result> { + pub fn exec(&self, cmd: ExecCommand) -> Result { let async_exec = self.rt().block_on(self.async_impl().exec(cmd))?; Ok(exec::SyncExecResult { inner: async_exec, - runtime: self.rt(), + runtime: self.rt().clone(), }) } @@ -159,23 +159,23 @@ where } /// Returns a reader for stdout. - pub fn stdout(&self) -> Box { + pub fn stdout(&self) -> Box { Box::new(sync_reader::SyncReadBridge::new( self.async_impl().stdout(), - self.rt(), + self.rt().clone(), )) } /// Returns a reader for stderr. - pub fn stderr(&self) -> Box { + pub fn stderr(&self) -> Box { Box::new(sync_reader::SyncReadBridge::new( self.async_impl().stderr(), - self.rt(), + self.rt().clone(), )) } /// Returns reference to inner `Runtime`. It's safe to unwrap because it's `Some` until `Container` is dropped. - fn rt(&self) -> &tokio::runtime::Runtime { + fn rt(&self) -> &Arc { &self.inner.as_ref().unwrap().runtime } @@ -234,27 +234,38 @@ mod test { fn assert_send_and_sync() {} #[test] - fn async_logs_are_accessible() -> anyhow::Result<()> { + fn sync_logs_are_accessible() -> anyhow::Result<()> { let image = GenericImage::new("testcontainers/helloworld", "1.1.0"); let container = RunnableImage::from(image).start()?; - let mut stderr_lines = container.stderr().lines(); - - let expected_messages = [ - "DELAY_START_MSEC: 0", - "Sleeping for 0 ms", - "Starting server on port 8080", - "Sleeping for 0 ms", - "Starting server on port 8081", - "Ready, listening on 8080 and 8081", - ]; - for expected_message in expected_messages { - let line = stderr_lines.next().expect("line must exist")?; - assert!( - line.contains(expected_message), - "Log message ('{line}') doesn't contain expected message ('{expected_message}')" - ); - } + let stderr = container.stderr(); + + // it's possible to send logs to another thread + let log_follower_thread = std::thread::spawn(move || { + let mut stderr_lines = stderr.lines(); + let expected_messages = [ + "DELAY_START_MSEC: 0", + "Sleeping for 0 ms", + "Starting server on port 8080", + "Sleeping for 0 ms", + "Starting server on port 8081", + "Ready, listening on 8080 and 8081", + ]; + for expected_message in expected_messages { + let line = stderr_lines.next().expect("line must exist")?; + if !line.contains(expected_message) { + anyhow::bail!( + "Log message ('{}') doesn't contain expected message ('{}')", + line, + expected_message + ); + } + } + Ok(()) + }); + log_follower_thread + .join() + .map_err(|_| anyhow::anyhow!("failed to join log follower thread"))??; // logs are accessible after container is stopped container.stop()?; diff --git a/testcontainers/src/core/containers/sync_container/exec.rs b/testcontainers/src/core/containers/sync_container/exec.rs index 5e80b5f2..498182b1 100644 --- a/testcontainers/src/core/containers/sync_container/exec.rs +++ b/testcontainers/src/core/containers/sync_container/exec.rs @@ -1,14 +1,14 @@ -use std::{fmt, io::BufRead}; +use std::{fmt, io::BufRead, sync::Arc}; use crate::{core::sync_container::sync_reader, TestcontainersError}; /// Represents the result of an executed command in a container. -pub struct SyncExecResult<'a> { - pub(super) inner: crate::core::async_container::exec::ExecResult<'a>, - pub(super) runtime: &'a tokio::runtime::Runtime, +pub struct SyncExecResult { + pub(super) inner: crate::core::async_container::exec::ExecResult, + pub(super) runtime: Arc, } -impl<'a> SyncExecResult<'a> { +impl SyncExecResult { /// Returns the exit code of the executed command. /// If the command has not yet exited, this will return `None`. pub fn exit_code(&self) -> Result, TestcontainersError> { @@ -19,7 +19,7 @@ impl<'a> SyncExecResult<'a> { pub fn stdout<'b>(&'b mut self) -> Box { Box::new(sync_reader::SyncReadBridge::new( self.inner.stdout(), - self.runtime, + self.runtime.clone(), )) } @@ -27,7 +27,7 @@ impl<'a> SyncExecResult<'a> { pub fn stderr<'b>(&'b mut self) -> Box { Box::new(sync_reader::SyncReadBridge::new( self.inner.stderr(), - self.runtime, + self.runtime.clone(), )) } @@ -42,7 +42,7 @@ impl<'a> SyncExecResult<'a> { } } -impl fmt::Debug for SyncExecResult<'_> { +impl fmt::Debug for SyncExecResult { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ExecResult") .field("id", &self.inner.id) diff --git a/testcontainers/src/core/containers/sync_container/sync_reader.rs b/testcontainers/src/core/containers/sync_container/sync_reader.rs index ace0f3bd..6087a468 100644 --- a/testcontainers/src/core/containers/sync_container/sync_reader.rs +++ b/testcontainers/src/core/containers/sync_container/sync_reader.rs @@ -1,22 +1,25 @@ -use std::io::{BufRead, Read}; +use std::{ + io::{BufRead, Read}, + sync::Arc, +}; use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt}; /// Allows to use [`tokio::io::AsyncRead`] synchronously as [`std::io::Read`]. /// In fact, it's almost the same as [`tokio_util::io::SyncIoBridge`], but utilizes [`tokio::runtime::Runtime`] instead of [`tokio::runtime::Handle`]. /// This is needed because [`tokio::runtime::Handle::block_on`] can't drive the IO on `current_thread` runtime. -pub(super) struct SyncReadBridge<'a, T> { +pub(super) struct SyncReadBridge { inner: T, - runtime: &'a tokio::runtime::Runtime, + runtime: Arc, } -impl<'a, T: Unpin> SyncReadBridge<'a, T> { - pub fn new(inner: T, runtime: &'a tokio::runtime::Runtime) -> Self { +impl SyncReadBridge { + pub fn new(inner: T, runtime: Arc) -> Self { Self { inner, runtime } } } -impl BufRead for SyncReadBridge<'_, T> { +impl BufRead for SyncReadBridge { fn fill_buf(&mut self) -> std::io::Result<&[u8]> { let inner = &mut self.inner; self.runtime.block_on(AsyncBufReadExt::fill_buf(inner)) @@ -39,7 +42,7 @@ impl BufRead for SyncReadBridge<'_, T> { } } -impl Read for SyncReadBridge<'_, T> { +impl Read for SyncReadBridge { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { let inner = &mut self.inner; self.runtime.block_on(AsyncReadExt::read(inner, buf)) diff --git a/testcontainers/src/core/logs.rs b/testcontainers/src/core/logs.rs index 4aeb617a..60a62057 100644 --- a/testcontainers/src/core/logs.rs +++ b/testcontainers/src/core/logs.rs @@ -32,14 +32,14 @@ impl LogSource { } } -pub(crate) struct LogStreamAsync<'a> { - inner: BoxStream<'a, Result>, +pub(crate) struct LogStreamAsync { + inner: BoxStream<'static, Result>, cache: Vec>, enable_cache: bool, } -impl<'a> LogStreamAsync<'a> { - pub fn new(stream: BoxStream<'a, Result>) -> Self { +impl LogStreamAsync { + pub fn new(stream: BoxStream<'static, Result>) -> Self { Self { inner: stream, cache: vec![], @@ -78,7 +78,7 @@ impl<'a> LogStreamAsync<'a> { Err(WaitLogError::EndOfStream(messages)) } - pub(crate) fn into_inner(self) -> BoxStream<'a, Result> { + pub(crate) fn into_inner(self) -> BoxStream<'static, Result> { futures::stream::iter(self.cache).chain(self.inner).boxed() } } @@ -90,7 +90,7 @@ fn display_bytes(bytes: &[Bytes]) -> Vec> { .collect::>() } -impl<'a> fmt::Debug for LogStreamAsync<'a> { +impl fmt::Debug for LogStreamAsync { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("LogStreamAsync").finish() }