Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): batch heavy write operations #1358

Merged
merged 14 commits into from
Dec 10, 2024
4 changes: 1 addition & 3 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ pub fn store_receipts(
receipts: Vec<Receipt>,
block_hash: BlockHash,
) -> Result<(), ChainError> {
for (index, receipt) in receipts.into_iter().enumerate() {
storage.add_receipt(block_hash, index as u64, receipt)?;
}
storage.add_receipts(block_hash, receipts)?;
Ok(())
}

Expand Down
10 changes: 10 additions & 0 deletions crates/storage/store/engines/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
index: Index,
) -> Result<(), StoreError>;

/// Store transaction locations in batch (one db transaction for all)
fn add_transaction_locations(
&self,
locations: Vec<(H256, BlockNumber, BlockHash, Index)>,
) -> Result<(), StoreError>;

/// Obtain transaction location (block hash and index)
fn get_transaction_location(
&self,
Expand All @@ -93,6 +99,10 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
receipt: Receipt,
) -> Result<(), StoreError>;

/// Add receipt
fn add_receipts(&self, block_hash: BlockHash, receipts: Vec<Receipt>)
-> Result<(), StoreError>;

/// Obtain receipt for a canonical block represented by the block number.
fn get_receipt(
&self,
Expand Down
27 changes: 27 additions & 0 deletions crates/storage/store/engines/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,33 @@ impl StoreEngine for Store {
Ok(self.inner().payloads.get(&payload_id).cloned())
}

fn add_receipts(
&self,
block_hash: BlockHash,
receipts: Vec<Receipt>,
) -> Result<(), StoreError> {
let mut store = self.inner();
let entry = store.receipts.entry(block_hash).or_default();
for (index, receipt) in receipts.into_iter().enumerate() {
entry.insert(index as u64, receipt);
}
Ok(())
}

fn add_transaction_locations(
&self,
locations: Vec<(H256, BlockNumber, BlockHash, Index)>,
) -> Result<(), StoreError> {
for (transaction_hash, block_number, block_hash, index) in locations {
self.inner()
.transaction_locations
.entry(transaction_hash)
.or_default()
.push((block_number, block_hash, index));
}

Ok(())
}
fn update_payload(
&self,
payload_id: u64,
Expand Down
52 changes: 52 additions & 0 deletions crates/storage/store/engines/libmdbx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ impl Store {
txn.commit().map_err(StoreError::LibmdbxError)
}

// Helper method to write into a libmdbx table in batch
fn write_batch<T: Table>(&self, key_values: Vec<(T::Key, T::Value)>) -> Result<(), StoreError> {
let txn = self
.db
.begin_readwrite()
.map_err(StoreError::LibmdbxError)?;

for (key, value) in key_values {
txn.upsert::<T>(key, value)
.map_err(StoreError::LibmdbxError)?;
}

txn.commit().map_err(StoreError::LibmdbxError)
}

// Helper method to read from a libmdbx table
fn read<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>, StoreError> {
let txn = self.db.begin_read().map_err(StoreError::LibmdbxError)?;
Expand Down Expand Up @@ -442,6 +457,43 @@ impl StoreEngine for Store {
.read::<PendingBlocks>(block_hash.into())?
.map(|b| b.to()))
}

fn add_transaction_locations(
&self,
locations: Vec<(H256, BlockNumber, BlockHash, Index)>,
) -> std::result::Result<(), StoreError> {
#[allow(clippy::type_complexity)]
let key_values: Vec<(TransactionHashRLP, Rlp<(BlockNumber, BlockHash, Index)>)> = locations
.into_iter()
.map(|(tx_hash, block_number, block_hash, index)| {
(tx_hash.into(), (block_number, block_hash, index).into())
})
.collect();

self.write_batch::<TransactionLocations>(key_values)
}

fn add_receipts(
&self,
block_hash: BlockHash,
receipts: Vec<Receipt>,
) -> std::result::Result<(), StoreError> {
let key_values = receipts
.into_iter()
.enumerate()
.map(|(index, receipt)| {
(
<(H256, u64) as Into<TupleRLP<BlockHash, Index>>>::into((
block_hash,
index as u64,
)),
<Receipt as Into<ReceiptRLP>>::into(receipt),
)
})
.collect();

self.write_batch::<Receipts>(key_values)
}
}

impl Debug for Store {
Expand Down
87 changes: 87 additions & 0 deletions crates/storage/store/engines/redb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,50 @@ impl RedBStore {
Ok(())
}

// Helper method to write into a redb table
fn write_batch<'k, 'v, 'a, K, V>(
&self,
table: TableDefinition<'a, K, V>,
key_values: Vec<(impl Borrow<K::SelfType<'k>>, impl Borrow<V::SelfType<'v>>)>,
) -> Result<(), StoreError>
where
K: Key + 'static,
V: Value + 'static,
{
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_table(table)?;
for (key, value) in key_values {
table.insert(key, value)?;
}
}
write_txn.commit()?;

Ok(())
}

// Helper method to write into a redb table
fn write_to_multi_batch<'k, 'v, 'a, K, V>(
&self,
table: MultimapTableDefinition<'a, K, V>,
key_values: Vec<(impl Borrow<K::SelfType<'k>>, impl Borrow<V::SelfType<'v>>)>,
) -> Result<(), StoreError>
where
K: Key + 'static,
V: Key + 'static,
{
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_multimap_table(table)?;
for (key, value) in key_values {
table.insert(key, value)?;
}
}
write_txn.commit()?;

Ok(())
}

// Helper method to read from a redb table
fn read<'k, 'a, K, V>(
&self,
Expand Down Expand Up @@ -547,6 +591,49 @@ impl StoreEngine for RedBStore {
.map(|b| b.value().to()))
}

