Skip to content

Commit 0027940

Browse files
committed
Turbopack: add turbo_tasks::spawn
1 parent fdbe676 commit 0027940

File tree

6 files changed

+152
-60
lines changed

6 files changed

+152
-60
lines changed

turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
use std::{cmp::max, path::PathBuf, sync::Arc, thread::available_parallelism};
22

3-
use anyhow::Result;
3+
use anyhow::{Ok, Result};
44
use parking_lot::Mutex;
5-
use tokio::{
6-
runtime::Handle,
7-
spawn,
8-
task::{JoinHandle, block_in_place},
9-
};
5+
use tokio::{runtime::Handle, task::block_in_place};
106
use turbo_persistence::{
117
ArcSlice, CompactConfig, KeyBase, StoreKey, TurboPersistence, ValueBuffer,
128
};
9+
use turbo_tasks::{JoinHandle, message_queue::TimingEvent, spawn, turbo_tasks};
1310

1411
use crate::database::{
1512
key_value_database::{KeySpace, KeyValueDatabase},
@@ -237,5 +234,5 @@ impl<'l> From<WriteBuffer<'l>> for ValueBuffer<'l> {
237234
}
238235

239236
fn join(handle: JoinHandle<Result<()>>) -> Result<()> {
240-
block_in_place(|| Handle::current().block_on(handle))?
237+
block_in_place(|| Handle::current().block_on(handle))
241238
}

turbopack/crates/turbo-tasks-malloc/src/lib.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod counter;
33
use std::{
44
alloc::{GlobalAlloc, Layout},
55
marker::PhantomData,
6+
ops::{Add, AddAssign},
67
};
78

89
use self::counter::{add, flush, get, remove, update};
@@ -16,12 +17,45 @@ pub struct AllocationInfo {
1617
}
1718

1819
impl AllocationInfo {
20+
pub const ZERO: Self = Self {
21+
allocations: 0,
22+
deallocations: 0,
23+
allocation_count: 0,
24+
deallocation_count: 0,
25+
};
26+
1927
pub fn is_empty(&self) -> bool {
2028
self.allocations == 0
2129
&& self.deallocations == 0
2230
&& self.allocation_count == 0
2331
&& self.deallocation_count == 0
2432
}
33+
34+
pub fn memory_usage(&self) -> usize {
35+
self.allocations.saturating_sub(self.deallocations)
36+
}
37+
}
38+
39+
impl Add<Self> for AllocationInfo {
40+
type Output = Self;
41+
42+
fn add(self, other: Self) -> Self {
43+
Self {
44+
allocations: self.allocations + other.allocations,
45+
deallocations: self.deallocations + other.deallocations,
46+
allocation_count: self.allocation_count + other.allocation_count,
47+
deallocation_count: self.deallocation_count + other.deallocation_count,
48+
}
49+
}
50+
}
51+
52+
impl AddAssign<Self> for AllocationInfo {
53+
fn add_assign(&mut self, other: Self) {
54+
self.allocations += other.allocations;
55+
self.deallocations += other.deallocations;
56+
self.allocation_count += other.allocation_count;
57+
self.deallocation_count += other.deallocation_count;
58+
}
2559
}
2660

2761
#[derive(Default, Clone, Debug)]

turbopack/crates/turbo-tasks/src/capture_future.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ pin_project! {
3131
#[pin]
3232
future: F,
3333
duration: Duration,
34-
allocations: usize,
35-
deallocations: usize,
34+
allocations: AllocationInfo,
3635
}
3736
}
3837

@@ -41,8 +40,7 @@ impl<T, F: Future<Output = T>> CaptureFuture<T, F> {
4140
Self {
4241
future,
4342
duration: Duration::ZERO,
44-
allocations: 0,
45-
deallocations: 0,
43+
allocations: AllocationInfo::ZERO,
4644
}
4745
}
4846
}
@@ -77,14 +75,25 @@ pub struct TurboTasksPanic {
7775
pub location: Option<String>,
7876
}
7977

78+
impl TurboTasksPanic {
79+
pub fn into_panic(self) -> Box<dyn std::any::Any + Send> {
80+
Box::new(format!(
81+
"{} at {}",
82+
self.message,
83+
self.location
84+
.unwrap_or_else(|| "unknown location".to_string())
85+
))
86+
}
87+
}
88+
8089
impl Display for TurboTasksPanic {
8190
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8291
write!(f, "{}", self.message)
8392
}
8493
}
8594

