Skip to content

Commit

Permalink
feat(core): batch heavy write operations (#1358)
Browse files Browse the repository at this point in the history
**Motivation**

This PR batches the three current heaviest db write operations on the
node (`add_block`, `store_receipts`, and `commit_node`) so that a single
db transaction is used for them. This reduces the time spent flushing to
disc with `msync` (on `libmdbx`) or `fsync` (on `redb`) from around
~40-50% to ~0.35%.

Non-batched version:

![flamegraph_not_batched](https://github.com/user-attachments/assets/7d698997-d8d4-4cac-9e6d-1fb1e94824bd)

Batched version (the `msync` on this flamegraph is one of the tiny red
bars to the left):

![flamegraph_batched](https://github.com/user-attachments/assets/48ad899d-51ce-4b48-a9f3-7254567c54fe)

**Description**

<!-- A clear and concise general description of the changes this PR
introduces -->

<!-- Link to issues: Resolves #111, Resolves #222 -->
  • Loading branch information
jrchatruc authored Dec 10, 2024
1 parent a0a4b5f commit c9d0e48
Show file tree
Hide file tree
Showing 16 changed files with 300 additions and 26 deletions.
4 changes: 1 addition & 3 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ pub fn store_receipts(
receipts: Vec<Receipt>,
block_hash: BlockHash,
) -> Result<(), ChainError> {
for (index, receipt) in receipts.into_iter().enumerate() {
storage.add_receipt(block_hash, index as u64, receipt)?;
}
storage.add_receipts(block_hash, receipts)?;
Ok(())
}

Expand Down
10 changes: 10 additions & 0 deletions crates/storage/store/engines/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
index: Index,
) -> Result<(), StoreError>;

/// Store transaction locations in batch (one db transaction for all)
fn add_transaction_locations(
&self,
locations: Vec<(H256, BlockNumber, BlockHash, Index)>,
) -> Result<(), StoreError>;

/// Obtain transaction location (block hash and index)
fn get_transaction_location(
&self,
Expand All @@ -93,6 +99,10 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
receipt: Receipt,
) -> Result<(), StoreError>;

/// Add receipt
fn add_receipts(&self, block_hash: BlockHash, receipts: Vec<Receipt>)
-> Result<(), StoreError>;

/// Obtain receipt for a canonical block represented by the block number.
fn get_receipt(
&self,
Expand Down
27 changes: 27 additions & 0 deletions crates/storage/store/engines/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,33 @@ impl StoreEngine for Store {
Ok(self.inner().payloads.get(&payload_id).cloned())
}

fn add_receipts(
&self,
block_hash: BlockHash,
receipts: Vec<Receipt>,
) -> Result<(), StoreError> {
let mut store = self.inner();
let entry = store.receipts.entry(block_hash).or_default();
for (index, receipt) in receipts.into_iter().enumerate() {
entry.insert(index as u64, receipt);
}
Ok(())
}

fn add_transaction_locations(
&self,
locations: Vec<(H256, BlockNumber, BlockHash, Index)>,
) -> Result<(), StoreError> {
for (transaction_hash, block_number, block_hash, index) in locations {
self.inner()
.transaction_locations
.entry(transaction_hash)
.or_default()
.push((block_number, block_hash, index));
}

Ok(())
}
fn update_payload(
&self,
payload_id: u64,
Expand Down
52 changes: 52 additions & 0 deletions crates/storage/store/engines/libmdbx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ impl Store {
txn.commit().map_err(StoreError::LibmdbxError)
}

// Helper method to write into a libmdbx table in batch
fn write_batch<T: Table>(&self, key_values: Vec<(T::Key, T::Value)>) -> Result<(), StoreError> {
let txn = self
.db
.begin_readwrite()
.map_err(StoreError::LibmdbxError)?;

for (key, value) in key_values {
txn.upsert::<T>(key, value)
.map_err(StoreError::LibmdbxError)?;
}

txn.commit().map_err(StoreError::LibmdbxError)
}

// Helper method to read from a libmdbx table
fn read<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>, StoreError> {
let txn = self.db.begin_read().map_err(StoreError::LibmdbxError)?;
Expand Down Expand Up @@ -442,6 +457,43 @@ impl StoreEngine for Store {
.read::<PendingBlocks>(block_hash.into())?
.map(|b| b.to()))
}

fn add_transaction_locations(
&self,
locations: Vec<(H256, BlockNumber, BlockHash, Index)>,
) -> std::result::Result<(), StoreError> {
#[allow(clippy::type_complexity)]
let key_values: Vec<(TransactionHashRLP, Rlp<(BlockNumber, BlockHash, Index)>)> = locations
.into_iter()
.map(|(tx_hash, block_number, block_hash, index)| {
(tx_hash.into(), (block_number, block_hash, index).into())
})
.collect();

self.write_batch::<TransactionLocations>(key_values)
}

fn add_receipts(
&self,
block_hash: BlockHash,
receipts: Vec<Receipt>,
) -> std::result::Result<(), StoreError> {
let key_values = receipts
.into_iter()
.enumerate()
.map(|(index, receipt)| {
(
<(H256, u64) as Into<TupleRLP<BlockHash, Index>>>::into((
block_hash,
index as u64,
)),
<Receipt as Into<ReceiptRLP>>::into(receipt),
)
})
.collect();

self.write_batch::<Receipts>(key_values)
}
}

impl Debug for Store {
Expand Down
87 changes: 87 additions & 0 deletions crates/storage/store/engines/redb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,50 @@ impl RedBStore {
Ok(())
}

// Helper method to write into a redb table
fn write_batch<'k, 'v, 'a, K, V>(
&self,
table: TableDefinition<'a, K, V>,
key_values: Vec<(impl Borrow<K::SelfType<'k>>, impl Borrow<V::SelfType<'v>>)>,
) -> Result<(), StoreError>
where
K: Key + 'static,
V: Value + 'static,
{
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_table(table)?;
for (key, value) in key_values {
table.insert(key, value)?;
}
}
write_txn.commit()?;

Ok(())
}

// Helper method to write into a redb table
fn write_to_multi_batch<'k, 'v, 'a, K, V>(
&self,
table: MultimapTableDefinition<'a, K, V>,
key_values: Vec<(impl Borrow<K::SelfType<'k>>, impl Borrow<V::SelfType<'v>>)>,
) -> Result<(), StoreError>
where
K: Key + 'static,
V: Key + 'static,
{
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_multimap_table(table)?;
for (key, value) in key_values {
table.insert(key, value)?;
}
}
write_txn.commit()?;

Ok(())
}