fn add_receipts(
&self,
block_hash: BlockHash,
receipts: Vec<Receipt>,
) -> Result<(), StoreError> {
let key_values = receipts
.into_iter()
.enumerate()
.map(|(index, receipt)| {
(
<(H256, u64) as Into<TupleRLP<BlockHash, Index>>>::into((
block_hash,
index as u64,
)),
<Receipt as Into<ReceiptRLP>>::into(receipt),
)
})
.collect();
self.write_batch(RECEIPTS_TABLE, key_values)
}

fn add_transaction_locations(
&self,
locations: Vec<(H256, BlockNumber, BlockHash, Index)>,
) -> Result<(), StoreError> {
let key_values = locations
.into_iter()
.map(|(tx_hash, block_number, block_hash, index)| {
(
<H256 as Into<TransactionHashRLP>>::into(tx_hash),
<(u64, H256, u64) as Into<Rlp<(BlockNumber, BlockHash, Index)>>>::into((
block_number,
block_hash,
index,
)),
)
})
.collect();

self.write_to_multi_batch(TRANSACTION_LOCATIONS_TABLE, key_values)?;

Ok(())
}
fn update_payload(
&self,
payload_id: u64,
Expand Down
45 changes: 28 additions & 17 deletions crates/storage/store/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,26 @@ impl Store {
.add_transaction_location(transaction_hash, block_number, block_hash, index)
}

pub fn add_transaction_locations(
&self,
transactions: &[Transaction],
block_number: BlockNumber,
block_hash: BlockHash,
) -> Result<(), StoreError> {
let mut locations = vec![];

for (index, transaction) in transactions.iter().enumerate() {
locations.push((
transaction.compute_hash(),
block_number,
block_hash,
index as Index,
));
}

self.engine.add_transaction_locations(locations)
}

pub fn get_transaction_location(
&self,
transaction_hash: H256,
Expand Down Expand Up @@ -484,6 +504,14 @@ impl Store {
self.engine.add_receipt(block_hash, index, receipt)
}

pub fn add_receipts(
&self,
block_hash: BlockHash,
receipts: Vec<Receipt>,
) -> Result<(), StoreError> {
self.engine.add_receipts(block_hash, receipts)
}

pub fn get_receipt(
&self,
block_number: BlockNumber,
Expand All @@ -508,23 +536,6 @@ impl Store {
self.update_latest_total_difficulty(block_total_difficulty)
}

fn add_transaction_locations(
&self,
transactions: &[Transaction],
block_number: BlockNumber,
block_hash: BlockHash,
) -> Result<(), StoreError> {
for (index, transaction) in transactions.iter().enumerate() {
self.add_transaction_location(
transaction.compute_hash(),
block_number,
block_hash,
index as Index,
)?;
}
Ok(())
}

pub fn add_initial_state(&self, genesis: Genesis) -> Result<(), StoreError> {
info!("Storing initial state from genesis");

Expand Down
2 changes: 2 additions & 0 deletions crates/storage/trie/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ use crate::error::TrieError;
pub trait TrieDB {
fn get(&self, key: Vec<u8>) -> Result<Option<Vec<u8>>, TrieError>;
fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), TrieError>;
// fn put_batch(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), TrieError>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe remove this ?

fn put_batch(&self, key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), TrieError>;
}
22 changes: 20 additions & 2 deletions crates/storage/trie/db/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,29 @@ impl InMemoryTrieDB {

impl TrieDB for InMemoryTrieDB {
fn get(&self, key: Vec<u8>) -> Result<Option<Vec<u8>>, TrieError> {
Ok(self.inner.lock().unwrap().get(&key).cloned())
Ok(self
.inner
.lock()
.map_err(|_| TrieError::LockError)?
.get(&key)
.cloned())
}

fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), TrieError> {
self.inner.lock().unwrap().insert(key, value);
self.inner
.lock()
.map_err(|_| TrieError::LockError)?
.insert(key, value);
Ok(())
}

fn put_batch(&self, key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), TrieError> {
let mut db = self.inner.lock().map_err(|_| TrieError::LockError)?;

for (key, value) in key_values {
db.insert(key, value);
}

Ok(())
}
}
9 changes: 9 additions & 0 deletions crates/storage/trie/db/libmdbx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ where
.map_err(TrieError::LibmdbxError)?;
txn.commit().map_err(TrieError::LibmdbxError)
}

fn put_batch(&self, key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), TrieError> {
let txn = self.db.begin_readwrite().map_err(TrieError::LibmdbxError)?;
for (key, value) in key_values {
txn.upsert::<T>(key, value)
.map_err(TrieError::LibmdbxError)?;
}
txn.commit().map_err(TrieError::LibmdbxError)
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions crates/storage/trie/db/libmdbx_dupsort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ where
.map_err(TrieError::LibmdbxError)?;
txn.commit().map_err(TrieError::LibmdbxError)
}

fn put_batch(&self, key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), TrieError> {
let txn = self.db.begin_readwrite().map_err(TrieError::LibmdbxError)?;
for (key, value) in key_values {
txn.upsert::<T>(
(self.fixed_key.clone(), node_hash_to_fixed_size(key)),
value,
)
.map_err(TrieError::LibmdbxError)?;
}
txn.commit().map_err(TrieError::LibmdbxError)
}
}

#[cfg(test)]
Expand Down
13 changes: 13 additions & 0 deletions crates/storage/trie/db/redb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,17 @@ impl TrieDB for RedBTrie {

Ok(())
}

fn put_batch(&self, key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), crate::TrieError> {
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_table(TABLE)?;
for (key, value) in key_values {
table.insert(&*key, &*value)?;
}
}
write_txn.commit()?;

Ok(())
}
}
Loading
Loading