8695
impl<T, F: Future<Output = T>> Future for CaptureFuture<T, F> {
87-
type Output = (Result<T, TurboTasksPanic>, Duration, usize);
96+
type Output = (Result<T, TurboTasksPanic>, Duration, AllocationInfo);
8897

8998
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
9099
let this = self.project();
@@ -127,17 +136,10 @@ impl<T, F: Future<Output = T>> Future for CaptureFuture<T, F> {
127136
let elapsed = start.elapsed();
128137
let allocations = start_allocations.until_now();
129138
*this.duration += elapsed + data.duration;
130-
*this.allocations += allocations.allocations + data.allocations;
131-
*this.deallocations += allocations.deallocations + data.deallocations;
139+
*this.allocations += allocations;
132140
match result {
133-
Err(err) => {
134-
let memory_usage = this.allocations.saturating_sub(*this.deallocations);
135-
Poll::Ready((Err(err), *this.duration, memory_usage))
136-
}
137-
Ok(Poll::Ready(r)) => {
138-
let memory_usage = this.allocations.saturating_sub(*this.deallocations);
139-
Poll::Ready((Ok(r), *this.duration, memory_usage))
140-
}
141+
Err(err) => Poll::Ready((Err(err), *this.duration, this.allocations.clone())),
142+
Ok(Poll::Ready(r)) => Poll::Ready((Ok(r), *this.duration, this.allocations.clone())),
141143
Ok(Poll::Pending) => Poll::Pending,
142144
}
143145
}

turbopack/crates/turbo-tasks/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ pub mod registry;
7676
pub mod scope;
7777
mod serialization_invalidation;
7878
pub mod small_duration;
79+
mod spawn;
7980
mod state;
8081
pub mod task;
8182
mod task_execution_reason;
@@ -110,8 +111,7 @@ pub use manager::{
110111
CurrentCellRef, ReadConsistency, TaskPersistence, TurboTasks, TurboTasksApi,
111112
TurboTasksBackendApi, TurboTasksBackendApiExt, TurboTasksCallApi, Unused, UpdateInfo,
112113
dynamic_call, emit, mark_finished, mark_root, mark_session_dependent, mark_stateful,
113-
prevent_gc, run_once, run_once_with_reason, spawn_blocking, spawn_thread, trait_call,
114-
turbo_tasks, turbo_tasks_scope,
114+
prevent_gc, run_once, run_once_with_reason, trait_call, turbo_tasks, turbo_tasks_scope,
115115
};
116116
pub use output::OutputContent;
117117
pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError};
@@ -120,6 +120,7 @@ pub use read_ref::ReadRef;
120120
use rustc_hash::FxHasher;
121121
pub use serialization_invalidation::SerializationInvalidator;
122122
pub use shrink_to_fit::ShrinkToFit;
123+
pub use spawn::{JoinHandle, spawn, spawn_blocking, spawn_thread};
123124
pub use state::{State, TransientState};
124125
pub use task::{SharedReference, TypedSharedReference, task_input::TaskInput};
125126
pub use task_execution_reason::TaskExecutionReason;

turbopack/crates/turbo-tasks/src/manager.rs

Lines changed: 5 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use std::{
88
Arc, Mutex, RwLock, Weak,
99
atomic::{AtomicBool, AtomicUsize, Ordering},
1010
},
11-
thread,
1211
time::{Duration, Instant},
1312
};
1413

@@ -17,10 +16,9 @@ use auto_hash_map::AutoMap;
1716
use rustc_hash::FxHasher;
1817
use serde::{Deserialize, Serialize};
1918
use smallvec::SmallVec;
20-
use tokio::{runtime::Handle, select, sync::mpsc::Receiver, task_local};
19+
use tokio::{select, sync::mpsc::Receiver, task_local};
2120
use tokio_util::task::TaskTracker;
22-
use tracing::{Instrument, Level, Span, info_span, instrument, trace_span};
23-
use turbo_tasks_malloc::TurboMalloc;
21+
use tracing::{Instrument, Level, instrument, trace_span};
2422

2523
use crate::{
2624
Completion, InvalidationReason, InvalidationReasonSet, OutputContent, ReadCellOptions,
@@ -30,7 +28,7 @@ use crate::{
3028
Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec,
3129
TransientTaskType, TurboTasksExecutionError, TypedCellContent,
3230
},
33-
capture_future::{self, CaptureFuture},
31+
capture_future::CaptureFuture,
3432
event::{Event, EventListener},
3533
id::{BackendJobId, ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId},
3634
id_factory::IdFactoryWithReuse,
@@ -718,7 +716,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
718716
};
719717

720718
async {
721-
let (result, duration, memory_usage) = CaptureFuture::new(future).await;
719+
let (result, duration, alloc_info) = CaptureFuture::new(future).await;
722720

723721
// wait for all spawned local tasks using `local` to finish
724722
let ltt = CURRENT_TASK_STATE
@@ -742,7 +740,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
742740
let schedule_again = this.backend.task_execution_completed(
743741
task_id,
744742
duration,
745-
memory_usage,
743+
alloc_info.memory_usage(),
746744
&cell_counters,
747745
stateful,
748746
has_invalidator,
@@ -1794,35 +1792,6 @@ pub fn emit<T: VcValueTrait + ?Sized>(collectible: ResolvedVc<T>) {
17941792
})
17951793
}
17961794

