diff --git a/testcontainers/Cargo.toml b/testcontainers/Cargo.toml index 31b77533..49e1f996 100644 --- a/testcontainers/Cargo.toml +++ b/testcontainers/Cargo.toml @@ -19,20 +19,23 @@ rustdoc-args = ["--cfg", "docsrs"] async-trait = { version = "0.1" } bollard = { version = "0.16.1", features = ["ssl"] } bollard-stubs = "=1.44.0-rc.2" +bytes = "1.6.0" conquer-once = { version = "0.4", optional = true } dirs = "5.0.1" docker_credential = "1.3.1" futures = "0.3" log = "0.4" +memchr = "2.7.2" parse-display = "0.9.0" serde = { version = "1", features = ["derive"] } serde-java-properties = { version = "0.2.0", optional = true } serde_json = "1" serde_with = "3.7.0" signal-hook = { version = "0.3", optional = true } -tokio = { version = "1", features = ["macros", "fs", "rt-multi-thread"] } -tokio-util = "0.7.10" thiserror = "1.0.60" +tokio = { version = "1", features = ["macros", "fs", "rt-multi-thread"] } +tokio-stream = "0.1.15" +tokio-util = { version = "0.7.10", features = ["io-util"] } url = { version = "2", features = ["serde"] } [features] diff --git a/testcontainers/src/core.rs b/testcontainers/src/core.rs index 43ffb390..c4cf045f 100644 --- a/testcontainers/src/core.rs +++ b/testcontainers/src/core.rs @@ -7,6 +7,7 @@ pub use self::{ mounts::{AccessMode, Mount, MountType}, }; +pub mod errors; mod image; pub(crate) mod client; diff --git a/testcontainers/src/core/client.rs b/testcontainers/src/core/client.rs index 0d26ceff..36ee4340 100644 --- a/testcontainers/src/core/client.rs +++ b/testcontainers/src/core/client.rs @@ -1,22 +1,32 @@ -use std::{io, time::Duration}; +use std::{io, sync::Arc, time::Duration}; use bollard::{ auth::DockerCredentials, - container::{Config, CreateContainerOptions, LogsOptions, RemoveContainerOptions}, + container::{Config, CreateContainerOptions, LogOutput, LogsOptions, RemoveContainerOptions}, exec::{CreateExecOptions, StartExecOptions, StartExecResults}, image::CreateImageOptions, network::{CreateNetworkOptions, InspectNetworkOptions}, Docker, }; use bollard_stubs::models::{ - ContainerCreateResponse, ContainerInspectResponse, HealthStatusEnum, Network, + ContainerCreateResponse, ContainerInspectResponse, ExecInspectResponse, HealthStatusEnum, + Network, }; use futures::{StreamExt, TryStreamExt}; use tokio::sync::OnceCell; - -use crate::core::{env, logs::LogStreamAsync, ports::Ports, WaitFor}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use crate::core::{ + client::exec::ExecResult, + env, + errors::WaitContainerError, + logs::{LogSource, LogStreamAsync}, + ports::Ports, + WaitFor, +}; mod bollard_client; +mod exec; mod factory; static IN_A_CONTAINER: OnceCell = OnceCell::const_new(); @@ -29,11 +39,6 @@ async fn is_in_container() -> bool { .await } -pub(crate) struct AttachLog { - stdout: bool, - stderr: bool, -} - /// The internal client. pub(crate) struct Client { pub(crate) config: env::Config, @@ -49,16 +54,17 @@ impl Client { } pub(crate) fn stdout_logs(&self, id: &str) -> LogStreamAsync<'_> { - self.logs(id, AttachLog::stdout()) + self.logs(id, LogSource::StdOut) } pub(crate) fn stderr_logs(&self, id: &str) -> LogStreamAsync<'_> { - self.logs(id, AttachLog::stderr()) + self.logs(id, LogSource::StdErr) } pub(crate) async fn ports(&self, id: &str) -> Ports { self.inspect(id) .await + .unwrap() .network_settings .unwrap_or_default() .ports @@ -66,8 +72,11 @@ impl Client { .unwrap_or_default() } - pub(crate) async fn inspect(&self, id: &str) -> ContainerInspectResponse { - self.bollard.inspect_container(id, None).await.unwrap() + pub(crate) async fn inspect( + &self, + id: &str, + ) -> Result { + self.bollard.inspect_container(id, None).await } pub(crate) async fn rm(&self, id: &str) { @@ -99,20 +108,15 @@ impl Client { &self, container_id: &str, cmd: Vec, - attach_log: AttachLog, - ) -> (String, LogStreamAsync<'_>) { + ) -> Result, bollard::errors::Error> { let config = CreateExecOptions { cmd: Some(cmd), - attach_stdout: Some(attach_log.stdout), - attach_stderr: Some(attach_log.stderr), + attach_stdout: Some(true), + attach_stderr: Some(true), ..Default::default() }; - let exec = self - .bollard - .create_exec(container_id, config) - .await - .expect("failed to create exec"); + let exec = self.bollard.create_exec(container_id, config).await?; let res = self .bollard @@ -124,43 +128,86 @@ impl Client { output_capacity: None, }), ) - .await - .expect("failed to start exec"); + .await?; match res { StartExecResults::Attached { output, .. } => { - let stream = output - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) - .map(|chunk| { - let bytes = chunk?.into_bytes(); - let str = std::str::from_utf8(bytes.as_ref()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - Ok(str.to_string()) - }) - .boxed(); - - (exec.id, LogStreamAsync::new(stream)) + let (stdout_tx, stdout_rx) = tokio::sync::mpsc::unbounded_channel(); + let (stderr_tx, stderr_rx) = tokio::sync::mpsc::unbounded_channel(); + + tokio::spawn(async move { + macro_rules! handle_error { + ($res:expr) => { + if let Err(err) = $res { + log::debug!( + "Receiver has been dropped, stop producing messages: {}", + err + ); + break; + } + }; + } + let mut output = output; + while let Some(chunk) = output.next().await { + match chunk { + Ok(LogOutput::StdOut { message }) => { + handle_error!(stdout_tx.send(Ok(message))); + } + Ok(LogOutput::StdErr { message }) => { + handle_error!(stderr_tx.send(Ok(message))); + } + Err(err) => { + let err = Arc::new(err); + handle_error!(stdout_tx + .send(Err(io::Error::new(io::ErrorKind::Other, err.clone())))); + handle_error!( + stderr_tx.send(Err(io::Error::new(io::ErrorKind::Other, err))) + ); + } + Ok(_) => { + unreachable!("only stdout and stderr are supported") + } + } + } + }); + + let stdout = LogStreamAsync::new(UnboundedReceiverStream::new(stdout_rx).boxed()) + .enable_cache(); + let stderr = LogStreamAsync::new(UnboundedReceiverStream::new(stderr_rx).boxed()) + .enable_cache(); + + Ok(ExecResult { + id: exec.id, + stdout, + stderr, + }) } StartExecResults::Detached => unreachable!("detach is false"), } } - pub(crate) async fn block_until_ready(&self, id: &str, ready_conditions: &[WaitFor]) { + pub(crate) async fn inspect_exec( + &self, + exec_id: &str, + ) -> Result { + self.bollard.inspect_exec(exec_id).await + } + + pub(crate) async fn block_until_ready( + &self, + id: &str, + ready_conditions: &[WaitFor], + ) -> Result<(), WaitContainerError> { log::debug!("Waiting for container {id} to be ready"); for condition in ready_conditions { match condition { - WaitFor::StdOutMessage { message } => self - .stdout_logs(id) - .wait_for_message(message) - .await - .unwrap(), - WaitFor::StdErrMessage { message } => self - .stderr_logs(id) - .wait_for_message(message) - .await - .unwrap(), + WaitFor::StdOutMessage { message } => { + self.stdout_logs(id).wait_for_message(message).await? + } + WaitFor::StdErrMessage { message } => { + self.stderr_logs(id).wait_for_message(message).await? + } WaitFor::Duration { length } => { tokio::time::sleep(*length).await; } @@ -170,18 +217,20 @@ impl Client { let health_status = self .inspect(id) .await + .map_err(WaitContainerError::Inspect)? .state - .unwrap_or_else(|| panic!("Container state not available")) + .ok_or(WaitContainerError::StateUnavailable)? .health - .unwrap_or_else(|| panic!("Health state not available")) - .status; + .and_then(|health| health.status); match health_status { Some(HEALTHY) => break, None | Some(EMPTY) | Some(NONE) => { - panic!("Healthcheck not configured for container") + return Err(WaitContainerError::HealthCheckNotConfigured( + id.to_string(), + )) } - Some(UNHEALTHY) => panic!("Healthcheck reports unhealthy"), + Some(UNHEALTHY) => return Err(WaitContainerError::Unhealthy), Some(STARTING) => { tokio::time::sleep(Duration::from_millis(100)).await; } @@ -192,13 +241,14 @@ impl Client { } log::debug!("Container {id} is now ready!"); + Ok(()) } - fn logs(&self, container_id: &str, attach_log: AttachLog) -> LogStreamAsync<'_> { + fn logs(&self, container_id: &str, log_source: LogSource) -> LogStreamAsync<'_> { let options = LogsOptions { follow: true, - stdout: attach_log.stdout, - stderr: attach_log.stderr, + stdout: log_source.is_stdout(), + stderr: log_source.is_stderr(), tail: "all".to_owned(), ..Default::default() }; @@ -206,16 +256,9 @@ impl Client { let stream = self .bollard .logs(container_id, Some(options)) + .map_ok(|chunk| chunk.into_bytes()) .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) - .map(|chunk| { - let bytes = chunk?.into_bytes(); - let str = std::str::from_utf8(bytes.as_ref()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - Ok(String::from(str)) - }) .boxed(); - LogStreamAsync::new(stream) } @@ -342,33 +385,3 @@ impl Client { Some(bollard_credentials) } } - -impl AttachLog { - pub(crate) fn stdout() -> Self { - Self { - stdout: true, - stderr: false, - } - } - - pub(crate) fn stderr() -> Self { - Self { - stdout: false, - stderr: true, - } - } - - pub(crate) fn stdout_and_stderr() -> Self { - Self { - stdout: true, - stderr: true, - } - } - - pub(crate) fn nothing() -> Self { - Self { - stdout: false, - stderr: false, - } - } -} diff --git a/testcontainers/src/core/client/exec.rs b/testcontainers/src/core/client/exec.rs new file mode 100644 index 00000000..4f2ce851 --- /dev/null +++ b/testcontainers/src/core/client/exec.rs @@ -0,0 +1,21 @@ +use crate::core::logs::LogStreamAsync; + +pub(crate) struct ExecResult<'a> { + pub(crate) id: String, + pub(crate) stdout: LogStreamAsync<'a>, + pub(crate) stderr: LogStreamAsync<'a>, +} + +impl<'a> ExecResult<'a> { + pub(crate) fn id(&self) -> &str { + &self.id + } + + pub(crate) fn stdout(&mut self) -> &mut LogStreamAsync<'a> { + &mut self.stdout + } + + pub(crate) fn stderr(&mut self) -> &mut LogStreamAsync<'a> { + &mut self.stderr + } +} diff --git a/testcontainers/src/core/containers/async_container.rs b/testcontainers/src/core/containers/async_container.rs index fbc0696e..86dbee45 100644 --- a/testcontainers/src/core/containers/async_container.rs +++ b/testcontainers/src/core/containers/async_container.rs @@ -5,8 +5,9 @@ use tokio::runtime::RuntimeFlavor; use crate::{ core::{ - client::{AttachLog, Client}, + client::Client, env, + errors::{ExecError, WaitContainerError}, image::CmdWaitFor, macros, network::Network, @@ -16,6 +17,8 @@ use crate::{ Image, RunnableImage, }; +pub(super) mod exec; + /// Represents a running docker container that has been started using an async client. /// /// Containers have a [`custom destructor`][drop_impl] that removes them as soon as they @@ -60,7 +63,7 @@ where network, dropped: false, }; - container.block_until_ready().await; + container.block_until_ready().await.unwrap(); container } @@ -140,7 +143,7 @@ where /// Returns the bridge ip address of docker container as specified in NetworkSettings.Networks.IPAddress pub async fn get_bridge_ip_address(&self) -> IpAddr { - let container_settings = self.docker_client.inspect(&self.id).await; + let container_settings = self.docker_client.inspect(&self.id).await.unwrap(); let host_config = container_settings .host_config @@ -183,7 +186,8 @@ where self.docker_client.docker_hostname().await } - pub async fn exec(&self, cmd: ExecCommand) { + /// Executes a command in the container. + pub async fn exec(&self, cmd: ExecCommand) -> Result, ExecError> { let ExecCommand { cmd, container_ready_conditions, @@ -192,48 +196,48 @@ where log::debug!("Executing command {:?}", cmd); - let attach_log = match cmd_ready_condition { - CmdWaitFor::StdOutMessage { .. } => AttachLog::stdout(), - CmdWaitFor::StdErrMessage { .. } => AttachLog::stderr(), - CmdWaitFor::StdOutOrErrMessage { .. } => AttachLog::stdout_and_stderr(), - _ => AttachLog::nothing(), - }; - - let (exec_id, output) = self.docker_client.exec(&self.id, cmd, attach_log).await; + let mut exec = self.docker_client.exec(&self.id, cmd).await?; self.docker_client .block_until_ready(self.id(), &container_ready_conditions) - .await; + .await?; match cmd_ready_condition { - CmdWaitFor::StdOutOrErrMessage { message } - | CmdWaitFor::StdOutMessage { message } - | CmdWaitFor::StdErrMessage { message } => { - output.wait_for_message(&message).await.unwrap(); + CmdWaitFor::StdOutMessage { message } => { + exec.stdout().wait_for_message(&message).await.unwrap(); + } + CmdWaitFor::StdErrMessage { message } => { + exec.stderr().wait_for_message(&message).await.unwrap(); } - CmdWaitFor::ExitCode { code } => loop { - let inspect = self - .docker_client - .bollard - .inspect_exec(&exec_id) - .await - .unwrap(); - - if let Some(exit_code) = inspect.exit_code { - assert_eq!( - exit_code, code, - "expected exit code {} but got {:?}", - code, inspect.exit_code - ); - break; - } else { - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + CmdWaitFor::ExitCode { code } => { + let exec_id = exec.id().to_string(); + loop { + let inspect = self.docker_client.inspect_exec(&exec_id).await.unwrap(); + + if let Some(actual) = inspect.exit_code { + if actual != code { + return Err(ExecError::ExitCodeMismatch { + expected: code, + actual, + }); + } + break; + } else { + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + } } - }, + } CmdWaitFor::Duration { length } => { tokio::time::sleep(length).await; } _ => {} } + + Ok(exec::ExecResult { + client: self.docker_client.clone(), + id: exec.id, + stdout: exec.stdout.into_inner(), + stderr: exec.stderr.into_inner(), + }) } pub async fn start(&self) { @@ -242,7 +246,7 @@ where .image .exec_after_start(ContainerState::new(self.ports().await)) { - self.exec(cmd).await; + self.exec(cmd).await.unwrap(); } } @@ -263,10 +267,10 @@ where self.dropped = true; } - async fn block_until_ready(&self) { + async fn block_until_ready(&self) -> Result<(), WaitContainerError> { self.docker_client .block_until_ready(self.id(), &self.image().ready_conditions()) - .await; + .await } } diff --git a/testcontainers/src/core/containers/async_container/exec.rs b/testcontainers/src/core/containers/async_container/exec.rs new file mode 100644 index 00000000..02abaa04 --- /dev/null +++ b/testcontainers/src/core/containers/async_container/exec.rs @@ -0,0 +1,58 @@ +use std::{fmt, io, pin::Pin, sync::Arc}; + +use bytes::Bytes; +use futures::stream::BoxStream; +use tokio::io::{AsyncRead, AsyncReadExt}; + +use crate::core::client::Client; + +/// Represents the result of an executed command in a container. +pub struct ExecResult<'a> { + pub(super) client: Arc, + pub(crate) id: String, + pub(super) stdout: BoxStream<'a, Result>, + pub(super) stderr: BoxStream<'a, Result>, +} + +impl<'a> ExecResult<'a> { + /// Returns the exit code of the executed command. + /// If the command has not yet exited, this will return `None`. + pub async fn exit_code(&self) -> Result, bollard::errors::Error> { + self.client + .inspect_exec(&self.id) + .await + .map(|exec| exec.exit_code) + } + + /// Returns stdout as a vector of bytes. + /// If you want to read stdout in asynchronous manner, use `stdout_reader` instead. + pub async fn stdout(&mut self) -> Result, io::Error> { + let mut stdout = Vec::new(); + self.stdout_reader().read_to_end(&mut stdout).await?; + Ok(stdout) + } + + /// Returns stderr as a vector of bytes. + /// If you want to read stderr in asynchronous manner, use `stderr_reader` instead. + pub async fn stderr(&mut self) -> Result, io::Error> { + let mut stderr = Vec::new(); + self.stderr_reader().read_to_end(&mut stderr).await?; + Ok(stderr) + } + + /// Returns an asynchronous reader for stdout. + pub fn stdout_reader<'b>(&'b mut self) -> Pin> { + Box::pin(tokio_util::io::StreamReader::new(&mut self.stdout)) + } + + /// Returns an asynchronous reader for stderr. + pub fn stderr_reader<'b>(&'b mut self) -> Pin> { + Box::pin(tokio_util::io::StreamReader::new(&mut self.stderr)) + } +} + +impl fmt::Debug for ExecResult<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ExecResult").field("id", &self.id).finish() + } +} diff --git a/testcontainers/src/core/containers/mod.rs b/testcontainers/src/core/containers/mod.rs index b876d039..15f75f31 100644 --- a/testcontainers/src/core/containers/mod.rs +++ b/testcontainers/src/core/containers/mod.rs @@ -2,7 +2,7 @@ pub(crate) mod async_container; #[cfg(feature = "blocking")] pub(crate) mod sync_container; -pub use async_container::ContainerAsync; +pub use async_container::{exec::ExecResult, ContainerAsync}; #[cfg(feature = "blocking")] #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] -pub use sync_container::Container; +pub use sync_container::{exec::SyncExecResult, Container}; diff --git a/testcontainers/src/core/containers/sync_container.rs b/testcontainers/src/core/containers/sync_container.rs index 600aefc9..ba034522 100644 --- a/testcontainers/src/core/containers/sync_container.rs +++ b/testcontainers/src/core/containers/sync_container.rs @@ -1,10 +1,12 @@ use std::{fmt, net::IpAddr}; use crate::{ - core::{env, ports::Ports, ExecCommand}, + core::{env, errors::ExecError, ports::Ports, ExecCommand}, ContainerAsync, Image, }; +pub(super) mod exec; + /// Represents a running docker container. /// /// Containers have a [`custom destructor`][drop_impl] that removes them as soon as they go out of scope: @@ -128,8 +130,13 @@ where self.rt().block_on(self.async_impl().get_host()) } - pub fn exec(&self, cmd: ExecCommand) { - self.rt().block_on(self.async_impl().exec(cmd)); + /// Executes a command in the container. + pub fn exec(&self, cmd: ExecCommand) -> Result, ExecError> { + let async_exec = self.rt().block_on(self.async_impl().exec(cmd))?; + Ok(exec::SyncExecResult { + inner: async_exec, + runtime: self.rt(), + }) } pub fn stop(&self) { diff --git a/testcontainers/src/core/containers/sync_container/exec.rs b/testcontainers/src/core/containers/sync_container/exec.rs new file mode 100644 index 00000000..9df170e4 --- /dev/null +++ b/testcontainers/src/core/containers/sync_container/exec.rs @@ -0,0 +1,33 @@ +use std::{fmt, io}; + +/// Represents the result of an executed command in a container. +pub struct SyncExecResult<'a> { + pub(super) inner: crate::core::async_container::exec::ExecResult<'a>, + pub(super) runtime: &'a tokio::runtime::Runtime, +} + +impl<'a> SyncExecResult<'a> { + /// Returns the exit code of the executed command. + /// If the command has not yet exited, this will return `None`. + pub fn exit_code(&self) -> Result, bollard::errors::Error> { + self.runtime.block_on(self.inner.exit_code()) + } + + /// Returns stdout as a vector of bytes. + pub fn stdout(&mut self) -> Result, io::Error> { + self.runtime.block_on(self.inner.stdout()) + } + + /// Returns stderr as a vector of bytes. + pub fn stderr(&mut self) -> Result, io::Error> { + self.runtime.block_on(self.inner.stderr()) + } +} + +impl fmt::Debug for SyncExecResult<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ExecResult") + .field("id", &self.inner.id) + .finish() + } +} diff --git a/testcontainers/src/core/errors.rs b/testcontainers/src/core/errors.rs new file mode 100644 index 00000000..1632257e --- /dev/null +++ b/testcontainers/src/core/errors.rs @@ -0,0 +1,29 @@ +use crate::core::logs::WaitLogError; + +/// Error type for exec operation. +#[derive(Debug, thiserror::Error)] +pub enum ExecError { + #[error("failed to start exec process: {0}")] + Init(#[from] bollard::errors::Error), + #[error("exec process exited with code {actual}, expected {expected}")] + ExitCodeMismatch { expected: i64, actual: i64 }, + #[error("failed to wait for exec log: {0}")] + WaitLog(#[from] WaitLogError), + #[error("container's wait conditions are not met: {0}")] + WaitContainer(#[from] WaitContainerError), +} + +/// Error type for waiting for container readiness based on [`crate::core::WaitFor`] conditions. +#[derive(Debug, thiserror::Error)] +pub enum WaitContainerError { + #[error("failed to wait for container log: {0}")] + WaitLog(#[from] WaitLogError), + #[error("failed to inspect container: {0}")] + Inspect(bollard::errors::Error), + #[error("container state is unavailable")] + StateUnavailable, + #[error("healthcheck is not configured for container: {0}")] + HealthCheckNotConfigured(String), + #[error("container is unhealthy")] + Unhealthy, +} diff --git a/testcontainers/src/core/image.rs b/testcontainers/src/core/image.rs index 7992626a..e1e3caa6 100644 --- a/testcontainers/src/core/image.rs +++ b/testcontainers/src/core/image.rs @@ -1,13 +1,13 @@ use std::fmt::Debug; -pub use exec_command::{CmdWaitFor, ExecCommand}; +pub use exec::{CmdWaitFor, ExecCommand}; pub use runnable_image::{CgroupnsMode, Host, PortMapping, RunnableImage}; pub use wait_for::WaitFor; use super::ports::Ports; use crate::core::mounts::Mount; -mod exec_command; +mod exec; mod runnable_image; mod wait_for; diff --git a/testcontainers/src/core/image/exec_command.rs b/testcontainers/src/core/image/exec.rs similarity index 81% rename from testcontainers/src/core/image/exec_command.rs rename to testcontainers/src/core/image/exec.rs index 5d3f887d..ced36a0a 100644 --- a/testcontainers/src/core/image/exec_command.rs +++ b/testcontainers/src/core/image/exec.rs @@ -1,5 +1,7 @@ use std::time::Duration; +use bytes::Bytes; + use crate::core::WaitFor; #[derive(Debug)] @@ -43,11 +45,9 @@ pub enum CmdWaitFor { /// An empty condition. Useful for default cases or fallbacks. Nothing, /// Wait for a message on the stdout stream of the command's output. - StdOutMessage { message: String }, + StdOutMessage { message: Bytes }, /// Wait for a message on the stderr stream of the command's output. - StdErrMessage { message: String }, - /// Wait for a message on the stdout or stderr stream of the command's output. - StdOutOrErrMessage { message: String }, + StdErrMessage { message: Bytes }, /// Wait for a certain amount of time. Duration { length: Duration }, /// Wait for the command's exit code to be equal to the provided one. @@ -55,21 +55,15 @@ pub enum CmdWaitFor { } impl CmdWaitFor { - pub fn message_on_stdout>(message: S) -> Self { + pub fn message_on_stdout(message: impl AsRef<[u8]>) -> Self { Self::StdOutMessage { - message: message.into(), + message: Bytes::from(message.as_ref().to_vec()), } } - pub fn message_on_stderr>(message: S) -> Self { + pub fn message_on_stderr(message: impl AsRef<[u8]>) -> Self { Self::StdErrMessage { - message: message.into(), - } - } - - pub fn message_on_stdout_or_stderr>(message: S) -> Self { - Self::StdOutOrErrMessage { - message: message.into(), + message: Bytes::from(message.as_ref().to_vec()), } } diff --git a/testcontainers/src/core/image/wait_for.rs b/testcontainers/src/core/image/wait_for.rs index a1fe2a20..77eccc75 100644 --- a/testcontainers/src/core/image/wait_for.rs +++ b/testcontainers/src/core/image/wait_for.rs @@ -1,14 +1,16 @@ use std::{env::var, time::Duration}; +use bytes::Bytes; + /// Represents a condition that needs to be met before a container is considered ready. #[derive(Debug, Eq, PartialEq, Clone)] pub enum WaitFor { /// An empty condition. Useful for default cases or fallbacks. Nothing, /// Wait for a message on the stdout stream of the container's logs. - StdOutMessage { message: String }, + StdOutMessage { message: Bytes }, /// Wait for a message on the stderr stream of the container's logs. - StdErrMessage { message: String }, + StdErrMessage { message: Bytes }, /// Wait for a certain amount of time. Duration { length: Duration }, /// Wait for the container's status to become `healthy`. @@ -16,15 +18,15 @@ pub enum WaitFor { } impl WaitFor { - pub fn message_on_stdout>(message: S) -> WaitFor { + pub fn message_on_stdout(message: impl AsRef<[u8]>) -> WaitFor { WaitFor::StdOutMessage { - message: message.into(), + message: Bytes::from(message.as_ref().to_vec()), } } - pub fn message_on_stderr>(message: S) -> WaitFor { + pub fn message_on_stderr(message: impl AsRef<[u8]>) -> WaitFor { WaitFor::StdErrMessage { - message: message.into(), + message: Bytes::from(message.as_ref().to_vec()), } } diff --git a/testcontainers/src/core/logs.rs b/testcontainers/src/core/logs.rs index 0c648002..4aeb617a 100644 --- a/testcontainers/src/core/logs.rs +++ b/testcontainers/src/core/logs.rs @@ -1,70 +1,98 @@ -use std::{fmt, io}; +use std::{borrow::Cow, fmt, io}; +use bytes::Bytes; use futures::{stream::BoxStream, StreamExt}; +use memchr::memmem::Finder; -pub(crate) struct LogStreamAsync<'d> { - inner: BoxStream<'d, Result>, +/// Defines error cases when waiting for a message in a stream. +#[derive(Debug, thiserror::Error)] +pub enum WaitLogError { + /// Indicates the stream ended before finding the log line you were looking for. + /// Contains all the lines that were read for debugging purposes. + #[error("End of stream reached before finding message: {:?}", display_bytes(.0))] + EndOfStream(Vec), + #[error(transparent)] + Io(#[from] io::Error), } -impl<'d> fmt::Debug for LogStreamAsync<'d> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("LogStreamAsync").finish() - } +#[derive(Copy, Clone, Debug, parse_display::Display)] +#[display(style = "lowercase")] +pub(crate) enum LogSource { + StdOut, + StdErr, } -impl<'d> LogStreamAsync<'d> { - pub fn new(stream: BoxStream<'d, Result>) -> Self { - Self { inner: stream } +impl LogSource { + pub(super) fn is_stdout(&self) -> bool { + matches!(self, Self::StdOut) } - pub async fn wait_for_message(mut self, message: &str) -> Result<(), WaitError> { - let mut lines = vec![]; - - while let Some(line) = self.inner.next().await.transpose()? { - if handle_line(line, message, &mut lines) { - return Ok(()); - } - } - - Err(end_of_stream(message, lines)) + pub(super) fn is_stderr(&self) -> bool { + matches!(self, Self::StdErr) } } -fn handle_line(line: String, message: &str, lines: &mut Vec) -> bool { - if line.contains(message) { - log::info!("Found message after comparing {} lines", lines.len()); +pub(crate) struct LogStreamAsync<'a> { + inner: BoxStream<'a, Result>, + cache: Vec>, + enable_cache: bool, +} - return true; +impl<'a> LogStreamAsync<'a> { + pub fn new(stream: BoxStream<'a, Result>) -> Self { + Self { + inner: stream, + cache: vec![], + enable_cache: false, + } } - lines.push(line); + pub fn enable_cache(mut self) -> Self { + self.enable_cache = true; + self + } - false -} + pub(crate) async fn wait_for_message( + &mut self, + message: impl AsRef<[u8]>, + ) -> Result<(), WaitLogError> { + let msg_finder = Finder::new(message.as_ref()); + let mut messages = Vec::new(); + while let Some(message) = self.inner.next().await.transpose()? { + messages.push(message.clone()); + if self.enable_cache { + self.cache.push(Ok(message.clone())); + } + let match_found = msg_finder.find(message.as_ref()).is_some(); + if match_found { + log::debug!("Found message after comparing {} lines", messages.len()); + return Ok(()); + } + } -fn end_of_stream(expected_msg: &str, lines: Vec) -> WaitError { - log::error!( - "Failed to find message '{expected_msg}' in stream after comparing {} lines.", - lines.len() - ); + log::warn!( + "Failed to find message '{}' after comparing {} lines.", + String::from_utf8_lossy(message.as_ref()), + messages.len() + ); + Err(WaitLogError::EndOfStream(messages)) + } - WaitError::EndOfStream(lines) + pub(crate) fn into_inner(self) -> BoxStream<'a, Result> { + futures::stream::iter(self.cache).chain(self.inner).boxed() + } } -/// Defines error cases when waiting for a message in a stream. -#[derive(Debug, thiserror::Error)] -pub enum WaitError { - /// Indicates the stream ended before finding the log line you were looking for. - /// Contains all the lines that were read for debugging purposes. - #[error("End of stream reached: {0:?}")] - EndOfStream(Vec), - #[error(transparent)] - Io(io::Error), +fn display_bytes(bytes: &[Bytes]) -> Vec> { + bytes + .iter() + .map(|m| String::from_utf8_lossy(m.as_ref())) + .collect::>() } -impl From for WaitError { - fn from(e: io::Error) -> Self { - WaitError::Io(e) +impl<'a> fmt::Debug for LogStreamAsync<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LogStreamAsync").finish() } } @@ -74,12 +102,12 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn given_logs_when_line_contains_message_should_find_it() { - let log_stream = LogStreamAsync::new(Box::pin(futures::stream::iter([Ok(r" + let mut log_stream = LogStreamAsync::new(Box::pin(futures::stream::iter([Ok(r" Message one Message two Message three " - .to_string())]))); + .into())]))); let result = log_stream.wait_for_message("Message three").await; diff --git a/testcontainers/src/lib.rs b/testcontainers/src/lib.rs index 61e90eb7..b27edc45 100644 --- a/testcontainers/src/lib.rs +++ b/testcontainers/src/lib.rs @@ -70,7 +70,10 @@ //! [`testcontainers-modules`]: https://crates.io/crates/testcontainers-modules pub mod core; -pub use crate::core::{containers::*, Image, ImageArgs, RunnableImage}; +#[cfg(feature = "blocking")] +#[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] +pub use crate::core::Container; +pub use crate::core::{errors, ContainerAsync, Image, ImageArgs, RunnableImage}; #[cfg(feature = "watchdog")] #[cfg_attr(docsrs, doc(cfg(feature = "watchdog")))] diff --git a/testcontainers/src/runners/async_runner.rs b/testcontainers/src/runners/async_runner.rs index ab7b1354..ee0dde1a 100644 --- a/testcontainers/src/runners/async_runner.rs +++ b/testcontainers/src/runners/async_runner.rs @@ -223,7 +223,7 @@ where .image() .exec_after_start(ContainerState::new(container.ports().await)) { - container.exec(cmd).await; + container.exec(cmd).await.unwrap(); } container @@ -279,7 +279,7 @@ mod tests { .start() .await; - let container_details = client.inspect(container.id()).await; + let container_details = client.inspect(container.id()).await.unwrap(); let publish_ports = container_details .host_config .unwrap() @@ -308,7 +308,7 @@ mod tests { .start() .await; - let container_details = client.inspect(container.id()).await; + let container_details = client.inspect(container.id()).await.unwrap(); let port_bindings = container_details .host_config @@ -328,7 +328,7 @@ mod tests { .start() .await; - let container_details = client.inspect(container.id()).await; + let container_details = client.inspect(container.id()).await.unwrap(); let networks = container_details .network_settings .unwrap() @@ -350,7 +350,7 @@ mod tests { .start() .await; - let container_details = client.inspect(container.id()).await; + let container_details = client.inspect(container.id()).await.unwrap(); let container_name = container_details.name.unwrap(); assert!(container_name.ends_with("async_hello_container")); } @@ -427,7 +427,7 @@ mod tests { .start() .await; - let container_details = client.inspect(container.id()).await; + let container_details = client.inspect(container.id()).await.unwrap(); let shm_size = container_details.host_config.unwrap().shm_size.unwrap(); assert_eq!(shm_size, 1_000_000); @@ -442,7 +442,7 @@ mod tests { .await; let client = Client::lazy_client().await; - let container_details = client.inspect(container.id()).await; + let container_details = client.inspect(container.id()).await.unwrap(); let privileged = container_details.host_config.unwrap().privileged.unwrap(); assert!(privileged, "privileged must be `true`"); @@ -457,7 +457,7 @@ mod tests { .await; let client = Client::lazy_client().await; - let container_details = client.inspect(container.id()).await; + let container_details = client.inspect(container.id()).await.unwrap(); let cgroupns_mode = container_details .host_config @@ -481,7 +481,7 @@ mod tests { .await; let client = Client::lazy_client().await; - let container_details = client.inspect(container.id()).await; + let container_details = client.inspect(container.id()).await.unwrap(); let cgroupns_mode = container_details .host_config @@ -505,7 +505,7 @@ mod tests { .await; let client = Client::lazy_client().await; - let container_details = client.inspect(container.id()).await; + let container_details = client.inspect(container.id()).await.unwrap(); let userns_mode = container_details.host_config.unwrap().userns_mode.unwrap(); assert_eq!("host", userns_mode, "userns mode must be `host`"); diff --git a/testcontainers/src/runners/sync_runner.rs b/testcontainers/src/runners/sync_runner.rs index f0c3bbae..583e7597 100644 --- a/testcontainers/src/runners/sync_runner.rs +++ b/testcontainers/src/runners/sync_runner.rs @@ -80,7 +80,7 @@ mod tests { } fn inspect(id: &str) -> ContainerInspectResponse { - runtime().block_on(docker_client().inspect(id)) + runtime().block_on(docker_client().inspect(id)).unwrap() } fn network_exists(client: &Arc, name: &str) -> bool { diff --git a/testcontainers/tests/async_runner.rs b/testcontainers/tests/async_runner.rs index ff826d60..687180e3 100644 --- a/testcontainers/tests/async_runner.rs +++ b/testcontainers/tests/async_runner.rs @@ -6,6 +6,7 @@ use testcontainers::{ runners::AsyncRunner, GenericImage, *, }; +use tokio::io::AsyncReadExt; #[derive(Debug, Default)] pub struct HelloWorld; @@ -99,24 +100,47 @@ async fn async_run_exec() { let container = image.start().await; // exit code, it waits for result - container + let res = container .exec(ExecCommand::new(["sleep", "3"]).with_cmd_ready_condition(CmdWaitFor::exit_code(0))) - .await; + .await + .unwrap(); + assert_eq!(res.exit_code().await.unwrap(), Some(0)); // stdout - container + let mut res = container .exec( ExecCommand::new(["ls"]).with_cmd_ready_condition(CmdWaitFor::message_on_stdout("foo")), ) - .await; + .await + .unwrap(); + + let stdout = String::from_utf8(res.stdout().await.unwrap()).unwrap(); + assert!(stdout.contains("foo"), "stdout must contain 'foo'"); + + // stdout and stderr readers + let mut res = container + .exec(ExecCommand::new([ + "/bin/bash", + "-c", + "echo 'stdout 1' >&1 && echo 'stderr 1' >&2 \ + && echo 'stderr 2' >&2 && echo 'stdout 2' >&1", + ])) + .await + .unwrap(); - // stdout or stderr - container - .exec( - ExecCommand::new(["ls"]) - .with_cmd_ready_condition(CmdWaitFor::message_on_stdout_or_stderr("foo")), - ) - .await; + let mut stdout = String::new(); + res.stdout_reader() + .read_to_string(&mut stdout) + .await + .unwrap(); + assert_eq!(stdout, "stdout 1\nstdout 2\n"); + + let mut stderr = String::new(); + res.stderr_reader() + .read_to_string(&mut stderr) + .await + .unwrap(); + assert_eq!(stderr, "stderr 1\nstderr 2\n"); } #[tokio::test] @@ -135,5 +159,6 @@ async fn async_run_exec_fails_due_to_unexpected_code() { ExecCommand::new(vec!["ls".to_string()]) .with_cmd_ready_condition(CmdWaitFor::exit_code(-1)), ) - .await; + .await + .unwrap(); } diff --git a/testcontainers/tests/sync_runner.rs b/testcontainers/tests/sync_runner.rs index 7a3240e4..a404ea3b 100644 --- a/testcontainers/tests/sync_runner.rs +++ b/testcontainers/tests/sync_runner.rs @@ -139,20 +139,37 @@ fn sync_run_exec() { let container = image.start(); // exit code, it waits for result - container.exec( - ExecCommand::new(vec!["sleep".to_string(), "3".to_string()]) - .with_cmd_ready_condition(CmdWaitFor::exit_code(0)), - ); + let res = container + .exec( + ExecCommand::new(vec!["sleep".to_string(), "3".to_string()]) + .with_cmd_ready_condition(CmdWaitFor::exit_code(0)), + ) + .unwrap(); + assert_eq!(res.exit_code().unwrap(), Some(0)); // stdout - container.exec( - ExecCommand::new(vec!["ls".to_string()]) - .with_cmd_ready_condition(CmdWaitFor::message_on_stdout("foo")), - ); - - // stdout or stderr - container.exec( - ExecCommand::new(vec!["ls".to_string()]) - .with_cmd_ready_condition(CmdWaitFor::message_on_stdout_or_stderr("foo")), - ); + let mut res = container + .exec( + ExecCommand::new(vec!["ls".to_string()]) + .with_cmd_ready_condition(CmdWaitFor::message_on_stdout("foo")), + ) + .unwrap(); + let stdout = String::from_utf8(res.stdout().unwrap()).unwrap(); + assert!(stdout.contains("foo"), "stdout must contain 'foo'"); + + // stdout and stderr readers + let mut res = container + .exec(ExecCommand::new([ + "/bin/bash", + "-c", + "echo 'stdout 1' >&1 && echo 'stderr 1' >&2 \ + && echo 'stderr 2' >&2 && echo 'stdout 2' >&1", + ])) + .unwrap(); + + let stdout = String::from_utf8(res.stdout().unwrap()).unwrap(); + assert_eq!(stdout, "stdout 1\nstdout 2\n"); + + let stderr = String::from_utf8(res.stderr().unwrap()).unwrap(); + assert_eq!(stderr, "stderr 1\nstderr 2\n"); }