diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index a8dde9ee6f435..1110be4bf3781 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -1,5 +1,6 @@ pub mod indexed; mod operation; +mod persisted_storage_log; mod storage; use std::{ @@ -43,6 +44,7 @@ use crate::{ AggregationUpdateQueue, CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskDirtyCause, TaskGuard, }, + persisted_storage_log::PersistedStorageLog, storage::{get, get_many, get_mut, iter_many, remove, Storage}, }, backing_storage::BackingStorage, @@ -133,7 +135,6 @@ impl Default for BackendOptions { pub struct TurboTasksBackend(Arc>); type TaskCacheLog = Sharded, TaskId)>>; -type StorageLog = Sharded>; struct TurboTasksBackendInner { options: BackendOptions, @@ -148,8 +149,8 @@ struct TurboTasksBackendInner { task_cache: BiMap, TaskId>, transient_tasks: DashMap, BuildHasherDefault>, - persisted_storage_data_log: Option, - persisted_storage_meta_log: Option, + persisted_storage_data_log: Option, + persisted_storage_meta_log: Option, storage: Storage, /// Number of executing operations + Highest bit is set when snapshot is @@ -207,8 +208,8 @@ impl TurboTasksBackendInner { persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)), task_cache: BiMap::new(), transient_tasks: DashMap::default(), - persisted_storage_data_log: need_log.then(|| Sharded::new(shard_amount)), - persisted_storage_meta_log: need_log.then(|| Sharded::new(shard_amount)), + persisted_storage_data_log: need_log.then(|| PersistedStorageLog::new(shard_amount)), + persisted_storage_meta_log: need_log.then(|| PersistedStorageLog::new(shard_amount)), storage: Storage::new(), in_progress_operations: AtomicUsize::new(0), snapshot_request: Mutex::new(SnapshotRequest::new()), @@ -312,10 +313,7 @@ impl TurboTasksBackendInner { } } - fn persisted_storage_log( - &self, - category: TaskDataCategory, - ) -> Option<&Sharded>> { + fn persisted_storage_log(&self, category: TaskDataCategory) -> Option<&PersistedStorageLog> { match category { TaskDataCategory::Data => &self.persisted_storage_data_log, TaskDataCategory::Meta => &self.persisted_storage_meta_log, @@ -696,12 +694,16 @@ impl TurboTasksBackendInner { .map(|op| op.arc().clone()) .collect::>(); drop(snapshot_request); - fn take_from_log(log: &Option>) -> Vec { + fn take_from_log(log: &Option) -> Vec> { log.as_ref().map(|l| l.take()).unwrap_or_default() } let persisted_storage_meta_log = take_from_log(&self.persisted_storage_meta_log); let persisted_storage_data_log = take_from_log(&self.persisted_storage_data_log); - let persisted_task_cache_log = take_from_log(&self.persisted_task_cache_log); + let persisted_task_cache_log = self + .persisted_task_cache_log + .as_ref() + .map(|l| l.take(|i| i)) + .unwrap_or_default(); let mut snapshot_request = self.snapshot_request.lock(); snapshot_request.snapshot_requested = false; self.in_progress_operations @@ -741,7 +743,7 @@ impl TurboTasksBackendInner { persisted_storage_meta_log, persisted_storage_data_log, ) { - println!("Persisting failed: {}", err); + println!("Persisting failed: {:?}", err); return None; } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 39f399877e4e9..cf9f7c9cc83e5 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -21,10 +21,7 @@ use crate::{ TurboTasksBackend, TurboTasksBackendInner, }, backing_storage::BackingStorage, - data::{ - CachedDataItem, CachedDataItemIndex, CachedDataItemKey, CachedDataItemValue, - CachedDataUpdate, - }, + data::{CachedDataItem, CachedDataItemIndex, CachedDataItemKey, CachedDataItemValue}, }; pub trait Operation: @@ -419,13 +416,7 @@ impl TaskGuard for TaskGuardImpl<'_, B> { self.backend .persisted_storage_log(key.category()) .unwrap() - .lock(self.task_id) - .push(CachedDataUpdate { - key, - task: self.task_id, - value: Some(value), - old_value: None, - }); + .push(self.task_id, key, None, Some(value)); true } else { false @@ -451,15 +442,13 @@ impl TaskGuard for TaskGuardImpl<'_, B> { self.backend .persisted_storage_log(key.category()) .unwrap() - .lock(self.task_id) - .push(CachedDataUpdate { + .push( + self.task_id, key, - task: self.task_id, - value: Some(value), - old_value: old - .as_ref() + old.as_ref() .and_then(|old| old.is_persistent().then(|| old.clone())), - }); + Some(value), + ); old } else { let item = CachedDataItem::from_key_and_value(key.clone(), value); @@ -469,13 +458,7 @@ impl TaskGuard for TaskGuardImpl<'_, B> { self.backend .persisted_storage_log(key.category()) .unwrap() - .lock(self.task_id) - .push(CachedDataUpdate { - key, - task: self.task_id, - value: None, - old_value: Some(old.clone()), - }); + .push(self.task_id, key, Some(old.clone()), None); } Some(old) } else { @@ -510,29 +493,21 @@ impl TaskGuard for TaskGuardImpl<'_, B> { (None, false) => {} (Some(old_value), false) => { add_persisting_item = true; - backend - .persisted_storage_log(key.category()) - .unwrap() - .lock(*task_id) - .push(CachedDataUpdate { - key: key.clone(), - task: *task_id, - value: None, - old_value: Some(old_value), - }); + backend.persisted_storage_log(key.category()).unwrap().push( + *task_id, + key.clone(), + Some(old_value), + None, + ); } (old_value, true) => { add_persisting_item = true; - backend - .persisted_storage_log(key.category()) - .unwrap() - .lock(*task_id) - .push(CachedDataUpdate { - key: key.clone(), - task: *task_id, - value: new.clone(), - old_value, - }); + backend.persisted_storage_log(key.category()).unwrap().push( + *task_id, + key.clone(), + old_value, + new.clone(), + ); } } @@ -556,13 +531,12 @@ impl TaskGuard for TaskGuardImpl<'_, B> { self.backend .persisted_storage_log(key.category()) .unwrap() - .lock(self.task_id) - .push(CachedDataUpdate { + .push( + self.task_id, key, - task: self.task_id, - value: None, - old_value: value.is_persistent().then(|| value.clone()), - }); + value.is_persistent().then(|| value.clone()), + None, + ); } Some(value) } else { @@ -615,13 +589,7 @@ impl TaskGuard for TaskGuardImpl<'_, B> { self.backend .persisted_storage_log(key.category()) .unwrap() - .lock(self.task_id) - .push(CachedDataUpdate { - key, - task: self.task_id, - value: None, - old_value: Some(value), - }); + .push(self.task_id, key, Some(value), None); } })) } @@ -640,13 +608,7 @@ impl TaskGuard for TaskGuardImpl<'_, B> { self.backend .persisted_storage_log(key.category()) .unwrap() - .lock(self.task_id) - .push(CachedDataUpdate { - key, - task: self.task_id, - value: None, - old_value: Some(value), - }); + .push(self.task_id, key, Some(value), None); } })) } @@ -661,24 +623,18 @@ impl TaskGuard for TaskGuardImpl<'_, B> { .filter_map(|(key, value)| match (key, value) { (CachedDataItemKey::CellData { cell }, CachedDataItemValue::CellData { value }) => { count += 1; - Some(CachedDataUpdate { - task: self.task_id, - key: CachedDataItemKey::CellData { cell: *cell }, - value: Some(CachedDataItemValue::CellData { - value: value.clone(), - }), - old_value: None, + Some(CachedDataItem::CellData { + cell: *cell, + value: value.clone(), }) } _ => None, }); { - let mut guard = self - .backend + self.backend .persisted_storage_log(TaskDataCategory::Data) .unwrap() - .lock(self.task_id); - guard.extend(cell_data); + .push_batch_insert(self.task_id, cell_data); self.task .persistance_state_mut() .add_persisting_items(count); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/persisted_storage_log.rs b/turbopack/crates/turbo-tasks-backend/src/backend/persisted_storage_log.rs new file mode 100644 index 0000000000000..7754e75865d18 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/backend/persisted_storage_log.rs @@ -0,0 +1,78 @@ +use turbo_tasks::{KeyValuePair, TaskId}; + +use crate::{ + data::{CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate}, + utils::{chunked_vec::ChunkedVec, sharded::Sharded}, +}; + +#[derive(Default)] +struct ShardData { + last_task: Option, + data: ChunkedVec, +} + +impl ShardData { + fn set_task(&mut self, task: TaskId) { + if self.last_task != Some(task) { + self.data.push(CachedDataUpdate::Task { task }); + self.last_task = Some(task); + } + } +} + +pub struct PersistedStorageLog { + data: Sharded, +} + +impl PersistedStorageLog { + pub fn new(shard_amount: usize) -> Self { + Self { + data: Sharded::new(shard_amount), + } + } + + pub fn push( + &self, + task: TaskId, + key: CachedDataItemKey, + old_value: Option, + new_value: Option, + ) { + let mut guard = self.data.lock(task); + guard.set_task(task); + match (old_value, new_value) { + (None, None) => {} + (None, Some(new_value)) => guard.data.push(CachedDataUpdate::New { + item: CachedDataItem::from_key_and_value(key, new_value), + }), + (Some(old_value), None) => guard.data.push(CachedDataUpdate::Removed { + old_item: CachedDataItem::from_key_and_value(key, old_value), + }), + (Some(old_value), Some(new_value)) => { + guard.data.push(CachedDataUpdate::Replace1 { + old_item: CachedDataItem::from_key_and_value(key, old_value), + }); + guard + .data + .push(CachedDataUpdate::Replace2 { value: new_value }); + } + } + } + + pub fn push_batch_insert( + &self, + task: TaskId, + updates: impl IntoIterator, + ) { + let updates = updates + .into_iter() + .map(|item| CachedDataUpdate::New { item }); + let mut guard = self.data.lock(task); + guard.set_task(task); + guard.data.extend(updates); + } + + pub fn take(&self) -> Vec> { + self.data.take(|shard| shard.data) + } +} diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 36bf68ec8fcd6..86f0ad6d9ae50 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -475,10 +475,6 @@ impl CachedDataItem { } } - pub fn is_optional(&self) -> bool { - matches!(self, CachedDataItem::CellData { .. }) - } - pub fn new_scheduled(description: impl Fn() -> String + Sync + Send + 'static) -> Self { CachedDataItem::InProgress { value: InProgressState::Scheduled { @@ -541,6 +537,10 @@ impl CachedDataItemKey { } } + pub fn is_optional(&self) -> bool { + matches!(self, CachedDataItemKey::CellData { .. }) + } + pub fn category(&self) -> TaskDataCategory { match self { CachedDataItemKey::Collectible { .. } @@ -693,10 +693,15 @@ impl CachedDataItemValue { } #[derive(Debug)] -pub struct CachedDataUpdate { - pub task: TaskId, - // TODO generate CachedDataItemUpdate to avoid repeating the variant field 3 times - pub key: CachedDataItemKey, - pub value: Option, - pub old_value: Option, +pub enum CachedDataUpdate { + /// Sets the current task id. + Task { task: TaskId }, + /// An item was added. There was no old value. + New { item: CachedDataItem }, + /// An item was removed. + Removed { old_item: CachedDataItem }, + /// An item was replaced. This is step 1 and tells about the key and the old value + Replace1 { old_item: CachedDataItem }, + /// An item was replaced. This is step 2 and tells about the new value. + Replace2 { value: CachedDataItemValue }, } diff --git a/turbopack/crates/turbo-tasks-backend/src/database/fresh_db_optimization.rs b/turbopack/crates/turbo-tasks-backend/src/database/fresh_db_optimization.rs index 5196882963fa0..6e6b979dac1f4 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/fresh_db_optimization.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/fresh_db_optimization.rs @@ -42,6 +42,10 @@ impl KeyValueDatabase for FreshDbOptimization { T::lower_read_transaction(tx) } + fn is_empty(&self) -> bool { + self.fresh_db.load(Ordering::Acquire) || self.database.is_empty() + } + fn begin_read_transaction(&self) -> Result> { self.database.begin_read_transaction() } diff --git a/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs b/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs index 901ac09080afc..eb300c00624e4 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs @@ -24,6 +24,10 @@ pub trait KeyValueDatabase { fn begin_read_transaction(&self) -> Result>; + fn is_empty(&self) -> bool { + false + } + type ValueBuffer<'l>: std::borrow::Borrow<[u8]> where Self: 'l; diff --git a/turbopack/crates/turbo-tasks-backend/src/database/read_transaction_cache.rs b/turbopack/crates/turbo-tasks-backend/src/database/read_transaction_cache.rs index 7077a56ab45fc..c4ef64113de16 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/read_transaction_cache.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/read_transaction_cache.rs @@ -59,6 +59,10 @@ impl KeyValueDatabase for ReadTransactionCache unsafe { transmute::<&'r Self::ReadTransaction<'l>, &'r Self::ReadTransaction<'i>>(tx) } } + fn is_empty(&self) -> bool { + self.database.is_empty() + } + fn begin_read_transaction(&self) -> Result> { let guard = self.read_transactions_cache.load(); let container = guard diff --git a/turbopack/crates/turbo-tasks-backend/src/database/startup_cache.rs b/turbopack/crates/turbo-tasks-backend/src/database/startup_cache.rs index 44f6b453ef2df..62ba5a6af584d 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/startup_cache.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/startup_cache.rs @@ -110,6 +110,10 @@ impl KeyValueDatabase for StartupCacheLayer { T::lower_read_transaction(tx) } + fn is_empty(&self) -> bool { + self.database.is_empty() + } + fn begin_read_transaction(&self) -> Result> { self.database.begin_read_transaction() } diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index a2656694fd5c9..009fdf5e2dc11 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -6,8 +6,9 @@ use std::{ }; use anyhow::{anyhow, Context, Result}; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use rustc_hash::FxHashMap; +use serde::{ser::SerializeSeq, Serialize}; use tracing::Span; use turbo_tasks::{backend::CachedTaskType, turbo_tasks_scope, KeyValuePair, SessionId, TaskId}; @@ -178,6 +179,7 @@ impl BackingStorage .entered(); let result = task_cache_updates .into_par_iter() + .with_max_len(1) .map(|updates| { let mut max_task_id = 0; @@ -363,6 +365,11 @@ impl BackingStorage let id = TaskId::from(u32::from_le_bytes(bytes)); Ok(Some(id)) } + if self.database.is_empty() { + // Checking if the database is empty is a performance optimization + // to avoid serializing the task type. + return None; + } let id = self .with_tx(tx, |tx| lookup(&self.database, tx, task_type)) .inspect_err(|err| println!("Looking up task id for {task_type:?} failed: {err:?}")) @@ -517,6 +524,8 @@ fn serialize_task_type( } type SerializedTasks = Vec)>>; +type TaskUpdates = + FxHashMap, Option)>; fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync>( database: &(impl KeyValueDatabase + Sync), @@ -529,19 +538,12 @@ fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync>( let handle = tokio::runtime::Handle::current(); updates .into_par_iter() + .with_max_len(1) .map(|updates| { let _span = span.clone().entered(); let _guard = handle.clone().enter(); turbo_tasks_scope(turbo_tasks.clone(), || { - type TaskUpdates = FxHashMap< - TaskId, - FxHashMap< - CachedDataItemKey, - (Option, Option), - >, - >; - - let mut task_updates: TaskUpdates = + let mut task_updates: FxHashMap = FxHashMap::with_capacity_and_hasher(updates.len(), Default::default()); { @@ -552,21 +554,81 @@ fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync>( ) .entered(); + // The store the last task data and the last value as pointers to avoid looking + // them up in the map again. Everytime we modify the map the pointers are + // updated, so we never have a dangling pointer. + let mut current_task_data: Option<*mut TaskUpdates> = None; + let mut last_value: Option<*mut ( + Option, + Option, + )> = None; + // Organize the updates by task - for CachedDataUpdate { - task, - key, - value, - old_value, - } in updates.into_iter() - { - let data = task_updates.entry(task).or_default(); - match data.entry(key) { - Entry::Occupied(mut entry) => { - entry.get_mut().1 = value; + for update in updates.into_iter() { + match update { + CachedDataUpdate::Task { task } => { + current_task_data = Some(task_updates.entry(task).or_default()) + } + CachedDataUpdate::New { item } => { + let data = current_task_data + .expect("Task update must be before data updates"); + // Safety: task_updates are not modified while we hold this pointer. + // We update the pointer every time we update the map. + let data = unsafe { &mut *data }; + let (key, new_value) = item.into_key_and_value(); + match data.entry(key) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + entry.1 = Some(new_value); + last_value = Some(entry); + } + Entry::Vacant(entry) => { + last_value = Some(entry.insert((None, Some(new_value)))); + } + } } - Entry::Vacant(entry) => { - entry.insert((old_value, value)); + CachedDataUpdate::Removed { old_item } => { + let data = current_task_data + .expect("Task update must be before data updates"); + // Safety: task_updates are not modified while we hold this pointer. + // We update the pointer every time we update the map. + let data = unsafe { &mut *data }; + let (key, old_value) = old_item.into_key_and_value(); + match data.entry(key) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + entry.1 = None; + last_value = Some(entry); + } + Entry::Vacant(entry) => { + last_value = Some(entry.insert((Some(old_value), None))); + } + } + } + CachedDataUpdate::Replace1 { old_item } => { + let data = current_task_data + .expect("Task update must be before data updates"); + // Safety: task_updates are not modified while we hold this pointer. + // We update the pointer every time we update the map. + let data = unsafe { &mut *data }; + let (key, old_value) = old_item.into_key_and_value(); + match data.entry(key) { + Entry::Occupied(mut entry) => { + last_value = Some(entry.get_mut()); + } + Entry::Vacant(entry) => { + last_value = Some(entry.insert((Some(old_value), None))); + } + } + } + CachedDataUpdate::Replace2 { value: new_value } => { + let last_value = + last_value.expect("Task update must be before data updates"); + // Safety: the inner map of task_updates is not modified while we + // hold this pointer. We update the + // pointer every time we update the map. + let last_value = unsafe { &mut *last_value }; + last_value.1 = Some(new_value); } } } @@ -602,9 +664,12 @@ fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync>( let mut restored_tasks = 0; // Restore the old task data, apply the updates and serialize the new data - let mut tasks = Vec::with_capacity(task_updates.len()); - let mut map = FxHashMap::with_capacity_and_hasher(128, Default::default()); - for (task, updates) in task_updates { + let mut tasks = if batch.is_some() { + Vec::new() + } else { + Vec::with_capacity(task_updates.len()) + }; + for (task, mut updates) in task_updates { // Restore the old task data if let Some(old_data) = database.get(&tx, key_space, IntKey::new(*task).as_ref())? @@ -622,27 +687,23 @@ fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync>( anyhow!("Unable to deserialize old value of {task}: {old_data:?}") })?, }; - map.extend(old_data.into_iter().map(|item| item.into_key_and_value())); - restored_tasks += 1; - } - // Apply update - for (key, (_, value)) in updates { - if let Some(value) = value { - map.insert(key, value); - } else { - map.remove(&key); + // Reserve capacity to avoid rehashing later + updates.reserve(old_data.len()); + + // Apply the old data to the updates, so updates includes the whole data + for item in old_data.into_iter() { + let (key, value) = item.into_key_and_value(); + updates.entry(key).or_insert((None, Some(value))); } + restored_tasks += 1; } - // Get new data - let data = map - .drain() - .map(|(key, value)| CachedDataItem::from_key_and_value(key, value)) - .collect::>(); + // Remove all deletions + updates.retain(|_, (_, value)| value.is_some()); // Serialize new data - let value = serialize(task, data)?; + let value = serialize(task, &mut updates)?; if let Some(batch) = batch { batch.put( @@ -663,48 +724,91 @@ fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync>( .collect::>>() } -fn serialize(task: TaskId, mut data: Vec) -> Result> { - Ok(match POT_CONFIG.serialize(&data) { - #[cfg(not(feature = "verify_serialization"))] - Ok(value) => value, - _ => { - let mut error = Ok(()); - data.retain(|item| { - let mut buf = Vec::::new(); - let mut symbol_map = pot_ser_symbol_map(); - let mut serializer = symbol_map.serializer_for(&mut buf).unwrap(); - if let Err(err) = serde_path_to_error::serialize(item, &mut serializer) { - if item.is_optional() { - #[cfg(feature = "verify_serialization")] - println!("Skipping non-serializable optional item: {item:?}"); - } else { - error = Err(err).context({ - anyhow!("Unable to serialize data item for {task}: {item:#?}") - }); - } - false - } else { - #[cfg(feature = "verify_serialization")] - { - let deserialize: Result = - serde_path_to_error::deserialize( - &mut pot_de_symbol_list().deserializer_for_slice(&buf).unwrap(), - ); - if let Err(err) = deserialize { +fn serialize(task: TaskId, data: &mut TaskUpdates) -> Result> { + Ok( + match POT_CONFIG.serialize(&SerializeLikeVecOfCachedDataItem(data)) { + #[cfg(not(feature = "verify_serialization"))] + Ok(value) => value, + _ => { + let mut error = Ok(()); + data.retain(|key, (_, value)| { + let mut buf = Vec::::new(); + let mut symbol_map = pot_ser_symbol_map(); + let mut serializer = symbol_map.serializer_for(&mut buf).unwrap(); + if let Err(err) = serde_path_to_error::serialize( + &SerializeLikeCachedDataItem( + key, + value + .as_ref() + .expect("serialize data must not contain None values"), + ), + &mut serializer, + ) { + if key.is_optional() { + #[cfg(feature = "verify_serialization")] println!( - "Data item would not be deserializable {task}: {err:?}\n{item:#?}" + "Skipping non-serializable optional item: {key:?} = {value:?}" ); - return false; + } else { + error = Err(err).context({ + anyhow!( + "Unable to serialize data item for {task}: {key:?} = \ + {value:#?}" + ) + }); + } + false + } else { + #[cfg(feature = "verify_serialization")] + { + let deserialize: Result = + serde_path_to_error::deserialize( + &mut pot_de_symbol_list().deserializer_for_slice(&buf).unwrap(), + ); + if let Err(err) = deserialize { + println!( + "Data item would not be deserializable {task}: \ + {err:?}\n{key:?} = {value:#?}" + ); + return false; + } } + true } - true - } - }); - error?; + }); + error?; + + POT_CONFIG + .serialize(&SerializeLikeVecOfCachedDataItem(data)) + .with_context(|| { + anyhow!("Unable to serialize data items for {task}: {data:#?}") + })? + } + }, + ) +} - POT_CONFIG - .serialize(&data) - .with_context(|| anyhow!("Unable to serialize data items for {task}: {data:#?}"))? +struct SerializeLikeVecOfCachedDataItem<'l>(&'l TaskUpdates); + +impl Serialize for SerializeLikeVecOfCachedDataItem<'_> { + fn serialize(&self, serializer: S) -> Result { + let map = &self.0; + let mut seq = serializer.serialize_seq(Some(map.len()))?; + for (key, (_, value)) in map.iter() { + let value = value + .as_ref() + .expect("SerializeLikeVecOfCachedDataItem must not contain None values"); + seq.serialize_element(&SerializeLikeCachedDataItem(key, value))?; } - }) + seq.end() + } +} + +struct SerializeLikeCachedDataItem<'l>(&'l CachedDataItemKey, &'l CachedDataItemValue); + +impl Serialize for SerializeLikeCachedDataItem<'_> { + fn serialize(&self, serializer: S) -> Result { + let item = CachedDataItem::from_key_and_value(self.0.clone(), self.1.clone()); + item.serialize(serializer) + } } diff --git a/turbopack/crates/turbo-tasks-backend/src/lib.rs b/turbopack/crates/turbo-tasks-backend/src/lib.rs index f18b8106a287b..333cf6596ab4f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/lib.rs +++ b/turbopack/crates/turbo-tasks-backend/src/lib.rs @@ -1,5 +1,6 @@ #![feature(anonymous_lifetime_in_impl_trait)] #![feature(associated_type_defaults)] +#![feature(iter_collect_into)] mod backend; mod backing_storage; diff --git a/turbopack/crates/turbo-tasks-backend/src/utils/sharded.rs b/turbopack/crates/turbo-tasks-backend/src/utils/sharded.rs index 2c0d79d3b4440..93f1b628cc5b7 100644 --- a/turbopack/crates/turbo-tasks-backend/src/utils/sharded.rs +++ b/turbopack/crates/turbo-tasks-backend/src/utils/sharded.rs @@ -38,14 +38,14 @@ impl Sharded { self.data[shard as usize].lock() } - pub fn take(&self) -> Vec + pub fn take(&self, map: impl Fn(T) -> R) -> Vec where T: Default, { let locked = self.data.iter().map(|m| m.lock()).collect::>(); locked .into_iter() - .map(|mut m| std::mem::take(&mut *m)) + .map(|mut m| map(std::mem::take(&mut *m))) .collect() } }