Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Logs) Build transactions batch #189

Merged
merged 8 commits into from
Jan 30, 2024
25 changes: 25 additions & 0 deletions block-producer/src/batch_builder/batch.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::collections::BTreeMap;

use miden_crypto::hash::blake::{Blake3Digest, Blake3_256};
use miden_objects::{accounts::AccountId, notes::NoteEnvelope, Digest};
use miden_vm::crypto::SimpleSmt;
use tracing::instrument;

use super::errors::BuildBatchError;
use crate::{SharedProvenTx, CREATED_NOTES_SMT_DEPTH, MAX_NUM_CREATED_NOTES_PER_BATCH};

pub type BatchId = Blake3Digest<32>;

// TRANSACTION BATCH
// ================================================================================================

Expand All @@ -15,6 +19,7 @@ use crate::{SharedProvenTx, CREATED_NOTES_SMT_DEPTH, MAX_NUM_CREATED_NOTES_PER_B
/// Note: Until recursive proofs are available in the Miden VM, we don't include the common proof.
#[derive(Debug)]
pub struct TransactionBatch {
id: BatchId,
updated_accounts: BTreeMap<AccountId, AccountStates>,
produced_nullifiers: Vec<Digest>,
created_notes_smt: SimpleSmt<CREATED_NOTES_SMT_DEPTH>,
Expand All @@ -33,6 +38,7 @@ impl TransactionBatch {
/// - The number of created notes across all transactions exceeds 4096.
///
/// TODO: enforce limit on the number of created nullifiers.
#[instrument(target = "miden-block-producer", name = "new_batch", skip_all, err)]
pub fn new(txs: Vec<SharedProvenTx>) -> Result<Self, BuildBatchError> {
let updated_accounts = txs
.iter()
Expand Down Expand Up @@ -72,7 +78,10 @@ impl TransactionBatch {
)
};

let id = Self::compute_id(&txs);

Ok(Self {
id,
updated_accounts,
produced_nullifiers,
created_notes_smt,
Expand All @@ -83,6 +92,11 @@ impl TransactionBatch {
// PUBLIC ACCESSORS
// --------------------------------------------------------------------------------------------

/// Returns the batch ID.
pub fn id(&self) -> BatchId {
self.id
}

/// Returns an iterator over (account_id, init_state_hash) tuples for accounts that were
/// modified in this transaction batch.
pub fn account_initial_states(&self) -> impl Iterator<Item = (AccountId, Digest)> + '_ {
Expand Down Expand Up @@ -113,6 +127,17 @@ impl TransactionBatch {
pub fn created_notes_root(&self) -> Digest {
self.created_notes_smt.root()
}

// HELPER FUNCTIONS
// --------------------------------------------------------------------------------------------

fn compute_id(txs: &[SharedProvenTx]) -> BatchId {
let mut buf = Vec::with_capacity(32 * txs.len());
for tx in txs {
buf.extend_from_slice(&tx.id().as_bytes());
}
Blake3_256::hash(&buf)
}
}

/// Stores the initial state (before the transaction) and final state (after the transaction) of an
Expand Down
22 changes: 19 additions & 3 deletions block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{cmp::min, sync::Arc, time::Duration};

use async_trait::async_trait;
use tokio::{sync::RwLock, time};
use tracing::info;
use tracing::{debug, info, instrument, Span};

use self::errors::BuildBatchError;
use crate::{block_builder::BlockBuilder, SharedProvenTx, SharedRwVec, SharedTxBatch, COMPONENT};
Expand All @@ -13,6 +13,7 @@ mod tests;

mod batch;
pub use batch::TransactionBatch;
use miden_node_utils::logging::{format_array, format_blake3_digest};

// BATCH BUILDER
// ================================================================================================
Expand Down Expand Up @@ -73,6 +74,8 @@ where
pub async fn run(self: Arc<Self>) {
let mut interval = time::interval(self.options.block_frequency);

info!(target: COMPONENT, period_ms = interval.period().as_millis(), "Batch builder started");

loop {
interval.tick().await;
self.try_build_block().await;
Expand Down Expand Up @@ -111,16 +114,29 @@ impl<BB> BatchBuilder for DefaultBatchBuilder<BB>
where
BB: BlockBuilder,
{
#[allow(clippy::blocks_in_conditions)] // Workaround of `instrument` issue
#[instrument(target = "miden-block-producer", skip_all, err, fields(batch_id))]
async fn build_batch(
&self,
txs: Vec<SharedProvenTx>,
) -> Result<(), BuildBatchError> {
let num_txs = txs.len();

info!(target: COMPONENT, num_txs, "Building a transaction batch");
debug!(target: COMPONENT, txs = %format_array(txs.iter().map(|tx| tx.id().to_hex())));

let batch = Arc::new(TransactionBatch::new(txs)?);
self.ready_batches.write().await.push(batch);

info!(COMPONENT, "batch built with {num_txs} txs");
info!(target: COMPONENT, "Transaction batch built");
Span::current().record("batch_id", format_blake3_digest(batch.id()));

let num_batches = {
let mut write_guard = self.ready_batches.write().await;
write_guard.push(batch);
write_guard.len()
};

info!(target: COMPONENT, num_batches, "Transaction batch added to the batch queue");

Ok(())
}
Expand Down
18 changes: 4 additions & 14 deletions block-producer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{net::ToSocketAddrs, sync::Arc};
use anyhow::{anyhow, Result};
use miden_node_proto::{block_producer::api_server, store::api_client as store_client};
use tonic::transport::Server;
use tracing::{info, info_span, instrument, Instrument};
use tracing::{info, instrument};

use crate::{
batch_builder::{DefaultBatchBuilder, DefaultBatchBuilderOptions},
Expand All @@ -23,7 +23,7 @@ pub mod api;
// ================================================================================================

/// TODO: add comments
#[instrument(target = "miden-block-producer", skip_all)]
#[instrument(target = "miden-block-producer", name = "block_producer", skip_all)]
pub async fn serve(config: BlockProducerConfig) -> Result<()> {
info!(target: COMPONENT, %config, "Initializing server");

Expand Down Expand Up @@ -52,19 +52,9 @@ pub async fn serve(config: BlockProducerConfig) -> Result<()> {

let block_producer = api_server::ApiServer::new(api::BlockProducerApi::new(queue.clone()));

tokio::spawn(async move {
queue
.run()
.instrument(info_span!(target: COMPONENT, "transaction_queue_start"))
.await
});
tokio::spawn(async move { queue.run().await });

tokio::spawn(async move {
batch_builder
.run()
.instrument(info_span!(target: COMPONENT, "batch_builder_start"))
.await
});
tokio::spawn(async move { batch_builder.run().await });
bobbinth marked this conversation as resolved.
Show resolved Hide resolved

info!(target: COMPONENT, "Server initialized");

Expand Down
29 changes: 18 additions & 11 deletions block-producer/src/txqueue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use miden_objects::{
accounts::AccountId, notes::Nullifier, transaction::InputNotes, Digest, TransactionInputError,
};
use tokio::{sync::RwLock, time};
use tracing::{info, instrument};
use tracing::{info, info_span, instrument, Instrument};

use crate::{
batch_builder::BatchBuilder, store::TxInputsError, SharedProvenTx, SharedRwVec, COMPONENT,
Expand Down Expand Up @@ -148,16 +148,20 @@ where
}
}

#[instrument(target = "miden-block-producer", name = "block_producer" skip_all)]
pub async fn run(self: Arc<Self>) {
let mut interval = time::interval(self.options.build_batch_frequency);

info!(target: COMPONENT, period_ms = interval.period().as_millis(), "Transaction queue started");

loop {
interval.tick().await;
self.try_build_batches().await;
}
}

/// Divides the queue in groups to be batched; those that failed are appended back on the queue
#[instrument(target = "miden-block-producer", skip_all)]
async fn try_build_batches(&self) {
let txs: Vec<SharedProvenTx> = {
let mut locked_ready_queue = self.ready_queue.write().await;
Expand All @@ -175,17 +179,20 @@ where
let ready_queue = self.ready_queue.clone();
let batch_builder = self.batch_builder.clone();

tokio::spawn(async move {
match batch_builder.build_batch(txs.clone()).await {
Ok(_) => {
// batch was successfully built, do nothing
},
Err(_) => {
// batch building failed, add txs back at the end of the queue
ready_queue.write().await.append(&mut txs);
},
tokio::spawn(
async move {
match batch_builder.build_batch(txs.clone()).await {
Ok(_) => {
// batch was successfully built, do nothing
},
Err(_) => {
// batch building failed, add txs back at the end of the queue
ready_queue.write().await.append(&mut txs);
},
}
}
});
.instrument(info_span!(target: COMPONENT, "batch_builder")),
);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod api;

// RPC INITIALIZER
// ================================================================================================
#[instrument(target = "miden-rpc", skip_all)]
#[instrument(target = "miden-rpc", name = "rpc", skip_all)]
pub async fn serve(config: RpcConfig) -> Result<()> {
info!(target: COMPONENT, %config, "Initializing server");

Expand Down
2 changes: 1 addition & 1 deletion store/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct StateSyncUpdate {
impl Db {
/// Open a connection to the DB, apply any pending migrations, and ensure that the genesis block
/// is as expected and present in the database.
#[instrument(target = "miden-store", skip_all)]
#[instrument(target = "miden-store", name = "store::setup", skip_all)]
pub async fn setup(config: StoreConfig) -> Result<Self, anyhow::Error> {
info!(target: COMPONENT, %config, "Connecting to the database");

Expand Down
2 changes: 1 addition & 1 deletion store/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod api;
// STORE INITIALIZER
// ================================================================================================

#[instrument(target = "miden-store", skip_all)]
#[instrument(target = "miden-store", name = "store", skip_all)]
pub async fn serve(
config: StoreConfig,
db: Db,
Expand Down
8 changes: 8 additions & 0 deletions utils/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ use std::fmt::Display;

use anyhow::Result;
use itertools::Itertools;
use miden_crypto::{
hash::{blake::Blake3Digest, Digest},
utils::bytes_to_hex_string,
};
use miden_objects::{
notes::{NoteEnvelope, Nullifier},
transaction::{InputNotes, OutputNotes},
Expand Down Expand Up @@ -72,3 +76,7 @@ pub fn format_array(list: impl IntoIterator<Item = impl Display>) -> String {
format!("[{}]", comma_separated)
}
}

pub fn format_blake3_digest(digest: Blake3Digest<32>) -> String {
bytes_to_hex_string(digest.as_bytes())
}
Loading