diff --git a/Cargo.lock b/Cargo.lock index bad745bd46e..8693274a813 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5250,6 +5250,7 @@ dependencies = [ "futures", "gear-core", "gprimitives", + "indexmap 2.2.6", "log", "parity-scale-codec", "tokio", @@ -5268,6 +5269,7 @@ dependencies = [ "parity-scale-codec", "secp256k1 0.29.0", "sha3", + "tempfile", ] [[package]] @@ -5290,7 +5292,7 @@ version = "1.5.0" dependencies = [ "anyhow", "ethexe-common", - "ethexe-network", + "ethexe-db", "ethexe-sequencer", "ethexe-signer", "futures", @@ -5298,7 +5300,6 @@ dependencies = [ "log", "parity-scale-codec", "static_init", - "tempfile", ] [[package]] diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index d2edd57115e..8bd5d63cbd0 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -32,12 +32,19 @@ use ethexe_ethereum::router::RouterQuery; use ethexe_network::NetworkReceiverEvent; use ethexe_observer::{BlockData, Event as ObserverEvent}; use ethexe_processor::LocalOutcome; -use ethexe_signer::{PublicKey, Signer}; -use ethexe_validator::Commitment; +use ethexe_sequencer::agro::AggregatedCommitments; +use ethexe_signer::{Digest, PublicKey, Signature, Signer}; +use ethexe_validator::BlockCommitmentValidationRequest; use futures::{future, stream::StreamExt, FutureExt}; use gprimitives::H256; -use parity_scale_codec::Decode; -use std::{future::Future, sync::Arc, time::Duration}; +use parity_scale_codec::{Decode, Encode}; +use std::{ + future::Future, + ops::Not, + sync::Arc, + time::{Duration, Instant}, +}; +use utils::*; /// ethexe service. pub struct Service { @@ -56,6 +63,23 @@ pub struct Service { rpc: Option, } +// TODO: consider to move this to another module #4176 +#[derive(Debug, Clone, Encode, Decode)] +pub enum NetworkMessage { + PublishCommitments { + codes: Option>, + blocks: Option>, + }, + RequestCommitmentsValidation { + codes: Vec, + blocks: Vec, + }, + ApproveCommitments { + codes: Option<(Digest, Signature)>, + blocks: Option<(Digest, Signature)>, + }, +} + impl Service { pub async fn new(config: &Config) -> Result { let blob_reader = Arc::new( @@ -79,8 +103,15 @@ impl Service { .await?; let router_query = RouterQuery::new(&config.ethereum_rpc, ethereum_router_address).await?; + let genesis_block_hash = router_query.genesis_block_hash().await?; - log::info!("👶 Genesis block hash: {genesis_block_hash}"); + log::info!("👶 Genesis block hash: {genesis_block_hash:?}"); + + let validators = router_query.validators().await?; + log::info!("👥 Validators set: {validators:?}"); + + let threshold = router_query.threshold().await?; + log::info!("🔒 Multisig threshold: {threshold} / {}", validators.len()); let query = ethexe_observer::Query::new( Arc::new(db.clone()), @@ -103,6 +134,8 @@ impl Service { ethereum_rpc: config.ethereum_rpc.clone(), sign_tx_public: key, router_address: config.ethereum_router_address, + validators, + threshold, }, signer.clone(), ) @@ -226,7 +259,7 @@ impl Service { let blob_tx_hash = db .code_blob_tx(code_id) - .ok_or(anyhow!("Blob tx hash not found"))?; + .ok_or_else(|| anyhow!("Blob tx hash not found"))?; let code = query.download_code(code_id, blob_tx_hash).await?; @@ -274,7 +307,7 @@ impl Service { // so append it to the `wait for commitment` queue. let mut queue = db .block_commitment_queue(block_hash) - .ok_or(anyhow!("Commitment queue is not found for block"))?; + .ok_or_else(|| anyhow!("Commitment queue is not found for block"))?; queue.push_back(block_hash); db.set_block_commitment_queue(block_hash, queue); } @@ -323,7 +356,7 @@ impl Service { pred_block_hash: block_data.block_hash, prev_commitment_hash: db .block_prev_commitment(block_hash) - .ok_or(anyhow!("Prev commitment not found"))?, + .ok_or_else(|| anyhow!("Prev commitment not found"))?, transitions, }); } @@ -337,12 +370,12 @@ impl Service { processor: &mut ethexe_processor::Processor, maybe_sequencer: &mut Option, observer_event: ethexe_observer::Event, - ) -> Result> { + ) -> Result<(Vec, Vec)> { if let Some(sequencer) = maybe_sequencer { sequencer.process_observer_event(&observer_event)?; } - let commitments = match observer_event { + match observer_event { ObserverEvent::Block(block_data) => { log::info!( "📦 receive a new block {}, hash {}, parent hash {}", @@ -354,24 +387,20 @@ impl Service { let commitments = Self::process_block_event(db, query, processor, block_data).await?; - commitments.into_iter().map(Commitment::Block).collect() + Ok((Vec::new(), commitments)) } ethexe_observer::Event::CodeLoaded { code_id, code } => { let outcomes = processor.process_upload_code(code_id, code.as_slice())?; - - outcomes + let commitments: Vec<_> = outcomes .into_iter() .map(|outcome| match outcome { - LocalOutcome::CodeValidated { id, valid } => { - Commitment::Code(CodeCommitment { id, valid }) - } + LocalOutcome::CodeValidated { id, valid } => CodeCommitment { id, valid }, _ => unreachable!("Only code outcomes are expected here"), }) - .collect() + .collect(); + Ok((commitments, Vec::new())) } - }; - - Ok(commitments) + } } async fn run_inner(self) -> Result<()> { @@ -418,8 +447,6 @@ impl Service { None }; - let mut delay: Option<_> = None; - let mut roles = "Observer".to_string(); if let Some(seq) = sequencer.as_ref() { roles.push_str(&format!(", Sequencer ({})", seq.address())); @@ -429,6 +456,9 @@ impl Service { } log::info!("⚙️ Node service starting, roles: [{}]", roles); + let mut collection_round_timer = StoppableTimer::new(block_time / 4); + let mut validation_round_timer = StoppableTimer::new(block_time / 4); + loop { tokio::select! { observer_event = observer_events.next() => { @@ -437,7 +467,9 @@ impl Service { break; }; - let commitments = Self::process_observer_event( + let is_block_event = matches!(observer_event, ObserverEvent::Block(_)); + + let (code_commitments, block_commitments) = Self::process_observer_event( &db, &mut query, &mut processor, @@ -445,57 +477,58 @@ impl Service { observer_event, ).await?; - if let Some(ref mut validator) = validator { - log::debug!("Pushing commitments to local validator..."); - validator.push_commitments(commitments)?; - - if let Some(ref mut network_sender) = network_sender { - log::debug!("Publishing commitments to network..."); - validator.publish_commitments(network_sender)?; - } - - if let Some(ref mut sequencer) = sequencer { - let origin = validator.pub_key().to_address(); - if validator.has_codes_commit() { - let aggregated_codes_commitments = validator.codes_aggregation()?; - log::debug!("Received ({}) signed code commitments from local validator...", aggregated_codes_commitments.len()); - sequencer.receive_codes_commitment(origin, aggregated_codes_commitments)?; - } - if validator.has_transitions_commit() { - let aggregated_transitions_commitments = validator.blocks_aggregation()?; - log::debug!("Received ({}) signed transition commitments from local validator...", aggregated_transitions_commitments.len()); - sequencer.receive_block_commitment(origin, aggregated_transitions_commitments)?; - } else { - log::debug!("No commitments from local validator..."); - } - } + Self::post_process_commitments( + code_commitments, + block_commitments, + validator.as_mut(), + sequencer.as_mut(), + network_sender.as_mut(), + ).await?; + + if is_block_event { + collection_round_timer.start(); + validation_round_timer.stop(); } + } + _ = collection_round_timer.wait() => { + log::debug!("Collection round timeout, process collected commitments..."); + + Self::process_collected_commitments( + &db, + validator.as_mut(), + sequencer.as_mut(), + network_sender.as_mut() + )?; - log::trace!("Sending timeout after observer event..."); - delay = Some(tokio::time::sleep(block_time / 4)); + collection_round_timer.stop(); + validation_round_timer.start(); } - _ = maybe_await(delay.take()) => { - log::debug!("Sending timeout after block event..."); + _ = validation_round_timer.wait() => { + log::debug!("Validation round timeout, process validated commitments..."); - if let Some(sequencer) = sequencer.as_mut() { - sequencer.process_block_timeout().await?; - } + Self::process_approved_commitments(sequencer.as_mut()).await?; - if let Some(ref mut validator) = validator { - // clean validator state - validator.clear(); - }; + validation_round_timer.stop(); } event = maybe_await(network_receiver.as_mut().map(|rx| rx.recv())) => { - if let Some(NetworkReceiverEvent::Commitments { source, data }) = event { - if let Some(sequencer) = sequencer.as_mut() { - log::debug!("Received p2p commitments from: {:?}", source); + let Some(NetworkReceiverEvent::Message { source, data }) = event else { + continue; + }; - let (origin, (codes_aggregated_commitment, transitions_aggregated_commitment)) = Decode::decode(&mut data.as_slice())?; + log::debug!("Received a network message from peer {source:?}"); - sequencer.receive_codes_commitment(origin, codes_aggregated_commitment)?; - sequencer.receive_block_commitment(origin, transitions_aggregated_commitment)?; - } + let result = Self::process_network_message( + data.as_slice(), + &db, + validator.as_mut(), + sequencer.as_mut(), + network_sender.as_mut(), + ); + + if let Err(err) = result { + // TODO: slash peer/validator in case of error #4175 + // TODO: consider error log as temporary solution #4175 + log::warn!("Failed to process network message: {err}"); } } _ = maybe_await(network_handle.as_mut()) => { @@ -518,13 +551,285 @@ impl Service { err }) } + + async fn post_process_commitments( + code_commitments: Vec, + block_commitments: Vec, + maybe_validator: Option<&mut ethexe_validator::Validator>, + maybe_sequencer: Option<&mut ethexe_sequencer::Sequencer>, + maybe_network_sender: Option<&mut ethexe_network::NetworkSender>, + ) -> Result<()> { + let Some(validator) = maybe_validator else { + return Ok(()); + }; + + if maybe_network_sender.is_none() && maybe_sequencer.is_none() { + return Ok(()); + } + + let aggregated_codes = code_commitments + .is_empty() + .not() + .then(|| validator.aggregate(code_commitments)) + .transpose()?; + let aggregated_blocks = block_commitments + .is_empty() + .not() + .then(|| validator.aggregate(block_commitments)) + .transpose()?; + + if aggregated_codes.is_none() && aggregated_blocks.is_none() { + return Ok(()); + } + + if let Some(network_sender) = maybe_network_sender { + log::debug!("Publishing commitments to network..."); + network_sender.publish_message( + NetworkMessage::PublishCommitments { + codes: aggregated_codes.clone(), + blocks: aggregated_blocks.clone(), + } + .encode(), + ); + } + + if let Some(sequencer) = maybe_sequencer { + if let Some(aggregated) = aggregated_codes { + log::debug!( + "Received ({}) signed code commitments from local validator...", + aggregated.len() + ); + sequencer.receive_code_commitments(aggregated)?; + } + if let Some(aggregated) = aggregated_blocks { + log::debug!( + "Received ({}) signed block commitments from local validator...", + aggregated.len() + ); + sequencer.receive_block_commitments(aggregated)?; + } + } + + Ok(()) + } + + fn process_collected_commitments( + db: &Database, + maybe_validator: Option<&mut ethexe_validator::Validator>, + maybe_sequencer: Option<&mut ethexe_sequencer::Sequencer>, + maybe_network_sender: Option<&mut ethexe_network::NetworkSender>, + ) -> Result<()> { + let Some(sequencer) = maybe_sequencer else { + return Ok(()); + }; + + let Some(chain_head) = sequencer.chain_head() else { + return Err(anyhow!("Chain head is not set in sequencer")); + }; + + // If chain head is not yet processed by this node, this is normal situation, + // so we just skip this round for sequencer. + + let Some(block_is_empty) = db.block_is_empty(chain_head) else { + log::warn!("Failed to get block emptiness status for {chain_head}"); + return Ok(()); + }; + + let last_not_empty_block = match block_is_empty { + true => match db.block_prev_commitment(chain_head) { + Some(prev_commitment) => prev_commitment, + None => { + log::warn!("Failed to get previous commitment for {chain_head}"); + return Ok(()); + } + }, + false => chain_head, + }; + + sequencer.process_collected_commitments(last_not_empty_block)?; + + if maybe_validator.is_none() && maybe_network_sender.is_none() { + return Ok(()); + } + + let code_requests: Vec<_> = sequencer + .get_candidate_code_commitments() + .cloned() + .collect(); + + let block_requests: Vec<_> = sequencer + .get_candidate_block_commitments() + .map(BlockCommitmentValidationRequest::from) + .collect(); + + if let Some(network_sender) = maybe_network_sender { + log::debug!("Request validation of aggregated commitments..."); + + let message = NetworkMessage::RequestCommitmentsValidation { + codes: code_requests.clone(), + blocks: block_requests.clone(), + }; + network_sender.publish_message(message.encode()); + } + + if let Some(validator) = maybe_validator { + log::debug!( + "Validate collected ({}) code commitments and ({}) block commitments...", + code_requests.len(), + block_requests.len() + ); + + // Because sequencer can collect commitments from different sources, + // it's possible that some of collected commitments validation will fail + // on local validator. So we just print warning in this case. + + if code_requests.is_empty().not() { + match validator.validate_code_commitments(db, code_requests) { + Result::Ok((digest, signature)) => { + sequencer.receive_codes_signature(digest, signature)? + } + Result::Err(err) => { + log::warn!("Collected code commitments validation failed: {err}") + } + } + } + + if block_requests.is_empty().not() { + match validator.validate_block_commitments(db, block_requests) { + Result::Ok((digest, signature)) => { + sequencer.receive_blocks_signature(digest, signature)? + } + Result::Err(err) => { + log::warn!("Collected block commitments validation failed: {err}") + } + } + } + } + + Ok(()) + } + + async fn process_approved_commitments( + maybe_sequencer: Option<&mut ethexe_sequencer::Sequencer>, + ) -> Result<()> { + let Some(sequencer) = maybe_sequencer else { + return Ok(()); + }; + + sequencer.submit_multisigned_commitments().await + } + + fn process_network_message( + mut data: &[u8], + db: &Database, + maybe_validator: Option<&mut ethexe_validator::Validator>, + maybe_sequencer: Option<&mut ethexe_sequencer::Sequencer>, + maybe_network_sender: Option<&mut ethexe_network::NetworkSender>, + ) -> Result<()> { + let message = NetworkMessage::decode(&mut data)?; + match message { + NetworkMessage::PublishCommitments { codes, blocks } => { + let Some(sequencer) = maybe_sequencer else { + return Ok(()); + }; + if let Some(aggregated) = codes { + sequencer.receive_code_commitments(aggregated)?; + } + if let Some(aggregated) = blocks { + sequencer.receive_block_commitments(aggregated)?; + } + Ok(()) + } + NetworkMessage::RequestCommitmentsValidation { codes, blocks } => { + let Some(validator) = maybe_validator else { + return Ok(()); + }; + let Some(network_sender) = maybe_network_sender else { + return Ok(()); + }; + + let codes = codes + .is_empty() + .not() + .then(|| validator.validate_code_commitments(db, codes)) + .transpose()?; + + let blocks = blocks + .is_empty() + .not() + .then(|| validator.validate_block_commitments(db, blocks)) + .transpose()?; + + let message = NetworkMessage::ApproveCommitments { codes, blocks }; + network_sender.publish_message(message.encode()); + + Ok(()) + } + NetworkMessage::ApproveCommitments { codes, blocks } => { + let Some(sequencer) = maybe_sequencer else { + return Ok(()); + }; + + if let Some((digest, signature)) = codes { + sequencer.receive_codes_signature(digest, signature)?; + } + + if let Some((digest, signature)) = blocks { + sequencer.receive_blocks_signature(digest, signature)?; + } + + Ok(()) + } + } + } } -pub async fn maybe_await(f: Option) -> F::Output { - if let Some(f) = f { - f.await - } else { - future::pending().await +mod utils { + use super::*; + + pub(crate) struct StoppableTimer { + round_duration: Duration, + started: Option, + } + + impl StoppableTimer { + pub fn new(round_duration: Duration) -> Self { + Self { + round_duration, + started: None, + } + } + + pub fn start(&mut self) { + self.started = Some(Instant::now()); + } + + pub fn stop(&mut self) { + self.started = None; + } + + pub async fn wait(&self) { + maybe_await(self.remaining().map(|rem| tokio::time::sleep(rem))).await; + } + + fn remaining(&self) -> Option { + self.started.map(|started| { + let elapsed = started.elapsed(); + if elapsed < self.round_duration { + self.round_duration - elapsed + } else { + Duration::ZERO + } + }) + } + } + + pub(crate) async fn maybe_await(f: Option) -> F::Output { + if let Some(f) = f { + f.await + } else { + future::pending().await + } } } @@ -540,6 +845,8 @@ mod tests { #[tokio::test] async fn basics() { + gear_utils::init_default_logger(); + let tmp_dir = tempdir().unwrap(); let tmp_dir = tmp_dir.path().to_path_buf(); @@ -550,7 +857,7 @@ mod tests { node_name: "test".to_string(), ethereum_rpc: "ws://54.67.75.1:8546".into(), ethereum_beacon_rpc: "http://localhost:5052".into(), - ethereum_router_address: "0x05069E9045Ca0D2B72840c6A21C7bE588E02089A" + ethereum_router_address: "0xa9e7B594e18e28b1Cc0FA4000D92ded887CB356F" .parse() .expect("infallible"), max_commitment_depth: 1000, @@ -575,7 +882,7 @@ mod tests { node_name: "test".to_string(), ethereum_rpc: "wss://ethereum-holesky-rpc.publicnode.com".into(), ethereum_beacon_rpc: "http://localhost:5052".into(), - ethereum_router_address: "0x05069E9045Ca0D2B72840c6A21C7bE588E02089A" + ethereum_router_address: "0xa9e7B594e18e28b1Cc0FA4000D92ded887CB356F" .parse() .expect("infallible"), max_commitment_depth: 1000, diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 50f45093a1f..e850d2e7a00 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -73,7 +73,10 @@ impl Listener { } pub async fn next_event(&mut self) -> Result { - self.receiver.recv().await.ok_or(anyhow!("No more events")) + self.receiver + .recv() + .await + .ok_or_else(|| anyhow!("No more events")) } pub async fn apply_until( @@ -258,6 +261,8 @@ impl TestEnv { ethereum_rpc: self.rpc_url.clone(), sign_tx_public: self.sequencer_public_key, router_address: self.router_address, + validators: vec![self.validator_public_key.to_address()], + threshold: 1, }, self.signer.clone(), ) diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index 160132b7268..c0b48b95e40 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -88,4 +88,7 @@ pub trait CodesStorage: Send + Sync { fn code_blob_tx(&self, code_id: CodeId) -> Option; fn set_code_blob_tx(&self, code_id: CodeId, blob_tx_hash: H256); + + fn code_valid(&self, code_id: CodeId) -> Option; + fn set_code_valid(&self, code_id: CodeId, valid: bool); } diff --git a/ethexe/common/src/lib.rs b/ethexe/common/src/lib.rs index 54b27fb982b..d0ed6d1f2b9 100644 --- a/ethexe/common/src/lib.rs +++ b/ethexe/common/src/lib.rs @@ -22,14 +22,17 @@ extern crate alloc; -use gprimitives::ActorId; -use parity_scale_codec::{Decode, Encode}; - pub mod db; pub mod mirror; pub mod router; pub mod wvara; +pub use gear_core; +pub use gprimitives; + +use gprimitives::ActorId; +use parity_scale_codec::{Decode, Encode}; + #[derive(Clone, Debug, Encode, Decode)] pub enum BlockEvent { Router(router::Event), diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index b4d76730aa7..ea53ccf06e9 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -52,6 +52,7 @@ enum KeyPrefix { CodeUpload = 7, LatestValidBlock = 8, BlockHeader = 9, + CodeValid = 10, } impl KeyPrefix { @@ -298,7 +299,7 @@ impl CodesStorage for Database { self.kv .iter_prefix(&key_prefix) - .map(|(key, code_id)| { + .map(|#[allow(unused_variables)] (key, code_id)| { let (splitted_key_prefix, program_id) = key.split_at(key_prefix.len()); debug_assert_eq!(splitted_key_prefix, key_prefix); let program_id = @@ -348,6 +349,17 @@ impl CodesStorage for Database { self.kv .put(&KeyPrefix::CodeUpload.one(code_id), blob_tx_hash.encode()); } + + fn code_valid(&self, code_id: CodeId) -> Option { + self.kv.get(&KeyPrefix::CodeValid.one(code_id)).map(|data| { + bool::decode(&mut data.as_slice()).expect("Failed to decode data into `bool`") + }) + } + + fn set_code_valid(&self, code_id: CodeId, approved: bool) { + self.kv + .put(&KeyPrefix::CodeValid.one(code_id), approved.encode()); + } } impl Database { diff --git a/ethexe/ethereum/src/lib.rs b/ethexe/ethereum/src/lib.rs index 0f84723d8c1..654fdc19c87 100644 --- a/ethexe/ethereum/src/lib.rs +++ b/ethexe/ethereum/src/lib.rs @@ -312,9 +312,9 @@ impl SignerSync for Sender { fn sign_hash_sync(&self, hash: &B256) -> SignerResult { let signature = self .signer - .raw_sign_digest(self.sender, hash.0) + .raw_sign_digest(self.sender, hash.0.into()) .map_err(|err| SignerError::Other(err.into()))?; - Ok(Signature::try_from(&signature.0[..])?) + Ok(Signature::try_from(signature.as_ref())?) } fn chain_id_sync(&self) -> Option { diff --git a/ethexe/ethereum/src/mirror/mod.rs b/ethexe/ethereum/src/mirror/mod.rs index dc9b5e60c42..a94fae47d6e 100644 --- a/ethexe/ethereum/src/mirror/mod.rs +++ b/ethexe/ethereum/src/mirror/mod.rs @@ -79,7 +79,7 @@ impl Mirror { } let message_id = - message_id.ok_or(anyhow!("Couldn't find `MessageQueueingRequested` log"))?; + message_id.ok_or_else(|| anyhow!("Couldn't find `MessageQueueingRequested` log"))?; Ok((tx_hash, message_id)) } diff --git a/ethexe/ethereum/src/router/events.rs b/ethexe/ethereum/src/router/events.rs index 441b7d4c08b..83b3c50b535 100644 --- a/ethexe/ethereum/src/router/events.rs +++ b/ethexe/ethereum/src/router/events.rs @@ -62,7 +62,9 @@ pub fn try_extract_event(log: &Log) -> Result> { b if b == BLOCK_COMMITTED => decode_log::(log)?.into(), b if b == CODE_GOT_VALIDATED => decode_log::(log)?.into(), b if b == CODE_VALIDATION_REQUESTED => { - let tx_hash = log.transaction_hash.ok_or(anyhow!("Tx hash not found"))?; + let tx_hash = log + .transaction_hash + .ok_or_else(|| anyhow!("Tx hash not found"))?; let mut event = decode_log::(log)?; diff --git a/ethexe/ethereum/src/router/mod.rs b/ethexe/ethereum/src/router/mod.rs index c6941db6112..6c89fb4ea50 100644 --- a/ethexe/ethereum/src/router/mod.rs +++ b/ethexe/ethereum/src/router/mod.rs @@ -178,7 +178,7 @@ impl Router { } } - let actor_id = actor_id.ok_or(anyhow!("Couldn't find `ProgramCreated` log"))?; + let actor_id = actor_id.ok_or_else(|| anyhow!("Couldn't find `ProgramCreated` log"))?; Ok((tx_hash, actor_id)) } @@ -192,7 +192,7 @@ impl Router { commitments.into_iter().map(Into::into).collect(), signatures .into_iter() - .map(|signature| Bytes::copy_from_slice(&signature.0)) + .map(|signature| Bytes::copy_from_slice(signature.as_ref())) .collect(), ); let tx = builder.send().await?; @@ -211,7 +211,7 @@ impl Router { commitments.into_iter().map(Into::into).collect(), signatures .into_iter() - .map(|signature| Bytes::copy_from_slice(&signature.0)) + .map(|signature| Bytes::copy_from_slice(signature.as_ref())) .collect(), ) .gas(10_000_000); @@ -269,4 +269,22 @@ impl RouterQuery { .map(|res| H256(*res._0)) .map_err(Into::into) } + + pub async fn validators(&self) -> Result> { + self.instance + .validators() + .call() + .await + .map(|res| res._0.into_iter().map(|v| LocalAddress(v.into())).collect()) + .map_err(Into::into) + } + + pub async fn threshold(&self) -> Result { + self.instance + .validatorsThreshold() + .call() + .await + .map(|res| res._0.to()) + .map_err(Into::into) + } } diff --git a/ethexe/network/src/lib.rs b/ethexe/network/src/lib.rs index 429ed29d9ec..fcb2855c227 100644 --- a/ethexe/network/src/lib.rs +++ b/ethexe/network/src/lib.rs @@ -117,7 +117,7 @@ impl NetworkService { #[derive(Debug)] enum NetworkSenderEvent { - PublishCommitments { data: Vec }, + PublishMessage { data: Vec }, RequestDbData(db_sync::Request), } @@ -132,10 +132,10 @@ impl NetworkSender { (Self { tx }, rx) } - pub fn publish_commitments(&self, data: impl Into>) { + pub fn publish_message(&self, data: impl Into>) { let _res = self .tx - .send(NetworkSenderEvent::PublishCommitments { data: data.into() }); + .send(NetworkSenderEvent::PublishMessage { data: data.into() }); } pub fn request_db_data(&self, request: db_sync::Request) { @@ -145,7 +145,7 @@ impl NetworkSender { #[derive(Debug, Eq, PartialEq)] pub enum NetworkReceiverEvent { - Commitments { + Message { source: Option, data: Vec, }, @@ -392,7 +392,7 @@ impl NetworkEventLoop { }) if gpu_commitments_topic().hash() == topic => { let _res = self .external_tx - .send(NetworkReceiverEvent::Commitments { source, data }); + .send(NetworkReceiverEvent::Message { source, data }); } BehaviourEvent::Gossipsub(gossipsub::Event::GossipsubNotSupported { peer_id }) => { log::debug!("`gossipsub` protocol is not supported. Disconnecting..."); @@ -422,7 +422,7 @@ impl NetworkEventLoop { fn handle_network_rx_event(&mut self, event: NetworkSenderEvent) { match event { - NetworkSenderEvent::PublishCommitments { data } => { + NetworkSenderEvent::PublishMessage { data } => { if let Err(e) = self .swarm .behaviour_mut() @@ -582,13 +582,13 @@ mod tests { // Send a commitment from service1 let commitment_data = b"test commitment".to_vec(); - sender.publish_commitments(commitment_data.clone()); + sender.publish_message(commitment_data.clone()); let mut receiver = service2.receiver; // Wait for the commitment to be received by service2 let received_commitment = timeout(Duration::from_secs(5), async { - while let Some(NetworkReceiverEvent::Commitments { source: _, data }) = + while let Some(NetworkReceiverEvent::Message { source: _, data }) = receiver.recv().await { if data == commitment_data { diff --git a/ethexe/observer/src/blobs.rs b/ethexe/observer/src/blobs.rs index a1e0e548bf2..b9ff6d5d93f 100644 --- a/ethexe/observer/src/blobs.rs +++ b/ethexe/observer/src/blobs.rs @@ -113,7 +113,7 @@ impl BlobReader for ConsensusLayerBlobReader { let mut coder = SimpleCoder::default(); let data = coder .decode_all(&blobs) - .ok_or(anyhow!("failed to decode blobs"))? + .ok_or_else(|| anyhow!("failed to decode blobs"))? .concat(); Ok(data) diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index e0ac96532e0..a2b24e21103 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -255,7 +255,7 @@ async fn read_events_impl( let block_hash_of = |log: &alloy::rpc::types::Log| -> Result { log.block_hash .map(|v| v.0.into()) - .ok_or(anyhow!("Block hash is missing")) + .ok_or_else(|| anyhow!("Block hash is missing")) }; let mut res: HashMap<_, Vec<_>> = HashMap::new(); diff --git a/ethexe/observer/src/query.rs b/ethexe/observer/src/query.rs index 0b19f922978..f7f7f916bcf 100644 --- a/ethexe/observer/src/query.rs +++ b/ethexe/observer/src/query.rs @@ -228,9 +228,11 @@ impl Query { let mut actual_commitment_queue: VecDeque = self .database .block_commitment_queue(hash) - .ok_or(anyhow!( - "Commitment queue not found for block {hash}, possible database inconsistency." - ))? + .ok_or_else(|| { + anyhow!( + "Commitment queue not found for block {hash}, possible database inconsistency." + ) + })? .into_iter() .filter(|hash| !committed_blocks.contains(hash)) .collect(); @@ -268,7 +270,7 @@ impl Query { let program_state_hashes = self .database .block_end_program_states(parent) - .ok_or(anyhow!("parent block end states not found"))?; + .ok_or_else(|| anyhow!("parent block end states not found"))?; self.database .set_block_start_program_states(block_hash, program_state_hashes); @@ -276,7 +278,7 @@ impl Query { let queue = self .database .block_commitment_queue(parent) - .ok_or(anyhow!("parent block commitment queue not found"))?; + .ok_or_else(|| anyhow!("parent block commitment queue not found"))?; let committed_blocks = self.get_committed_blocks(block_hash).await?; let current_queue = queue .into_iter() @@ -289,12 +291,12 @@ impl Query { if self .database .block_is_empty(parent) - .ok_or(anyhow!("Cannot identify whether parent is empty"))? + .ok_or_else(|| anyhow!("Cannot identify whether parent is empty"))? { let parent_prev_commitment = self .database .block_prev_commitment(parent) - .ok_or(anyhow!("parent block prev commitment not found"))?; + .ok_or_else(|| anyhow!("parent block prev commitment not found"))?; self.database .set_block_prev_commitment(block_hash, parent_prev_commitment); } else { @@ -312,13 +314,13 @@ impl Query { .provider .get_block_by_hash(block_hash.0.into(), BlockTransactionsKind::Hashes) .await? - .ok_or(anyhow!("Block not found"))?; + .ok_or_else(|| anyhow!("Block not found"))?; let height = u32::try_from( block .header .number - .ok_or(anyhow!("Block number not found"))?, + .ok_or_else(|| anyhow!("Block number not found"))?, ) .unwrap_or_else(|err| unreachable!("Ethereum block number not fit in u32: {err}")); let timestamp = block.header.timestamp; diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 141555e7993..88a67444b41 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -218,8 +218,9 @@ impl Processor { ) -> Result> { log::debug!("Processing upload code {code_id:?}"); - let valid = !(code_id != CodeId::generate(code) || self.handle_new_code(code)?.is_none()); + let valid = code_id == CodeId::generate(code) && self.handle_new_code(code)?.is_some(); + self.db.set_code_valid(code_id, valid); Ok(vec![LocalOutcome::CodeValidated { id: code_id, valid }]) } diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 0fa3282a04e..6f9987287f4 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -42,11 +42,9 @@ impl RpcModule { impl RpcApiServer for RpcModule { async fn block_header(&self, hash: H256) -> RpcResult { // let db = db.lock().await; - self.db.block_header(hash).ok_or(ErrorObject::borrowed( - ErrorCode::InvalidParams.code(), - "Block not found", - None, - )) + self.db.block_header(hash).ok_or_else(|| { + ErrorObject::borrowed(ErrorCode::InvalidParams.code(), "Block not found", None) + }) } } diff --git a/ethexe/sequencer/Cargo.toml b/ethexe/sequencer/Cargo.toml index 2a1bdd6d6a4..4e0c9a18285 100644 --- a/ethexe/sequencer/Cargo.toml +++ b/ethexe/sequencer/Cargo.toml @@ -23,3 +23,4 @@ log.workspace = true anyhow.workspace = true futures.workspace = true tokio.workspace = true +indexmap.workspace = true diff --git a/ethexe/sequencer/src/agro.rs b/ethexe/sequencer/src/agro.rs index 927f3d4527c..4d510ac0638 100644 --- a/ethexe/sequencer/src/agro.rs +++ b/ethexe/sequencer/src/agro.rs @@ -18,207 +18,40 @@ //! Abstract commitment aggregator. -use anyhow::Result; -use ethexe_common::router::{ - BlockCommitment, CodeCommitment, OutgoingMessage, StateTransition, ValueClaim, -}; -use ethexe_signer::{hash, Address, PublicKey, Signature, Signer}; -use gprimitives::{MessageId, H256}; +use anyhow::{anyhow, Result}; +use ethexe_signer::{Address, Digest, PublicKey, Signature, Signer, ToDigest}; +use indexmap::IndexSet; use parity_scale_codec::{Decode, Encode}; -use std::{ - collections::{HashMap, HashSet}, - fmt, -}; +use std::collections::BTreeMap; -pub trait SeqHash { - fn hash(&self) -> H256; -} - -#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, Hash)] -pub struct AggregatedCommitments { +#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)] +pub struct AggregatedCommitments { pub commitments: Vec, pub signature: Signature, } -#[derive(Debug)] -pub struct LinkedAggregation { - pub aggregated: AggregatedCommitments, - pub previous: Option, -} - -#[derive(Debug)] -pub struct AggregatedQueue { - all_commitments: HashMap>, - last: H256, -} - -// TODO: REMOVE THIS IMPL. SeqHash makes sense only for `ethexe_ethereum` types. -// identity hashing -impl SeqHash for CodeCommitment { - fn hash(&self) -> H256 { - hash(&self.encode()) - } -} - -// TODO: REMOVE THIS IMPL. SeqHash makes sense only for `ethexe_ethereum` types. -impl SeqHash for StateTransition { - fn hash(&self) -> H256 { - // State transition basic fields. - - let state_transition_size = // concat of fields: - // actorId - size_of::
() - // newStateHash - + size_of::() - // valueToReceive - + size_of::() - // hash(valueClaimsBytes) - + size_of::() - // hash(messagesHashesBytes) - + size_of::(); - - let mut state_transition_bytes = Vec::with_capacity(state_transition_size); - - state_transition_bytes.extend_from_slice(self.actor_id.to_address_lossy().as_bytes()); - state_transition_bytes.extend_from_slice(self.new_state_hash.as_bytes()); - state_transition_bytes.extend_from_slice(self.value_to_receive.to_be_bytes().as_slice()); - - // TODO (breathx): consider SeqHash for ValueClaim, so hashing of inner fields. - // Value claims field. - - let value_claim_size = // concat of fields: - // messageId - size_of::() - // destination - + size_of::
() - // value - + size_of::(); - - let mut value_claims_bytes = Vec::with_capacity(self.value_claims.len() * value_claim_size); - - for ValueClaim { - message_id, - destination, - value, - } in &self.value_claims - { - value_claims_bytes.extend_from_slice(message_id.as_ref()); - value_claims_bytes.extend_from_slice(destination.to_address_lossy().as_bytes()); - // TODO (breathx): double check if we should use BIG endian. - value_claims_bytes.extend_from_slice(value.to_be_bytes().as_slice()) - } - - let value_claims_hash = hash(&value_claims_bytes); - state_transition_bytes.extend_from_slice(value_claims_hash.as_bytes()); - - // Messages field. - - let messages_hashes_hash = self.messages.hash(); - state_transition_bytes.extend_from_slice(messages_hashes_hash.as_bytes()); - - hash(&state_transition_bytes) - } -} - -impl SeqHash for OutgoingMessage { - fn hash(&self) -> H256 { - let message_size = // concat of fields: - // id - size_of::() - // destination - + size_of::
() - // payload - + self.payload.len() - // value - + size_of::() - // replyDetails.to - + size_of::() - // replyDetails.code - + size_of::<[u8; 4]>(); - - let mut message = Vec::with_capacity(message_size); - - message.extend_from_slice(self.id.as_ref()); - message.extend_from_slice(self.destination.to_address_lossy().as_bytes()); - message.extend_from_slice(&self.payload); - // TODO (breathx): double check big endian. - message.extend_from_slice(self.value.to_be_bytes().as_slice()); - - let (reply_details_to, reply_details_code) = - self.reply_details.unwrap_or_default().into_parts(); - - message.extend_from_slice(reply_details_to.as_ref()); - message.extend_from_slice(reply_details_code.to_bytes().as_slice()); - - hash(&message) - } -} - -impl SeqHash for BlockCommitment { - fn hash(&self) -> H256 { - let block_commitment_size = // concat of fields: - // blockHash - size_of::() - // prevCommitmentHash - + size_of::() - // predBlockHash - + size_of::() - // hash(transitionsHashesBytes) - + size_of::(); - - let mut block_commitment_bytes = Vec::with_capacity(block_commitment_size); - - block_commitment_bytes.extend_from_slice(self.block_hash.as_bytes()); - block_commitment_bytes.extend_from_slice(self.prev_commitment_hash.as_bytes()); - block_commitment_bytes.extend_from_slice(self.pred_block_hash.as_bytes()); - block_commitment_bytes.extend_from_slice(self.transitions.hash().as_bytes()); - - hash(&block_commitment_bytes) - } -} - -impl AggregatedCommitments { - pub fn hash(&self) -> H256 { - let mut array = Vec::new(); - for commitment in &self.commitments { - array.extend_from_slice(commitment.hash().as_ref()); - } - hash(&array) - } -} - -impl SeqHash for Vec { - fn hash(&self) -> H256 { - let mut array = Vec::new(); - for commitment in self { - array.extend_from_slice(commitment.hash().as_ref()); - } - hash(&array) - } -} - -impl AggregatedCommitments { +impl AggregatedCommitments { pub fn aggregate_commitments( commitments: Vec, signer: &Signer, pub_key: PublicKey, router_address: Address, ) -> Result> { - let mut aggregated = AggregatedCommitments { - commitments, - signature: Signature::default(), - }; - - let buffer = [ - [0x19, 0x00].as_ref(), - router_address.0.as_ref(), - aggregated.commitments.hash().as_ref(), - ] - .concat(); + let signature = + sign_commitments_digest(commitments.to_digest(), signer, pub_key, router_address)?; - aggregated.signature = signer.sign_digest(pub_key, hash(&buffer).to_fixed_bytes())?; + Ok(AggregatedCommitments { + commitments, + signature, + }) + } - Ok(aggregated) + pub fn recover(&self, router_address: Address) -> Result
{ + recover_from_commitments_digest( + self.commitments.to_digest(), + &self.signature, + router_address, + ) } pub fn len(&self) -> usize { @@ -230,278 +63,245 @@ impl AggregatedCommitments { } } -impl AggregatedQueue { - pub fn new(initial: AggregatedCommitments) -> Self { - let hash = initial.hash(); - let mut all_commitments = HashMap::new(); - all_commitments.insert( - hash, - LinkedAggregation { - aggregated: initial, - previous: None, - }, - ); - Self { - all_commitments, - last: hash, +pub(crate) struct MultisignedCommitmentDigests { + digest: Digest, + digests: IndexSet, + signatures: BTreeMap, +} + +impl MultisignedCommitmentDigests { + pub fn new(digests: IndexSet) -> Result { + if digests.is_empty() { + return Err(anyhow!("Empty commitments digests")); } + + Ok(Self { + digest: digests.iter().collect(), + digests, + signatures: BTreeMap::new(), + }) } - pub fn push(&mut self, commitment: AggregatedCommitments) { - let hash = commitment.hash(); + pub fn append_signature_with_check( + &mut self, + digest: Digest, + signature: Signature, + router_address: Address, + check_origin: impl FnOnce(Address) -> Result<()>, + ) -> Result<()> { + if self.digest != digest { + return Err(anyhow!("Aggregated commitments digest mismatch")); + } - let new_queue = LinkedAggregation { - aggregated: commitment, - previous: Some(self.last), - }; - self.last = hash; + let origin = recover_from_commitments_digest(digest, &signature, router_address)?; + check_origin(origin)?; - self.all_commitments.insert(hash, new_queue); - } + self.signatures.insert(origin, signature); - pub fn get_signature(&self, commitment: H256) -> Option { - self.all_commitments - .get(&commitment) - .map(|c| c.aggregated.signature.clone()) + Ok(()) } - pub fn previous(&self, commitment: H256) -> Option { - self.all_commitments - .get(&commitment) - .and_then(|c| c.previous) + pub fn digests(&self) -> &IndexSet { + &self.digests } -} - -#[derive(Clone)] -pub struct MultisignedCommitments { - pub commitments: Vec, - pub sources: Vec
, - pub signatures: Vec, -} -impl fmt::Debug for MultisignedCommitments { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "MultisignedCommitments {{ commitments: {:?}, sources: {:?}, signatures: {:?} }}", - self.commitments, self.sources, self.signatures - ) + pub fn signatures(&self) -> &BTreeMap { + &self.signatures } } -pub struct Aggregator { - threshold: usize, - - aggregated: HashMap>, - plain_commitments: HashMap>, - - rolling: Option, +pub(crate) struct MultisignedCommitments { + commitments: Vec, + signatures: BTreeMap, } -impl Aggregator { - pub fn new(threshold: usize) -> Self { +impl MultisignedCommitments { + pub fn from_multisigned_digests( + multisigned: MultisignedCommitmentDigests, + get_commitment: impl FnMut(Digest) -> C, + ) -> Self { + let MultisignedCommitmentDigests { + digests, + signatures, + .. + } = multisigned; + Self { - threshold, - aggregated: HashMap::new(), - plain_commitments: HashMap::new(), - rolling: None, + commitments: digests.into_iter().map(get_commitment).collect(), + signatures, } } - pub fn push(&mut self, origin: Address, aggregated: AggregatedCommitments) { - let hash = aggregated.hash(); - - self.plain_commitments - .insert(hash, aggregated.commitments.clone()); - - self.aggregated - .entry(origin) - .and_modify(|q| { - q.push(aggregated.clone()); - }) - .or_insert_with(move || AggregatedQueue::new(aggregated)); - - self.rolling = Some(hash); + pub fn commitments(&self) -> &[C] { + &self.commitments } - pub fn len(&self) -> usize { - self.aggregated.len() + pub fn into_parts(self) -> (Vec, BTreeMap) { + (self.commitments, self.signatures) } +} - pub fn find_root(self) -> Option> { - use std::collections::VecDeque; - - // Start only with the root - let mut candidates = VecDeque::new(); - let mut checked = HashSet::new(); - candidates.push_back(self.rolling?); - checked.insert(self.rolling?); - - while let Some(candidate_hash) = candidates.pop_front() { - // check if we can find `threshold` amount of this `candidate_hash` - let mut sources = vec![]; - let mut signatures = vec![]; - - for (source, queue) in self.aggregated.iter() { - if let Some(signature) = queue.get_signature(candidate_hash) { - sources.push(*source); - signatures.push(signature.clone()); - } - } - - if signatures.len() >= self.threshold { - // found our candidate - let plain_commitments = self.plain_commitments.get(&candidate_hash) - .expect("Plain commitments should be present, as they are always updated when Aggregator::push is invoked; qed"); - - let multi_signed = MultisignedCommitments { - commitments: plain_commitments.clone(), - sources, - signatures, - }; - - return Some(multi_signed); - } - - // else we try to find as many candidates as possible - for queue in self.aggregated.values() { - if let Some(previous) = queue.previous(candidate_hash) { - if checked.contains(&previous) { - continue; - } - candidates.push_back(previous); - checked.insert(previous); - } - } - } +pub fn sign_commitments_digest( + commitments_digest: Digest, + signer: &Signer, + pub_key: PublicKey, + router_address: Address, +) -> Result { + let digest = to_router_digest(commitments_digest, router_address); + signer.sign_digest(pub_key, digest) +} - None - } +fn recover_from_commitments_digest( + commitments_digest: Digest, + signature: &Signature, + router_address: Address, +) -> Result
{ + signature + .recover_from_digest(to_router_digest(commitments_digest, router_address)) + .map(|k| k.to_address()) +} + +fn to_router_digest(commitments_digest: Digest, router_address: Address) -> Digest { + // See explanation: https://eips.ethereum.org/EIPS/eip-191 + [ + [0x19, 0x00].as_ref(), + router_address.0.as_ref(), + commitments_digest.as_ref(), + ] + .concat() + .to_digest() } #[cfg(test)] mod tests { - use super::*; - use ethexe_signer::{Address, Signature}; - use gear_core::ids::ActorId; + use ethexe_signer::{ + sha3::{Digest as _, Keccak256}, + PrivateKey, + }; + use std::str::FromStr; - #[derive(Clone, Debug)] + #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct MyComm([u8; 2]); - impl SeqHash for MyComm { - fn hash(&self) -> H256 { - hash(&self.0[..]) + impl ToDigest for MyComm { + fn update_hasher(&self, hasher: &mut Keccak256) { + hasher.update(self.0); } } - fn signer(id: u8) -> Address { - let mut array = [0; 20]; - array[0] = id; - Address(array) - } + #[test] + fn test_sign_digest() { + let signer = Signer::tmp(); - fn signature(id: u8) -> Signature { - let mut array = [0; 65]; - array[0] = id; - Signature::from(array) - } + let private_key = PrivateKey::from_str( + "4c0883a69102937d6231471b5dbb6204fe51296170827936ea5cce4b76994b0f", + ) + .unwrap(); + let pub_key = signer.add_key(private_key).unwrap(); - #[allow(unused)] - fn block_hash(id: u8) -> H256 { - let mut array = [0; 32]; - array[0] = id; - H256::from(array) - } + let router_address = Address([0x01; 20]); + let commitments = [MyComm([1, 2]), MyComm([3, 4])]; - #[allow(unused)] - fn pid(id: u8) -> ActorId { - let mut array = [0; 32]; - array[0] = id; - ActorId::from(array) - } + let commitments_digest = commitments.to_digest(); + let signature = + sign_commitments_digest(commitments_digest, &signer, pub_key, router_address).unwrap(); + let recovered = + recover_from_commitments_digest(commitments_digest, &signature, router_address) + .unwrap(); - #[allow(unused)] - fn state_id(id: u8) -> H256 { - let mut array = [0; 32]; - array[0] = id; - H256::from(array) + assert_eq!(recovered, pub_key.to_address()); } - fn gen_commitment( - signature_id: u8, - commitments: Vec<(u8, u8)>, - ) -> AggregatedCommitments { - let commitments = commitments - .into_iter() - .map(|v| MyComm([v.0, v.1])) - .collect(); + #[test] + fn test_aggregated_commitments() { + let signer = Signer::tmp(); - AggregatedCommitments { - commitments, - signature: signature(signature_id), - } - } + let private_key = PrivateKey::from_str( + "4c0883a69102937d6231471b5dbb6204fe51296170827936ea5cce4b76994b0f", + ) + .unwrap(); + let pub_key = signer.add_key(private_key).unwrap(); - #[test] - fn simple() { - // aggregator with threshold 1 - let mut aggregator = Aggregator::new(1); + let router_address = Address([0x01; 20]); + let commitments = vec![MyComm([1, 2]), MyComm([3, 4])]; - aggregator.push(signer(1), gen_commitment(0, vec![(1, 1)])); + let agg = AggregatedCommitments::aggregate_commitments( + commitments, + &signer, + pub_key, + router_address, + ) + .unwrap(); + let recovered = agg.recover(router_address).unwrap(); - let root = aggregator - .find_root() - .expect("Failed to generate root commitment"); + assert_eq!(recovered, pub_key.to_address()); + } - assert_eq!(root.signatures.len(), 1); - assert_eq!(root.commitments.len(), 1); + #[test] + fn test_multisigned_commitment_digests() { + let signer = Signer::tmp(); - // aggregator with threshold 1 - let mut aggregator = Aggregator::new(1); + let private_key = PrivateKey([1; 32]); + let pub_key = signer.add_key(private_key).unwrap(); - aggregator.push(signer(1), gen_commitment(0, vec![(1, 1)])); - aggregator.push(signer(1), gen_commitment(1, vec![(1, 1), (2, 2)])); + let router_address = Address([0x01; 20]); + let commitments = [MyComm([1, 2]), MyComm([3, 4])]; + let digests: IndexSet<_> = commitments.map(|c| c.to_digest()).into_iter().collect(); - let root = aggregator - .find_root() - .expect("Failed to generate root commitment"); + let mut multisigned = MultisignedCommitmentDigests::new(digests.clone()).unwrap(); + assert_eq!(multisigned.digests(), &digests); + assert_eq!(multisigned.signatures().len(), 0); - assert_eq!(root.signatures.len(), 1); + let commitments_digest = commitments.to_digest(); + let signature = + sign_commitments_digest(commitments_digest, &signer, pub_key, router_address).unwrap(); - // should be latest commitment - assert_eq!(root.commitments.len(), 2); + multisigned + .append_signature_with_check(commitments_digest, signature, router_address, |_| Ok(())) + .unwrap(); + assert_eq!(multisigned.digests(), &digests); + assert_eq!(multisigned.signatures().len(), 1); } #[test] - fn more_threshold() { - // aggregator with threshold 2 - let mut aggregator = Aggregator::new(2); - - aggregator.push(signer(1), gen_commitment(0, vec![(1, 1)])); - aggregator.push(signer(2), gen_commitment(0, vec![(1, 1)])); - aggregator.push(signer(2), gen_commitment(0, vec![(1, 1), (2, 2)])); + fn test_multisigned_commitments() { + let signer = Signer::tmp(); - let root = aggregator - .find_root() - .expect("Failed to generate root commitment"); + let private_key = PrivateKey([1; 32]); + let pub_key = signer.add_key(private_key).unwrap(); - assert_eq!(root.signatures.len(), 2); - assert_eq!(root.commitments.len(), 1); // only (1, 1) is committed by both aggregators - - // aggregator with threshold 2 - let mut aggregator = Aggregator::new(2); - - aggregator.push(signer(1), gen_commitment(0, vec![(1, 1)])); - aggregator.push(signer(2), gen_commitment(0, vec![(1, 1)])); - aggregator.push(signer(2), gen_commitment(0, vec![(1, 1), (2, 2)])); - aggregator.push(signer(1), gen_commitment(0, vec![(1, 1), (2, 2)])); - - let root = aggregator - .find_root() - .expect("Failed to generate root commitment"); + let router_address = Address([1; 20]); + let commitments = [MyComm([1, 2]), MyComm([3, 4])]; + let digests = commitments.map(|c| c.to_digest()); + let mut commitments_map: BTreeMap<_, _> = commitments + .into_iter() + .map(|c| (c.to_digest(), c)) + .collect(); - assert_eq!(root.signatures.len(), 2); - assert_eq!(root.commitments.len(), 2); // both (1, 1) and (2, 2) is committed by both aggregators + let mut multisigned = + MultisignedCommitmentDigests::new(digests.into_iter().collect()).unwrap(); + let commitments_digest = commitments.to_digest(); + let signature = + sign_commitments_digest(commitments_digest, &signer, pub_key, router_address).unwrap(); + + multisigned + .append_signature_with_check(commitments_digest, signature, router_address, |_| Ok(())) + .unwrap(); + + let multisigned_commitments = + MultisignedCommitments::from_multisigned_digests(multisigned, |d| { + commitments_map.remove(&d).unwrap() + }); + + assert_eq!(multisigned_commitments.commitments(), commitments); + + let parts = multisigned_commitments.into_parts(); + assert_eq!(parts.0.as_slice(), commitments.as_slice()); + assert_eq!(parts.1.len(), 1); + parts.1.into_iter().for_each(|(k, v)| { + assert_eq!(k, pub_key.to_address()); + assert_eq!(v, signature); + }); } } diff --git a/ethexe/sequencer/src/lib.rs b/ethexe/sequencer/src/lib.rs index 72b1ee578c4..a3e726c3d3f 100644 --- a/ethexe/sequencer/src/lib.rs +++ b/ethexe/sequencer/src/lib.rs @@ -18,52 +18,68 @@ //! Sequencer for ethexe. -mod agro; +pub mod agro; -use agro::{Aggregator, MultisignedCommitments}; -use anyhow::Result; +use agro::{AggregatedCommitments, MultisignedCommitmentDigests, MultisignedCommitments}; +use anyhow::{anyhow, Result}; use ethexe_common::router::{BlockCommitment, CodeCommitment}; use ethexe_ethereum::Ethereum; use ethexe_observer::Event; -use ethexe_signer::{Address, PublicKey, Signer}; -use std::mem; +use ethexe_signer::{Address, Digest, PublicKey, Signature, Signer, ToDigest}; +use gprimitives::H256; +use indexmap::IndexSet; +use std::{ + collections::{BTreeMap, BTreeSet, HashSet, VecDeque}, + ops::Not, +}; use tokio::sync::watch; -pub use agro::AggregatedCommitments; +pub struct Sequencer { + key: PublicKey, + ethereum: Ethereum, + + validators: HashSet
, + threshold: u64, + + code_commitments: CommitmentsMap, + block_commitments: CommitmentsMap, + + codes_candidate: Option, + blocks_candidate: Option, + chain_head: Option, + + status: SequencerStatus, + status_sender: watch::Sender, +} pub struct Config { pub ethereum_rpc: String, pub sign_tx_public: PublicKey, pub router_address: Address, + pub validators: Vec
, + pub threshold: u64, } #[derive(Debug, Clone, Copy, Default)] pub struct SequencerStatus { + // TODO: change this to code and blocks commitments in the commitments map #4177 pub aggregated_commitments: u64, pub submitted_code_commitments: u64, pub submitted_block_commitments: u64, } -#[allow(unused)] -pub struct Sequencer { - signer: Signer, - ethereum_rpc: String, - key: PublicKey, - codes_aggregation: Aggregator, - blocks_aggregation: Aggregator, - ethereum: Ethereum, - status: SequencerStatus, - status_sender: watch::Sender, +#[derive(Debug, Clone, PartialEq, Eq)] +struct CommitmentAndOrigins { + commitment: C, + origins: BTreeSet
, } +type CommitmentsMap = BTreeMap>; + impl Sequencer { pub async fn new(config: &Config, signer: Signer) -> Result { let (status_sender, _status_receiver) = watch::channel(SequencerStatus::default()); - Ok(Self { - signer: signer.clone(), - ethereum_rpc: config.ethereum_rpc.clone(), - codes_aggregation: Aggregator::new(1), - blocks_aggregation: Aggregator::new(1), + Ok(Sequencer { key: config.sign_tx_public, ethereum: Ethereum::new( &config.ethereum_rpc, @@ -72,79 +88,88 @@ impl Sequencer { config.sign_tx_public.to_address(), ) .await?, - status: SequencerStatus::default(), + validators: config.validators.iter().cloned().collect(), + threshold: config.threshold, + code_commitments: Default::default(), + block_commitments: Default::default(), + codes_candidate: Default::default(), + blocks_candidate: Default::default(), + chain_head: Default::default(), + status: Default::default(), status_sender, }) } + pub fn chain_head(&self) -> Option { + self.chain_head + } + // This function should never block. pub fn process_observer_event(&mut self, event: &Event) -> Result<()> { - self.update_status(|status| { - *status = SequencerStatus::default(); - }); - match event { - Event::Block(data) => { - log::debug!( - "Processing events for {:?} (parent: {:?})", - data.block_hash, - data.parent_hash - ); + if let Event::Block(block_data) = event { + // Reset status, candidates and chain-head each block event - if self.codes_aggregation.len() > 0 { - log::debug!( - "Building on top of existing aggregation of {} commitments", - self.codes_aggregation.len() - ); - } - } - Event::CodeLoaded { code_id, .. } => { - log::debug!("Observed code_hash#{code_id:?}. Waiting for inclusion...") - } + self.update_status(|status| { + *status = SequencerStatus::default(); + }); + + self.codes_candidate = None; + self.blocks_candidate = None; + self.chain_head = Some(block_data.block_hash); } Ok(()) } - pub async fn process_block_timeout(&mut self) -> Result<()> { - log::debug!("Block timeout reached. Submitting aggregated commitments"); + /// Process collected by sequencer commitments and prepare them for submission. + /// + /// `from_block` is the block hash, + /// from which the sequencer should start collecting block commitments list. + /// If `from_block` is not collected yet by the sequencer, then nothing will be done. + pub fn process_collected_commitments(&mut self, from_block: H256) -> Result<()> { + if self.codes_candidate.is_some() || self.blocks_candidate.is_some() { + return Err(anyhow!("Previous commitments candidate are not submitted")); + } + + self.codes_candidate = + Self::code_commitments_candidate(&self.code_commitments, self.threshold); + + self.blocks_candidate = + Self::block_commitments_candidate(&self.block_commitments, from_block, self.threshold); + + Ok(()) + } + pub async fn submit_multisigned_commitments(&mut self) -> Result<()> { let mut codes_future = None; let mut blocks_future = None; let mut code_commitments_len = 0; let mut block_commitments_len = 0; - let codes_aggregation = mem::replace(&mut self.codes_aggregation, Aggregator::new(1)); - let blocks_aggregation = mem::replace(&mut self.blocks_aggregation, Aggregator::new(1)); + let codes_candidate = Self::process_multisigned_candidate( + &mut self.codes_candidate, + &mut self.code_commitments, + self.threshold, + ); - if codes_aggregation.len() > 0 { - log::debug!( - "Collected some {0} code commitments. Trying to submit...", - codes_aggregation.len() - ); + let blocks_candidate = Self::process_multisigned_candidate( + &mut self.blocks_candidate, + &mut self.block_commitments, + self.threshold, + ); - if let Some(code_commitments) = codes_aggregation.find_root() { - log::debug!("Achieved consensus on code commitments. Submitting..."); + if let Some(candidate) = codes_candidate { + code_commitments_len = candidate.commitments().len() as u64; + log::debug!("Collected {code_commitments_len} code commitments. Trying to submit..."); - code_commitments_len = code_commitments.commitments.len() as u64; - codes_future = Some(self.submit_codes_commitment(code_commitments)); - } else { - log::debug!("No consensus on code commitments found. Discarding..."); - } + codes_future = Some(self.submit_codes_commitment(candidate)); }; - if blocks_aggregation.len() > 0 { - log::debug!( - "Collected some {0} transition commitments. Trying to submit...", - blocks_aggregation.len() - ); - - if let Some(block_commitments) = blocks_aggregation.find_root() { - log::debug!("Achieved consensus on transition commitments. Submitting..."); - block_commitments_len = block_commitments.commitments.len() as u64; - blocks_future = Some(self.submit_block_commitments(block_commitments)); - } else { - log::debug!("No consensus on code commitments found. Discarding..."); - } + if let Some(candidate) = blocks_candidate { + block_commitments_len = candidate.commitments().len() as u64; + log::debug!("Collected {block_commitments_len} block commitments. Trying to submit...",); + + blocks_future = Some(self.submit_block_commitments(candidate)); }; match (codes_future, blocks_future) { @@ -166,18 +191,176 @@ impl Sequencer { Ok(()) } + pub fn receive_code_commitments( + &mut self, + aggregated: AggregatedCommitments, + ) -> Result<()> { + Self::receive_commitments( + aggregated, + &self.validators, + self.ethereum.router().address(), + &mut self.code_commitments, + ) + } + + pub fn receive_block_commitments( + &mut self, + aggregated: AggregatedCommitments, + ) -> Result<()> { + Self::receive_commitments( + aggregated, + &self.validators, + self.ethereum.router().address(), + &mut self.block_commitments, + ) + } + + pub fn receive_codes_signature(&mut self, digest: Digest, signature: Signature) -> Result<()> { + Self::receive_signature( + digest, + signature, + &self.validators, + self.ethereum.router().address(), + self.codes_candidate.as_mut(), + ) + } + + pub fn receive_blocks_signature(&mut self, digest: Digest, signature: Signature) -> Result<()> { + Self::receive_signature( + digest, + signature, + &self.validators, + self.ethereum.router().address(), + self.blocks_candidate.as_mut(), + ) + } + + pub fn address(&self) -> Address { + self.key.to_address() + } + + pub fn get_status_receiver(&self) -> watch::Receiver { + self.status_sender.subscribe() + } + + pub fn get_candidate_code_commitments(&self) -> impl Iterator + '_ { + Self::get_candidate_commitments(&self.codes_candidate, &self.code_commitments) + } + + pub fn get_candidate_block_commitments(&self) -> impl Iterator + '_ { + Self::get_candidate_commitments(&self.blocks_candidate, &self.block_commitments) + } + + fn get_candidate_commitments<'a, C>( + candidate: &'a Option, + commitments: &'a CommitmentsMap, + ) -> impl Iterator + 'a { + candidate + .iter() + .flat_map(|candidate| candidate.digests().iter()) + .map(|digest| { + commitments + .get(digest) + .map(|c| &c.commitment) + .unwrap_or_else(|| { + unreachable!("Guarantied by `Sequencer` implementation to be in the map") + }) + }) + } + + fn block_commitments_candidate( + commitments: &CommitmentsMap, + from_block: H256, + threshold: u64, + ) -> Option { + let suitable_commitments: BTreeMap<_, _> = commitments + .iter() + .filter_map(|(digest, c)| { + (c.origins.len() as u64 >= threshold) + .then_some((c.commitment.block_hash, (digest, &c.commitment))) + }) + .collect(); + + let mut candidate = VecDeque::new(); + let mut block_hash = from_block; + loop { + let Some((digest, commitment)) = suitable_commitments.get(&block_hash) else { + break; + }; + + candidate.push_front(**digest); + + block_hash = commitment.prev_commitment_hash; + } + + if candidate.is_empty() { + return None; + } + + let candidate = MultisignedCommitmentDigests::new(candidate.into_iter().collect()) + .unwrap_or_else(|err| { + unreachable!( + "Guarantied by impl to be non-empty and without duplicates, but get: {err}" + ); + }); + + Some(candidate) + } + + fn code_commitments_candidate( + commitments: &CommitmentsMap, + threshold: u64, + ) -> Option { + let suitable_commitment_digests: IndexSet<_> = commitments + .iter() + .filter_map(|(&digest, c)| (c.origins.len() as u64 >= threshold).then_some(digest)) + .collect(); + + if suitable_commitment_digests.is_empty() { + return None; + } + + Some( + MultisignedCommitmentDigests::new(suitable_commitment_digests).unwrap_or_else(|err| { + unreachable!("Guarantied by impl to be non-empty, but get: {err}"); + }), + ) + } + + fn process_multisigned_candidate( + candidate: &mut Option, + commitments: &mut CommitmentsMap, + threshold: u64, + ) -> Option> { + if candidate + .as_ref() + .map(|c| threshold > c.signatures().len() as u64) + .unwrap_or(true) + { + return None; + } + + let candidate = candidate.take()?; + let multisigned = MultisignedCommitments::from_multisigned_digests(candidate, |digest| { + commitments + .remove(&digest) + .map(|c| c.commitment) + .unwrap_or_else(|| { + unreachable!("Guarantied by `Sequencer` implementation to be in the map"); + }) + }); + + Some(multisigned) + } + async fn submit_codes_commitment( &self, - signed_commitments: MultisignedCommitments, + multisigned: MultisignedCommitments, ) -> Result<()> { - log::debug!("Code commitment to submit: {signed_commitments:?}"); + let (codes, signatures) = multisigned.into_parts(); + let (origins, signatures): (Vec<_>, _) = signatures.into_iter().unzip(); - let codes = signed_commitments - .commitments - .into_iter() - .map(Into::into) - .collect::>(); - let signatures = signed_commitments.signatures; + log::debug!("Code commitments to submit: {codes:?}, signed by: {origins:?}",); let router = self.ethereum.router(); if let Err(e) = router.commit_codes(codes, signatures).await { @@ -190,19 +373,15 @@ impl Sequencer { async fn submit_block_commitments( &self, - signed_commitments: MultisignedCommitments, + multisigned: MultisignedCommitments, ) -> Result<()> { - log::debug!("Transition commitment to submit: {signed_commitments:?}"); + let (blocks, signatures) = multisigned.into_parts(); + let (origins, signatures): (Vec<_>, _) = signatures.into_iter().unzip(); - let block_commitments = signed_commitments - .commitments - .into_iter() - .map(Into::into) - .collect::>(); - let signatures = signed_commitments.signatures; + log::debug!("Block commitments to submit: {blocks:?}, signed by: {origins:?}",); let router = self.ethereum.router(); - match router.commit_blocks(block_commitments, signatures).await { + match router.commit_blocks(blocks, signatures).await { Err(e) => { // TODO: return error? log::error!("Failed to commit transitions: {e}"); @@ -217,35 +396,54 @@ impl Sequencer { Ok(()) } - pub fn receive_codes_commitment( - &mut self, - origin: Address, - commitments: AggregatedCommitments, + fn receive_commitments( + aggregated: AggregatedCommitments, + validators: &HashSet
, + router_address: Address, + commitments_storage: &mut CommitmentsMap, ) -> Result<()> { - self.update_status(|status| { - status.aggregated_commitments += 1; - }); - log::debug!("Received codes commitment from {}", origin); - self.codes_aggregation.push(origin, commitments); - Ok(()) - } + let origin = aggregated.recover(router_address)?; + + if validators.contains(&origin).not() { + return Err(anyhow!("Unknown validator {origin} or invalid signature")); + } + + for commitment in aggregated.commitments { + commitments_storage + .entry(commitment.to_digest()) + .or_insert_with(|| CommitmentAndOrigins { + commitment, + origins: Default::default(), + }) + .origins + .insert(origin); + } - pub fn receive_block_commitment( - &mut self, - origin: Address, - commitments: AggregatedCommitments, - ) -> Result<()> { - log::debug!("Received transition commitment from {}", origin); - self.blocks_aggregation.push(origin, commitments); Ok(()) } - pub fn address(&self) -> Address { - self.key.to_address() - } + fn receive_signature( + commitments_digest: Digest, + signature: Signature, + validators: &HashSet
, + router_address: Address, + candidate: Option<&mut MultisignedCommitmentDigests>, + ) -> Result<()> { + let Some(candidate) = candidate else { + return Err(anyhow!("No candidate found")); + }; - pub fn get_status_receiver(&self) -> watch::Receiver { - self.status_sender.subscribe() + candidate.append_signature_with_check( + commitments_digest, + signature, + router_address, + |origin| { + validators + .contains(&origin) + .then_some(()) + .ok_or_else(|| anyhow!("Unknown validator {origin} or invalid signature")) + }, + ) } fn update_status(&mut self, update_fn: F) @@ -257,3 +455,479 @@ impl Sequencer { let _ = self.status_sender.send_replace(status); } } + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::Ok; + use ethexe_signer::{sha3, PrivateKey}; + + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + struct TestComm([u8; 2]); + + impl ToDigest for TestComm { + fn update_hasher(&self, hasher: &mut sha3::Keccak256) { + sha3::Digest::update(hasher, self.0); + } + } + + #[test] + fn test_receive_signature() { + let signer = Signer::tmp(); + + let router_address = Address([1; 20]); + + let validators_private_keys = [PrivateKey([1; 32]), PrivateKey([2; 32])]; + let validators: HashSet<_> = validators_private_keys + .iter() + .cloned() + .map(|key| signer.add_key(key).unwrap().to_address()) + .collect(); + + let validator1_private_key = validators_private_keys[0]; + let validator1_pub_key = PublicKey::from(validator1_private_key); + let validator1 = validator1_pub_key.to_address(); + + let commitments = [TestComm([0, 1]), TestComm([2, 3])]; + let commitments_digest = commitments.to_digest(); + let signature = agro::sign_commitments_digest( + commitments_digest, + &signer, + validator1_pub_key, + router_address, + ) + .unwrap(); + + Sequencer::receive_signature( + commitments_digest, + signature, + &validators, + router_address, + None, + ) + .expect_err("No candidate is provided"); + + let mut signatures: BTreeMap<_, _> = Default::default(); + let digests: IndexSet<_> = commitments.iter().map(ToDigest::to_digest).collect(); + let mut candidate = MultisignedCommitmentDigests::new(digests.clone()).unwrap(); + + Sequencer::receive_signature( + Digest::from([1; 32]), + signature, + &validators, + router_address, + Some(&mut candidate), + ) + .expect_err("Incorrect digest has been provided"); + + Sequencer::receive_signature( + commitments_digest, + Signature::create_for_digest(validator1_private_key, Digest::from([1; 32])).unwrap(), + &validators, + router_address, + Some(&mut candidate), + ) + .expect_err("Signature verification must fail"); + + Sequencer::receive_signature( + commitments_digest, + signature, + &validators, + router_address, + Some(&mut candidate), + ) + .unwrap(); + + signatures.insert(validator1, signature); + assert_eq!(candidate.digests(), &digests); + assert_eq!(candidate.signatures(), &signatures); + + let validator2_private_key = validators_private_keys[1]; + let validator2_pub_key = PublicKey::from(validator2_private_key); + let validator2 = validator2_pub_key.to_address(); + + let signature = agro::sign_commitments_digest( + commitments_digest, + &signer, + validator2_pub_key, + router_address, + ) + .unwrap(); + + Sequencer::receive_signature( + commitments_digest, + signature, + &validators, + router_address, + Some(&mut candidate), + ) + .unwrap(); + + signatures.insert(validator2, signature); + assert_eq!(candidate.digests(), &digests); + assert_eq!(candidate.signatures(), &signatures); + } + + #[test] + fn test_receive_commitments() { + let signer = Signer::tmp(); + + let router_address = Address([1; 20]); + + let validators_private_keys = [PrivateKey([1; 32]), PrivateKey([2; 32])]; + let validators: HashSet<_> = validators_private_keys + .iter() + .cloned() + .map(|key| signer.add_key(key).unwrap().to_address()) + .collect(); + + let validator1_private_key = validators_private_keys[0]; + let validator1_pub_key = PublicKey::from(validator1_private_key); + let validator1 = validator1_pub_key.to_address(); + + let commitments = [TestComm([0, 1]), TestComm([2, 3])]; + let aggregated = AggregatedCommitments::aggregate_commitments( + commitments.to_vec(), + &signer, + validator1_pub_key, + router_address, + ) + .unwrap(); + + let mut expected_commitments_storage = CommitmentsMap::new(); + let mut commitments_storage = CommitmentsMap::new(); + + let private_key = PrivateKey([3; 32]); + let pub_key = signer.add_key(private_key).unwrap(); + let incorrect_aggregated = AggregatedCommitments::aggregate_commitments( + commitments.to_vec(), + &signer, + pub_key, + router_address, + ) + .unwrap(); + Sequencer::receive_commitments( + incorrect_aggregated, + &validators, + router_address, + &mut commitments_storage, + ) + .expect_err("Signature verification must fail"); + + Sequencer::receive_commitments( + aggregated.clone(), + &validators, + router_address, + &mut commitments_storage, + ) + .unwrap(); + + expected_commitments_storage.insert( + commitments[0].to_digest(), + CommitmentAndOrigins { + commitment: commitments[0], + origins: [validator1].iter().cloned().collect(), + }, + ); + expected_commitments_storage.insert( + commitments[1].to_digest(), + CommitmentAndOrigins { + commitment: commitments[1], + origins: [validator1].iter().cloned().collect(), + }, + ); + assert_eq!(expected_commitments_storage, commitments_storage); + + let validator2_private_key = validators_private_keys[1]; + let validator2_pub_key = PublicKey::from(validator2_private_key); + let validator2 = validator2_pub_key.to_address(); + + let aggregated = AggregatedCommitments::aggregate_commitments( + commitments.to_vec(), + &signer, + validator2_pub_key, + router_address, + ) + .unwrap(); + + Sequencer::receive_commitments( + aggregated, + &validators, + router_address, + &mut commitments_storage, + ) + .unwrap(); + + expected_commitments_storage + .get_mut(&commitments[0].to_digest()) + .unwrap() + .origins + .insert(validator2); + expected_commitments_storage + .get_mut(&commitments[1].to_digest()) + .unwrap() + .origins + .insert(validator2); + assert_eq!(expected_commitments_storage, commitments_storage); + } + + #[test] + fn test_block_commitments_candidate() { + let threshold = 2; + + let mut commitments = BTreeMap::new(); + + let commitment1 = BlockCommitment { + block_hash: H256::random(), + prev_commitment_hash: H256::random(), + pred_block_hash: H256::random(), + transitions: Default::default(), + }; + let commitment2 = BlockCommitment { + block_hash: H256::random(), + prev_commitment_hash: commitment1.block_hash, + pred_block_hash: H256::random(), + transitions: Default::default(), + }; + let commitment3 = BlockCommitment { + block_hash: H256::random(), + prev_commitment_hash: commitment1.block_hash, + pred_block_hash: H256::random(), + transitions: Default::default(), + }; + + let mut expected_digests = IndexSet::new(); + + let candidate = + Sequencer::block_commitments_candidate(&commitments, commitment1.block_hash, threshold); + assert!(candidate.is_none()); + + commitments.insert( + commitment1.to_digest(), + CommitmentAndOrigins { + commitment: commitment1.clone(), + origins: Default::default(), + }, + ); + let candidate = + Sequencer::block_commitments_candidate(&commitments, H256::random(), threshold); + assert!(candidate.is_none()); + + let candidate = + Sequencer::block_commitments_candidate(&commitments, commitment1.block_hash, 0) + .expect("Must have candidate"); + expected_digests.insert(commitment1.to_digest()); + assert_eq!(candidate.digests(), &expected_digests); + + commitments + .get_mut(&commitment1.to_digest()) + .unwrap() + .origins + .extend([Address([1; 20]), Address([2; 20])]); + commitments.insert( + commitment2.to_digest(), + CommitmentAndOrigins { + commitment: commitment2.clone(), + origins: [[1; 20], [2; 20]].map(Address).iter().cloned().collect(), + }, + ); + commitments.insert( + commitment3.to_digest(), + CommitmentAndOrigins { + commitment: commitment3.clone(), + origins: [[1; 20], [2; 20]].map(Address).iter().cloned().collect(), + }, + ); + + let candidate = + Sequencer::block_commitments_candidate(&commitments, commitment1.block_hash, threshold) + .expect("Must have candidate"); + assert_eq!(candidate.digests(), &expected_digests); + + let candidate = + Sequencer::block_commitments_candidate(&commitments, commitment2.block_hash, threshold) + .expect("Must have candidate"); + expected_digests.insert(commitment2.to_digest()); + assert_eq!(candidate.digests(), &expected_digests); + + let candidate = + Sequencer::block_commitments_candidate(&commitments, commitment3.block_hash, threshold) + .expect("Must have candidate"); + expected_digests.pop(); + expected_digests.insert(commitment3.to_digest()); + assert_eq!(candidate.digests(), &expected_digests); + } + + #[test] + fn test_code_commitments_candidate() { + let threshold = 2; + + let mut commitments = BTreeMap::new(); + + let commitment1 = CodeCommitment { + id: H256::random().0.into(), + valid: true, + }; + let commitment2 = CodeCommitment { + id: H256::random().0.into(), + valid: true, + }; + let commitment3 = CodeCommitment { + id: H256::random().0.into(), + valid: false, + }; + + let candidate = Sequencer::code_commitments_candidate(&commitments, threshold); + assert!(candidate.is_none()); + + commitments.insert( + commitment1.to_digest(), + CommitmentAndOrigins { + commitment: commitment1.clone(), + origins: Default::default(), + }, + ); + let candidate = Sequencer::code_commitments_candidate(&commitments, threshold); + assert!(candidate.is_none()); + + commitments + .get_mut(&commitment1.to_digest()) + .unwrap() + .origins + .insert(Address([1; 20])); + let candidate = Sequencer::code_commitments_candidate(&commitments, threshold); + assert!(candidate.is_none()); + + commitments + .get_mut(&commitment1.to_digest()) + .unwrap() + .origins + .insert(Address([2; 20])); + let candidate = Sequencer::code_commitments_candidate(&commitments, threshold) + .expect("Must have candidate"); + let expected_digests: IndexSet<_> = [commitment1.to_digest()].into_iter().collect(); + assert_eq!(candidate.digests(), &expected_digests); + assert!(candidate.signatures().is_empty()); + + commitments.insert( + commitment2.to_digest(), + CommitmentAndOrigins { + commitment: commitment2.clone(), + origins: [Address([3; 20]), Address([4; 20])] + .iter() + .cloned() + .collect(), + }, + ); + let candidate = Sequencer::code_commitments_candidate(&commitments, threshold) + .expect("Must have candidate"); + let mut expected_digests: IndexSet<_> = [commitment1.to_digest(), commitment2.to_digest()] + .into_iter() + .collect(); + expected_digests.sort(); + assert_eq!(candidate.digests(), &expected_digests); + assert!(candidate.signatures().is_empty()); + + commitments.insert( + commitment3.to_digest(), + CommitmentAndOrigins { + commitment: commitment3, + origins: [Address([5; 20])].iter().cloned().collect(), + }, + ); + let candidate = Sequencer::code_commitments_candidate(&commitments, threshold) + .expect("Must have candidate"); + assert_eq!(candidate.digests(), &expected_digests); + assert!(candidate.signatures().is_empty()); + } + + #[test] + #[should_panic(expected = "Guarantied by `Sequencer` implementation to be in the map")] + fn test_process_multisigned_candidate_empty_map() { + let candidate = + MultisignedCommitmentDigests::new([[2; 32]].map(Into::into).into_iter().collect()) + .unwrap(); + Sequencer::process_multisigned_candidate::( + &mut Some(candidate), + &mut Default::default(), + 0, + ); + } + + #[test] + fn test_process_multisigned_candidate() { + let signer = Signer::tmp(); + + // Test candidate is None + assert!(Sequencer::process_multisigned_candidate::( + &mut None, + &mut Default::default(), + 0 + ) + .is_none()); + + // Test not enough signatures + let mut candidate = Some( + MultisignedCommitmentDigests::new([b"gear".to_digest()].into_iter().collect()).unwrap(), + ); + assert!(Sequencer::process_multisigned_candidate( + &mut candidate, + &mut CommitmentsMap::::new(), + 2 + ) + .is_none()); + + let validators_private_keys = [PrivateKey([1; 32]), PrivateKey([2; 32])]; + let validators_pub_keys = validators_private_keys.map(|key| signer.add_key(key).unwrap()); + let origins: BTreeSet<_> = validators_pub_keys + .map(|k| k.to_address()) + .into_iter() + .collect(); + + let commitments = [TestComm([0, 1]), TestComm([2, 3]), TestComm([4, 5])]; + let mut commitments_map = commitments + .iter() + .map(|commitment| { + ( + commitment.to_digest(), + CommitmentAndOrigins { + commitment: *commitment, + origins: origins.clone(), + }, + ) + }) + .collect(); + + let mut candidate = + MultisignedCommitmentDigests::new(commitments.iter().map(|c| c.to_digest()).collect()) + .unwrap(); + + let router_address = Address([1; 20]); + validators_pub_keys.iter().for_each(|pub_key| { + let commitments_digest = commitments.to_digest(); + candidate + .append_signature_with_check( + commitments_digest, + agro::sign_commitments_digest( + commitments_digest, + &signer, + *pub_key, + router_address, + ) + .unwrap(), + router_address, + |_| Ok(()), + ) + .unwrap(); + }); + + let mut candidate = Some(candidate); + + assert!( + Sequencer::process_multisigned_candidate(&mut candidate, &mut commitments_map, 2) + .is_some(), + ); + assert!(commitments_map.is_empty()); + assert!(candidate.is_none()); + } +} diff --git a/ethexe/signer/Cargo.toml b/ethexe/signer/Cargo.toml index 5de814f625a..c2833494d12 100644 --- a/ethexe/signer/Cargo.toml +++ b/ethexe/signer/Cargo.toml @@ -17,6 +17,7 @@ anyhow.workspace = true hex = { workspace = true, features = ["alloc"] } parity-scale-codec = { workspace = true, features = ["std", "derive"] } derive_more.workspace = true +tempfile.workspace = true secp256k1 = { version = "0.29", features = ["rand", "global-context", "hashes-std", "rand-std", "recovery"] } sha3 = { version = "0.10.0", default-features = false } diff --git a/ethexe/signer/src/digest.rs b/ethexe/signer/src/digest.rs new file mode 100644 index 00000000000..1f34f8ab7cd --- /dev/null +++ b/ethexe/signer/src/digest.rs @@ -0,0 +1,197 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Keccak256 digest type. Implements AsDigest hashing for ethexe common types. + +use core::fmt; +use ethexe_common::router::{BlockCommitment, CodeCommitment, OutgoingMessage, StateTransition}; +use parity_scale_codec::{Decode, Encode}; +use sha3::Digest as _; + +#[derive( + Clone, + Copy, + PartialOrd, + Ord, + PartialEq, + Eq, + Hash, + Encode, + Decode, + derive_more::From, + derive_more::Into, + derive_more::AsRef, +)] +pub struct Digest([u8; 32]); + +impl fmt::Debug for Digest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "0x{}", hex::encode(self.0)) + } +} + +impl fmt::Display for Digest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "0x{}", hex::encode(self.0)) + } +} + +impl<'a> FromIterator<&'a Digest> for Digest { + fn from_iter>(iter: I) -> Self { + let mut hasher = sha3::Keccak256::new(); + for digest in iter { + hasher.update(digest.as_ref()); + } + Digest(hasher.finalize().into()) + } +} + +impl FromIterator for Digest { + fn from_iter>(iter: I) -> Self { + let mut hasher = sha3::Keccak256::new(); + for digest in iter { + hasher.update(digest.as_ref()); + } + Digest(hasher.finalize().into()) + } +} + +/// Trait for hashing types into a Digest using Keccak256. +pub trait ToDigest { + fn to_digest(&self) -> Digest { + let mut hasher = sha3::Keccak256::new(); + self.update_hasher(&mut hasher); + Digest(hasher.finalize().into()) + } + + fn update_hasher(&self, hasher: &mut sha3::Keccak256); +} + +impl<'a, T: ToDigest> FromIterator<&'a T> for Digest { + fn from_iter>(iter: I) -> Self { + iter.into_iter().map(|item| item.to_digest()).collect() + } +} + +impl ToDigest for [T] { + fn update_hasher(&self, hasher: &mut sha3::Keccak256) { + for item in self { + hasher.update(item.to_digest().as_ref()); + } + } +} + +impl ToDigest for Vec { + fn update_hasher(&self, hasher: &mut sha3::Keccak256) { + self.as_slice().update_hasher(hasher); + } +} + +impl ToDigest for [u8] { + fn update_hasher(&self, hasher: &mut sha3::Keccak256) { + hasher.update(self); + } +} + +impl ToDigest for CodeCommitment { + fn update_hasher(&self, hasher: &mut sha3::Keccak256) { + hasher.update(self.encode().as_slice()); + } +} + +impl ToDigest for StateTransition { + fn update_hasher(&self, hasher: &mut sha3::Keccak256) { + hasher.update(self.actor_id.to_address_lossy().as_bytes()); + hasher.update(self.new_state_hash.as_bytes()); + hasher.update(self.value_to_receive.to_be_bytes().as_slice()); + + let mut value_hasher = sha3::Keccak256::new(); + for value_claim in &self.value_claims { + value_hasher.update(value_claim.message_id.as_ref()); + value_hasher.update(value_claim.destination.to_address_lossy().as_bytes()); + value_hasher.update(value_claim.value.to_be_bytes().as_slice()); + } + hasher.update(value_hasher.finalize().as_slice()); + + hasher.update(self.messages.to_digest().as_ref()); + } +} + +impl ToDigest for OutgoingMessage { + fn update_hasher(&self, hasher: &mut sha3::Keccak256) { + let (reply_details_to, reply_details_code) = + self.reply_details.unwrap_or_default().into_parts(); + + hasher.update(self.id.as_ref()); + hasher.update(self.destination.to_address_lossy().as_bytes()); + hasher.update(self.payload.as_slice()); + hasher.update(self.value.to_be_bytes().as_slice()); + hasher.update(reply_details_to.as_ref()); + hasher.update(reply_details_code.to_bytes().as_slice()); + } +} + +impl ToDigest for BlockCommitment { + fn update_hasher(&self, hasher: &mut sha3::Keccak256) { + hasher.update(self.block_hash.as_bytes()); + hasher.update(self.prev_commitment_hash.as_bytes()); + hasher.update(self.pred_block_hash.as_bytes()); + hasher.update(self.transitions.to_digest().as_ref()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use gprimitives::{ActorId, CodeId, MessageId, H256}; + use std::vec; + + #[test] + fn as_digest() { + let _digest = CodeCommitment { + id: CodeId::from(0), + valid: true, + } + .to_digest(); + + let state_transition = StateTransition { + actor_id: ActorId::from(0), + new_state_hash: H256::from([1; 32]), + value_to_receive: 0, + value_claims: vec![], + messages: vec![OutgoingMessage { + id: MessageId::from(0), + destination: ActorId::from(0), + payload: b"Hello, World!".to_vec(), + value: 0, + reply_details: None, + }], + }; + let _digest = state_transition.to_digest(); + + let transitions = vec![state_transition.clone(), state_transition]; + + let block_commitment = BlockCommitment { + block_hash: H256::from([0; 32]), + pred_block_hash: H256::from([1; 32]), + prev_commitment_hash: H256::from([2; 32]), + transitions: transitions.clone(), + }; + let _digest = block_commitment.to_digest(); + } +} diff --git a/ethexe/signer/src/lib.rs b/ethexe/signer/src/lib.rs index 7dd0505cf32..c7ab71c26e8 100644 --- a/ethexe/signer/src/lib.rs +++ b/ethexe/signer/src/lib.rs @@ -18,11 +18,18 @@ //! Signer library for ethexe. -use anyhow::{anyhow, Context as _, Result}; +mod digest; +mod signature; + +pub use digest::{Digest, ToDigest}; +pub use sha3; +pub use signature::Signature; + +use anyhow::{anyhow, Result}; use gprimitives::ActorId; use parity_scale_codec::{Decode, Encode}; -use secp256k1::Message; use sha3::Digest as _; +use signature::RawSignature; use std::{fmt, fs, path::PathBuf, str::FromStr}; #[derive(Debug, Clone, Copy, Eq, PartialEq)] @@ -41,7 +48,7 @@ impl From for PublicKey { } } -#[derive(Encode, Decode, Default, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Encode, Decode, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Address(pub [u8; 20]); impl TryFrom for Address { @@ -53,27 +60,10 @@ impl TryFrom for Address { .take(12) .all(|&byte| byte == 0) .then_some(Address(id.to_address_lossy().0)) - .ok_or(anyhow!( - "First 12 bytes are not 0, it is not ethereum address" - )) - } -} - -#[derive(Clone, Encode, Decode, PartialEq, Eq, Hash)] -pub struct Signature(pub [u8; 65]); - -pub struct Hash([u8; 32]); - -impl From for gprimitives::H256 { - fn from(source: Hash) -> gprimitives::H256 { - gprimitives::H256::from_slice(&source.0) + .ok_or_else(|| anyhow!("First 12 bytes are not 0, it is not ethereum address")) } } -pub fn hash(data: &[u8]) -> gprimitives::H256 { - Hash(<[u8; 32]>::from(sha3::Keccak256::digest(data))).into() -} - fn strip_prefix(s: &str) -> &str { if let Some(s) = s.strip_prefix("0x") { s @@ -147,24 +137,6 @@ impl fmt::Debug for Address { } } -impl fmt::Debug for Signature { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "0x{}", self.to_hex()) - } -} - -impl Signature { - pub fn to_hex(&self) -> String { - hex::encode(self.0) - } -} - -impl From<[u8; 65]> for Signature { - fn from(bytes: [u8; 65]) -> Self { - Self(bytes) - } -} - impl fmt::Display for PublicKey { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.to_hex()) @@ -177,18 +149,6 @@ impl fmt::Display for Address { } } -impl fmt::Display for Signature { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.to_hex()) - } -} - -impl Default for Signature { - fn default() -> Self { - Signature([0u8; 65]) - } -} - #[derive(Debug, Clone)] pub struct Signer { key_store: PathBuf, @@ -201,34 +161,27 @@ impl Signer { Ok(Self { key_store }) } - pub fn raw_sign_digest(&self, public_key: PublicKey, digest: [u8; 32]) -> Result { - let secret_key = self.get_private_key(public_key)?; - - let secp_secret_key = secp256k1::SecretKey::from_slice(&secret_key.0) - .with_context(|| "Invalid secret key format for {:?}")?; - - let message = Message::from_digest(digest); - - let recsig = - secp256k1::global::SECP256K1.sign_ecdsa_recoverable(&message, &secp_secret_key); + pub fn tmp() -> Self { + let temp_dir = tempfile::tempdir().expect("Cannot create temp dir for keys"); + Self { + key_store: temp_dir.into_path(), + } + } - let mut r = Signature::default(); - let (recid, sig) = recsig.serialize_compact(); - r.0[..64].copy_from_slice(&sig); - r.0[64] = recid.to_i32() as u8; + pub fn raw_sign_digest(&self, public_key: PublicKey, digest: Digest) -> Result { + let private_key = self.get_private_key(public_key)?; - Ok(r) + RawSignature::create_for_digest(private_key, digest) } - pub fn sign_digest(&self, public_key: PublicKey, digest: [u8; 32]) -> Result { - let mut r = self.raw_sign_digest(public_key, digest)?; - r.0[64] += 27; + pub fn sign_digest(&self, public_key: PublicKey, digest: Digest) -> Result { + let private_key = self.get_private_key(public_key)?; - Ok(r) + Signature::create_for_digest(private_key, digest) } pub fn sign(&self, public_key: PublicKey, data: &[u8]) -> Result { - self.sign_digest(public_key, sha3::Keccak256::digest(data).into()) + self.sign_digest(public_key, data.to_digest()) } pub fn sign_with_addr(&self, address: Address, data: &[u8]) -> Result { @@ -325,6 +278,8 @@ impl Signer { #[cfg(test)] mod tests { + use std::env::temp_dir; + use super::*; use ethers::utils::keccak256; @@ -356,7 +311,7 @@ mod tests { let hash = keccak256(message); // Recover the address using the signature - let ethers_sig = ethers::core::types::Signature::try_from(&signature.0[..]) + let ethers_sig = ethers::core::types::Signature::try_from(signature.as_ref()) .expect("failed to parse sig"); let recovered_address = ethers_sig.recover(hash).expect("Failed to recover address"); @@ -393,7 +348,8 @@ mod tests { let hash = keccak256(message); // Recover the address using the signature - let ethers_sig = ethers::core::types::Signature::try_from(&signature.0[..]) + // TODO: remove the deprecated ethers crate in favor of alloy #4197 + let ethers_sig = ethers::core::types::Signature::try_from(signature.as_ref()) .expect("failed to parse sig"); let recovered_address = ethers_sig.recover(hash).expect("Failed to recover address"); @@ -420,4 +376,28 @@ mod tests { .unwrap(); Address::try_from(id).expect_err("Must be incorrect ethereum address"); } + + #[test] + fn recover_digest() { + let private_key_hex = "4c0883a69102937d6231471b5dbb6204fe51296170827936ea5cce4b76994b0f"; + let message = b"hello world"; + + let key_store = temp_dir().join("signer-tests"); + let signer = Signer::new(key_store).expect("Failed to create signer"); + + let private_key = PrivateKey::from_str(private_key_hex).expect("Invalid private key hex"); + let public_key = signer.add_key(private_key).expect("Failed to add key"); + + let signature = signer + .sign(public_key, message) + .expect("Failed to sign message"); + + let hash = keccak256(message); + + let recovered_public_key = signature + .recover_from_digest(hash.into()) + .expect("Failed to recover public key"); + + assert_eq!(recovered_public_key, public_key); + } } diff --git a/ethexe/signer/src/signature.rs b/ethexe/signer/src/signature.rs new file mode 100644 index 00000000000..1b7c53867cf --- /dev/null +++ b/ethexe/signer/src/signature.rs @@ -0,0 +1,132 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Secp256k1 signature types and utilities. + +use crate::{Digest, PrivateKey, PublicKey}; +use anyhow::{Context, Result}; +use parity_scale_codec::{Decode, Encode}; +use secp256k1::{ + ecdsa::{RecoverableSignature, RecoveryId}, + Message, +}; +use std::fmt; + +#[derive(Clone, Copy, PartialEq, Eq)] +pub struct RawSignature([u8; 65]); + +impl RawSignature { + pub fn create_for_digest(private_key: PrivateKey, digest: Digest) -> Result { + let secp_secret_key = secp256k1::SecretKey::from_slice(&private_key.0) + .with_context(|| "Invalid secret key format for {:?}")?; + + let message = Message::from_digest(digest.into()); + + let recoverable = + secp256k1::global::SECP256K1.sign_ecdsa_recoverable(&message, &secp_secret_key); + + let (id, signature) = recoverable.serialize_compact(); + let mut bytes = [0u8; 65]; + bytes[..64].copy_from_slice(signature.as_ref()); + bytes[64] = id.to_i32() as u8; + Ok(RawSignature(bytes)) + } +} + +impl From for [u8; 65] { + fn from(sig: RawSignature) -> [u8; 65] { + sig.0 + } +} + +impl AsRef<[u8]> for RawSignature { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl From for RawSignature { + fn from(mut sig: Signature) -> RawSignature { + sig.0[64] -= 27; + RawSignature(sig.0) + } +} + +#[derive(Clone, Copy, Encode, Decode, PartialEq, Eq)] +pub struct Signature([u8; 65]); + +impl Signature { + pub fn to_hex(&self) -> String { + hex::encode(self.0) + } + + pub fn recover_from_digest(&self, digest: Digest) -> Result { + let sig = (*self).try_into()?; + let public_key = secp256k1::global::SECP256K1 + .recover_ecdsa(&Message::from_digest(digest.into()), &sig)?; + Ok(PublicKey::from_bytes(public_key.serialize())) + } + + pub fn create_for_digest(private_key: PrivateKey, digest: Digest) -> Result { + let raw_signature = RawSignature::create_for_digest(private_key, digest)?; + Ok(raw_signature.into()) + } +} + +impl From for Signature { + fn from(mut sig: RawSignature) -> Self { + sig.0[64] += 27; + Signature(sig.0) + } +} + +impl From for [u8; 65] { + fn from(sig: Signature) -> [u8; 65] { + sig.0 + } +} + +impl AsRef<[u8]> for Signature { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl fmt::Debug for Signature { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "0x{}", self.to_hex()) + } +} + +impl fmt::Display for Signature { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.to_hex()) + } +} + +impl TryFrom for RecoverableSignature { + type Error = anyhow::Error; + + fn try_from(sig: Signature) -> Result { + RecoverableSignature::from_compact( + sig.0[..64].as_ref(), + RecoveryId::from_i32((sig.0[64] - 27) as i32)?, + ) + .map_err(Into::into) + } +} diff --git a/ethexe/validator/Cargo.toml b/ethexe/validator/Cargo.toml index d541bc536de..d6b62de06dc 100644 --- a/ethexe/validator/Cargo.toml +++ b/ethexe/validator/Cargo.toml @@ -10,7 +10,6 @@ repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -ethexe-network.workspace = true ethexe-signer.workspace = true ethexe-sequencer.workspace = true ethexe-common.workspace = true @@ -20,6 +19,8 @@ log.workspace = true anyhow.workspace = true futures.workspace = true parity-scale-codec = { workspace = true, features = ["std", "derive"] } -tempfile.workspace = true static_init = "1.0.3" + +[dev-dependencies] +ethexe-db.workspace = true diff --git a/ethexe/validator/src/lib.rs b/ethexe/validator/src/lib.rs index b937e776314..8c033ff712f 100644 --- a/ethexe/validator/src/lib.rs +++ b/ethexe/validator/src/lib.rs @@ -16,16 +16,20 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use anyhow::Result; -use ethexe_common::router::{BlockCommitment, CodeCommitment}; -use ethexe_network::NetworkSender; -use ethexe_sequencer::AggregatedCommitments; -use ethexe_signer::{Address, PublicKey, Signer}; -use parity_scale_codec::Encode; - -pub enum Commitment { - Code(CodeCommitment), - Block(BlockCommitment), +use anyhow::{anyhow, Result}; +use ethexe_common::{ + db::{BlockMetaStorage, CodesStorage}, + router::{BlockCommitment, CodeCommitment}, +}; +use ethexe_sequencer::agro::{self, AggregatedCommitments}; +use ethexe_signer::{sha3, Address, Digest, PublicKey, Signature, Signer, ToDigest}; +use gprimitives::H256; +use parity_scale_codec::{Decode, Encode}; + +pub struct Validator { + pub_key: PublicKey, + signer: Signer, + router_address: Address, } pub struct Config { @@ -33,12 +37,32 @@ pub struct Config { pub router_address: Address, } -pub struct Validator { - pub_key: PublicKey, - signer: Signer, - current_codes: Vec, - current_blocks: Vec, - router_address: Address, +#[derive(Debug, Clone, Encode, Decode)] +pub struct BlockCommitmentValidationRequest { + pub block_hash: H256, + pub prev_commitment_hash: H256, + pub pred_block_hash: H256, + pub transitions_digest: Digest, +} + +impl From<&BlockCommitment> for BlockCommitmentValidationRequest { + fn from(commitment: &BlockCommitment) -> Self { + Self { + block_hash: commitment.block_hash, + prev_commitment_hash: commitment.prev_commitment_hash, + pred_block_hash: commitment.pred_block_hash, + transitions_digest: commitment.transitions.to_digest(), + } + } +} + +impl ToDigest for BlockCommitmentValidationRequest { + fn update_hasher(&self, hasher: &mut sha3::Keccak256) { + sha3::Digest::update(hasher, self.block_hash.as_bytes()); + sha3::Digest::update(hasher, self.prev_commitment_hash.as_bytes()); + sha3::Digest::update(hasher, self.pred_block_hash.as_bytes()); + sha3::Digest::update(hasher, self.transitions_digest.as_ref()); + } } impl Validator { @@ -46,69 +70,370 @@ impl Validator { Self { signer, pub_key: config.pub_key, - current_codes: vec![], - current_blocks: vec![], router_address: config.router_address, } } - pub fn has_codes_commit(&self) -> bool { - !self.current_codes.is_empty() - } - - pub fn has_transitions_commit(&self) -> bool { - !self.current_blocks.is_empty() - } - pub fn pub_key(&self) -> PublicKey { self.pub_key } - pub fn codes_aggregation(&mut self) -> Result> { + pub fn address(&self) -> Address { + self.pub_key.to_address() + } + + pub fn aggregate(&self, commitments: Vec) -> Result> { AggregatedCommitments::aggregate_commitments( - self.current_codes.clone(), + commitments, &self.signer, self.pub_key, self.router_address, ) } - pub fn blocks_aggregation(&mut self) -> Result> { - AggregatedCommitments::aggregate_commitments( - self.current_blocks.clone(), + pub fn validate_code_commitments( + &mut self, + db: &impl CodesStorage, + requests: impl IntoIterator, + ) -> Result<(Digest, Signature)> { + let mut commitment_digests = Vec::new(); + for request in requests { + commitment_digests.push(request.to_digest()); + Self::validate_code_commitment(db, request)?; + } + + let commitments_digest = commitment_digests.iter().collect(); + agro::sign_commitments_digest( + commitments_digest, &self.signer, self.pub_key, self.router_address, ) + .map(|signature| (commitments_digest, signature)) } - pub fn push_commitments(&mut self, commitments: Vec) -> Result<()> { - for commitment in commitments { - match commitment { - Commitment::Code(code_commitment) => self.current_codes.push(code_commitment), - Commitment::Block(block_commitment) => self.current_blocks.push(block_commitment), - } + pub fn validate_block_commitments( + &mut self, + db: &impl BlockMetaStorage, + requests: impl IntoIterator, + ) -> Result<(Digest, Signature)> { + let mut commitment_digests = Vec::new(); + for request in requests.into_iter() { + commitment_digests.push(request.to_digest()); + Self::validate_block_commitment(db, request)?; } + let commitments_digest = commitment_digests.iter().collect(); + agro::sign_commitments_digest( + commitments_digest, + &self.signer, + self.pub_key, + self.router_address, + ) + .map(|signature| (commitments_digest, signature)) + } + + fn validate_code_commitment(db: &impl CodesStorage, request: CodeCommitment) -> Result<()> { + let CodeCommitment { id: code_id, valid } = request; + if db + .code_valid(code_id) + .ok_or_else(|| anyhow!("Code {code_id} is not validated by this node"))? + .ne(&valid) + { + return Err(anyhow!( + "Requested and local code validation results mismatch" + )); + } Ok(()) } - pub fn publish_commitments(&mut self, sender: &mut NetworkSender) -> Result<()> { - let origin = self.pub_key.to_address(); + fn validate_block_commitment( + db: &impl BlockMetaStorage, + request: BlockCommitmentValidationRequest, + ) -> Result<()> { + let BlockCommitmentValidationRequest { + block_hash, + pred_block_hash: allowed_pred_block_hash, + prev_commitment_hash: allowed_prev_commitment_hash, + transitions_digest, + } = request; + + if !db.block_end_state_is_valid(block_hash).unwrap_or(false) { + return Err(anyhow!( + "Requested block {block_hash} is not processed by this node" + )); + } + + if db + .block_outcome(block_hash) + .ok_or_else(|| anyhow!("Cannot get from db outcome for block {block_hash}"))? + .iter() + .collect::() + != transitions_digest + { + return Err(anyhow!("Requested and local transitions digest mismatch")); + } - // broadcast (aggregated_code_commitments, aggregated_transitions_commitments) to the network peers - let commitments = (self.codes_aggregation()?, self.blocks_aggregation()?); - sender.publish_commitments((origin, commitments).encode()); + if db.block_prev_commitment(block_hash).ok_or_else(|| { + anyhow!("Cannot get from db previous commitment for block {block_hash}") + })? != allowed_prev_commitment_hash + { + return Err(anyhow!( + "Requested and local previous commitment block hash mismatch" + )); + } + + if !Self::verify_is_predecessor(db, allowed_pred_block_hash, block_hash, None)? { + return Err(anyhow!( + "{block_hash} is not a predecessor of {allowed_pred_block_hash}" + )); + } Ok(()) } - pub fn clear(&mut self) { - self.current_codes.clear(); - self.current_blocks.clear(); + /// Verify whether `pred_hash` is a predecessor of `block_hash` in the chain. + fn verify_is_predecessor( + db: &impl BlockMetaStorage, + block_hash: H256, + pred_hash: H256, + max_distance: Option, + ) -> Result { + if block_hash == pred_hash { + return Ok(true); + } + + let block_header = db + .block_header(block_hash) + .ok_or_else(|| anyhow!("header not found for block: {block_hash}"))?; + + if block_header.parent_hash == pred_hash { + return Ok(true); + } + + let pred_height = db + .block_header(pred_hash) + .ok_or_else(|| anyhow!("header not found for pred block: {pred_hash}"))? + .height; + + let distance = block_header.height.saturating_sub(pred_height); + if max_distance.map(|d| d < distance).unwrap_or(false) { + return Err(anyhow!("distance is too large: {distance}")); + } + + let mut block_hash = block_hash; + for _ in 0..=distance { + if block_hash == pred_hash { + return Ok(true); + } + block_hash = db + .block_header(block_hash) + .ok_or_else(|| anyhow!("header not found for block: {block_hash}"))? + .parent_hash; + } + + Ok(false) } +} - pub fn address(&self) -> Address { - self.pub_key.to_address() +#[cfg(test)] +mod tests { + use super::*; + use ethexe_common::router::StateTransition; + use ethexe_db::BlockHeader; + use gprimitives::CodeId; + + #[test] + fn block_validation_request_digest() { + let transition = StateTransition { + actor_id: H256::random().0.into(), + new_state_hash: H256::random(), + value_to_receive: 123, + value_claims: vec![], + messages: vec![], + }; + + let commitment = BlockCommitment { + block_hash: H256::random(), + prev_commitment_hash: H256::random(), + pred_block_hash: H256::random(), + transitions: vec![transition.clone(), transition], + }; + + assert_eq!( + commitment.to_digest(), + BlockCommitmentValidationRequest::from(&commitment).to_digest() + ); + } + + #[test] + fn test_validate_code_commitments() { + let db = ethexe_db::Database::from_one(ðexe_db::MemDb::default(), [0; 20]); + + let code_id = CodeId::from(H256::random()); + + Validator::validate_code_commitment( + &db, + CodeCommitment { + id: code_id, + valid: true, + }, + ) + .expect_err("Code is not in db"); + + db.set_code_valid(code_id, true); + Validator::validate_code_commitment( + &db, + CodeCommitment { + id: code_id, + valid: false, + }, + ) + .expect_err("Code validation result mismatch"); + + Validator::validate_code_commitment( + &db, + CodeCommitment { + id: code_id, + valid: true, + }, + ) + .unwrap(); + } + + #[test] + fn test_validate_block_commitment() { + let db = ethexe_db::Database::from_one(ðexe_db::MemDb::default(), [0; 20]); + + let block_hash = H256::random(); + let pred_block_hash = H256::random(); + let prev_commitment_hash = H256::random(); + let transitions = vec![]; + let transitions_digest = transitions.to_digest(); + + db.set_block_end_state_is_valid(block_hash, true); + db.set_block_outcome(block_hash, transitions); + db.set_block_prev_commitment(block_hash, prev_commitment_hash); + db.set_block_header( + block_hash, + BlockHeader { + height: 100, + timestamp: 100, + parent_hash: pred_block_hash, + }, + ); + + Validator::validate_block_commitment( + &db, + BlockCommitmentValidationRequest { + block_hash, + pred_block_hash: block_hash, + prev_commitment_hash, + transitions_digest, + }, + ) + .unwrap(); + + Validator::validate_block_commitment( + &db, + BlockCommitmentValidationRequest { + block_hash, + pred_block_hash: H256::random(), + prev_commitment_hash, + transitions_digest, + }, + ) + .expect_err("Unknown pred block is provided"); + + Validator::validate_block_commitment( + &db, + BlockCommitmentValidationRequest { + block_hash, + pred_block_hash: block_hash, + prev_commitment_hash: H256::random(), + transitions_digest, + }, + ) + .expect_err("Unknown prev commitment is provided"); + + Validator::validate_block_commitment( + &db, + BlockCommitmentValidationRequest { + block_hash, + pred_block_hash: block_hash, + prev_commitment_hash, + transitions_digest: Digest::from([2; 32]), + }, + ) + .expect_err("Transitions digest mismatch"); + + Validator::validate_block_commitment( + &db, + BlockCommitmentValidationRequest { + block_hash: H256::random(), + pred_block_hash: block_hash, + prev_commitment_hash, + transitions_digest, + }, + ) + .expect_err("Block is not processed by this node"); + } + + #[test] + fn test_verify_is_predecessor() { + let db = ethexe_db::Database::from_one(ðexe_db::MemDb::default(), [0; 20]); + + let blocks = [H256::random(), H256::random(), H256::random()]; + db.set_block_header( + blocks[0], + BlockHeader { + height: 100, + timestamp: 100, + parent_hash: H256::zero(), + }, + ); + db.set_block_header( + blocks[1], + BlockHeader { + height: 101, + timestamp: 101, + parent_hash: blocks[0], + }, + ); + db.set_block_header( + blocks[2], + BlockHeader { + height: 102, + timestamp: 102, + parent_hash: blocks[1], + }, + ); + + Validator::verify_is_predecessor(&db, blocks[1], H256::random(), None) + .expect_err("Unknown pred block is provided"); + + Validator::verify_is_predecessor(&db, H256::random(), blocks[0], None) + .expect_err("Unknown block is provided"); + + Validator::verify_is_predecessor(&db, blocks[2], blocks[0], Some(1)) + .expect_err("Distance is too large"); + + // Another chain block + let block3 = H256::random(); + db.set_block_header( + block3, + BlockHeader { + height: 1, + timestamp: 1, + parent_hash: blocks[0], + }, + ); + Validator::verify_is_predecessor(&db, blocks[2], block3, None) + .expect_err("Block is from other chain with incorrect height"); + + assert!(Validator::verify_is_predecessor(&db, blocks[2], blocks[0], None).unwrap()); + assert!(Validator::verify_is_predecessor(&db, blocks[2], blocks[0], Some(2)).unwrap()); + assert!(!Validator::verify_is_predecessor(&db, blocks[1], blocks[2], Some(1)).unwrap()); + assert!(Validator::verify_is_predecessor(&db, blocks[1], blocks[1], None).unwrap()); } }