Skip to content

Commit 338ee6d

Browse files
committed
flush fixup
1 parent 5f7c36a commit 338ee6d

File tree

1 file changed

+48
-20
lines changed

1 file changed

+48
-20
lines changed

turbopack/crates/turbo-persistence/src/write_batch.rs

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
185185
let sst = self.create_sst_file(family, global_collector.sorted())?;
186186
global_collector.clear();
187187
self.new_sst_files.lock().push(sst);
188-
self.idle_collectors.lock().push(global_collector);
188+
self.dispose_collector(global_collector);
189189
}
190190
Ok(())
191191
}
@@ -197,6 +197,14 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
197197
.unwrap_or_else(|| Collector::new())
198198
}
199199

200+
fn dispose_collector(&self, collector: Collector<K>) {
201+
self.idle_collectors.lock().push(collector);
202+
}
203+
204+
fn dispose_thread_local_collector(&self, collector: Collector<K, THREAD_LOCAL_SIZE_SHIFT>) {
205+
self.idle_thread_local_collectors.lock().push(collector);
206+
}
207+
200208
/// Puts a key-value pair into the write batch.
201209
pub fn put(&self, family: u32, key: K, value: ValueBuffer<'_>) -> Result<()> {
202210
let state = self.thread_local_state();
@@ -224,7 +232,9 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
224232
///
225233
/// Safety: Caller must ensure that no concurrent put or delete operation is happening on the
226234
/// flushed family.
235+
#[tracing::instrument(level = "trace", skip(self))]
227236
pub unsafe fn flush(&self, family: u32) -> Result<()> {
237+
// Flush the thread local collectors to the global collector.
228238
let mut collectors = Vec::new();
229239
for cell in self.thread_locals.iter() {
230240
let state = unsafe { &mut *cell.get() };
@@ -235,25 +245,43 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
235245
}
236246
}
237247

238-
let shared_error = Mutex::new(Ok(()));
239-
scope(|scope| {
240-
for mut collector in collectors {
241-
let this = &self;
242-
let shared_error = &shared_error;
243-
let span = Span::current();
244-
scope.spawn(move |_| {
245-
let _span = span.entered();
246-
if let Err(err) =
247-
this.flush_thread_local_collector(family as u32, &mut collector)
248-
{
249-
*shared_error.lock() = Err(err);
248+
let span = Span::current();
249+
collectors.into_par_iter().try_for_each(|mut collector| {
250+
let _span = span.clone().entered();
251+
self.flush_thread_local_collector(family, &mut collector)?;
252+
self.dispose_thread_local_collector(collector);
253+
anyhow::Ok(())
254+
})?;
255+
256+
// Now we flush the global collector(s).
257+
let mut collector_state = self.collectors[usize_from_u32(family)].lock();
258+
match &mut *collector_state {
259+
GlobalCollectorState::Unsharded(collector) => {
260+
if !collector.is_empty() {
261+
let sst = self.create_sst_file(family, collector.sorted())?;
262+
collector.clear();
263+
self.new_sst_files.lock().push(sst);
264+
}
265+
}
266+
GlobalCollectorState::Sharded(_) => {
267+
let GlobalCollectorState::Sharded(shards) = replace(
268+
&mut *collector_state,
269+
GlobalCollectorState::Unsharded(self.get_new_collector()),
270+
) else {
271+
unreachable!();
272+
};
273+
shards.into_par_iter().try_for_each(|mut collector| {
274+
let _span = span.clone().entered();
275+
if !collector.is_empty() {
276+
let sst = self.create_sst_file(family, collector.sorted())?;
277+
collector.clear();
278+
self.new_sst_files.lock().push(sst);
279+
self.dispose_collector(collector);
250280
}
251-
this.idle_thread_local_collectors.lock().push(collector);
252-
});
281+
anyhow::Ok(())
282+
})?;
253283
}
254-
});
255-
256-
shared_error.into_inner()?;
284+
}
257285

258286
Ok(())
259287
}
@@ -288,7 +316,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
288316
{
289317
*shared_error.lock() = Err(err);
290318
}
291-
this.idle_thread_local_collectors.lock().push(collector);
319+
this.dispose_thread_local_collector(collector);
292320
});
293321
}
294322
}
@@ -322,7 +350,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
322350
if !collector.is_empty() {
323351
let sst = self.create_sst_file(family, collector.sorted())?;
324352
collector.clear();
325-
self.idle_collectors.lock().push(collector);
353+
self.dispose_collector(collector);
326354
shared_new_sst_files.lock().push(sst);
327355
}
328356
anyhow::Ok(())

0 commit comments

Comments
 (0)