diff --git a/crates/prism/Cargo.toml b/crates/prism/Cargo.toml index 69336206..bc066892 100644 --- a/crates/prism/Cargo.toml +++ b/crates/prism/Cargo.toml @@ -56,3 +56,8 @@ sp1-sdk = { workspace = true } [dev-dependencies] serial_test = "3.1.1" criterion = "0.5.1" + +[[test]] +name = "integration_tests" +path = "tests/integration_tests.rs" +harness = true diff --git a/crates/prism/src/cfg.rs b/crates/prism/src/cfg.rs index 2b3c5d36..0683443d 100644 --- a/crates/prism/src/cfg.rs +++ b/crates/prism/src/cfg.rs @@ -2,12 +2,12 @@ use crate::{ consts::{DA_RETRY_COUNT, DA_RETRY_INTERVAL}, da::memory::InMemoryDataAvailabilityLayer, }; -use prism_errors::{DataAvailabilityError, GeneralError, PrismError}; use anyhow::{anyhow, Context, Result}; use clap::{Parser, Subcommand}; use config::{builder::DefaultState, ConfigBuilder, File}; use dirs::home_dir; use dotenvy::dotenv; +use prism_errors::{DataAvailabilityError, GeneralError, PrismError}; use serde::{Deserialize, Serialize}; use std::{fs, path::Path, sync::Arc}; diff --git a/crates/prism/src/da/celestia.rs b/crates/prism/src/da/celestia.rs index bd4e47d2..1e397273 100644 --- a/crates/prism/src/da/celestia.rs +++ b/crates/prism/src/da/celestia.rs @@ -1,22 +1,21 @@ use crate::{ cfg::CelestiaConfig, - consts::CHANNEL_BUFFER_SIZE, da::{DataAvailabilityLayer, FinalizedEpoch}, }; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use celestia_rpc::{BlobClient, Client, HeaderClient}; use celestia_types::{blob::GasPrice, nmt::Namespace, Blob}; use prism_common::operation::Operation; use prism_errors::{DataAvailabilityError, GeneralError}; -use std::{self, sync::Arc}; -use tokio::{ +use std::{ + self, sync::{ - mpsc::{channel, Receiver, Sender}, - Mutex, + atomic::{AtomicU64, Ordering}, + Arc, }, - task::spawn, }; +use tokio::{sync::broadcast, task::spawn}; use bincode; @@ -35,14 +34,12 @@ pub struct CelestiaConnection { pub snark_namespace: Namespace, pub operation_namespace: Namespace, - sync_target_tx: Arc>, - sync_target_rx: Arc>>, + height_update_tx: broadcast::Sender, + sync_target: Arc, } impl CelestiaConnection { pub async fn new(config: &CelestiaConfig, auth_token: Option<&str>) -> Result { - let (tx, rx) = channel(CHANNEL_BUFFER_SIZE); - let client = Client::new(&config.connection_string, auth_token) .await .context("Failed to initialize websocket connection") @@ -61,12 +58,14 @@ impl CelestiaConnection { None => snark_namespace, }; + let (height_update_tx, _) = broadcast::channel(100); + Ok(CelestiaConnection { client, snark_namespace, operation_namespace, - sync_target_tx: Arc::new(tx), - sync_target_rx: Arc::new(Mutex::new(rx)), + height_update_tx, + sync_target: Arc::new(AtomicU64::new(0)), }) } } @@ -86,41 +85,38 @@ fn create_namespace(namespace_hex: &str) -> Result { #[async_trait] impl DataAvailabilityLayer for CelestiaConnection { async fn get_latest_height(&self) -> Result { - match self.sync_target_rx.lock().await.recv().await { - Some(height) => Ok(height), - None => Err(anyhow!(DataAvailabilityError::ChannelReceiveError)), - } + Ok(self.sync_target.load(Ordering::Relaxed)) } async fn initialize_sync_target(&self) -> Result { - HeaderClient::header_network_head(&self.client) + let height = HeaderClient::header_network_head(&self.client) .await .context("Failed to get network head from DA layer") - .map(|extended_header| extended_header.header.height.value()) + .map(|extended_header| extended_header.header.height.value())?; + + self.sync_target.store(height, Ordering::Relaxed); + Ok(height) } - async fn get_snarks(&self, height: u64) -> Result> { + async fn get_finalized_epoch(&self, height: u64) -> Result> { trace!("searching for epoch on da layer at height {}", height); + match BlobClient::blob_get_all(&self.client, height, &[self.snark_namespace]).await { - Ok(blobs) => { - let mut epochs = Vec::new(); - for blob in blobs.iter() { - match FinalizedEpoch::try_from(blob) { - Ok(epoch_json) => epochs.push(epoch_json), - Err(_) => { - GeneralError::ParsingError(format!( - "marshalling blob from height {} to epoch json: {:?}", - height, &blob - )); - } - } - } - Ok(epochs) - } + Ok(blobs) => blobs + .into_iter() + .next() + .map(|blob| { + FinalizedEpoch::try_from(&blob).map_err(|_| { + anyhow!(GeneralError::ParsingError(format!( + "marshalling blob from height {} to epoch json: {:?}", + height, &blob + ))) + }) + }) + .transpose(), Err(err) => { - // todo: this is a hack to handle a retarded error from cel-node that will be fixed in v0.15.0 if err.to_string().contains("blob: not found") { - Ok(vec![]) + Ok(None) } else { Err(anyhow!(DataAvailabilityError::DataRetrievalError( height, @@ -131,38 +127,22 @@ impl DataAvailabilityLayer for CelestiaConnection { } } - async fn submit_snarks(&self, epochs: Vec) -> Result { - if epochs.is_empty() { - bail!("no epochs provided for submission"); - } + async fn submit_finalized_epoch(&self, epoch: FinalizedEpoch) -> Result { + debug!("posting {}th epoch to da layer", epoch.height); - debug!("posting {} epochs to da layer", epochs.len()); + let data = bincode::serialize(&epoch).map_err(|e| { + DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!( + "serializing epoch {}: {}", + epoch.height, e + ))) + })?; - let blobs: Result, DataAvailabilityError> = epochs - .iter() - .map(|epoch| { - let data = bincode::serialize(epoch).map_err(|e| { - DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!( - "serializing epoch {}: {}", - epoch.height, e - ))) - })?; - Blob::new(self.snark_namespace, data).map_err(|e| { - DataAvailabilityError::GeneralError(GeneralError::BlobCreationError( - e.to_string(), - )) - }) - }) - .collect(); - - let blobs = blobs?; - - for (i, blob) in blobs.iter().enumerate() { - trace!("blob {}: {:?}", i, blob); - } + let blob = Blob::new(self.snark_namespace, data).map_err(|e| { + DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(e.to_string())) + })?; self.client - .blob_submit(&blobs, GasPrice::from(-1.0)) + .blob_submit(&[blob], GasPrice::from(-1.0)) .await .map_err(|e| anyhow!(DataAvailabilityError::SubmissionError(e.to_string()))) } @@ -230,35 +210,29 @@ impl DataAvailabilityLayer for CelestiaConnection { .map_err(|e| anyhow!(DataAvailabilityError::SubmissionError(e.to_string()))) } + fn subscribe_to_heights(&self) -> broadcast::Receiver { + self.height_update_tx.subscribe() + } + async fn start(&self) -> Result<()> { let mut header_sub = HeaderClient::header_subscribe(&self.client) .await - .context("Failed to subscribe to headers from DA layer") - .map_err(|e| DataAvailabilityError::NetworkError(e.to_string()))?; + .context("Failed to subscribe to headers from DA layer")?; + + let sync_target = self.sync_target.clone(); + let height_update_tx = self.height_update_tx.clone(); - let synctarget_buffer = self.sync_target_tx.clone(); spawn(async move { while let Some(extended_header_result) = header_sub.next().await { match extended_header_result { Ok(extended_header) => { let height = extended_header.header.height.value(); - match synctarget_buffer.send(height).await { - Ok(_) => { - debug!("sent sync target update for height {}", height); - } - Err(_) => { - DataAvailabilityError::SyncTargetError(format!( - "sending sync target update message for height {}", - height - )); - } - } + sync_target.store(height, Ordering::Relaxed); + let _ = height_update_tx.send(height); + debug!("updated sync target for height {}", height); } Err(e) => { - DataAvailabilityError::NetworkError(format!( - "retrieving header from da layer: {}", - e - )); + error!("Error retrieving header from DA layer: {}", e); } } } diff --git a/crates/prism/src/da/memory.rs b/crates/prism/src/da/memory.rs index cc4651ad..82f384c7 100644 --- a/crates/prism/src/da/memory.rs +++ b/crates/prism/src/da/memory.rs @@ -2,7 +2,7 @@ use crate::da::{DataAvailabilityLayer, FinalizedEpoch}; use anyhow::Result; use async_trait::async_trait; use prism_common::operation::Operation; -use std::sync::Arc; +use std::{collections::VecDeque, sync::Arc}; use tokio::{ sync::{broadcast, RwLock}, time::{interval, Duration}, @@ -12,14 +12,14 @@ use tokio::{ pub struct Block { pub height: u64, pub operations: Vec, - pub epochs: Vec, + pub epoch: Option, } #[derive(Clone)] pub struct InMemoryDataAvailabilityLayer { blocks: Arc>>, pending_operations: Arc>>, - pending_epochs: Arc>>, + pending_epochs: Arc>>, latest_height: Arc>, height_update_tx: broadcast::Sender, block_update_tx: broadcast::Sender, @@ -34,7 +34,7 @@ impl InMemoryDataAvailabilityLayer { Self { blocks: Arc::new(RwLock::new(Vec::new())), pending_operations: Arc::new(RwLock::new(Vec::new())), - pending_epochs: Arc::new(RwLock::new(Vec::new())), + pending_epochs: Arc::new(RwLock::new(VecDeque::new())), latest_height: Arc::new(RwLock::new(0)), height_update_tx: height_tx, block_update_tx: block_tx, @@ -58,13 +58,12 @@ impl InMemoryDataAvailabilityLayer { let new_block = Block { height: *latest_height, operations: std::mem::take(&mut *pending_operations), - epochs: std::mem::take(&mut *pending_epochs), + epoch: pending_epochs.pop_front(), }; debug!( - "new block produced at height {} with {} operations and {} snarks", + "new block produced at height {} with {} operations", new_block.height, new_block.operations.len(), - new_block.epochs.len() ); blocks.push(new_block.clone()); @@ -81,6 +80,10 @@ impl InMemoryDataAvailabilityLayer { #[async_trait] impl DataAvailabilityLayer for InMemoryDataAvailabilityLayer { + fn subscribe_to_heights(&self) -> broadcast::Receiver { + self.height_update_tx.subscribe() + } + async fn get_latest_height(&self) -> Result { Ok(*self.latest_height.read().await) } @@ -89,18 +92,18 @@ impl DataAvailabilityLayer for InMemoryDataAvailabilityLayer { self.get_latest_height().await } - async fn get_snarks(&self, height: u64) -> Result> { + async fn get_finalized_epoch(&self, height: u64) -> Result> { let blocks = self.blocks.read().await; Ok(blocks .iter() .find(|block| block.height == height) - .map(|block| block.epochs.clone()) + .map(|block| block.epoch.clone()) .unwrap_or_default()) } - async fn submit_snarks(&self, epochs: Vec) -> Result { + async fn submit_finalized_epoch(&self, epoch: FinalizedEpoch) -> Result { let mut pending_epochs = self.pending_epochs.write().await; - pending_epochs.extend(epochs); + pending_epochs.push_back(epoch); self.get_latest_height().await } diff --git a/crates/prism/src/da/mod.rs b/crates/prism/src/da/mod.rs index 7fc90c86..fb3a53d1 100644 --- a/crates/prism/src/da/mod.rs +++ b/crates/prism/src/da/mod.rs @@ -8,6 +8,7 @@ use prism_errors::GeneralError; use serde::{Deserialize, Serialize}; use sp1_sdk::SP1ProofWithPublicValues; use std::{self, str::FromStr}; +use tokio::sync::broadcast; pub mod celestia; pub mod memory; @@ -50,9 +51,10 @@ impl SignedContent for FinalizedEpoch { pub trait DataAvailabilityLayer: Send + Sync { async fn get_latest_height(&self) -> Result; async fn initialize_sync_target(&self) -> Result; - async fn get_snarks(&self, height: u64) -> Result>; - async fn submit_snarks(&self, epoch: Vec) -> Result; + async fn get_finalized_epoch(&self, height: u64) -> Result>; + async fn submit_finalized_epoch(&self, epoch: FinalizedEpoch) -> Result; async fn get_operations(&self, height: u64) -> Result>; async fn submit_operations(&self, operations: Vec) -> Result; async fn start(&self) -> Result<()>; + fn subscribe_to_heights(&self) -> broadcast::Receiver; } diff --git a/crates/prism/src/node_types/lightclient.rs b/crates/prism/src/node_types/lightclient.rs index bb2f43d6..6b4291d5 100644 --- a/crates/prism/src/node_types/lightclient.rs +++ b/crates/prism/src/node_types/lightclient.rs @@ -4,8 +4,8 @@ use async_trait::async_trait; use prism_common::tree::Digest; use prism_errors::{DataAvailabilityError, GeneralError}; use sp1_sdk::{ProverClient, SP1VerifyingKey}; -use std::{self, sync::Arc, time::Duration}; -use tokio::{task::spawn, time::interval}; +use std::{self, sync::Arc}; +use tokio::{sync::broadcast, task::spawn}; use crate::{da::DataAvailabilityLayer, node_types::NodeType, utils::verify_signature}; @@ -58,91 +58,68 @@ impl LightClient { let start_height = self.start_height; spawn(async move { let mut current_position = start_height; - let mut ticker = interval(Duration::from_secs(1)); - loop { - // target is updated when a new header is received - let target = match self.da.get_latest_height().await { - Ok(target) => target, - Err(e) => { - error!("failed to update sync target, retrying: {:?}", e); - continue; - } - }; - - debug!("updated sync target to height {}", target); - for i in current_position..target { - trace!("processing height: {}", i); - match self.da.get_snarks(i + 1).await { - Ok(epoch_json_vec) => { - if !epoch_json_vec.is_empty() { - debug!("light client: got epochs at height {}", i + 1); - } + let mut height_rx = self.da.subscribe_to_heights(); - // todo: verify adjacency to last heights, <- for this we need some sort of storage of epochs - for epoch_json in epoch_json_vec { - let _prev_commitment = &epoch_json.prev_commitment; - let _current_commitment = &epoch_json.current_commitment; + loop { + match height_rx.recv().await { + Ok(target) => { + debug!("updated sync target to height {}", target); + for i in current_position..target { + trace!("processing height: {}", i); + match self.da.get_finalized_epoch(i + 1).await { + Ok(Some(finalized_epoch)) => { + debug!("light client: got epochs at height {}", i + 1); - // if the user does not add a verifying key, we will not verify the signature, - // but only log a warning on startup - if self.sequencer_pubkey.is_some() { - match verify_signature( - &epoch_json.clone(), - self.sequencer_pubkey.clone(), - ) { - Ok(_) => trace!( - "valid signature for epoch {}", - epoch_json.height - ), - Err(e) => { - panic!("invalid signature in epoch {}: {:?}", i, e) + // Signature verification + if let Some(pubkey) = &self.sequencer_pubkey { + match verify_signature(&finalized_epoch, Some(pubkey.clone())) { + Ok(_) => trace!("valid signature for epoch {}", finalized_epoch.height), + Err(e) => panic!("invalid signature in epoch {}: {:?}", i, e), } } - } - let prev_commitment = &epoch_json.prev_commitment; - let current_commitment = &epoch_json.current_commitment; + // Commitment verification + let prev_commitment = &finalized_epoch.prev_commitment; + let current_commitment = &finalized_epoch.current_commitment; + let mut public_values = finalized_epoch.proof.public_values.clone(); + let proof_prev_commitment: Digest = public_values.read(); + let proof_current_commitment: Digest = public_values.read(); - let mut public_values = epoch_json.proof.public_values.clone(); - let proof_prev_commitment: Digest = public_values.read(); - let proof_current_commitment: Digest = public_values.read(); - - if prev_commitment != &proof_prev_commitment - || current_commitment != &proof_current_commitment - { - error!( - "Commitment mismatch: - prev_commitment: {:?}, proof_prev_commitment: {:?}, - current_commitment: {:?}, proof_current_commitment: {:?}", - prev_commitment, - proof_prev_commitment, - current_commitment, - proof_current_commitment - ); - panic!("Commitment mismatch in epoch {}", epoch_json.height); - } + if prev_commitment != &proof_prev_commitment + || current_commitment != &proof_current_commitment + { + error!( + "Commitment mismatch: + prev_commitment: {:?}, proof_prev_commitment: {:?}, + current_commitment: {:?}, proof_current_commitment: {:?}", + prev_commitment, proof_prev_commitment, + current_commitment, proof_current_commitment + ); + panic!("Commitment mismatch in epoch {}", finalized_epoch.height); + } - match self.client.verify(&epoch_json.proof, &self.verifying_key) { - Ok(_) => { - info!( - "zkSNARK for epoch {} was validated successfully", - epoch_json.height - ) + // SNARK verification + match self.client.verify(&finalized_epoch.proof, &self.verifying_key) { + Ok(_) => info!("zkSNARK for epoch {} was validated successfully", finalized_epoch.height), + Err(err) => panic!("failed to validate epoch at height {}: {:?}", finalized_epoch.height, err), } - Err(err) => panic!( - "failed to validate epoch at height {}: {:?}", - epoch_json.height, err - ), - } - } - } - Err(e) => { - debug!("light client: getting epoch: {}", e) + }, + Ok(None) => { + debug!("no finalized epoch found at height: {}", i + 1); + }, + Err(e) => debug!("light client: getting epoch: {}", e), + }; } - }; + current_position = target; + }, + Err(broadcast::error::RecvError::Closed) => { + error!("Height channel closed unexpectedly"); + break; + }, + Err(broadcast::error::RecvError::Lagged(skipped)) => { + warn!("Lagged behind by {} messages", skipped); + }, } - ticker.tick().await; // only for testing purposes - current_position = target; // Update the current position to the latest target } }) .await diff --git a/crates/prism/src/node_types/sequencer.rs b/crates/prism/src/node_types/sequencer.rs index e81fff8f..4aad8559 100644 --- a/crates/prism/src/node_types/sequencer.rs +++ b/crates/prism/src/node_types/sequencer.rs @@ -1,27 +1,16 @@ -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use ed25519::Signature; use ed25519_dalek::{Signer, SigningKey}; use jmt::KeyHash; use prism_common::tree::{hash, Batch, Digest, Hasher, KeyDirectoryTree, Proof, SnarkableTree}; -use std::{self, str::FromStr, sync::Arc}; -use tokio::{ - sync::{ - mpsc::{channel, Receiver, Sender}, - Mutex, - }, - task::spawn, - time::interval, -}; +use std::{self, collections::VecDeque, str::FromStr, sync::Arc}; +use tokio::sync::{broadcast, Mutex}; use sp1_sdk::{ProverClient, SP1ProvingKey, SP1Stdin, SP1VerifyingKey}; -#[cfg(test)] -use prism_errors::DataAvailabilityError; - use crate::{ cfg::Config, - consts::{CHANNEL_BUFFER_SIZE, DA_RETRY_COUNT, DA_RETRY_INTERVAL}, da::{DataAvailabilityLayer, FinalizedEpoch}, node_types::NodeType, storage::Database, @@ -55,9 +44,6 @@ pub struct Sequencer { proving_key: SP1ProvingKey, verifying_key: SP1VerifyingKey, - - epoch_buffer_tx: Arc>, - epoch_buffer_rx: Arc>>, } #[async_trait] @@ -65,17 +51,15 @@ impl NodeType for Sequencer { async fn start(self: Arc) -> Result<()> { self.da.start().await.context("Failed to start DA layer")?; - let sync_loop = self.clone().sync_loop(); - let snark_loop = self.clone().post_snarks_loop(); - let operation_loop = self.clone().post_operations_loop(); + let main_loop = self.clone().main_loop(); + let batch_poster = self.clone().post_batch_loop(); let ws_self = self.clone(); let ws = ws_self.ws.start(self.clone()); tokio::select! { - res = sync_loop => Ok(res.context("sync loop failed")?), - res = snark_loop => Ok(res.context("DA loop failed")?), - res = operation_loop => Ok(res.context("Operation loop failed")?), + res = main_loop => Ok(res.context("main loop failed")?), + res = batch_poster => Ok(res.context("batch poster failed")?), res = ws => Ok(res.context("WebServer failed")?), } } @@ -88,7 +72,6 @@ impl Sequencer { cfg: Config, key: SigningKey, ) -> Result { - let (tx, rx) = channel(CHANNEL_BUFFER_SIZE); let ws = cfg.webserver.context("Missing webserver configuration")?; let start_height = cfg.celestia_config.unwrap_or_default().start_height; @@ -109,219 +92,254 @@ impl Sequencer { prover_client: Arc::new(Mutex::new(prover_client)), tree, pending_operations: Arc::new(Mutex::new(Vec::new())), - epoch_buffer_tx: Arc::new(tx), - epoch_buffer_rx: Arc::new(Mutex::new(rx)), }) } - // sync_loop is responsible for downloading operations from the DA layer - async fn sync_loop(self: Arc) -> Result<(), tokio::task::JoinError> { - let self_clone = self.clone(); - info!("starting operation sync loop"); - let epoch_buffer = self.epoch_buffer_tx.clone(); - spawn(async move { - let mut current_position = self_clone.start_height; - loop { - // target is updated when a new header is received - let target = match self_clone.da.get_latest_height().await { - Ok(target) => target, - Err(e) => { - error!("failed to update sync target, retrying: {:?}", e); - continue; - } - }; + async fn main_loop(self: Arc) -> Result<()> { + let mut height_rx = self.da.subscribe_to_heights(); + let current_height = height_rx.recv().await?; + let historical_sync_height = current_height - 1; - if current_position == target { - continue; - } + self.sync_range(self.start_height, historical_sync_height) + .await?; + self.real_time_sync(height_rx).await + } - debug!("updated sync target to height {}", target); - while current_position < target { - trace!("processing height: {}", current_position); - match self_clone.da.get_operations(current_position + 1).await { - Ok(operations) => { - if !operations.is_empty() { - debug!( - "sequencer: got operations at height {}", - current_position + 1 - ); - } - - let epoch = match self_clone.finalize_epoch(operations).await { - Ok(e) => e, - Err(e) => { - error!("sequencer_loop: finalizing epoch: {}", e); - continue; - } - }; - - info!("sequencer_loop: finalized epoch {}", epoch.height); - match epoch_buffer.send(epoch).await { - Ok(_) => { - current_position += 1; - } - Err(e) => { - error!("sequencer_loop: sending epoch to buffer: {}", e); - } - } - } - Err(e) => { - debug!("light client: getting epoch: {}", e) - } - }; - } - current_position = target; // Update the current position to the latest target + async fn sync_range(&self, start_height: u64, end_height: u64) -> Result<()> { + let saved_epoch = match self.db.get_epoch() { + Ok(epoch) => epoch, + Err(_) => { + debug!("no existing epoch state found, setting epoch to 0"); + self.db.set_epoch(&0)?; + 0 } - }) - .await + }; + let mut current_epoch: u64 = 0; + let mut buffered_operations: VecDeque> = VecDeque::new(); + let mut current_height = start_height; + + while current_height < end_height { + let height = current_height + 1; + let operations = self.da.get_operations(height).await?; + let epoch_result = self.da.get_finalized_epoch(height).await?; + + self.process_height( + height, + operations, + epoch_result, + &mut current_epoch, + &mut buffered_operations, + saved_epoch, + ) + .await?; + + current_height += 1; + } + + info!( + "finished historical sync from height {} to {}", + start_height, end_height + ); + Ok(()) } - pub async fn post_operations_loop(self: Arc) -> Result<(), tokio::task::JoinError> { - info!("Starting operation posting loop"); - let mut ticker = interval(std::time::Duration::from_secs(1)); // Adjust the interval as needed - let mut last_processed_height = 0; + async fn post_batch_loop(self: Arc) -> Result<()> { + let mut height_rx = self.da.subscribe_to_heights(); - spawn(async move { - loop { - ticker.tick().await; + loop { + let height = height_rx.recv().await?; + debug!("received height {}", height); - // Check for new block - let current_height = match self.da.get_latest_height().await { - Ok(height) => height, - Err(e) => { - error!("operation_loop: Failed to get latest height: {}", e); - continue; - } - }; - - // If there's a new block - if current_height > last_processed_height { - // Get pending operations - let pending_operations = { - let mut ops = self.pending_operations.lock().await; - std::mem::take(&mut *ops) - }; - - // If there are pending operations, submit them - if !pending_operations.is_empty() { - match self.da.submit_operations(pending_operations).await { - Ok(submitted_height) => { - info!( - "operation_loop: Submitted operations at height {}", - submitted_height - ); - last_processed_height = submitted_height; - } - Err(e) => { - error!("operation_loop: Failed to submit operations: {}", e); - // TODO: Handle error (e.g., retry logic, returning operations to pending_operations) - } - } - } else { - debug!( - "operation_loop: No pending operations to submit at height {}", - current_height + // Get pending operations + let pending_operations = { + let mut ops = self.pending_operations.lock().await; + std::mem::take(&mut *ops) + }; + + let op_count = pending_operations.len(); + + // If there are pending operations, submit them + if !pending_operations.clone().is_empty() { + match self.da.submit_operations(pending_operations).await { + Ok(submitted_height) => { + info!( + "post_batch_loop: submitted {} operations at height {}", + op_count, submitted_height ); - last_processed_height = current_height; } - } - } - }) - .await - } - - // post_snarks_loop is responsible for submitting finalized epochs to the DA layer. - async fn post_snarks_loop(self: Arc) -> Result<(), tokio::task::JoinError> { - info!("starting da submission loop"); - let mut ticker = interval(DA_RETRY_INTERVAL); - spawn(async move { - loop { - let epochs = match self.receive_finalized_epochs().await { - Ok(epochs) => epochs, Err(e) => { - error!("da_loop: getting finalized epochs: {}", e); - continue; - } - }; - - // don't post to DA layer if no epochs have been finalized - if epochs.is_empty() { - continue; - } - - let mut retry_counter = 0; - loop { - if retry_counter > DA_RETRY_COUNT { - // todo: graceful shutdown - panic!("da_loop: too many retries, giving up"); + error!("post_batch_loop: Failed to submit operations: {}", e); } - match self.da.submit_snarks(epochs.clone()).await { - Ok(height) => { - info!("da_loop: submitted epoch at height {}", height); - break; - } - Err(e) => { - // code = NotFound means the account is not funded - if e.to_string().contains("rpc error: code = NotFound") { - panic!("da_loop: celestia account not funded, causing: {}", e); - } - error!("da_loop: submitting epoch: {}", e); - retry_counter += 1; - } - }; - ticker.tick().await; } + } else { + debug!( + "post_batch_loop: No pending operations to submit at height {}", + height + ); } - }) - .await + } } - pub async fn get_commitment(&self) -> Result { - let tree = self.tree.lock().await; - tree.get_commitment().context("Failed to get commitment") + async fn real_time_sync(&self, mut height_rx: broadcast::Receiver) -> Result<()> { + let saved_epoch = self.db.get_epoch()?; + let mut current_epoch: u64 = saved_epoch; + let mut buffered_operations: VecDeque> = VecDeque::new(); + + loop { + let height = height_rx.recv().await?; + let operations = self.da.get_operations(height).await?; + let epoch_result = self.da.get_finalized_epoch(height).await?; + + self.process_height( + height, + operations, + epoch_result, + &mut current_epoch, + &mut buffered_operations, + saved_epoch, + ) + .await?; + } } - // finalize_epoch is responsible for finalizing the pending epoch and returning the epoch json to be posted on the DA layer. - pub async fn finalize_epoch(&self, operations: Vec) -> Result { - let epoch = match self.db.get_epoch() { - Ok(epoch) => epoch + 1, - Err(_) => 0, - }; + async fn process_height( + &self, + height: u64, + operations: Vec, + epoch_result: Option, + current_epoch: &mut u64, + buffered_operations: &mut VecDeque>, + saved_epoch: u64, + ) -> Result<()> { + let mut tree = self.tree.lock().await; + let prev_commitment = tree.get_commitment()?; + + debug!( + "processing height {}, saved_epoch: {}, current_epoch: {}", + height, saved_epoch, current_epoch + ); - let prev_commitment = if epoch > 0 { - let prev_epoch = epoch - 1; - let hash_string = self.db.get_commitment(&prev_epoch).context(format!( - "Failed to get commitment for previous epoch {}", - prev_epoch - ))?; - Digest::from_hex(&hash_string).context("Failed to parse commitment")? + if !operations.is_empty() { + buffered_operations.push_back(operations); + } + + if !buffered_operations.is_empty() && height > saved_epoch { + let all_ops: Vec = buffered_operations.drain(..).flatten().collect(); + *current_epoch = height; + self.finalize_new_epoch(*current_epoch, all_ops, &mut tree) + .await?; + } else if let Some(epoch) = epoch_result { + self.process_existing_epoch( + epoch, + current_epoch, + buffered_operations, + &mut tree, + prev_commitment, + height, + ) + .await?; } else { - self.get_commitment().await? - }; + debug!("No operations to process at height {}", height); + } + + Ok(()) + } + + async fn process_existing_epoch( + &self, + epoch: FinalizedEpoch, + current_epoch: &mut u64, + buffered_operations: &mut VecDeque>, + tree: &mut KeyDirectoryTree>, + prev_commitment: Digest, + height: u64, + ) -> Result<()> { + if epoch.height != *current_epoch { + return Err(anyhow!( + "Epoch height mismatch: expected {}, got {}", + current_epoch, + epoch.height + )); + } + if epoch.prev_commitment != prev_commitment { + return Err(anyhow!("Commitment mismatch at epoch {}", current_epoch)); + } + + while let Some(buffered_ops) = buffered_operations.pop_front() { + self.execute_block(buffered_ops, tree).await?; + } + + let new_commitment = tree.get_commitment()?; + if epoch.current_commitment != new_commitment { + return Err(anyhow!("Commitment mismatch at epoch {}", current_epoch)); + } + + debug!( + "Processed height {}. New commitment: {:?}", + height, new_commitment + ); + *current_epoch += 1; + Ok(()) + } + + async fn execute_block( + &self, + operations: Vec, + tree: &mut KeyDirectoryTree>, + ) -> Result> { + debug!("executing block with {} operations", operations.len()); let mut proofs = Vec::new(); - for entry in operations.iter() { - let proof = self.process_operation(entry).await?; - proofs.push(proof); + + for operation in operations { + match self.process_operation(&operation, tree).await { + Ok(proof) => proofs.push(proof), + Err(e) => { + // Log the error and continue with the next operation + warn!("Failed to process operation: {:?}. Error: {}", operation, e); + } + } } - let current_commitment = { - let tree = self.tree.lock().await; - tree.get_commitment() - .context("Failed to get current commitment")? - }; + Ok(proofs) + } + + async fn finalize_new_epoch( + &self, + height: u64, + operations: Vec, + tree: &mut KeyDirectoryTree>, + ) -> Result<()> { + let prev_commitment = tree.get_commitment()?; + + let proofs = self.execute_block(operations, tree).await?; + + let new_commitment = tree.get_commitment()?; + + let finalized_epoch = self + .prove_epoch(height, prev_commitment, new_commitment, proofs) + .await?; + + self.da.submit_finalized_epoch(finalized_epoch).await?; + + self.db.set_commitment(&height, &new_commitment)?; + self.db.set_epoch(&height)?; + + info!("Finalized new epoch at height {}", height); - self.db - .set_epoch(&epoch) - .context("Failed to set new epoch")?; - // add the commitment for the operations ran since the last epoch - self.db - .set_commitment(&epoch, ¤t_commitment) - .context("Failed to add commitment for new epoch")?; + Ok(()) + } + async fn prove_epoch( + &self, + height: u64, + prev_commitment: Digest, + new_commitment: Digest, + proofs: Vec, + ) -> Result { let batch = Batch { prev_root: prev_commitment, - new_root: current_commitment, + new_root: new_commitment, proofs, }; @@ -330,21 +348,21 @@ impl Sequencer { let client = self.prover_client.lock().await; - info!("generating proof for epoch height {}", epoch); + info!("generating proof for epoch height {}", height); #[cfg(not(feature = "plonk"))] let proof = client.prove(&self.proving_key, stdin).run()?; #[cfg(feature = "plonk")] let proof = client.prove(&self.proving_key, stdin).plonk().run()?; - info!("successfully generated proof for epoch height {}", epoch); + info!("successfully generated proof for epoch height {}", height); client.verify(&proof, &self.verifying_key)?; - info!("verified proof for epoch height {}", epoch); + info!("verified proof for epoch height {}", height); let epoch_json = FinalizedEpoch { - height: epoch, + height, prev_commitment, - current_commitment, + current_commitment: new_commitment, proof, signature: None, }; @@ -360,28 +378,17 @@ impl Sequencer { Ok(epoch_json_with_signature) } - // receive_finalized_epochs empties the epoch buffer into a vector and returns it. - async fn receive_finalized_epochs(&self) -> Result> { - let mut epochs = Vec::new(); - let mut receiver = self.epoch_buffer_rx.lock().await; - - while let Ok(epoch) = receiver.try_recv() { - epochs.push(epoch); - } - - Ok(epochs) - } - - #[cfg(test)] - pub async fn send_finalized_epoch(&self, epoch: &FinalizedEpoch) -> Result<()> { - self.epoch_buffer_tx - .send(epoch.clone()) - .await - .map_err(|_| DataAvailabilityError::ChannelClosed.into()) + pub async fn get_commitment(&self) -> Result { + let tree = self.tree.lock().await; + tree.get_commitment().context("Failed to get commitment") } /// Updates the state from an already verified pending operation. - async fn process_operation(&self, operation: &Operation) -> Result { + async fn process_operation( + &self, + operation: &Operation, + tree: &mut KeyDirectoryTree>, + ) -> Result { match operation { Operation::Add { id, .. } | Operation::Revoke { id, .. } => { // verify that the hashchain already exists @@ -390,7 +397,6 @@ impl Sequencer { .get_hashchain(id) .context(format!("Failed to get hashchain for ID {}", id))?; - let mut tree = self.tree.lock().await; let hashed_id = hash(id.as_bytes()); let previous_hash = current_chain.last().context("Hashchain is empty")?.hash; @@ -441,7 +447,6 @@ impl Sequencer { operation ))?; - let mut tree = self.tree.lock().await; let hashed_id = hash(id.as_bytes()); Ok(Proof::Insert( @@ -463,7 +468,6 @@ impl Sequencer { Ok(()) } } - #[cfg(test)] mod tests { use super::*; @@ -476,16 +480,16 @@ mod tests { use keystore_rs::create_signing_key; use serial_test::serial; - // set up redis connection and flush database before each test + // Helper function to set up redis connection and flush database before each test fn setup_db() -> RedisConnection { let redis_connection = RedisConnection::new(&RedisConfig::default()).unwrap(); redis_connection.flush_database().unwrap(); redis_connection } - // flush database after each test - fn teardown_db(redis_connections: Arc>) { - redis_connections.flush_database().unwrap(); + // Helper function to flush database after each test + fn teardown_db(redis_connection: Arc) { + redis_connection.flush_database().unwrap(); } // Helper function to create a test Sequencer instance @@ -499,7 +503,7 @@ mod tests { ) } - fn create_new_account_operation(id: String, value: String, key: SigningKey) -> OperationInput { + fn create_new_account_operation(id: String, value: String, key: &SigningKey) -> OperationInput { let incoming = Operation::CreateAccount { id: id.clone(), value: value.clone(), @@ -533,321 +537,131 @@ mod tests { #[tokio::test] #[serial] async fn test_validate_and_queue_update() { - let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1); - let da_layer = Arc::new(da_layer); - let db: Arc> = Arc::new(Box::new(setup_db())); - let sequencer = Arc::new( - Sequencer::new( - db.clone(), - da_layer, - Config::default(), - create_signing_key(), - ) - .unwrap(), - ); + let sequencer = create_test_sequencer().await; let update_entry = - create_update_operation("test@deltadevs.xyz".to_string(), "test".to_string()); + create_update_operation("test@example.com".to_string(), "test".to_string()); sequencer + .clone() .validate_and_queue_update(&update_entry) .await .unwrap(); - teardown_db(db); + + let pending_ops = sequencer.pending_operations.lock().await; + assert_eq!(pending_ops.len(), 1); + + teardown_db(sequencer.db.clone()); } #[tokio::test] #[serial] - async fn test_queued_update_gets_finalized() { - let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1); - let da_layer = Arc::new(da_layer); - let db: Arc> = Arc::new(Box::new(setup_db())); - let signing_key = create_signing_key(); - let sequencer = Arc::new( - Sequencer::new(db.clone(), da_layer, Config::default(), signing_key.clone()).unwrap(), - ); - - let id = "test@deltadevs.xyz".to_string(); - let update_entry = - create_new_account_operation(id.clone(), "test".to_string(), signing_key.clone()); + async fn test_process_operation() { + let sequencer = create_test_sequencer().await; + let mut tree = sequencer.tree.lock().await; - sequencer - .clone() - .validate_and_queue_update(&update_entry) + // Test CreateAccount operation + let create_op = create_new_account_operation( + "user@example.com".to_string(), + "initial".to_string(), + &sequencer.key, + ) + .operation; + let proof = sequencer + .process_operation(&create_op, &mut tree) .await .unwrap(); + assert!(matches!(proof, Proof::Insert(_))); - // hashchain doesn't exist yet, because operation is only queued - let hashchain = sequencer.db.get_hashchain(id.as_str()); - assert!(hashchain.is_err()); - - let pending_operations = sequencer.pending_operations.lock().await.clone(); - let prev_commitment = sequencer.get_commitment().await.unwrap(); - sequencer.finalize_epoch(pending_operations).await.unwrap(); - let new_commitment = sequencer.get_commitment().await.unwrap(); - assert_ne!(prev_commitment, new_commitment); - - let hashchain = sequencer.db.get_hashchain(id.as_str()); - let value = hashchain.unwrap().get(0).operation.value(); - assert_eq!(value, "test"); - - teardown_db(db); - } - - #[tokio::test] - #[serial] - async fn test_validate_invalid_update_fails() { - let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1); - let da_layer = Arc::new(da_layer); - let db: Arc> = Arc::new(Box::new(setup_db())); - let sequencer = Arc::new( - Sequencer::new( - db.clone(), - da_layer, - Config::default(), - create_signing_key(), - ) - .unwrap(), - ); + // Test Add operation + let add_op = Operation::Add { + id: "user@example.com".to_string(), + value: "new_value".to_string(), + }; + let proof = sequencer + .process_operation(&add_op, &mut tree) + .await + .unwrap(); + assert!(matches!(proof, Proof::Update(_))); - let mut update_entry = - create_update_operation("test@deltadevs.xyz".to_string(), "test".to_string()); - let second_signer = - create_update_operation("abcd".to_string(), "test".to_string()).public_key; - update_entry.public_key = second_signer; + // Test Revoke operation + let revoke_op = Operation::Revoke { + id: "user@example.com".to_string(), + value: "initial".to_string(), + }; + let proof = sequencer + .process_operation(&revoke_op, &mut tree) + .await + .unwrap(); + assert!(matches!(proof, Proof::Update(_))); - let res = sequencer.validate_and_queue_update(&update_entry).await; - assert!(res.is_err()); - teardown_db(db); + teardown_db(sequencer.db.clone()); } #[tokio::test] #[serial] - async fn test_finalize_epoch_first_epoch() { + async fn test_execute_block() { let sequencer = create_test_sequencer().await; + let mut tree = sequencer.tree.lock().await; + let operations = vec![ create_new_account_operation( "user1@example.com".to_string(), "value1".to_string(), - sequencer.key.clone(), + &sequencer.key, ) .operation, create_new_account_operation( "user2@example.com".to_string(), "value2".to_string(), - sequencer.key.clone(), + &sequencer.key, ) .operation, + Operation::Add { + id: "user1@example.com".to_string(), + value: "new_value1".to_string(), + }, ]; - let prev_commitment = sequencer.get_commitment().await.unwrap(); - let epoch = sequencer.finalize_epoch(operations).await.unwrap(); - assert_eq!(epoch.height, 0); - assert_eq!(epoch.prev_commitment, prev_commitment); - assert_eq!( - epoch.current_commitment, - sequencer.get_commitment().await.unwrap() - ); + let proofs = sequencer + .execute_block(operations, &mut tree) + .await + .unwrap(); + assert_eq!(proofs.len(), 3); + + teardown_db(sequencer.db.clone()); } #[tokio::test] #[serial] - async fn test_finalize_epoch_multiple_epochs() { + async fn test_finalize_new_epoch() { let sequencer = create_test_sequencer().await; + let mut tree = sequencer.tree.lock().await; - // First epoch - let operations1 = vec![ + let operations = vec![ create_new_account_operation( "user1@example.com".to_string(), "value1".to_string(), - sequencer.key.clone(), + &sequencer.key, ) .operation, - ]; - let epoch1 = sequencer.finalize_epoch(operations1).await.unwrap(); - - // Second epoch - let operations2 = vec![ create_new_account_operation( "user2@example.com".to_string(), "value2".to_string(), - sequencer.key.clone(), + &sequencer.key, ) .operation, ]; - let epoch2 = sequencer.finalize_epoch(operations2).await.unwrap(); - assert_eq!(epoch2.height, 1); - assert_eq!(epoch2.prev_commitment, epoch1.current_commitment); - } - - #[tokio::test] - #[serial] - async fn test_commitment_verification() { - let sequencer = create_test_sequencer().await; - - // First epoch - let operations1 = vec![ - create_new_account_operation( - "user1@example.com".to_string(), - "value1".to_string(), - sequencer.key.clone(), - ) - .operation, - ]; - let epoch1 = sequencer.finalize_epoch(operations1).await.unwrap(); - - let mut public_values = epoch1.proof.public_values.clone(); - let proof_prev_commitment: Digest = public_values.read(); - let proof_current_commitment: Digest = public_values.read(); - - assert_eq!( - &epoch1.prev_commitment, &proof_prev_commitment, - "Previous commitment mismatch" - ); - assert_eq!( - &epoch1.current_commitment, &proof_current_commitment, - "Current commitment mismatch" - ); - } - - #[tokio::test] - #[serial] - async fn test_process_operation_add() { - let sequencer = create_test_sequencer().await; - - // First, create an account - let create_op = create_new_account_operation( - "user@example.com".to_string(), - "initial".to_string(), - sequencer.key.clone(), - ) - .operation; - sequencer.process_operation(&create_op).await.unwrap(); - - // Then, add a new value - let add_op = Operation::Add { - id: "user@example.com".to_string(), - value: "new_value".to_string(), - }; - let proof = sequencer.process_operation(&add_op).await.unwrap(); - - assert!(matches!(proof, Proof::Update(_))); - - let hashchain = sequencer.db.get_hashchain("user@example.com").unwrap(); - assert_eq!(hashchain.len(), 2); - assert_eq!(hashchain.get(1).operation.value(), "new_value"); - } - - #[tokio::test] - #[serial] - async fn test_process_operation_revoke() { - let sequencer = create_test_sequencer().await; - - // First, create an account - let create_op = create_new_account_operation( - "user@example.com".to_string(), - "initial".to_string(), - sequencer.key.clone(), - ) - .operation; - sequencer.process_operation(&create_op).await.unwrap(); - - // Then, revoke a value - let revoke_op = Operation::Revoke { - id: "user@example.com".to_string(), - value: "initial".to_string(), - }; - let proof = sequencer.process_operation(&revoke_op).await.unwrap(); - - assert!(matches!(proof, Proof::Update(_))); - - let hashchain = sequencer.db.get_hashchain("user@example.com").unwrap(); - assert_eq!(hashchain.len(), 2); - assert!(matches!( - hashchain.get(1).operation, - Operation::Revoke { .. } - )); - } - - #[tokio::test] - #[serial] - async fn test_process_operation_create_account_duplicate() { - let sequencer = create_test_sequencer().await; - - // Create an account - let create_op = create_new_account_operation( - "user@example.com".to_string(), - "initial".to_string(), - sequencer.key.clone(), - ) - .operation; - sequencer.process_operation(&create_op).await.unwrap(); - - // Try to create the same account again - let result = sequencer.process_operation(&create_op).await; - assert!(result.is_err()); - } - - #[tokio::test] - #[serial] - async fn test_receive_finalized_epochs() { - let sequencer = create_test_sequencer().await; - - // Create some realistic operations - let op1 = create_new_account_operation( - "user1@example.com".to_string(), - "value1".to_string(), - sequencer.key.clone(), - ) - .operation; - let op2 = create_new_account_operation( - "user2@example.com".to_string(), - "value2".to_string(), - sequencer.key.clone(), - ) - .operation; - let op3 = Operation::Add { - id: "user1@example.com".to_string(), - value: "new_value1".to_string(), - }; - - // Create FinalizedEpoch instances - let epoch1 = sequencer.finalize_epoch(vec![op1]).await.unwrap(); - let epoch2 = sequencer.finalize_epoch(vec![op2, op3]).await.unwrap(); - - // Send the epochs to the sequencer - sequencer.send_finalized_epoch(&epoch1).await.unwrap(); - sequencer.send_finalized_epoch(&epoch2).await.unwrap(); - - // Receive and verify the epochs - let received_epochs = sequencer.receive_finalized_epochs().await.unwrap(); - assert_eq!(received_epochs.len(), 2); - - // Verify first epoch - assert_eq!(received_epochs[0].height, epoch1.height); - assert_eq!(received_epochs[0].prev_commitment, epoch1.prev_commitment); - assert_eq!( - received_epochs[0].current_commitment, - epoch1.current_commitment - ); - - // Verify second epoch - assert_eq!(received_epochs[1].height, epoch2.height); - assert_eq!(received_epochs[1].prev_commitment, epoch2.prev_commitment); - assert_eq!( - received_epochs[1].current_commitment, - epoch2.current_commitment - ); + let prev_commitment = tree.get_commitment().unwrap(); + sequencer + .finalize_new_epoch(0, operations, &mut tree) + .await + .unwrap(); - // Verify that the epochs are connected - assert_eq!( - received_epochs[1].prev_commitment, - received_epochs[0].current_commitment - ); + let new_commitment = tree.get_commitment().unwrap(); + assert_ne!(prev_commitment, new_commitment); - // Verify that the buffer is now empty - let empty_epochs = sequencer.receive_finalized_epochs().await.unwrap(); - assert!(empty_epochs.is_empty()); + teardown_db(sequencer.db.clone()); } } diff --git a/crates/prism/src/webserver.rs b/crates/prism/src/webserver.rs index 19bc41cb..3d661a56 100644 --- a/crates/prism/src/webserver.rs +++ b/crates/prism/src/webserver.rs @@ -3,7 +3,6 @@ use crate::{ node_types::sequencer::Sequencer, utils::{verify_signature, SignedContent}, }; -use prism_errors::GeneralError; use anyhow::{Context, Result}; use axum::{ extract::State, @@ -18,6 +17,7 @@ use indexed_merkle_tree::{ Hash as TreeHash, }; use prism_common::{hashchain::Hashchain, operation::Operation}; +use prism_errors::GeneralError; use serde::{Deserialize, Serialize}; use std::{self, str::FromStr, sync::Arc}; use tower_http::cors::CorsLayer; diff --git a/crates/prism/tests/integration_tests.rs b/crates/prism/tests/integration_tests.rs new file mode 100644 index 00000000..ff755eb7 --- /dev/null +++ b/crates/prism/tests/integration_tests.rs @@ -0,0 +1,141 @@ +#![cfg(test)] + +use anyhow::Result; +use ed25519_dalek::{Signer, SigningKey}; +use keystore_rs::create_signing_key; +use prism_common::operation::{AccountSource, Operation}; +use prism_main::{ + cfg::{Config, RedisConfig}, + da::memory::InMemoryDataAvailabilityLayer, + node_types::{lightclient::LightClient, sequencer::Sequencer, NodeType}, + storage::{Database, RedisConnection}, + webserver::OperationInput, +}; +use rand::{rngs::StdRng, Rng, SeedableRng}; +use std::sync::Arc; +use tokio::{spawn, time::Duration}; + +// Assuming 'engine' is a global or comes from a crate import +use base64::{engine::general_purpose::STANDARD as engine, Engine as _}; + +fn create_new_account_operation(id: String, value: String, key: &SigningKey) -> OperationInput { + let incoming = Operation::CreateAccount { + id: id.clone(), + value: value.clone(), + source: AccountSource::SignedBySequencer { + signature: key.sign(format!("{}{}", id, value).as_bytes()).to_string(), + }, + }; + let content = serde_json::to_string(&incoming).unwrap(); + let sig = key.sign(content.as_bytes()); + OperationInput { + operation: incoming, + signed_operation: sig.to_string(), + public_key: engine.encode(key.verifying_key().to_bytes()), + } +} + +fn create_update_operation(id: String, value: String) -> OperationInput { + let key = create_signing_key(); + let incoming = Operation::Add { id, value }; + let content = serde_json::to_string(&incoming).unwrap(); + let sig = key.sign(content.as_bytes()); + OperationInput { + operation: incoming, + signed_operation: sig.to_string(), + public_key: engine.encode(key.verifying_key().to_bytes()), + } +} + +fn setup_db() -> Arc> { + let redis_connection = RedisConnection::new(&RedisConfig::default()).unwrap(); + Arc::new(Box::new(redis_connection) as Box) +} + +#[tokio::test] +async fn test_light_client_sequencer_talking() -> Result<()> { + std::env::set_var("RUST_LOG", "DEBUG"); + pretty_env_logger::init(); + + let (da_layer, mut height_rx, mut _block_rx) = InMemoryDataAvailabilityLayer::new(30); + let da_layer = Arc::new(da_layer); + let db = setup_db(); + let cfg = Config::default(); + let signing_key = create_signing_key(); + let pubkey = engine.encode(signing_key.verifying_key().to_bytes()); + + let sequencer = Arc::new(Sequencer::new( + db.clone(), + da_layer.clone(), + cfg.clone(), + signing_key.clone(), + )?); + + let lightclient = Arc::new(LightClient::new( + da_layer, + cfg.celestia_config.unwrap(), + Some(pubkey), + )); + + let seq_clone = sequencer.clone(); + spawn(async move { + seq_clone.start().await.unwrap(); + }); + + let lc_clone = lightclient.clone(); + spawn(async move { + lc_clone.start().await.unwrap(); + }); + + spawn(async move { + let mut rng = StdRng::from_entropy(); + let mut accounts = Vec::new(); + let mut i = 0; + + loop { + // Create 1 to 10 new accounts + let num_new_accounts = rng.gen_range(1..=10); + for _ in 0..num_new_accounts { + let new_acc = create_new_account_operation( + format!("{}@gmail.com", i), + format!("key_{}", i), + &signing_key, + ); + sequencer + .clone() + .validate_and_queue_update(&new_acc) + .await + .unwrap(); + accounts.push(format!("{}@gmail.com", i)); + i += 1; + } + + // Update 5 random existing accounts (if we have at least 5) + if accounts.len() >= 5 { + for _ in 0..5 { + let account_index = rng.gen_range(0..accounts.len()); + let account_id = accounts[account_index].clone(); + let update_op = create_update_operation( + account_id, + format!("updated_key_{}", rng.gen::()), + ); + sequencer + .clone() + .validate_and_queue_update(&update_op) + .await + .unwrap(); + } + } + + tokio::time::sleep(Duration::from_millis(5000)).await; + } + }); + + while let Ok(height) = height_rx.recv().await { + if height == 5 { + break; + } + } + + Ok(()) +} diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs deleted file mode 100644 index 363cc416..00000000 --- a/tests/integration_tests.rs +++ /dev/null @@ -1,127 +0,0 @@ - -// fn create_new_account_operation(id: String, value: String, key: &SigningKey) -> OperationInput { -// let incoming = Operation::CreateAccount { -// id: id.clone(), -// value: value.clone(), -// source: AccountSource::SignedBySequencer { -// signature: key.sign(format!("{}{}", id, value).as_bytes()).to_string(), -// }, -// }; -// let content = serde_json::to_string(&incoming).unwrap(); -// let sig = key.sign(content.clone().as_bytes()); -// OperationInput { -// operation: incoming, -// signed_operation: sig.to_string(), -// public_key: engine.encode(key.verifying_key().to_bytes()), -// } -// } - -// fn create_update_operation(id: String, value: String) -> OperationInput { -// let key = create_signing_key(); -// let incoming = Operation::Add { id, value }; -// let content = serde_json::to_string(&incoming).unwrap(); -// let sig = key.sign(content.clone().as_bytes()); -// OperationInput { -// operation: incoming, -// signed_operation: sig.to_string(), -// public_key: engine.encode(key.verifying_key().to_bytes()), -// } -// } - -// #[tokio::test] -// async fn test_light_client_sequencer_talking() { -// std::env::set_var("RUST_LOG", "DEBUG"); -// pretty_env_logger::init(); - -// let (da_layer, mut height_rx, mut _block_rx) = InMemoryDataAvailabilityLayer::new(1); -// let da_layer = Arc::new(da_layer); -// let db = Arc::new(setup_db()); -// let cfg = Config::default(); -// let signing_key = create_signing_key(); -// let pubkey = engine.encode(signing_key.verifying_key().to_bytes()); - -// let sequencer = Arc::new( -// Sequencer::new( -// db.clone(), -// da_layer.clone(), -// cfg.clone(), -// signing_key.clone(), -// ) -// .unwrap(), -// ); - -// let lightclient = Arc::new(LightClient::new( -// da_layer, -// cfg.celestia_config.unwrap(), -// Some(pubkey), -// )); - -// let seq_1 = sequencer.clone(); -// tokio::spawn(async move { -// seq_1.start().await.unwrap(); -// }); - -// tokio::spawn(async move { -// lightclient.clone().start().await.unwrap(); -// }); - -// let seq = sequencer.clone(); -// tokio::spawn(async move { -// let mut rng = StdRng::from_entropy(); -// let mut accounts = Vec::new(); -// let mut i = 0; - -// loop { -// let seq_clone = seq.clone(); -// // Create 1 or 2 new accounts -// let num_new_accounts = rng.gen_range(1..=10); -// for _ in 0..num_new_accounts { -// let seq_i = seq_clone.clone(); -// let new_acc = create_new_account_operation( -// format!("{}@gmail.com", i), -// format!("key_{}", i), -// &signing_key, -// ); -// seq_i.validate_and_queue_update(&new_acc).await.unwrap(); -// accounts.push(format!("{}@gmail.com", i)); -// i += 1; -// } - -// // Update 5 random existing accounts (if we have at least 5) -// if accounts.len() >= 5 { -// for _ in 0..5 { -// let seq_i = seq_clone.clone(); -// let account_index = rng.gen_range(0..accounts.len()); -// let account_id = accounts[account_index].clone(); -// let update_op = create_update_operation( -// account_id, -// format!("updated_key_{}", rng.gen::()), -// ); -// seq_i.validate_and_queue_update(&update_op).await.unwrap(); -// } -// } - -// tokio::time::sleep(Duration::from_millis(500)).await; -// } -// }); - -// while let Ok(height) = height_rx.recv().await { -// if height == 60 { -// break; -// } -// } - -// teardown_db(db.clone()) -// } - -// // set up redis connection and flush database before each test -// fn setup_db() -> RedisConnection { -// let redis_connection = RedisConnection::new(&RedisConfig::default()).unwrap(); -// redis_connection.flush_database().unwrap(); -// redis_connection -// } - -// // flush database after each test -// fn teardown_db(redis_connections: Arc) { -// redis_connections.flush_database().unwrap(); -// }