// Helper method to read from a redb table
fn read<'k, 'a, K, V>(
&self,
Expand Down Expand Up @@ -547,6 +591,49 @@ impl StoreEngine for RedBStore {
.map(|b| b.value().to()))
}

fn add_receipts(
&self,
block_hash: BlockHash,
receipts: Vec<Receipt>,
) -> Result<(), StoreError> {
let key_values = receipts
.into_iter()
.enumerate()
.map(|(index, receipt)| {
(
<(H256, u64) as Into<TupleRLP<BlockHash, Index>>>::into((
block_hash,
index as u64,
)),
<Receipt as Into<ReceiptRLP>>::into(receipt),
)
})
.collect();
self.write_batch(RECEIPTS_TABLE, key_values)
}

fn add_transaction_locations(
&self,
locations: Vec<(H256, BlockNumber, BlockHash, Index)>,
) -> Result<(), StoreError> {
let key_values = locations
.into_iter()
.map(|(tx_hash, block_number, block_hash, index)| {
(
<H256 as Into<TransactionHashRLP>>::into(tx_hash),
<(u64, H256, u64) as Into<Rlp<(BlockNumber, BlockHash, Index)>>>::into((
block_number,
block_hash,
index,
)),
)
})
.collect();

self.write_to_multi_batch(TRANSACTION_LOCATIONS_TABLE, key_values)?;

Ok(())
}
fn update_payload(
&self,
payload_id: u64,
Expand Down
45 changes: 28 additions & 17 deletions crates/storage/store/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,26 @@ impl Store {
.add_transaction_location(transaction_hash, block_number, block_hash, index)
}

pub fn add_transaction_locations(
&self,
transactions: &[Transaction],
block_number: BlockNumber,
block_hash: BlockHash,
) -> Result<(), StoreError> {
let mut locations = vec![];

for (index, transaction) in transactions.iter().enumerate() {
locations.push((
transaction.compute_hash(),
block_number,
block_hash,
index as Index,
));
}

self.engine.add_transaction_locations(locations)
}

pub fn get_transaction_location(
&self,
transaction_hash: H256,
Expand Down Expand Up @@ -484,6 +504,14 @@ impl Store {
self.engine.add_receipt(block_hash, index, receipt)
}

pub fn add_receipts(
&self,
block_hash: BlockHash,
receipts: Vec<Receipt>,
) -> Result<(), StoreError> {
self.engine.add_receipts(block_hash, receipts)
}

pub fn get_receipt(
&self,
block_number: BlockNumber,
Expand All @@ -508,23 +536,6 @@ impl Store {
self.update_latest_total_difficulty(block_total_difficulty)
}

fn add_transaction_locations(
&self,
transactions: &[Transaction],
block_number: BlockNumber,
block_hash: BlockHash,
) -> Result<(), StoreError> {
for (index, transaction) in transactions.iter().enumerate() {
self.add_transaction_location(
transaction.compute_hash(),
block_number,
block_hash,
index as Index,
)?;
}
Ok(())
}

pub fn add_initial_state(&self, genesis: Genesis) -> Result<(), StoreError> {
info!("Storing initial state from genesis");

Expand Down
2 changes: 2 additions & 0 deletions crates/storage/trie/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ use crate::error::TrieError;
pub trait TrieDB {
fn get(&self, key: Vec<u8>) -> Result<Option<Vec<u8>>, TrieError>;
fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), TrieError>;
// fn put_batch(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), TrieError>;
fn put_batch(&self, key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), TrieError>;
}
22 changes: 20 additions & 2 deletions crates/storage/trie/db/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,29 @@ impl InMemoryTrieDB {

impl TrieDB for InMemoryTrieDB {
fn get(&self, key: Vec<u8>) -> Result<Option<Vec<u8>>, TrieError> {
Ok(self.inner.lock().unwrap().get(&key).cloned())
Ok(self
.inner
.lock()
.map_err(|_| TrieError::LockError)?
.get(&key)
.cloned())
}

fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), TrieError> {
self.inner.lock().unwrap().insert(key, value);
self.inner
.lock()
.map_err(|_| TrieError::LockError)?
.insert(key, value);
Ok(())
}

fn put_batch(&self, key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), TrieError> {
let mut db = self.inner.lock().map_err(|_| TrieError::LockError)?;

for (key, value) in key_values {
db.insert(key, value);
}

Ok(())
}
}
9 changes: 9 additions & 0 deletions crates/storage/trie/db/libmdbx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ where
.map_err(TrieError::LibmdbxError)?;
txn.commit().map_err(TrieError::LibmdbxError)
}

fn put_batch(&self, key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), TrieError> {
let txn = self.db.begin_readwrite().map_err(TrieError::LibmdbxError)?;
for (key, value) in key_values {
txn.upsert::<T>(key, value)
.map_err(TrieError::LibmdbxError)?;
}
txn.commit().map_err(TrieError::LibmdbxError)
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions crates/storage/trie/db/libmdbx_dupsort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ where
.map_err(TrieError::LibmdbxError)?;
txn.commit().map_err(TrieError::LibmdbxError)
}

fn put_batch(&self, key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), TrieError> {
let txn = self.db.begin_readwrite().map_err(TrieError::LibmdbxError)?;
for (key, value) in key_values {
txn.upsert::<T>(
(self.fixed_key.clone(), node_hash_to_fixed_size(key)),
value,
)
.map_err(TrieError::LibmdbxError)?;
}
txn.commit().map_err(TrieError::LibmdbxError)
}
}

#[cfg(test)]
Expand Down
13 changes: 13 additions & 0 deletions crates/storage/trie/db/redb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,17 @@ impl TrieDB for RedBTrie {

Ok(())
}

fn put_batch(&self, key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), crate::TrieError> {
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_table(TABLE)?;
for (key, value) in key_values {
table.insert(&*key, &*value)?;
}
}
write_txn.commit()?;

Ok(())
}
}
Loading

0 comments on commit c9d0e48

Please sign in to comment.