From 10cf09483108fa0791eff559e4f79891ee56ce4b Mon Sep 17 00:00:00 2001 From: Artem Medvedev Date: Sat, 15 Jun 2024 20:54:38 +0200 Subject: [PATCH] feat: support HTTP wait strategy (#659) Introduces HTTP wait strategy Closes #648 Additional documentation in follow-up PR --- testcontainers/Cargo.toml | 3 +- testcontainers/src/core.rs | 4 +- .../src/core/containers/async_container.rs | 7 +- testcontainers/src/core/error.rs | 4 +- testcontainers/src/core/image.rs | 9 +- testcontainers/src/core/image/exec.rs | 62 +--- testcontainers/src/core/wait/cmd_wait.rs | 47 +++ testcontainers/src/core/wait/http_strategy.rs | 277 ++++++++++++++++++ .../core/{image/wait_for.rs => wait/mod.rs} | 46 ++- testcontainers/tests/async_runner.rs | 16 +- testcontainers/tests/sync_runner.rs | 16 +- 11 files changed, 417 insertions(+), 74 deletions(-) create mode 100644 testcontainers/src/core/wait/cmd_wait.rs create mode 100644 testcontainers/src/core/wait/http_strategy.rs rename testcontainers/src/core/{image/wait_for.rs => wait/mod.rs} (51%) diff --git a/testcontainers/Cargo.toml b/testcontainers/Cargo.toml index 7e11825b..2c3e2f20 100644 --- a/testcontainers/Cargo.toml +++ b/testcontainers/Cargo.toml @@ -28,6 +28,7 @@ futures = "0.3" log = "0.4" memchr = "2.7.2" parse-display = "0.9.0" +reqwest = { version = "0.12.4", features = ["default-tls", "hickory-dns", "json", "charset", "http2"], default-features = false } serde = { version = "1", features = ["derive"] } serde-java-properties = { version = "0.2.0", optional = true } serde_json = "1" @@ -48,6 +49,6 @@ properties-config = ["serde-java-properties"] [dev-dependencies] anyhow = "1.0.86" pretty_env_logger = "0.5" -reqwest = { version = "0.12.3", features = ["blocking"] } +reqwest = { version = "0.12.4", features = ["blocking"] } testimages.workspace = true tokio = { version = "1", features = ["macros"] } diff --git a/testcontainers/src/core.rs b/testcontainers/src/core.rs index a64730ed..d35e393b 100644 --- a/testcontainers/src/core.rs +++ b/testcontainers/src/core.rs @@ -1,8 +1,9 @@ pub use self::{ containers::*, - image::{CmdWaitFor, ContainerState, ExecCommand, Image, ImageExt, WaitFor}, + image::{ContainerState, ExecCommand, Image, ImageExt}, mounts::{AccessMode, Mount, MountType}, ports::{ContainerPort, IntoContainerPort}, + wait::{cmd_wait::CmdWaitFor, WaitFor}, }; mod image; @@ -16,3 +17,4 @@ pub(crate) mod macros; pub(crate) mod mounts; pub(crate) mod network; pub(crate) mod ports; +pub mod wait; diff --git a/testcontainers/src/core/containers/async_container.rs b/testcontainers/src/core/containers/async_container.rs index 54ba7fab..be05cd08 100644 --- a/testcontainers/src/core/containers/async_container.rs +++ b/testcontainers/src/core/containers/async_container.rs @@ -10,11 +10,11 @@ use crate::{ client::Client, env, error::{ContainerMissingInfo, ExecError, Result, TestcontainersError, WaitContainerError}, - image::CmdWaitFor, macros, network::Network, ports::Ports, - ContainerPort, ContainerState, ExecCommand, WaitFor, + wait::WaitStrategy, + CmdWaitFor, ContainerPort, ContainerState, ExecCommand, WaitFor, }, ContainerRequest, Image, }; @@ -342,6 +342,9 @@ where } } }, + WaitFor::Http(http_strategy) => { + http_strategy.wait_until_ready(self).await?; + } WaitFor::Nothing => {} } } diff --git a/testcontainers/src/core/error.rs b/testcontainers/src/core/error.rs index 4ed0be4e..79340152 100644 --- a/testcontainers/src/core/error.rs +++ b/testcontainers/src/core/error.rs @@ -1,7 +1,7 @@ use std::error::Error; -use crate::core::logs::WaitLogError; pub use crate::core::{client::ClientError, env::ConfigurationError, ContainerPort}; +use crate::core::{logs::WaitLogError, wait::http_strategy::HttpWaitError}; pub type Result = std::result::Result; @@ -54,6 +54,8 @@ pub enum WaitContainerError { WaitLog(#[from] WaitLogError), #[error("container state is unavailable")] StateUnavailable, + #[error("container is not ready: {0}")] + HttpWait(#[from] HttpWaitError), #[error("healthcheck is not configured for container: {0}")] HealthCheckNotConfigured(String), #[error("container is unhealthy")] diff --git a/testcontainers/src/core/image.rs b/testcontainers/src/core/image.rs index 9af9da41..95e68d92 100644 --- a/testcontainers/src/core/image.rs +++ b/testcontainers/src/core/image.rs @@ -1,15 +1,16 @@ use std::{borrow::Cow, fmt::Debug}; -pub use exec::{CmdWaitFor, ExecCommand}; +pub use exec::ExecCommand; pub use image_ext::ImageExt; -pub use wait_for::WaitFor; use super::ports::{ContainerPort, Ports}; -use crate::{core::mounts::Mount, TestcontainersError}; +use crate::{ + core::{mounts::Mount, WaitFor}, + TestcontainersError, +}; mod exec; mod image_ext; -mod wait_for; /// Represents a docker image. /// diff --git a/testcontainers/src/core/image/exec.rs b/testcontainers/src/core/image/exec.rs index ced36a0a..42208ac7 100644 --- a/testcontainers/src/core/image/exec.rs +++ b/testcontainers/src/core/image/exec.rs @@ -1,8 +1,4 @@ -use std::time::Duration; - -use bytes::Bytes; - -use crate::core::WaitFor; +use crate::core::{CmdWaitFor, WaitFor}; #[derive(Debug)] pub struct ExecCommand { @@ -39,59 +35,3 @@ impl Default for ExecCommand { Self::new(Vec::::new()) } } - -#[derive(Debug, Eq, PartialEq, Clone)] -pub enum CmdWaitFor { - /// An empty condition. Useful for default cases or fallbacks. - Nothing, - /// Wait for a message on the stdout stream of the command's output. - StdOutMessage { message: Bytes }, - /// Wait for a message on the stderr stream of the command's output. - StdErrMessage { message: Bytes }, - /// Wait for a certain amount of time. - Duration { length: Duration }, - /// Wait for the command's exit code to be equal to the provided one. - ExitCode { code: i64 }, -} - -impl CmdWaitFor { - pub fn message_on_stdout(message: impl AsRef<[u8]>) -> Self { - Self::StdOutMessage { - message: Bytes::from(message.as_ref().to_vec()), - } - } - - pub fn message_on_stderr(message: impl AsRef<[u8]>) -> Self { - Self::StdErrMessage { - message: Bytes::from(message.as_ref().to_vec()), - } - } - - pub fn exit_code(code: i64) -> Self { - Self::ExitCode { code } - } - - pub fn seconds(length: u64) -> Self { - Self::Duration { - length: Duration::from_secs(length), - } - } - - pub fn millis(length: u64) -> Self { - Self::Duration { - length: Duration::from_millis(length), - } - } -} - -impl From for CmdWaitFor { - fn from(wait_for: WaitFor) -> Self { - match wait_for { - WaitFor::Nothing => Self::Nothing, - WaitFor::StdOutMessage { message } => Self::StdOutMessage { message }, - WaitFor::StdErrMessage { message } => Self::StdErrMessage { message }, - WaitFor::Duration { length } => Self::Duration { length }, - WaitFor::Healthcheck => Self::ExitCode { code: 0 }, - } - } -} diff --git a/testcontainers/src/core/wait/cmd_wait.rs b/testcontainers/src/core/wait/cmd_wait.rs new file mode 100644 index 00000000..df4d5de5 --- /dev/null +++ b/testcontainers/src/core/wait/cmd_wait.rs @@ -0,0 +1,47 @@ +use std::time::Duration; + +use bytes::Bytes; + +#[derive(Debug, Eq, PartialEq, Clone)] +pub enum CmdWaitFor { + /// An empty condition. Useful for default cases or fallbacks. + Nothing, + /// Wait for a message on the stdout stream of the command's output. + StdOutMessage { message: Bytes }, + /// Wait for a message on the stderr stream of the command's output. + StdErrMessage { message: Bytes }, + /// Wait for a certain amount of time. + Duration { length: Duration }, + /// Wait for the command's exit code to be equal to the provided one. + ExitCode { code: i64 }, +} + +impl CmdWaitFor { + pub fn message_on_stdout(message: impl AsRef<[u8]>) -> Self { + Self::StdOutMessage { + message: Bytes::from(message.as_ref().to_vec()), + } + } + + pub fn message_on_stderr(message: impl AsRef<[u8]>) -> Self { + Self::StdErrMessage { + message: Bytes::from(message.as_ref().to_vec()), + } + } + + pub fn exit_code(code: i64) -> Self { + Self::ExitCode { code } + } + + pub fn seconds(length: u64) -> Self { + Self::Duration { + length: Duration::from_secs(length), + } + } + + pub fn millis(length: u64) -> Self { + Self::Duration { + length: Duration::from_millis(length), + } + } +} diff --git a/testcontainers/src/core/wait/http_strategy.rs b/testcontainers/src/core/wait/http_strategy.rs new file mode 100644 index 00000000..ca95f496 --- /dev/null +++ b/testcontainers/src/core/wait/http_strategy.rs @@ -0,0 +1,277 @@ +use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; + +use bytes::Bytes; +use url::{Host, Url}; + +use crate::{ + core::{error::WaitContainerError, wait::WaitStrategy, ContainerPort}, + ContainerAsync, Image, TestcontainersError, +}; + +/// Error type for waiting for container readiness based on HTTP response. +#[derive(Debug, thiserror::Error)] +pub enum HttpWaitError { + #[error("container has no exposed ports")] + NoExposedPortsForHttpWait, + #[error("invalid URL: {0}")] + InvalidUrl(#[from] url::ParseError), +} + +/// Represents a strategy for waiting for a certain HTTP response. +#[derive(Clone)] +pub struct HttpWaitStrategy { + client: Option, + path: String, + port: Option, + method: reqwest::Method, + headers: reqwest::header::HeaderMap, + body: Option, + auth: Option, + use_tls: bool, + response_matcher: Option, + poll_interval: Duration, +} + +type ResponseMatcher = Arc< + dyn Fn(reqwest::Response) -> Pin + Send>> + Send + Sync + 'static, +>; + +#[derive(Debug, Clone)] +enum Auth { + Basic { username: String, password: String }, + Bearer(String), +} + +impl HttpWaitStrategy { + /// Create a new `HttpWaitStrategy` for the given resource path (using GET method by default). + pub fn new(path: impl Into) -> Self { + Self { + client: None, + path: path.into(), + port: None, + method: reqwest::Method::GET, + headers: Default::default(), + body: None, + auth: None, + use_tls: false, + response_matcher: None, + poll_interval: Duration::from_millis(100), + } + } + + /// Set the port to be used for the request. + /// + /// It will use mapped host port for the passed container port. By default, first exposed port is used. + pub fn with_port(mut self, port: ContainerPort) -> Self { + self.port = Some(port); + self + } + + /// Set the custom client for the request. + /// + /// Allows to customize the client, enabling features like TLS, accept_invalid_certs, proxies, etc. + pub fn with_client(mut self, client: reqwest::Client) -> Self { + self.client = Some(client); + self + } + + /// Set method for the request. + pub fn with_method(mut self, method: reqwest::Method) -> Self { + self.method = method; + self + } + + /// Add a header to the request. + pub fn with_header(mut self, key: K, value: V) -> Self + where + K: reqwest::header::IntoHeaderName, + V: Into, + { + self.headers.insert(key, value.into()); + self + } + + /// Set the body for the request. + pub fn with_body(mut self, body: impl Into) -> Self { + self.body = Some(body.into()); + self + } + + /// Set the basic auth for the request. + /// Overwrites any previously set Authorization header. + pub fn with_basic_auth( + mut self, + username: impl Into, + password: impl Into, + ) -> Self { + self.auth = Some(Auth::Basic { + username: username.into(), + password: password.into(), + }); + self + } + + /// Set the bearer token for the request. + /// Overwrites any previously set Authorization header. + pub fn with_bearer_auth(mut self, token: impl Into) -> Self { + self.auth = Some(Auth::Bearer(token.into())); + self + } + + /// Use TLS for the request. + /// + /// This will use `https` scheme for the request. TLS configuration can be customized using the [`HttpWaitStrategy::with_client`]. + pub fn with_tls(mut self) -> Self { + self.use_tls = true; + self + } + + /// Set the poll interval for the wait strategy. + /// + /// This is the time to wait between each poll for the expected condition to be met. + pub fn with_poll_interval(mut self, poll_interval: Duration) -> Self { + self.poll_interval = poll_interval; + self + } + + /// Wait for expected status code. + /// Shortcut for `with_response_matcher(|response| response.status() == status)`. + pub fn with_expected_status_code(self, status: impl Into) -> Self { + let status = status.into(); + self.with_response_matcher(move |response| response.status().as_u16() == status) + } + + /// Wait for a response that matches the given matcher function. + /// Use [`HttpWaitStrategy::with_response_matcher_async`] for async matcher functions. + /// + /// Matcher function should return `true` if the response is expected, `false` otherwise. + pub fn with_response_matcher(self, matcher: Matcher) -> Self + where + Matcher: Fn(reqwest::Response) -> bool + Send + Sync + 'static, + { + let matcher = Arc::new(matcher); + self.with_response_matcher_async(move |response| { + let matcher = matcher.clone(); + async move { matcher(response) } + }) + } + + /// Wait for a response that matches the result of given matcher function. + /// This is an async version of [`HttpWaitStrategy::with_response_matcher`], + /// useful when the matcher function needs to perform additional async operations (e.g. body reading to check response content). + /// + /// Matcher function should return `true` if the response is expected, `false` otherwise. + pub fn with_response_matcher_async(mut self, matcher: Matcher) -> Self + where + Matcher: Fn(reqwest::Response) -> Out, + Matcher: Send + Sync + 'static, + for<'a> Out: Future + Send + 'a, + { + self.response_matcher = Some(Arc::new(move |resp| Box::pin(matcher(resp)))); + self + } + + pub(crate) fn response_matcher(&self) -> Option { + self.response_matcher.clone() + } + + pub(crate) fn into_request( + self, + base_url: &Url, + ) -> Result { + let client = self.client.unwrap_or_default(); + let url = base_url.join(&self.path).map_err(HttpWaitError::from)?; + let mut request = client.request(self.method, url).headers(self.headers); + + if let Some(body) = self.body { + request = request.body(body); + } + + if let Some(auth) = self.auth { + match auth { + Auth::Basic { username, password } => { + request = request.basic_auth(username, Some(password)); + } + Auth::Bearer(token) => { + request = request.bearer_auth(token); + } + } + } + + Ok(request) + } +} + +impl WaitStrategy for HttpWaitStrategy { + async fn wait_until_ready( + self, + container: &ContainerAsync, + ) -> crate::core::error::Result<()> { + let host = container.get_host().await?; + let container_port = self + .port + .or_else(|| container.image().expose_ports().first().copied()) + .ok_or(WaitContainerError::from( + HttpWaitError::NoExposedPortsForHttpWait, + ))?; + + let host_port = match host { + Host::Domain(ref domain) => match container.get_host_port_ipv4(container_port).await { + Ok(port) => port, + Err(_) => { + log::debug!("IPv4 port not found for domain: {domain}, checking for IPv6"); + container.get_host_port_ipv6(container_port).await? + } + }, + Host::Ipv4(_) => container.get_host_port_ipv4(container_port).await?, + Host::Ipv6(_) => container.get_host_port_ipv6(container_port).await?, + }; + + let scheme = if self.use_tls { "https" } else { "http" }; + let base_url = Url::parse(&format!("{scheme}://{host}:{host_port}")) + .map_err(HttpWaitError::from) + .map_err(WaitContainerError::from)?; + + loop { + let Some(matcher) = self.response_matcher() else { + return Err(TestcontainersError::other(format!( + "No response matcher provided for HTTP wait strategy: {self:?}" + ))); + }; + let result = self + .clone() + .into_request(&base_url) + .map_err(WaitContainerError::from)? + .send() + .await; + + match result { + Ok(response) => { + if matcher(response).await { + log::debug!("HTTP response condition met"); + break; + } else { + log::debug!("HTTP response condition not met"); + } + } + Err(err) => { + log::debug!("Error while waiting for HTTP response: {}", err); + } + } + tokio::time::sleep(self.poll_interval).await; + } + Ok(()) + } +} + +impl Debug for HttpWaitStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HttpWaitStrategy") + .field("path", &self.path) + .field("method", &self.method) + .field("headers", &self.headers) + .field("body", &self.body) + .field("auth", &self.auth) + .finish() + } +} diff --git a/testcontainers/src/core/image/wait_for.rs b/testcontainers/src/core/wait/mod.rs similarity index 51% rename from testcontainers/src/core/image/wait_for.rs rename to testcontainers/src/core/wait/mod.rs index 77eccc75..ce1f388e 100644 --- a/testcontainers/src/core/image/wait_for.rs +++ b/testcontainers/src/core/wait/mod.rs @@ -1,9 +1,22 @@ -use std::{env::var, time::Duration}; +use std::{env::var, fmt::Debug, time::Duration}; use bytes::Bytes; +pub use http_strategy::HttpWaitStrategy; + +use crate::Image; + +pub(crate) mod cmd_wait; +pub(crate) mod http_strategy; + +pub(crate) trait WaitStrategy { + async fn wait_until_ready( + self, + container: &crate::ContainerAsync, + ) -> crate::core::error::Result<()>; +} /// Represents a condition that needs to be met before a container is considered ready. -#[derive(Debug, Eq, PartialEq, Clone)] +#[derive(Debug, Clone)] pub enum WaitFor { /// An empty condition. Useful for default cases or fallbacks. Nothing, @@ -15,33 +28,56 @@ pub enum WaitFor { Duration { length: Duration }, /// Wait for the container's status to become `healthy`. Healthcheck, + /// Wait for a certain HTTP response. + Http(HttpWaitStrategy), } impl WaitFor { + /// Wait for the message to appear on the container's stdout. pub fn message_on_stdout(message: impl AsRef<[u8]>) -> WaitFor { WaitFor::StdOutMessage { message: Bytes::from(message.as_ref().to_vec()), } } + /// Wait for the message to appear on the container's stderr. pub fn message_on_stderr(message: impl AsRef<[u8]>) -> WaitFor { WaitFor::StdErrMessage { message: Bytes::from(message.as_ref().to_vec()), } } + /// Wait for the container to become healthy. + pub fn healthcheck() -> WaitFor { + WaitFor::Healthcheck + } + + /// Wait for a certain HTTP response. + pub fn http(http_strategy: HttpWaitStrategy) -> WaitFor { + WaitFor::Http(http_strategy) + } + + /// Wait for a certain amount of seconds. + /// + /// Generally, it's not recommended to use this method, as it's better to wait for a specific condition to be met. pub fn seconds(length: u64) -> WaitFor { WaitFor::Duration { length: Duration::from_secs(length), } } + /// Wait for a certain amount of millis. + /// + /// Generally, it's not recommended to use this method, as it's better to wait for a specific condition to be met. pub fn millis(length: u64) -> WaitFor { WaitFor::Duration { length: Duration::from_millis(length), } } + /// Wait for a certain amount of millis specified in the environment variable. + /// + /// Generally, it's not recommended to use this method, as it's better to wait for a specific condition to be met. pub fn millis_in_env_var(name: &'static str) -> WaitFor { let additional_sleep_period = var(name).map(|value| value.parse()); @@ -55,3 +91,9 @@ impl WaitFor { .unwrap_or(WaitFor::Nothing) } } + +impl From for WaitFor { + fn from(value: HttpWaitStrategy) -> Self { + Self::Http(value) + } +} diff --git a/testcontainers/tests/async_runner.rs b/testcontainers/tests/async_runner.rs index b2a70ed6..dd6a09b5 100644 --- a/testcontainers/tests/async_runner.rs +++ b/testcontainers/tests/async_runner.rs @@ -1,8 +1,9 @@ use std::time::Duration; use bollard::Docker; +use reqwest::StatusCode; use testcontainers::{ - core::{CmdWaitFor, ExecCommand, WaitFor}, + core::{wait::HttpWaitStrategy, CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor}, runners::AsyncRunner, GenericImage, *, }; @@ -133,6 +134,19 @@ async fn async_run_exec() -> anyhow::Result<()> { Ok(()) } +#[tokio::test] +async fn async_wait_for_http() -> anyhow::Result<()> { + let _ = pretty_env_logger::try_init(); + + let image = GenericImage::new("simple_web_server", "latest") + .with_exposed_port(80.tcp()) + .with_wait_for(WaitFor::http( + HttpWaitStrategy::new("/").with_expected_status_code(StatusCode::OK), + )); + let _container = image.start().await?; + Ok(()) +} + #[tokio::test] async fn async_run_exec_fails_due_to_unexpected_code() -> anyhow::Result<()> { let _ = pretty_env_logger::try_init(); diff --git a/testcontainers/tests/sync_runner.rs b/testcontainers/tests/sync_runner.rs index f90fe794..ae1b9f3d 100644 --- a/testcontainers/tests/sync_runner.rs +++ b/testcontainers/tests/sync_runner.rs @@ -1,7 +1,8 @@ #![cfg(feature = "blocking")] +use reqwest::StatusCode; use testcontainers::{ - core::{CmdWaitFor, ExecCommand, Host, IntoContainerPort, WaitFor}, + core::{wait::HttpWaitStrategy, CmdWaitFor, ExecCommand, Host, IntoContainerPort, WaitFor}, runners::SyncRunner, *, }; @@ -35,6 +36,19 @@ fn sync_can_run_hello_world() -> anyhow::Result<()> { Ok(()) } +#[test] +fn sync_wait_for_http() -> anyhow::Result<()> { + let _ = pretty_env_logger::try_init(); + + let image = GenericImage::new("simple_web_server", "latest") + .with_exposed_port(80.tcp()) + .with_wait_for(WaitFor::http( + HttpWaitStrategy::new("/").with_expected_status_code(StatusCode::OK), + )); + let _container = image.start()?; + Ok(()) +} + #[test] fn generic_image_with_custom_entrypoint() -> anyhow::Result<()> { let generic = get_server_container(None);