Skip to content

Commit 7c6c06d

Browse files
authored
Turbopack: show timing event for database compaction (#82369)
### What? - show timing event for database compaction - show timing event for persistent caching in dev too
1 parent 9934b37 commit 7c6c06d

File tree

6 files changed

+85
-47
lines changed

6 files changed

+85
-47
lines changed

packages/next/src/server/dev/hot-reloader-turbopack.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ export async function createHotReloaderTurbopack(
258258
}
259259
)
260260
backgroundLogCompilationEvents(project, {
261-
eventTypes: ['StartupCacheInvalidationEvent'],
261+
eventTypes: ['StartupCacheInvalidationEvent', 'TimingEvent'],
262262
})
263263
setBundlerFindSourceMapImplementation(
264264
getSourceMapFromTurbopack.bind(null, project, projectPath)

turbopack/crates/turbo-persistence/src/db.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,7 @@ impl TurboPersistence {
650650
/// files is above the given threshold. The coverage is the average number of SST files that
651651
/// need to be read to find a key. It also limits the maximum number of SST files that are
652652
/// merged at once, which is the main factor for the runtime of the compaction.
653-
pub fn compact(&self, compact_config: &CompactConfig) -> Result<()> {
653+
pub fn compact(&self, compact_config: &CompactConfig) -> Result<bool> {
654654
if self.read_only {
655655
bail!("Compaction is not allowed on a read only database");
656656
}
@@ -689,7 +689,8 @@ impl TurboPersistence {
689689
.context("Failed to compact database")?;
690690
}
691691

692-
if !new_meta_files.is_empty() {
692+
let has_changes = !new_meta_files.is_empty();
693+
if has_changes {
693694
self.commit(CommitOptions {
694695
new_meta_files,
695696
new_sst_files,
@@ -704,7 +705,7 @@ impl TurboPersistence {
704705

705706
self.active_write_operation.store(false, Ordering::Release);
706707

707-
Ok(())
708+
Ok(has_changes)
708709
}
709710

710711
/// Internal function to perform a compaction.

turbopack/crates/turbo-tasks-backend/src/database/turbo.rs

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
1-
use std::{
2-
cmp::max,
3-
path::PathBuf,
4-
sync::Arc,
5-
thread::{JoinHandle, available_parallelism, spawn},
6-
};
1+
use std::{cmp::max, path::PathBuf, sync::Arc, thread::available_parallelism, time::Instant};
72

83
use anyhow::{Ok, Result};
94
use parking_lot::Mutex;
105
use turbo_persistence::{
116
ArcSlice, CompactConfig, KeyBase, StoreKey, TurboPersistence, ValueBuffer,
127
};
8+
use turbo_tasks::{JoinHandle, message_queue::TimingEvent, spawn, turbo_tasks};
139

1410
use crate::database::{
1511
key_value_database::{KeySpace, KeyValueDatabase},
@@ -84,7 +80,7 @@ impl KeyValueDatabase for TurboKeyValueDatabase {
8480
) -> Result<WriteBatch<'_, Self::SerialWriteBatch<'_>, Self::ConcurrentWriteBatch<'_>>> {
8581
// Wait for the compaction to finish
8682
if let Some(join_handle) = self.compact_join_handle.lock().take() {
87-
join_handle.join().unwrap()?;
83+
join_handle.join()?;
8884
}
8985
// Start a new write batch
9086
Ok(WriteBatch::concurrent(TurboWriteBatch {
@@ -100,23 +96,44 @@ impl KeyValueDatabase for TurboKeyValueDatabase {
10096
fn shutdown(&self) -> Result<()> {
10197
// Wait for the compaction to finish
10298
if let Some(join_handle) = self.compact_join_handle.lock().take() {
103-
join_handle.join().unwrap()?;
99+
join_handle.join()?;
104100
}
105101
// Compact the database on shutdown
106-
self.db.compact(&CompactConfig {
107-
max_merge_segment_count: if self.is_ci {
108-
// Fully compact in CI to reduce cache size
109-
usize::MAX
110-
} else {
111-
available_parallelism().map_or(4, |c| max(4, c.get()))
112-
},
113-
..COMPACT_CONFIG
114-
})?;
102+
if self.is_ci {
103+
// Fully compact in CI to reduce cache size
104+
do_compact(&self.db, "Finished database compaction", usize::MAX)?;
105+
} else {
106+
// Compact with a reasonable limit in non-CI environments
107+
do_compact(
108+
&self.db,
109+
"Finished database compaction",
110+
available_parallelism().map_or(4, |c| max(4, c.get())),
111+
)?;
112+
}
115113
// Shutdown the database
116114
self.db.shutdown()
117115
}
118116
}
119117

118+
fn do_compact(
119+
db: &TurboPersistence,
120+
message: &'static str,
121+
max_merge_segment_count: usize,
122+
) -> Result<()> {
123+
let start = Instant::now();
124+
// Compact the database with the given max merge segment count
125+
let ran = db.compact(&CompactConfig {
126+
max_merge_segment_count,
127+
..COMPACT_CONFIG
128+
})?;
129+
if ran {
130+
let elapsed = start.elapsed();
131+
turbo_tasks()
132+
.send_compilation_event(Arc::new(TimingEvent::new(message.to_string(), elapsed)));
133+
}
134+
Ok(())
135+
}
136+
120137
pub struct TurboWriteBatch<'a> {
121138
batch: turbo_persistence::WriteBatch<WriteBuffer<'static>, 5>,
122139
db: &'a Arc<TurboPersistence>,
@@ -144,12 +161,12 @@ impl<'a> BaseWriteBatch<'a> for TurboWriteBatch<'a> {
144161
if let Some(compact_join_handle) = self.compact_join_handle {
145162
// Start a new compaction in the background
146163
let db = self.db.clone();
147-
let handle = spawn(move || {
148-
db.compact(&CompactConfig {
149-
max_merge_segment_count: available_parallelism()
150-
.map_or(4, |c| max(4, c.get() / 2)),
151-
..COMPACT_CONFIG
152-
})
164+
let handle = spawn(async move {
165+
do_compact(
166+
&db,
167+
"Finished database compaction",
168+
available_parallelism().map_or(4, |c| max(4, c.get() / 2)),
169+
)
153170
});
154171
compact_join_handle.lock().replace(handle);
155172
}

turbopack/crates/turbo-tasks/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ use rustc_hash::FxHasher;
118118
pub use scope::scope;
119119
pub use serialization_invalidation::SerializationInvalidator;
120120
pub use shrink_to_fit::ShrinkToFit;
121-
pub use spawn::{JoinHandle, spawn, spawn_blocking, spawn_thread};
121+
pub use spawn::{JoinHandle, block_for_future, spawn, spawn_blocking, spawn_thread};
122122
pub use state::{State, TransientState};
123123
pub use task::{SharedReference, TypedSharedReference, task_input::TaskInput};
124124
pub use task_execution_reason::TaskExecutionReason;

turbopack/crates/turbo-tasks/src/manager.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,27 +1058,30 @@ impl<B: Backend + 'static> TurboTasks<B> {
10581058
}
10591059

10601060
pub async fn stop_and_wait(&self) {
1061-
self.backend.stopping(self);
1062-
self.stopped.store(true, Ordering::Release);
1063-
{
1064-
let listener = self
1065-
.event
1066-
.listen_with_note(|| || "wait for stop".to_string());
1067-
if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1068-
listener.await;
1061+
turbo_tasks_future_scope(self.pin(), async move {
1062+
self.backend.stopping(self);
1063+
self.stopped.store(true, Ordering::Release);
1064+
{
1065+
let listener = self
1066+
.event
1067+
.listen_with_note(|| || "wait for stop".to_string());
1068+
if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 {
1069+
listener.await;
1070+
}
10691071
}
1070-
}
1071-
{
1072-
let listener = self.event_background.listen();
1073-
if self
1074-
.currently_scheduled_background_jobs
1075-
.load(Ordering::Acquire)
1076-
!= 0
10771072
{
1078-
listener.await;
1073+
let listener = self.event_background.listen();
1074+
if self
1075+
.currently_scheduled_background_jobs
1076+
.load(Ordering::Acquire)
1077+
!= 0
1078+
{
1079+
listener.await;
1080+
}
10791081
}
1080-
}
1081-
self.backend.stop(self);
1082+
self.backend.stop(self);
1083+
})
1084+
.await;
10821085
}
10831086

10841087
#[track_caller]

turbopack/crates/turbo-tasks/src/spawn.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88

99
use anyhow::Result;
1010
use futures::{FutureExt, ready};
11-
use tokio::runtime::Handle;
11+
use tokio::{runtime::Handle, task::block_in_place};
1212
use tracing::{Instrument, Span, info_span};
1313
use turbo_tasks_malloc::{AllocationInfo, TurboMalloc};
1414

@@ -23,6 +23,12 @@ pub struct JoinHandle<T> {
2323
join_handle: tokio::task::JoinHandle<(Result<T, TurboTasksPanic>, Duration, AllocationInfo)>,
2424
}
2525

26+
impl<T: Send + 'static> JoinHandle<T> {
27+
pub fn join(self) -> T {
28+
block_for_future(self)
29+
}
30+
}
31+
2632
impl<T> Future for JoinHandle<T> {
2733
type Output = T;
2834

@@ -87,3 +93,14 @@ pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
8793
})
8894
});
8995
}
96+
97+
/// Blocking waits for a future to complete. This blocks the current thread potentially staling
98+
/// other concurrent futures (but not other concurrent tasks). Try to avoid this method infavor of
99+
/// awaiting the future instead.
100+
pub fn block_for_future<T: Send>(future: impl Future<Output = T> + Send + 'static) -> T {
101+
let handle = Handle::current();
102+
block_in_place(|| {
103+
let _span = info_span!("blocking").entered();
104+
handle.block_on(future)
105+
})
106+
}

0 commit comments

Comments
 (0)