Skip to content

Commit

Permalink
perf: use separate dropper thread for current_thread flavor (#666)
Browse files Browse the repository at this point in the history
`tokio::test` run with `current_thread` flavor by default and it's
typical use-case.

However, we need to perform some async operations on drop, and it used
to spawn a new thread for each drop. It's inefficient and was a
temporary solution.

Now, it uses:
- for `multi-thread` runtime flavor: `tokio::task::block_in_place`
- for `current-thread` runtime flavor: lazily spawns a separate thread
for dropping tasks, thus it will be only 1 thread regardless of the
number of containers, networks, etc.
  • Loading branch information
DDtKey authored Jun 17, 2024
1 parent 6a9b643 commit e5217e3
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 38 deletions.
2 changes: 1 addition & 1 deletion testcontainers/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ pub use self::{

mod image;

pub(crate) mod async_drop;
pub(crate) mod client;
pub(crate) mod containers;
pub(crate) mod env;
pub mod error;
pub(crate) mod logs;
pub(crate) mod macros;
pub(crate) mod mounts;
pub(crate) mod network;
pub mod ports;
Expand Down
54 changes: 54 additions & 0 deletions testcontainers/src/core/async_drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::sync::OnceLock;

use futures::future::BoxFuture;

static DROP_TASK_SENDER: OnceLock<tokio::sync::mpsc::UnboundedSender<BoxFuture<'static, ()>>> =
OnceLock::new();

/// A helper to perform async operations in `Drop` implementation.
///
/// The behavior depends on the runtime flavor used in the test:
/// - `multi-threaded` runtime: it will use `tokio::task::block_in_place` to run the provided future
/// - `current-thread` runtime: it spawns a separate tokio runtime in a dedicated thread to run the provided futures.
/// * Only 1 drop-worker for the process, regardless of number of containers and drops.
// We can consider creating `AsyncDrop` trait + `AsyncDropGuard<T: AsyncDrop>` wrapper to make it more ergonomic.
// However, we have a only couple of places where we need this functionality.
pub(crate) fn async_drop(future: impl std::future::Future<Output = ()> + Send + 'static) {
let handle = tokio::runtime::Handle::current();
match handle.runtime_flavor() {
tokio::runtime::RuntimeFlavor::CurrentThread => {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
dropper_task_sender()
.send(Box::pin(async move {
future.await;
let _ = tx.send(());
}))
.expect("drop-worker must be running: failed to send drop task");
let _ = rx.recv();
}
tokio::runtime::RuntimeFlavor::MultiThread => {
tokio::task::block_in_place(move || handle.block_on(future))
}
_ => unreachable!("unsupported runtime flavor"),
}
}

fn dropper_task_sender() -> &'static tokio::sync::mpsc::UnboundedSender<BoxFuture<'static, ()>> {
DROP_TASK_SENDER.get_or_init(|| {
let (dropper_tx, mut dropper_rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.thread_name("testcontainers-drop-worker")
.enable_all()
.build()
.expect("failed to create dropper runtime")
.block_on(async move {
while let Some(future) = dropper_rx.recv().await {
future.await;
}
});
});

dropper_tx
})
}
10 changes: 4 additions & 6 deletions testcontainers/src/core/containers/async_container.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use std::{fmt, net::IpAddr, pin::Pin, str::FromStr, sync::Arc, time::Duration};

use tokio::{
io::{AsyncBufRead, AsyncReadExt},
runtime::RuntimeFlavor,
};
use tokio::io::{AsyncBufRead, AsyncReadExt};

use crate::{
core::{
async_drop,
client::Client,
env,
error::{ContainerMissingInfo, ExecError, Result, TestcontainersError},
macros,
network::Network,
ports::Ports,
wait::WaitStrategy,
Expand Down Expand Up @@ -352,7 +349,8 @@ where
log::debug!("Container {id} was successfully dropped");
};

macros::block_on!(drop_task, "failed to remove container on drop");
async_drop::async_drop(drop_task);
// async_drop::block_on!(drop_task, "failed to remove container on drop");
}
}
}
Expand Down
28 changes: 0 additions & 28 deletions testcontainers/src/core/macros.rs

This file was deleted.

7 changes: 4 additions & 3 deletions testcontainers/src/core/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use std::{
sync::{Arc, OnceLock, Weak},
};

use tokio::{runtime::RuntimeFlavor, sync::Mutex};
use tokio::sync::Mutex;

use crate::core::{
async_drop,
client::{Client, ClientError},
env, macros,
env,
};

pub(crate) static CREATED_NETWORKS: OnceLock<Mutex<HashMap<String, Weak<Network>>>> =
Expand Down Expand Up @@ -87,7 +88,7 @@ impl Drop for Network {
}
};

macros::block_on!(drop_task, "failed to remove network on drop");
async_drop::async_drop(drop_task);
}
}
}
Expand Down

0 comments on commit e5217e3

Please sign in to comment.