From 6e5dbebcf936c27c18cbb59e8b4457b78aba7601 Mon Sep 17 00:00:00 2001 From: Bo Wu Date: Wed, 11 Dec 2024 13:12:50 -0800 Subject: [PATCH] save progress --- .../src/transaction_committer.rs | 35 +++++++++++++++++-- execution/executor/src/block_executor/mod.rs | 6 ++-- .../storage/commit_experiments/Cargo.toml | 0 3 files changed, 37 insertions(+), 4 deletions(-) create mode 100644 experimental/storage/commit_experiments/Cargo.toml diff --git a/execution/executor-benchmark/src/transaction_committer.rs b/execution/executor-benchmark/src/transaction_committer.rs index d0311e27a457d..9e1a9fe425656 100644 --- a/execution/executor-benchmark/src/transaction_committer.rs +++ b/execution/executor-benchmark/src/transaction_committer.rs @@ -25,6 +25,8 @@ use std::{ time::{Duration, Instant}, }; +use aptos_db::utils::ShardedStateKvSchemaBatch; + pub(crate) fn gen_li_with_sigs( block_id: HashValue, root_hash: HashValue, @@ -47,10 +49,18 @@ pub(crate) fn gen_li_with_sigs( ) } +// TODO (bowu) +pub struct CommitBatches { + state_kv_metadata_batch: SchemaBatch, + sharded_state_kv_batches: ShardedStateKvSchemaBatch, +} + pub struct TransactionCommitter { executor: Arc>, start_version: Version, block_receiver: mpsc::Receiver, + batch_sender: mpsc::Sender, + batch_receiver: mpsc::Receiver, } impl TransactionCommitter @@ -62,16 +72,38 @@ where start_version: Version, block_receiver: mpsc::Receiver, ) -> Self { + // spawn a new thread in backgrond to do the actual commit + let (batch_sender, batch_receiver) = mpsc::channel(); + Self { executor, start_version, block_receiver, + batch_sender, + batch_receiver, } } + fn commit_batch(&self, batch: SchemaBatch) -> Result<()> { + Ok(()) + } + + fn prepare_commit(&self, block_id: u64, ledger_info_sigs: LedgerInfoWithSignatures) -> Result<()> { + self.executor.pre_commit_block(block_id)?; + self.executor.commit_ledger(ledger_info_sigs)?; + Ok(()) + } + pub fn run(&mut self) { info!("Start with version: {}", self.start_version); + // Spawn a new thread in backgrond to do the actual commit + let commit_thread = thread::spawn(move || { + while let Ok(batch) = self.batch_receiver.recv() { + self.commit_batch(batch).unwrap(); + } + }); + while let Ok(msg) = self.block_receiver.recv() { let CommitBlockMessage { block_id, @@ -93,8 +125,7 @@ where let version = output.expect_last_version(); let commit_start = Instant::now(); let ledger_info_with_sigs = gen_li_with_sigs(block_id, root_hash, version); - self.executor.pre_commit_block(block_id).unwrap(); - self.executor.commit_ledger(ledger_info_with_sigs).unwrap(); + self.prepare_commit(block_id, ledger_info_sigs).unwrap(); report_block( self.start_version, diff --git a/execution/executor/src/block_executor/mod.rs b/execution/executor/src/block_executor/mod.rs index 40c90751bcec7..d24fa8455bc54 100644 --- a/execution/executor/src/block_executor/mod.rs +++ b/execution/executor/src/block_executor/mod.rs @@ -50,16 +50,18 @@ pub mod block_tree; pub struct BlockExecutor { pub db: DbReaderWriter, inner: RwLock>>, + sender: Option>, } impl BlockExecutor where V: VMBlockExecutor, { - pub fn new(db: DbReaderWriter) -> Self { + pub fn new(db: DbReaderWriter, sender: Option>) -> Self { Self { db, inner: RwLock::new(None), + sender, } } @@ -124,7 +126,7 @@ where .ledger_update(block_id, parent_block_id) } - fn pre_commit_block(&self, block_id: HashValue) -> ExecutorResult<()> { + fn pre_commit_block(&self, block_id: HashValue, sender:) -> ExecutorResult<()> { let _guard = CONCURRENCY_GAUGE.concurrency_with(&["block", "pre_commit_block"]); self.inner diff --git a/experimental/storage/commit_experiments/Cargo.toml b/experimental/storage/commit_experiments/Cargo.toml new file mode 100644 index 0000000000000..e69de29bb2d1d