Skip to content

Commit 5f7c36a

Browse files
committed
flush key spaces when done with them
1 parent f2425e0 commit 5f7c36a

File tree

6 files changed

+96
-3
lines changed

6 files changed

+96
-3
lines changed

turbopack/crates/turbo-persistence/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#![feature(once_cell_try)]
22
#![feature(new_zeroed_alloc)]
33
#![feature(get_mut_unchecked)]
4+
#![feature(sync_unsafe_cell)]
45

56
mod arc_slice;
67
mod collector;

turbopack/crates/turbo-persistence/src/write_batch.rs

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
cell::UnsafeCell,
2+
cell::SyncUnsafeCell,
33
fs::File,
44
io::Write,
55
mem::{replace, take},
@@ -68,7 +68,7 @@ pub struct WriteBatch<K: StoreKey + Send, const FAMILIES: usize> {
6868
/// The current sequence number counter. Increased for every new SST file or blob file.
6969
current_sequence_number: AtomicU32,
7070
/// The thread local state.
71-
thread_locals: ThreadLocal<UnsafeCell<ThreadLocalState<K, FAMILIES>>>,
71+
thread_locals: ThreadLocal<SyncUnsafeCell<ThreadLocalState<K, FAMILIES>>>,
7272
/// Collectors in use. The thread local collectors flush into these when they are full.
7373
collectors: [Mutex<GlobalCollectorState<K>>; FAMILIES],
7474
/// The list of new SST files that have been created.
@@ -109,7 +109,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
109109
#[allow(clippy::mut_from_ref)]
110110
fn thread_local_state(&self) -> &mut ThreadLocalState<K, FAMILIES> {
111111
let cell = self.thread_locals.get_or(|| {
112-
UnsafeCell::new(ThreadLocalState {
112+
SyncUnsafeCell::new(ThreadLocalState {
113113
collectors: [const { None }; FAMILIES],
114114
new_blob_files: Vec::new(),
115115
})
@@ -219,6 +219,45 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
219219
Ok(())
220220
}
221221

222+
/// Flushes a family of the write batch, reducing the amount of buffered memory used.
223+
/// Does not commit any data persistently.
224+
///
225+
/// Safety: Caller must ensure that no concurrent put or delete operation is happening on the
226+
/// flushed family.
227+
pub unsafe fn flush(&self, family: u32) -> Result<()> {
228+
let mut collectors = Vec::new();
229+
for cell in self.thread_locals.iter() {
230+
let state = unsafe { &mut *cell.get() };
231+
if let Some(collector) = state.collectors[usize_from_u32(family)].take() {
232+
if !collector.is_empty() {
233+
collectors.push(collector);
234+
}
235+
}
236+
}
237+
238+
let shared_error = Mutex::new(Ok(()));
239+
scope(|scope| {
240+
for mut collector in collectors {
241+
let this = &self;
242+
let shared_error = &shared_error;
243+
let span = Span::current();
244+
scope.spawn(move |_| {
245+
let _span = span.entered();
246+
if let Err(err) =
247+
this.flush_thread_local_collector(family as u32, &mut collector)
248+
{
249+
*shared_error.lock() = Err(err);
250+
}
251+
this.idle_thread_local_collectors.lock().push(collector);
252+
});
253+
}
254+
});
255+
256+
shared_error.into_inner()?;
257+
258+
Ok(())
259+
}
260+
222261
/// Finishes the write batch by returning the new sequence number and the new SST files. This
223262
/// writes all outstanding thread local data to disk.
224263
pub(crate) fn finish(&mut self) -> Result<FinishResult> {

turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ impl SerialWriteBatch<'_> for NoopWriteBatch {
9090
fn delete(&mut self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> {
9191
Ok(())
9292
}
93+
94+
fn flush(&mut self, _key_space: KeySpace) -> Result<()> {
95+
Ok(())
96+
}
9397
}
9498

9599
impl ConcurrentWriteBatch<'_> for NoopWriteBatch {
@@ -105,4 +109,8 @@ impl ConcurrentWriteBatch<'_> for NoopWriteBatch {
105109
fn delete(&self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> {
106110
Ok(())
107111
}
112+
113+
unsafe fn flush(&self, _key_space: KeySpace) -> Result<()> {
114+
Ok(())
115+
}
108116
}

turbopack/crates/turbo-tasks-backend/src/database/turbo.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ impl<'a> ConcurrentWriteBatch<'a> for TurboWriteBatch<'a> {
160160
fn delete(&self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> {
161161
self.batch.delete(key_space as u32, key.into_static())
162162
}
163+
164+
unsafe fn flush(&self, key_space: KeySpace) -> Result<()> {
165+
self.batch.flush(key_space as u32)
166+
}
163167
}
164168

165169
impl KeyBase for WriteBuffer<'_> {

turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,18 @@ pub trait SerialWriteBatch<'a>: BaseWriteBatch<'a> {
6666
value: WriteBuffer<'_>,
6767
) -> Result<()>;
6868
fn delete(&mut self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()>;
69+
fn flush(&mut self, key_space: KeySpace) -> Result<()>;
6970
}
7071

7172
pub trait ConcurrentWriteBatch<'a>: BaseWriteBatch<'a> + Sync + Send {
7273
fn put(&self, key_space: KeySpace, key: WriteBuffer<'_>, value: WriteBuffer<'_>) -> Result<()>;
7374
fn delete(&self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()>;
75+
/// Flushes a key space of the write batch, reducing the amount of buffered memory used.
76+
/// Does not commit any data persistently.
77+
///
78+
/// Safety: Caller must ensure that no concurrent put or delete operation is happening on the
79+
/// flushed key space.
80+
unsafe fn flush(&self, key_space: KeySpace) -> Result<()>;
7481
}
7582

7683
pub enum WriteBatch<'a, S, C>
@@ -164,6 +171,16 @@ where
164171
WriteBatch::Concurrent(c, _) => c.delete(key_space, key),
165172
}
166173
}
174+
175+
fn flush(&mut self, key_space: KeySpace) -> Result<()> {
176+
match self {
177+
WriteBatch::Serial(s) => s.flush(key_space),
178+
WriteBatch::Concurrent(c, _) => {
179+
// Safety: the &mut self ensures that no concurrent operation is happening
180+
unsafe { c.flush(key_space) }
181+
}
182+
}
183+
}
167184
}
168185

169186
pub enum WriteBatchRef<'r, 'a, S, C>
@@ -241,6 +258,16 @@ where
241258
WriteBatchRef::Concurrent(c, _) => c.delete(key_space, key),
242259
}
243260
}
261+
262+
fn flush(&mut self, key_space: KeySpace) -> Result<()> {
263+
match self {
264+
WriteBatchRef::Serial(s) => s.flush(key_space),
265+
WriteBatchRef::Concurrent(c, _) => {
266+
// Safety: the &mut self ensures that no concurrent operation is happening
267+
unsafe { c.flush(key_space) }
268+
}
269+
}
270+
}
244271
}
245272

246273
pub struct UnimplementedWriteBatch;
@@ -275,6 +302,9 @@ impl SerialWriteBatch<'_> for UnimplementedWriteBatch {
275302
fn delete(&mut self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> {
276303
todo!()
277304
}
305+
fn flush(&mut self, _key_space: KeySpace) -> Result<()> {
306+
todo!()
307+
}
278308
}
279309

280310
impl ConcurrentWriteBatch<'_> for UnimplementedWriteBatch {
@@ -289,4 +319,7 @@ impl ConcurrentWriteBatch<'_> for UnimplementedWriteBatch {
289319
fn delete(&self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> {
290320
todo!()
291321
}
322+
unsafe fn flush(&self, _key_space: KeySpace) -> Result<()> {
323+
todo!()
324+
}
292325
}

turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,13 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
184184
{
185185
let _span = tracing::trace_span!("update task data").entered();
186186
process_task_data(snapshots, Some(batch))?;
187+
[KeySpace::TaskMeta, KeySpace::TaskData]
188+
.into_par_iter()
189+
.try_for_each(|key_space| {
190+
// Safety: We already finished all processing of the task data and task
191+
// meta
192+
unsafe { batch.flush(key_space) }
193+
})?;
187194
}
188195

189196
let mut next_task_id = get_next_free_task_id::<
@@ -500,6 +507,7 @@ where
500507
)
501508
.with_context(|| anyhow!("Unable to write operations"))?;
502509
}
510+
batch.flush(KeySpace::Infra)?;
503511
Ok(())
504512
}
505513

0 commit comments

Comments
 (0)