Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] lmdb Backing Storage performance improvments #71250

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions turbopack/crates/turbo-tasks-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ verify_serialization = []

[dependencies]
anyhow = { workspace = true }
arc-swap = { version = "1.7.1" }
async-trait = { workspace = true }
auto-hash-map = { workspace = true }
byteorder = "1.5.0"
Expand All @@ -38,6 +39,7 @@ smallvec = { workspace = true }
tokio = { workspace = true }
tokio-scoped = "0.2.0"
tracing = { workspace = true }
thread_local = { version = "1.1.8" }
sokra marked this conversation as resolved.
Show resolved Hide resolved
turbo-prehash = { workspace = true }
turbo-tasks = { workspace = true }
turbo-tasks-hash = { workspace = true }
Expand Down
112 changes: 100 additions & 12 deletions turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
mod extended_key;

use std::{
cell::UnsafeCell,
collections::{hash_map::Entry, HashMap},
env,
error::Error,
fs::{create_dir_all, metadata, read_dir, remove_dir_all},
fs::{self, create_dir_all, metadata, read_dir, remove_dir_all},
mem::{transmute, ManuallyDrop},
path::Path,
sync::Arc,
Expand All @@ -13,10 +14,13 @@ use std::{
};

use anyhow::{anyhow, Context, Result};
use arc_swap::ArcSwap;
use lmdb::{
Database, DatabaseFlags, Environment, EnvironmentFlags, RoTransaction, Transaction, WriteFlags,
};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use smallvec::SmallVec;
use thread_local::ThreadLocal;
use tracing::Span;
use turbo_tasks::{backend::CachedTaskType, KeyValuePair, SessionId, TaskId};

Expand Down Expand Up @@ -56,13 +60,41 @@ fn as_u32<E: Error + Send + Sync + 'static>(result: Result<&[u8], E>) -> Result<
Ok(n)
}

struct ThreadLocalReadTransactionsContainer(UnsafeCell<SmallVec<[RoTransaction<'static>; 4]>>);

impl ThreadLocalReadTransactionsContainer {
unsafe fn pop(&self) -> Option<RoTransaction<'static>> {
// Safety: Access only happens via `push` and `pop`, and
// `ThreadLocalReadTransactionsContainer` is not `Sync`, so we can know we can know this
// block is the only one currently reading or writing to the cell.
let vec = unsafe { &mut *self.0.get() };
sokra marked this conversation as resolved.
Show resolved Hide resolved
vec.pop()
}

unsafe fn push(&self, tx: RoTransaction<'static>) {
// Safety: Access only happens via `push` and `pop`, and
// `ThreadLocalReadTransactionsContainer` is not `Sync`, so we can know we can know this
// block is the only one currently reading or writing to the cell.
let vec = unsafe { &mut *self.0.get() };
vec.push(tx)
}
}

// Safety: It's safe to send `RoTransaction` between threads as we construct `Environment` with
// `EnvironmentFlags::NO_TLS`, but the types don't allow that.
unsafe impl Send for ThreadLocalReadTransactionsContainer {}

pub struct LmdbBackingStorage {
// Safety: `read_transactions_cache` need to be dropped before `env` since it will end the
// transactions.
read_transactions_cache: ArcSwap<ThreadLocal<ThreadLocalReadTransactionsContainer>>,
env: Environment,
infra_db: Database,
data_db: Database,
meta_db: Database,
forward_task_cache_db: Database,
reverse_task_cache_db: Database,
fresh_db: bool,
}

impl LmdbBackingStorage {
Expand Down Expand Up @@ -144,7 +176,10 @@ impl LmdbBackingStorage {
let _ = remove_dir_all(base_path);
path = base_path.join("temp");
}
create_dir_all(&path).context("Creating database directory failed")?;
let fresh_db = fs::exists(&path).map_or(false, |exists| !exists);
if fresh_db {
create_dir_all(&path).context("Creating database directory failed")?;
}

#[cfg(target_arch = "x86")]
const MAP_SIZE: usize = usize::MAX;
Expand Down Expand Up @@ -175,6 +210,8 @@ impl LmdbBackingStorage {
meta_db,
forward_task_cache_db,
reverse_task_cache_db,
fresh_db,
read_transactions_cache: ArcSwap::new(Arc::new(ThreadLocal::new())),
})
}

Expand All @@ -186,6 +223,33 @@ impl LmdbBackingStorage {
}
}

fn begin_read_transaction(&self) -> Result<RoTransaction<'_>> {
let guard = self.read_transactions_cache.load();
let container = guard
.get_or(|| ThreadLocalReadTransactionsContainer(UnsafeCell::new(Default::default())));
// Safety: Since it's a thread local it's safe to take from the container
Ok(if let Some(tx) = unsafe { container.pop() } {
tx
} else {
self.env.begin_ro_txn()?
})
}

