Skip to content

Commit b6ee316

Browse files
authored
Merge pull request #34148 from teskje/persist-turmoil-runtime
persist: provide a way to disable the isolated runtime
2 parents e598639 + a5d2cc6 commit b6ee316

File tree

3 files changed

+51
-8
lines changed

3 files changed

+51
-8
lines changed

src/persist-client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ tonic-build = "0.12.3"
8383

8484
[features]
8585
default = ["mz-build-tools/default", "workspace-hack"]
86+
turmoil = []
8687

8788
[package.metadata.cargo-udeps.ignore]
8889
normal = ["workspace-hack"]

src/persist-client/src/async_runtime.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ impl IsolatedRuntime {
7575
IsolatedRuntime::new(&MetricsRegistry::new(), Some(TEST_THREADS))
7676
}
7777

78+
#[cfg(feature = "turmoil")]
79+
/// Create a no-op shim that spawns tasks on the current tokio runtime.
80+
///
81+
/// This is useful for simulation tests where we don't want to spawn additional threads and/or
82+
/// tokio runtimes.
83+
pub fn new_disabled() -> Self {
84+
IsolatedRuntime { inner: None }
85+
}
86+
7887
/// Spawns a task onto this runtime.
7988
///
8089
/// Note: We purposefully do not use the [`tokio::task::spawn_blocking`] API here, see the doc
@@ -86,10 +95,11 @@ impl IsolatedRuntime {
8695
F: Future + Send + 'static,
8796
F::Output: Send + 'static,
8897
{
89-
self.inner
90-
.as_ref()
91-
.expect("exists until drop")
92-
.spawn_named(name, fut)
98+
if let Some(runtime) = &self.inner {
99+
runtime.spawn_named(name, fut)
100+
} else {
101+
mz_ore::task::spawn(name, fut)
102+
}
93103
}
94104
}
95105

@@ -98,9 +108,8 @@ impl Drop for IsolatedRuntime {
98108
// We don't need to worry about `shutdown_background` leaking
99109
// blocking tasks (i.e., tasks spawned with `spawn_blocking`) because
100110
// the `IsolatedRuntime` wrapper prevents access to `spawn_blocking`.
101-
self.inner
102-
.take()
103-
.expect("cannot drop twice")
104-
.shutdown_background()
111+
if let Some(runtime) = self.inner.take() {
112+
runtime.shutdown_background();
113+
}
105114
}
106115
}

src/persist-client/src/cache.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,39 @@ impl PersistClientCache {
111111
)
112112
}
113113

114+
#[cfg(feature = "turmoil")]
115+
/// Create a [PersistClientCache] for use in turmoil tests.
116+
///
117+
/// Turmoil wants to run all software under test in a single thread, so we disable the
118+
/// (multi-threaded) isolated runtime.
119+
pub fn new_for_turmoil() -> Self {
120+
use crate::rpc::NoopPubSubSender;
121+
122+
let cfg = PersistConfig::new_for_tests();
123+
let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
124+
125+
let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
126+
let _pubsub_receiver_task = mz_ore::task::spawn(|| "noop", async {});
127+
128+
let state_cache = Arc::new(StateCache::new(
129+
&cfg,
130+
Arc::clone(&metrics),
131+
Arc::clone(&pubsub_sender),
132+
));
133+
let isolated_runtime = IsolatedRuntime::new_disabled();
134+
135+
PersistClientCache {
136+
cfg,
137+
metrics,
138+
blob_by_uri: Mutex::new(BTreeMap::new()),
139+
consensus_by_uri: Mutex::new(BTreeMap::new()),
140+
isolated_runtime: Arc::new(isolated_runtime),
141+
state_cache,
142+
pubsub_sender,
143+
_pubsub_receiver_task,
144+
}
145+
}
146+
114147
/// Returns the [PersistConfig] being used by this cache.
115148
pub fn cfg(&self) -> &PersistConfig {
116149
&self.cfg

0 commit comments

Comments
 (0)