Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/persist-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ tonic-build = "0.12.3"

[features]
default = ["mz-build-tools/default", "workspace-hack"]
turmoil = []

[package.metadata.cargo-udeps.ignore]
normal = ["workspace-hack"]
25 changes: 17 additions & 8 deletions src/persist-client/src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ impl IsolatedRuntime {
IsolatedRuntime::new(&MetricsRegistry::new(), Some(TEST_THREADS))
}

#[cfg(feature = "turmoil")]
/// Create a no-op shim that spawns tasks on the current tokio runtime.
///
/// This is useful for simulation tests where we don't want to spawn additional threads and/or
/// tokio runtimes.
pub fn new_disabled() -> Self {
IsolatedRuntime { inner: None }
}

/// Spawns a task onto this runtime.
///
/// Note: We purposefully do not use the [`tokio::task::spawn_blocking`] API here, see the doc
Expand All @@ -86,10 +95,11 @@ impl IsolatedRuntime {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.inner
.as_ref()
.expect("exists until drop")
.spawn_named(name, fut)
if let Some(runtime) = &self.inner {
runtime.spawn_named(name, fut)
} else {
mz_ore::task::spawn(name, fut)
}
}
}

Expand All @@ -98,9 +108,8 @@ impl Drop for IsolatedRuntime {
// We don't need to worry about `shutdown_background` leaking
// blocking tasks (i.e., tasks spawned with `spawn_blocking`) because
// the `IsolatedRuntime` wrapper prevents access to `spawn_blocking`.
self.inner
.take()
.expect("cannot drop twice")
.shutdown_background()
if let Some(runtime) = self.inner.take() {
runtime.shutdown_background();
}
}
}
33 changes: 33 additions & 0 deletions src/persist-client/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,39 @@ impl PersistClientCache {
)
}

#[cfg(feature = "turmoil")]
/// Create a [PersistClientCache] for use in turmoil tests.
///
/// Turmoil wants to run all software under test in a single thread, so we disable the
/// (multi-threaded) isolated runtime.
pub fn new_for_turmoil() -> Self {
use crate::rpc::NoopPubSubSender;

let cfg = PersistConfig::new_for_tests();
let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));

let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
let _pubsub_receiver_task = mz_ore::task::spawn(|| "noop", async {});

let state_cache = Arc::new(StateCache::new(
&cfg,
Arc::clone(&metrics),
Arc::clone(&pubsub_sender),
));
let isolated_runtime = IsolatedRuntime::new_disabled();

PersistClientCache {
cfg,
metrics,
blob_by_uri: Mutex::new(BTreeMap::new()),
consensus_by_uri: Mutex::new(BTreeMap::new()),
isolated_runtime: Arc::new(isolated_runtime),
state_cache,
pubsub_sender,
_pubsub_receiver_task,
}
}

/// Returns the [PersistConfig] being used by this cache.
pub fn cfg(&self) -> &PersistConfig {
&self.cfg
Expand Down