Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: extend exec interface to return logs and exec code #631

Merged
merged 12 commits into from
May 18, 2024
7 changes: 5 additions & 2 deletions testcontainers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@ rustdoc-args = ["--cfg", "docsrs"]
async-trait = { version = "0.1" }
bollard = { version = "0.16.1", features = ["ssl"] }
bollard-stubs = "=1.44.0-rc.2"
bytes = "1.6.0"
conquer-once = { version = "0.4", optional = true }
dirs = "5.0.1"
docker_credential = "1.3.1"
futures = "0.3"
log = "0.4"
memchr = "2.7.2"
parse-display = "0.9.0"
serde = { version = "1", features = ["derive"] }
serde-java-properties = { version = "0.2.0", optional = true }
serde_json = "1"
serde_with = "3.7.0"
signal-hook = { version = "0.3", optional = true }
tokio = { version = "1", features = ["macros", "fs", "rt-multi-thread"] }
tokio-util = "0.7.10"
thiserror = "1.0.60"
tokio = { version = "1", features = ["macros", "fs", "rt-multi-thread"] }
tokio-stream = "0.1.15"
tokio-util = { version = "0.7.10", features = ["io-util"] }
url = { version = "2", features = ["serde"] }

[features]
Expand Down
1 change: 1 addition & 0 deletions testcontainers/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use self::{
mounts::{AccessMode, Mount, MountType},
};

pub mod errors;
mod image;

pub(crate) mod client;
Expand Down
201 changes: 107 additions & 94 deletions testcontainers/src/core/client.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
use std::{io, time::Duration};
use std::{io, sync::Arc, time::Duration};

use bollard::{
auth::DockerCredentials,
container::{Config, CreateContainerOptions, LogsOptions, RemoveContainerOptions},
container::{Config, CreateContainerOptions, LogOutput, LogsOptions, RemoveContainerOptions},
exec::{CreateExecOptions, StartExecOptions, StartExecResults},
image::CreateImageOptions,
network::{CreateNetworkOptions, InspectNetworkOptions},
Docker,
};
use bollard_stubs::models::{
ContainerCreateResponse, ContainerInspectResponse, HealthStatusEnum, Network,
ContainerCreateResponse, ContainerInspectResponse, ExecInspectResponse, HealthStatusEnum,
Network,
};
use futures::{StreamExt, TryStreamExt};
use tokio::sync::OnceCell;

use crate::core::{env, logs::LogStreamAsync, ports::Ports, WaitFor};
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::core::{
client::exec::ExecResult,
env,
errors::WaitContainerError,
logs::{LogSource, LogStreamAsync},
ports::Ports,
WaitFor,
};

mod bollard_client;
mod exec;
mod factory;

static IN_A_CONTAINER: OnceCell<bool> = OnceCell::const_new();
Expand All @@ -29,11 +39,6 @@ async fn is_in_container() -> bool {
.await
}

pub(crate) struct AttachLog {
stdout: bool,
stderr: bool,
}

/// The internal client.
pub(crate) struct Client {
pub(crate) config: env::Config,
Expand All @@ -49,25 +54,29 @@ impl Client {
}

pub(crate) fn stdout_logs(&self, id: &str) -> LogStreamAsync<'_> {
self.logs(id, AttachLog::stdout())
self.logs(id, LogSource::StdOut)
}

pub(crate) fn stderr_logs(&self, id: &str) -> LogStreamAsync<'_> {
self.logs(id, AttachLog::stderr())
self.logs(id, LogSource::StdErr)
}

pub(crate) async fn ports(&self, id: &str) -> Ports {
self.inspect(id)
.await
.unwrap()
.network_settings
.unwrap_or_default()
.ports
.map(Ports::from)
.unwrap_or_default()
}

pub(crate) async fn inspect(&self, id: &str) -> ContainerInspectResponse {
self.bollard.inspect_container(id, None).await.unwrap()
pub(crate) async fn inspect(
&self,
id: &str,
) -> Result<ContainerInspectResponse, bollard::errors::Error> {
self.bollard.inspect_container(id, None).await
}

pub(crate) async fn rm(&self, id: &str) {
Expand Down Expand Up @@ -99,20 +108,15 @@ impl Client {
&self,
container_id: &str,
cmd: Vec<String>,
attach_log: AttachLog,
) -> (String, LogStreamAsync<'_>) {
) -> Result<ExecResult<'_>, bollard::errors::Error> {
let config = CreateExecOptions {
cmd: Some(cmd),
attach_stdout: Some(attach_log.stdout),
attach_stderr: Some(attach_log.stderr),
attach_stdout: Some(true),
attach_stderr: Some(true),
..Default::default()
};

let exec = self
.bollard
.create_exec(container_id, config)
.await
.expect("failed to create exec");
let exec = self.bollard.create_exec(container_id, config).await?;

let res = self
.bollard
Expand All @@ -124,43 +128,86 @@ impl Client {
output_capacity: None,
}),
)
.await
.expect("failed to start exec");
.await?;