fn end_read_transaction(&self, tx: RoTransaction<'_>) {
let guard = self.read_transactions_cache.load();
let container = guard
.get_or(|| ThreadLocalReadTransactionsContainer(UnsafeCell::new(Default::default())));
// Safety: We cast it to 'static lifetime, but it will be casted back to 'env when
// taken. It's safe since this will not outlive the environment. We need to
// be careful with Drop, but `read_transactions_cache` is before `env` in the
// LmdbBackingStorage struct, so it's fine.
let tx = unsafe { transmute::<RoTransaction<'_>, RoTransaction<'static>>(tx) };
// Safety: It's safe to put it back since it's a thread local
unsafe {
container.push(tx);
}
}

fn to_tx(&self, tx: ReadTransaction) -> ManuallyDrop<RoTransaction<'_>> {
ManuallyDrop::new(unsafe { transmute::<*const (), RoTransaction<'_>>(tx.0) })
}
Expand All @@ -203,9 +267,9 @@ impl LmdbBackingStorage {
let tx = self.to_tx(tx);
f(&tx)
} else {
let tx = self.env.begin_ro_txn()?;
let tx = self.begin_read_transaction()?;
let r = f(&tx)?;
tx.commit()?;
self.end_read_transaction(tx);
Ok(r)
}
}
Expand Down Expand Up @@ -233,10 +297,11 @@ impl BackingStorage for LmdbBackingStorage {

fn uncompleted_operations(&self) -> Vec<AnyOperation> {
fn get(this: &LmdbBackingStorage) -> Result<Vec<AnyOperation>> {
let tx = this.env.begin_ro_txn()?;
let operations = tx.get(this.infra_db, &IntKey::new(META_KEY_OPERATIONS))?;
let operations = pot::from_slice(operations)?;
Ok(operations)
this.with_tx(None, |tx| {
let operations = tx.get(this.infra_db, &IntKey::new(META_KEY_OPERATIONS))?;
let operations = pot::from_slice(operations)?;
Ok(operations)
})
}
get(self).unwrap_or_default()
}
Expand Down Expand Up @@ -411,25 +476,36 @@ impl BackingStorage for LmdbBackingStorage {
tx.commit()
.with_context(|| anyhow!("Unable to commit operations"))?;
}
{
let _span = tracing::trace_span!("swap read transactions").entered();
// This resets the thread local storage for read transactions, read transactions are
// eventually dropped, allowing DB to free up unused storage.
self.read_transactions_cache
.store(Arc::new(ThreadLocal::new()));
}
span.record("db_operation_count", op_count);
Ok(())
}

fn start_read_transaction(&self) -> Option<ReadTransaction> {
Some(Self::from_tx(self.env.begin_ro_txn().ok()?))
Some(Self::from_tx(self.begin_read_transaction().ok()?))
}

unsafe fn end_read_transaction(&self, transaction: ReadTransaction) {
ManuallyDrop::into_inner(self.to_tx(transaction))
.commit()
.unwrap();
self.end_read_transaction(ManuallyDrop::into_inner(Self::to_tx(self, transaction)));
}

unsafe fn forward_lookup_task_cache(
&self,
tx: Option<ReadTransaction>,
task_type: &CachedTaskType,
) -> Option<TaskId> {
// Performance optimization when the database was empty
// It's assumed that no cache entries are removed from the memory cache, but we might change
// that in future.
sokra marked this conversation as resolved.
Show resolved Hide resolved
if self.fresh_db {
return None;
}
fn lookup(
this: &LmdbBackingStorage,
tx: &RoTransaction<'_>,
Expand Down Expand Up @@ -462,6 +538,12 @@ impl BackingStorage for LmdbBackingStorage {
tx: Option<ReadTransaction>,
task_id: TaskId,
) -> Option<Arc<CachedTaskType>> {
// Performance optimization when the database was empty
// It's assumed that no cache entries are removed from the memory cache, but we might change
// that in future.
if self.fresh_db {
return None;
}
fn lookup(
this: &LmdbBackingStorage,
tx: &RoTransaction<'_>,
Expand Down Expand Up @@ -492,6 +574,12 @@ impl BackingStorage for LmdbBackingStorage {
task_id: TaskId,
category: TaskDataCategory,
) -> Vec<CachedDataItem> {
// Performance optimization when the database was empty
// It's assumed that no cache entries are removed from the memory cache, but we might change
// that in future.
if self.fresh_db {
return Vec::new();
}
fn lookup(
this: &LmdbBackingStorage,
tx: &RoTransaction<'_>,
Expand Down
Loading