diff --git a/Cargo.lock b/Cargo.lock index 5b15ec9ff36ed..6888c6f241d4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -931,9 +931,9 @@ checksum = "17febce684fd15d89027105661fec94afb475cb995fbc59d2865198446ba2eea" [[package]] name = "byteorder" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" @@ -8603,6 +8603,7 @@ dependencies = [ "async-trait", "auto-hash-map", "bincode", + "byteorder", "dashmap", "indexmap 1.9.3", "lmdb", diff --git a/turbopack/crates/turbo-tasks-backend/Cargo.toml b/turbopack/crates/turbo-tasks-backend/Cargo.toml index 30a13b255d9a6..c4a7a0c5c7754 100644 --- a/turbopack/crates/turbo-tasks-backend/Cargo.toml +++ b/turbopack/crates/turbo-tasks-backend/Cargo.toml @@ -17,6 +17,7 @@ anyhow = { workspace = true } async-trait = { workspace = true } auto-hash-map = { workspace = true } bincode = "1.3.3" +byteorder = "1.5.0" dashmap = { workspace = true } indexmap = { workspace = true } lmdb = "0.8.0" diff --git a/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs index eb06ff4d74940..e185f43f307fa 100644 --- a/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs @@ -1,3 +1,5 @@ +mod extended_key; + use std::{ collections::{hash_map::Entry, HashMap}, error::Error, @@ -118,7 +120,8 @@ impl BackingStorage for LmdbBackingStorage { let task_id = **task_id; let task_type_bytes = bincode::serialize(&task_type) .with_context(|| anyhow!("Unable to serialize task cache key {task_type:?}"))?; - tx.put( + extended_key::put( + &mut tx, self.forward_task_cache_db, &task_type_bytes, &task_id.to_be_bytes(), @@ -204,8 +207,7 @@ impl BackingStorage for LmdbBackingStorage { fn forward_lookup_task_cache(&self, task_type: &CachedTaskType) -> Option { let tx = self.env.begin_ro_txn().ok()?; let task_type = bincode::serialize(task_type).ok()?; - let result = tx - .get(self.forward_task_cache_db, &task_type) + let result = extended_key::get(&tx, self.forward_task_cache_db, &task_type) .ok() .and_then(|v| v.try_into().ok()) .map(|v| TaskId::from(u32::from_be_bytes(v))); diff --git a/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage/extended_key.rs b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage/extended_key.rs new file mode 100644 index 0000000000000..3e87669c2563c --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage/extended_key.rs @@ -0,0 +1,104 @@ +use std::hash::{Hash, Hasher}; + +use byteorder::ByteOrder; +use lmdb::{Database, RoTransaction, RwTransaction, Transaction, WriteFlags}; +use rustc_hash::FxHasher; + +const MAX_KEY_SIZE: usize = 511; +const SHARED_KEY: usize = MAX_KEY_SIZE - 8; + +pub fn get<'tx>( + tx: &'tx RoTransaction<'tx>, + database: Database, + key: &[u8], +) -> lmdb::Result<&'tx [u8]> { + if key.len() > MAX_KEY_SIZE - 1 { + let hashed_key = hashed_key(key); + let data = tx.get(database, &hashed_key)?; + let mut iter = ExtendedValueIter::new(data); + while let Some((k, v)) = iter.next() { + if k == key { + return Ok(v); + } + } + Err(lmdb::Error::NotFound) + } else { + tx.get(database, &key) + } +} + +pub fn put( + tx: &mut RwTransaction<'_>, + database: Database, + key: &[u8], + value: &[u8], + flags: WriteFlags, +) -> lmdb::Result<()> { + if key.len() > MAX_KEY_SIZE - 1 { + let hashed_key = hashed_key(key); + + let size = key.len() - SHARED_KEY + value.len() + 8; + let old = tx.get(database, &hashed_key); + let old_size = old.map_or(0, |v| v.len()); + let mut data = Vec::with_capacity(old_size + size); + data.extend_from_slice(&((key.len() - SHARED_KEY) as u32).to_be_bytes()); + data.extend_from_slice(&(value.len() as u32).to_be_bytes()); + data.extend_from_slice(&key[SHARED_KEY..]); + data.extend_from_slice(value); + if let Ok(old) = old { + let mut iter = ExtendedValueIter::new(old); + while let Some((k, v)) = iter.next() { + if k != &key[SHARED_KEY..] { + data.extend_from_slice(&(k.len() as u32).to_be_bytes()); + data.extend_from_slice(&(v.len() as u32).to_be_bytes()); + data.extend_from_slice(k); + data.extend_from_slice(v); + } + } + }; + + tx.put(database, &hashed_key, &data, flags)?; + Ok(()) + } else { + tx.put(database, &key, &value, flags) + } +} + +fn hashed_key(key: &[u8]) -> [u8; MAX_KEY_SIZE] { + let mut result = [0; MAX_KEY_SIZE]; + let mut hash = FxHasher::default(); + key.hash(&mut hash); + byteorder::BigEndian::write_u64(&mut result, hash.finish()); + result[8..].copy_from_slice(&key[0..SHARED_KEY]); + result +} + +struct ExtendedValueIter<'a> { + data: &'a [u8], + pos: usize, +} + +impl<'a> Iterator for ExtendedValueIter<'a> { + type Item = (&'a [u8], &'a [u8]); + + fn next(&mut self) -> Option { + if self.pos >= self.data.len() { + return None; + } + let key_len = byteorder::BigEndian::read_u32(&self.data[self.pos..]) as usize; + self.pos += 4; + let value_len = byteorder::BigEndian::read_u32(&self.data[self.pos..]) as usize; + self.pos += 4; + let key = &self.data[self.pos..self.pos + key_len]; + self.pos += key_len; + let value = &self.data[self.pos..self.pos + value_len]; + self.pos += value_len; + Some((key, value)) + } +} + +impl<'a> ExtendedValueIter<'a> { + fn new(data: &'a [u8]) -> Self { + Self { data, pos: 0 } + } +}