match res {
StartExecResults::Attached { output, .. } => {
let stream = output
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.map(|chunk| {
let bytes = chunk?.into_bytes();
let str = std::str::from_utf8(bytes.as_ref())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

Ok(str.to_string())
})
.boxed();

(exec.id, LogStreamAsync::new(stream))
let (stdout_tx, stdout_rx) = tokio::sync::mpsc::unbounded_channel();
let (stderr_tx, stderr_rx) = tokio::sync::mpsc::unbounded_channel();

tokio::spawn(async move {
macro_rules! handle_error {
($res:expr) => {
if let Err(err) = $res {
log::debug!(
"Receiver has been dropped, stop producing messages: {}",
err
);
break;
}
};
}
let mut output = output;
while let Some(chunk) = output.next().await {
match chunk {
Ok(LogOutput::StdOut { message }) => {
handle_error!(stdout_tx.send(Ok(message)));
}
Ok(LogOutput::StdErr { message }) => {
handle_error!(stderr_tx.send(Ok(message)));
}
Err(err) => {
let err = Arc::new(err);
handle_error!(stdout_tx
.send(Err(io::Error::new(io::ErrorKind::Other, err.clone()))));
handle_error!(
stderr_tx.send(Err(io::Error::new(io::ErrorKind::Other, err)))
);
}
Ok(_) => {
unreachable!("only stdout and stderr are supported")
}
}
}
});

let stdout = LogStreamAsync::new(UnboundedReceiverStream::new(stdout_rx).boxed())
.enable_cache();
let stderr = LogStreamAsync::new(UnboundedReceiverStream::new(stderr_rx).boxed())
.enable_cache();

Ok(ExecResult {
id: exec.id,
stdout,
stderr,
})
}
StartExecResults::Detached => unreachable!("detach is false"),
}
}

pub(crate) async fn block_until_ready(&self, id: &str, ready_conditions: &[WaitFor]) {
pub(crate) async fn inspect_exec(
&self,
exec_id: &str,
) -> Result<ExecInspectResponse, bollard::errors::Error> {
self.bollard.inspect_exec(exec_id).await
}

pub(crate) async fn block_until_ready(
&self,
id: &str,
ready_conditions: &[WaitFor],
) -> Result<(), WaitContainerError> {
log::debug!("Waiting for container {id} to be ready");

for condition in ready_conditions {
match condition {
WaitFor::StdOutMessage { message } => self
.stdout_logs(id)
.wait_for_message(message)
.await
.unwrap(),
WaitFor::StdErrMessage { message } => self
.stderr_logs(id)
.wait_for_message(message)
.await
.unwrap(),
WaitFor::StdOutMessage { message } => {
self.stdout_logs(id).wait_for_message(message).await?
}
WaitFor::StdErrMessage { message } => {
self.stderr_logs(id).wait_for_message(message).await?
}
WaitFor::Duration { length } => {
tokio::time::sleep(*length).await;
}
Expand All @@ -170,18 +217,20 @@ impl Client {
let health_status = self
.inspect(id)
.await
.map_err(WaitContainerError::Inspect)?
.state
.unwrap_or_else(|| panic!("Container state not available"))
.ok_or(WaitContainerError::StateUnavailable)?
.health
.unwrap_or_else(|| panic!("Health state not available"))
.status;
.and_then(|health| health.status);

match health_status {
Some(HEALTHY) => break,
None | Some(EMPTY) | Some(NONE) => {
panic!("Healthcheck not configured for container")
return Err(WaitContainerError::HealthCheckNotConfigured(
id.to_string(),
))
}
Some(UNHEALTHY) => panic!("Healthcheck reports unhealthy"),
Some(UNHEALTHY) => return Err(WaitContainerError::Unhealthy),
Some(STARTING) => {
tokio::time::sleep(Duration::from_millis(100)).await;
}
Expand All @@ -192,30 +241,24 @@ impl Client {
}

log::debug!("Container {id} is now ready!");
Ok(())
}

fn logs(&self, container_id: &str, attach_log: AttachLog) -> LogStreamAsync<'_> {
fn logs(&self, container_id: &str, log_source: LogSource) -> LogStreamAsync<'_> {
let options = LogsOptions {
follow: true,
stdout: attach_log.stdout,
stderr: attach_log.stderr,
stdout: log_source.is_stdout(),
stderr: log_source.is_stderr(),
tail: "all".to_owned(),
..Default::default()
};

let stream = self
.bollard
.logs(container_id, Some(options))
.map_ok(|chunk| chunk.into_bytes())
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.map(|chunk| {
let bytes = chunk?.into_bytes();
let str = std::str::from_utf8(bytes.as_ref())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

Ok(String::from(str))
})
.boxed();

LogStreamAsync::new(stream)
}

Expand Down Expand Up @@ -342,33 +385,3 @@ impl Client {
Some(bollard_credentials)
}
}

impl AttachLog {
pub(crate) fn stdout() -> Self {
Self {
stdout: true,
stderr: false,
}
}

pub(crate) fn stderr() -> Self {
Self {
stdout: false,
stderr: true,
}
}

pub(crate) fn stdout_and_stderr() -> Self {
Self {
stdout: true,
stderr: true,
}
}

pub(crate) fn nothing() -> Self {
Self {
stdout: false,
stderr: false,
}
}
}
21 changes: 21 additions & 0 deletions testcontainers/src/core/client/exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::core::logs::LogStreamAsync;

pub(crate) struct ExecResult<'a> {
pub(crate) id: String,
pub(crate) stdout: LogStreamAsync<'a>,
pub(crate) stderr: LogStreamAsync<'a>,
}

impl<'a> ExecResult<'a> {
pub(crate) fn id(&self) -> &str {
&self.id
}

pub(crate) fn stdout(&mut self) -> &mut LogStreamAsync<'a> {
&mut self.stdout
}

pub(crate) fn stderr(&mut self) -> &mut LogStreamAsync<'a> {
&mut self.stderr
}
}
Loading
Loading