Skip to content

Commit

Permalink
cache read transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Oct 14, 2024
1 parent 5425b62 commit c38f4d3
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 12 deletions.
12 changes: 10 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions turbopack/crates/turbo-tasks-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ verify_serialization = []

[dependencies]
anyhow = { workspace = true }
arc-swap = { version = "1.7.1" }
async-trait = { workspace = true }
auto-hash-map = { workspace = true }
byteorder = "1.5.0"
Expand All @@ -38,6 +39,7 @@ smallvec = { workspace = true }
tokio = { workspace = true }
tokio-scoped = "0.2.0"
tracing = { workspace = true }
thread_local = { version = "1.1.8" }
turbo-prehash = { workspace = true }
turbo-tasks = { workspace = true }
turbo-tasks-hash = { workspace = true }
Expand Down
78 changes: 68 additions & 10 deletions turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod extended_key;

use std::{
cell::UnsafeCell,
collections::{hash_map::Entry, HashMap},
env,
error::Error,
Expand All @@ -13,10 +14,13 @@ use std::{
};

use anyhow::{anyhow, Context, Result};
use arc_swap::ArcSwap;
use lmdb::{
Database, DatabaseFlags, Environment, EnvironmentFlags, RoTransaction, Transaction, WriteFlags,
};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use smallvec::SmallVec;
use thread_local::ThreadLocal;
use tracing::Span;
use turbo_tasks::{backend::CachedTaskType, KeyValuePair, SessionId, TaskId};

Expand Down Expand Up @@ -56,7 +60,27 @@ fn as_u32<E: Error + Send + Sync + 'static>(result: Result<&[u8], E>) -> Result<
Ok(n)
}

struct ThreadLocalReadTransactionsContainer(UnsafeCell<SmallVec<[RoTransaction<'static>; 4]>>);

impl ThreadLocalReadTransactionsContainer {
unsafe fn pop(&self) -> Option<RoTransaction<'static>> {
let vec = unsafe { &mut *self.0.get() };
vec.pop()
}

unsafe fn push(&self, tx: RoTransaction<'static>) {
let vec = unsafe { &mut *self.0.get() };
vec.push(tx)
}
}

// Safety: It's safe to send RoTransaction between threads, but the types don't allow that.
unsafe impl Send for ThreadLocalReadTransactionsContainer {}

pub struct LmdbBackingStorage {
// Safety: `read_transactions_cache` need to be dropped before `env` since it will end the
// transactions.
read_transactions_cache: ArcSwap<ThreadLocal<ThreadLocalReadTransactionsContainer>>,
env: Environment,
infra_db: Database,
data_db: Database,
Expand Down Expand Up @@ -180,6 +204,7 @@ impl LmdbBackingStorage {
forward_task_cache_db,
reverse_task_cache_db,
fresh_db,
read_transactions_cache: ArcSwap::new(Arc::new(ThreadLocal::new())),
})
}

Expand All @@ -191,6 +216,33 @@ impl LmdbBackingStorage {
}
}

fn begin_read_transaction(&self) -> Result<RoTransaction<'_>> {
let guard = self.read_transactions_cache.load();
let container = guard
.get_or(|| ThreadLocalReadTransactionsContainer(UnsafeCell::new(Default::default())));
// Safety: Since it's a thread local it's safe to take from the container
Ok(if let Some(tx) = unsafe { container.pop() } {
tx
} else {
self.env.begin_ro_txn()?
})
}

fn end_read_transaction(&self, tx: RoTransaction<'_>) {
let guard = self.read_transactions_cache.load();
let container = guard
.get_or(|| ThreadLocalReadTransactionsContainer(UnsafeCell::new(Default::default())));
// Safety: We cast it to 'static lifetime, but it will be casted back to 'env when
// taken. It's safe since this will not outlive the environment. We need to
// be careful with Drop, but `read_transactions_cache` is before `env` in the
// LmdbBackingStorage struct, so it's fine.
let tx = unsafe { transmute::<RoTransaction<'_>, RoTransaction<'static>>(tx) };
// Safety: It's safe to put it back since it's a thread local
unsafe {
container.push(tx);
}
}

fn to_tx(&self, tx: ReadTransaction) -> ManuallyDrop<RoTransaction<'_>> {
ManuallyDrop::new(unsafe { transmute::<*const (), RoTransaction<'_>>(tx.0) })
}
Expand All @@ -208,9 +260,9 @@ impl LmdbBackingStorage {
let tx = self.to_tx(tx);
f(&tx)
} else {
let tx = self.env.begin_ro_txn()?;
let tx = self.begin_read_transaction()?;
let r = f(&tx)?;
tx.commit()?;
self.end_read_transaction(tx);
Ok(r)
}
}
Expand Down Expand Up @@ -238,10 +290,11 @@ impl BackingStorage for LmdbBackingStorage {

fn uncompleted_operations(&self) -> Vec<AnyOperation> {
fn get(this: &LmdbBackingStorage) -> Result<Vec<AnyOperation>> {
let tx = this.env.begin_ro_txn()?;
let operations = tx.get(this.infra_db, &IntKey::new(META_KEY_OPERATIONS))?;
let operations = pot::from_slice(operations)?;
Ok(operations)
this.with_tx(None, |tx| {
let operations = tx.get(this.infra_db, &IntKey::new(META_KEY_OPERATIONS))?;
let operations = pot::from_slice(operations)?;
Ok(operations)
})
}
get(self).unwrap_or_default()
}
Expand Down Expand Up @@ -416,18 +469,23 @@ impl BackingStorage for LmdbBackingStorage {
tx.commit()
.with_context(|| anyhow!("Unable to commit operations"))?;
}
{
let _span = tracing::trace_span!("swap read transactions").entered();
// This resets the thread local storage for read transactions, read transactions are
// eventually dropped, allowing DB to free up unused storage.
self.read_transactions_cache
.store(Arc::new(ThreadLocal::new()));
}
span.record("db_operation_count", op_count);
Ok(())
}

fn start_read_transaction(&self) -> Option<ReadTransaction> {
Some(Self::from_tx(self.env.begin_ro_txn().ok()?))
Some(Self::from_tx(self.begin_read_transaction().ok()?))
}

unsafe fn end_read_transaction(&self, transaction: ReadTransaction) {
ManuallyDrop::into_inner(self.to_tx(transaction))
.commit()
.unwrap();
self.end_read_transaction(ManuallyDrop::into_inner(Self::to_tx(self, transaction)));
}

unsafe fn forward_lookup_task_cache(
Expand Down

0 comments on commit c38f4d3

Please sign in to comment.