diff --git a/zebra-consensus/src/error.rs b/zebra-consensus/src/error.rs index 62bea06ad6e..7ec970c19e6 100644 --- a/zebra-consensus/src/error.rs +++ b/zebra-consensus/src/error.rs @@ -71,13 +71,16 @@ pub enum TransactionError { Groth16, #[error( - "joinSplitSig MUST represent a valid signature under joinSplitPubKey of dataToBeSigned" + "Sprout joinSplitSig MUST represent a valid signature under joinSplitPubKey of dataToBeSigned" )] Ed25519(#[from] zebra_chain::primitives::ed25519::Error), - #[error("bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")] + #[error("Sapling bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")] RedJubjub(zebra_chain::primitives::redjubjub::Error), + #[error("Orchard bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")] + RedPallas(zebra_chain::primitives::redpallas::Error), + // temporary error type until #1186 is fixed #[error("Downcast from BoxError to redjubjub::Error failed")] InternalDowncastError(String), @@ -88,6 +91,7 @@ pub enum TransactionError { impl From for TransactionError { fn from(err: BoxError) -> Self { + // TODO: handle redpallas Error? match err.downcast::() { Ok(e) => TransactionError::RedJubjub(*e), Err(e) => TransactionError::InternalDowncastError(format!( diff --git a/zebra-consensus/src/primitives.rs b/zebra-consensus/src/primitives.rs index 700b99e36c6..dd3fbab21db 100644 --- a/zebra-consensus/src/primitives.rs +++ b/zebra-consensus/src/primitives.rs @@ -3,6 +3,7 @@ pub mod ed25519; pub mod groth16; pub mod redjubjub; +pub mod redpallas; /// The maximum batch size for any of the batch verifiers. const MAX_BATCH_SIZE: usize = 64; diff --git a/zebra-consensus/src/primitives/redpallas.rs b/zebra-consensus/src/primitives/redpallas.rs new file mode 100644 index 00000000000..5ee17cf4241 --- /dev/null +++ b/zebra-consensus/src/primitives/redpallas.rs @@ -0,0 +1,129 @@ +//! Async RedPallas batch verifier service + +#[cfg(test)] +mod tests; + +use std::{ + future::Future, + mem, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::future::{ready, Ready}; +use once_cell::sync::Lazy; +use rand::thread_rng; +use tokio::sync::broadcast::{channel, error::RecvError, Sender}; +use tower::{util::ServiceFn, Service}; +use tower_batch::{Batch, BatchControl}; +use tower_fallback::Fallback; + +use zebra_chain::primitives::redpallas::{batch, *}; + +/// Global batch verification context for RedPallas signatures. +/// +/// This service transparently batches contemporaneous signature verifications, +/// handling batch failures by falling back to individual verification. +/// +/// Note that making a `Service` call requires mutable access to the service, so +/// you should call `.clone()` on the global handle to create a local, mutable +/// handle. +#[allow(dead_code)] +pub static VERIFIER: Lazy< + Fallback, ServiceFn Ready>>>, +> = Lazy::new(|| { + Fallback::new( + Batch::new( + Verifier::default(), + super::MAX_BATCH_SIZE, + super::MAX_BATCH_LATENCY, + ), + // We want to fallback to individual verification if batch verification + // fails, so we need a Service to use. The obvious way to do this would + // be to write a closure that returns an async block. But because we + // have to specify the type of a static, we need to be able to write the + // type of the closure and its return value, and both closures and async + // blocks have eldritch types whose names cannot be written. So instead, + // we use a Ready to avoid an async block and cast the closure to a + // function (which is possible because it doesn't capture any state). + tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _), + ) +}); + +/// RedPallas signature verifier service +pub struct Verifier { + batch: batch::Verifier, + // This uses a "broadcast" channel, which is an mpmc channel. Tokio also + // provides a spmc channel, "watch", but it only keeps the latest value, so + // using it would require thinking through whether it was possible for + // results from one batch to be mixed with another. + tx: Sender>, +} + +impl Default for Verifier { + fn default() -> Self { + let batch = batch::Verifier::default(); + let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE); + Self { batch, tx } + } +} + +/// Type alias to clarify that this batch::Item is a RedPallasItem +pub type Item = batch::Item; + +impl Service> for Verifier { + type Response = (); + type Error = Error; + type Future = Pin> + Send + 'static>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: BatchControl) -> Self::Future { + match req { + BatchControl::Item(item) => { + tracing::trace!("got item"); + self.batch.queue(item); + let mut rx = self.tx.subscribe(); + Box::pin(async move { + match rx.recv().await { + Ok(result) => { + if result.is_ok() { + tracing::trace!(?result, "validated redpallas signature"); + metrics::counter!("signatures.redpallas.validated", 1); + } else { + tracing::trace!(?result, "invalid redpallas signature"); + metrics::counter!("signatures.redpallas.invalid", 1); + } + + result + } + Err(RecvError::Lagged(_)) => { + tracing::error!( + "batch verification receiver lagged and lost verification results" + ); + Err(Error::InvalidSignature) + } + Err(RecvError::Closed) => panic!("verifier was dropped without flushing"), + } + }) + } + + BatchControl::Flush => { + tracing::trace!("got flush command"); + let batch = mem::take(&mut self.batch); + let _ = self.tx.send(batch.verify(thread_rng())); + Box::pin(async { Ok(()) }) + } + } + } +} + +impl Drop for Verifier { + fn drop(&mut self) { + // We need to flush the current batch in case there are still any pending futures. + let batch = mem::take(&mut self.batch); + let _ = self.tx.send(batch.verify(thread_rng())); + } +} diff --git a/zebra-consensus/src/primitives/redpallas/tests.rs b/zebra-consensus/src/primitives/redpallas/tests.rs new file mode 100644 index 00000000000..b2537e8a9e8 --- /dev/null +++ b/zebra-consensus/src/primitives/redpallas/tests.rs @@ -0,0 +1,74 @@ +//! Tests for redpallas signature verification + +use super::*; + +use std::time::Duration; + +use color_eyre::eyre::{eyre, Result}; +use futures::stream::{FuturesUnordered, StreamExt}; +use tower::ServiceExt; +use tower_batch::Batch; + +async fn sign_and_verify(mut verifier: V, n: usize) -> Result<(), V::Error> +where + V: Service, +{ + let mut rng = thread_rng(); + let mut results = FuturesUnordered::new(); + for i in 0..n { + let span = tracing::trace_span!("sig", i); + let msg = b"BatchVerifyTest"; + + match i % 2 { + 0 => { + let sk = SigningKey::::new(&mut rng); + let vk = VerificationKey::from(&sk); + let sig = sk.sign(&mut rng, &msg[..]); + verifier.ready_and().await?; + results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into()))) + } + 1 => { + let sk = SigningKey::::new(&mut rng); + let vk = VerificationKey::from(&sk); + let sig = sk.sign(&mut rng, &msg[..]); + verifier.ready_and().await?; + results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into()))) + } + _ => panic!(), + } + } + + while let Some(result) = results.next().await { + result?; + } + + Ok(()) +} + +#[tokio::test] +async fn batch_flushes_on_max_items() -> Result<()> { + use tokio::time::timeout; + + // Use a very long max_latency and a short timeout to check that + // flushing is happening based on hitting max_items. + let verifier = Batch::new(Verifier::default(), 10, Duration::from_secs(1000)); + timeout(Duration::from_secs(5), sign_and_verify(verifier, 100)) + .await? + .map_err(|e| eyre!(e))?; + + Ok(()) +} + +#[tokio::test] +async fn batch_flushes_on_max_latency() -> Result<()> { + use tokio::time::timeout; + + // Use a very high max_items and a short timeout to check that + // flushing is happening based on hitting max_latency. + let verifier = Batch::new(Verifier::default(), 100, Duration::from_millis(500)); + timeout(Duration::from_secs(5), sign_and_verify(verifier, 10)) + .await? + .map_err(|e| eyre!(e))?; + + Ok(()) +}