1797-
pub async fn spawn_blocking<T: Send + 'static>(func: impl FnOnce() -> T + Send + 'static) -> T {
1798-
let turbo_tasks = turbo_tasks();
1799-
let span = Span::current();
1800-
let (result, duration, alloc_info) = tokio::task::spawn_blocking(|| {
1801-
let _guard = span.entered();
1802-
let start = Instant::now();
1803-
let start_allocations = TurboMalloc::allocation_counters();
1804-
let r = turbo_tasks_scope(turbo_tasks, func);
1805-
(r, start.elapsed(), start_allocations.until_now())
1806-
})
1807-
.await
1808-
.unwrap();
1809-
capture_future::add_duration(duration);
1810-
capture_future::add_allocation_info(alloc_info);
1811-
result
1812-
}
1813-
1814-
pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
1815-
let handle = Handle::current();
1816-
let span = info_span!("thread").or_current();
1817-
thread::spawn(move || {
1818-
let span = span.entered();
1819-
let guard = handle.enter();
1820-
func();
1821-
drop(guard);
1822-
drop(span);
1823-
});
1824-
}
1825-
18261795
pub(crate) async fn read_task_output(
18271796
this: &dyn TurboTasksApi,
18281797
id: TaskId,
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use std::{
2+
panic::resume_unwind,
3+
pin::Pin,
4+
task::{Context, Poll},
5+
thread,
6+
time::{Duration, Instant},
7+
};
8+
9+
use anyhow::Result;
10+
use futures::{FutureExt, ready};
11+
use tokio::runtime::Handle;
12+
use tracing::{Instrument, Span, info_span};
13+
use turbo_tasks_malloc::{AllocationInfo, TurboMalloc};
14+
15+
use crate::{
16+
TurboTasksPanic,
17+
capture_future::{self, CaptureFuture},
18+
manager::turbo_tasks_future_scope,
19+
turbo_tasks, turbo_tasks_scope,
20+
};
21+
22+
pub struct JoinHandle<T> {
23+
join_handle: tokio::task::JoinHandle<(Result<T, TurboTasksPanic>, Duration, AllocationInfo)>,
24+
}
25+
26+
impl<T> Future for JoinHandle<T> {
27+
type Output = T;
28+
29+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
30+
let this = self.get_mut();
31+
match ready!(this.join_handle.poll_unpin(cx)) {
32+
Ok((res, duration, alloc_info)) => {
33+
capture_future::add_duration(duration);
34+
capture_future::add_allocation_info(alloc_info);
35+
match res {
36+
Ok(res) => Poll::Ready(res),
37+
Err(e) => resume_unwind(e.into_panic()),
38+
}
39+
}
40+
Err(e) => resume_unwind(e.into_panic()),
41+
}
42+
}
43+
}
44+
45+
/// Spawns a future as separate task and returns a JoinHandle which can be used to await the result.
46+
/// The future has access to the current TurboTasks context and runs in the same tracing span.
47+
/// Allocations and cpu time is accounted to the current turbo-tasks function.
48+
pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> JoinHandle<T> {
49+
let turbo_tasks = turbo_tasks();
50+
let span = Span::current();
51+
let join_handle = tokio::task::spawn(
52+
turbo_tasks_future_scope(turbo_tasks, CaptureFuture::new(future)).instrument(span),
53+
);
54+
JoinHandle { join_handle }
55+
}
56+
57+
/// Spawns a blocking function in a separate task using the blocking pool and returns a JoinHandle
58+
/// which can be used to await the result. The function has access to the current TurboTasks context
59+
/// and runs in the same tracing span.
60+
/// Allocations and cpu time is accounted to the current turbo-tasks function.
61+
pub fn spawn_blocking<T: Send + 'static>(
62+
func: impl FnOnce() -> T + Send + 'static,
63+
) -> JoinHandle<T> {
64+
let turbo_tasks = turbo_tasks();
65+
let span = Span::current();
66+
let join_handle = tokio::task::spawn_blocking(|| {
67+
let _guard = span.entered();
68+
let start = Instant::now();
69+
let start_allocations = TurboMalloc::allocation_counters();
70+
let r = turbo_tasks_scope(turbo_tasks, func);
71+
(Ok(r), start.elapsed(), start_allocations.until_now())
72+
});
73+
JoinHandle { join_handle }
74+
}
75+
76+
/// Spawns a thread which runs in background. It has access to the current TurboTasks context, but
77+
/// is not accounted towards the current turbo-tasks function.
78+
pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
79+
let handle = Handle::current();
80+
let span = info_span!("thread").or_current();
81+
let turbo_tasks = turbo_tasks();
82+
thread::spawn(move || {
83+
let _span = span.entered();
84+
turbo_tasks_scope(turbo_tasks, || {
85+
let _guard = handle.enter();
86+
func();
87+
})
88+
});
89+
}

0 commit comments

Comments
 (0)