-
Notifications
You must be signed in to change notification settings - Fork 107
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2318 from ZcashFoundation/redpallas-verifier
RedPallas async verifier service
- Loading branch information
Showing
4 changed files
with
210 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
//! Async RedPallas batch verifier service | ||
#[cfg(test)] | ||
mod tests; | ||
|
||
use std::{ | ||
future::Future, | ||
mem, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
use futures::future::{ready, Ready}; | ||
use once_cell::sync::Lazy; | ||
use rand::thread_rng; | ||
use tokio::sync::broadcast::{channel, error::RecvError, Sender}; | ||
use tower::{util::ServiceFn, Service}; | ||
use tower_batch::{Batch, BatchControl}; | ||
use tower_fallback::Fallback; | ||
|
||
use zebra_chain::primitives::redpallas::{batch, *}; | ||
|
||
/// Global batch verification context for RedPallas signatures. | ||
/// | ||
/// This service transparently batches contemporaneous signature verifications, | ||
/// handling batch failures by falling back to individual verification. | ||
/// | ||
/// Note that making a `Service` call requires mutable access to the service, so | ||
/// you should call `.clone()` on the global handle to create a local, mutable | ||
/// handle. | ||
#[allow(dead_code)] | ||
pub static VERIFIER: Lazy< | ||
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> Ready<Result<(), Error>>>>, | ||
> = Lazy::new(|| { | ||
Fallback::new( | ||
Batch::new( | ||
Verifier::default(), | ||
super::MAX_BATCH_SIZE, | ||
super::MAX_BATCH_LATENCY, | ||
), | ||
// We want to fallback to individual verification if batch verification | ||
// fails, so we need a Service to use. The obvious way to do this would | ||
// be to write a closure that returns an async block. But because we | ||
// have to specify the type of a static, we need to be able to write the | ||
// type of the closure and its return value, and both closures and async | ||
// blocks have eldritch types whose names cannot be written. So instead, | ||
// we use a Ready to avoid an async block and cast the closure to a | ||
// function (which is possible because it doesn't capture any state). | ||
tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _), | ||
) | ||
}); | ||
|
||
/// RedPallas signature verifier service | ||
pub struct Verifier { | ||
batch: batch::Verifier, | ||
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also | ||
// provides a spmc channel, "watch", but it only keeps the latest value, so | ||
// using it would require thinking through whether it was possible for | ||
// results from one batch to be mixed with another. | ||
tx: Sender<Result<(), Error>>, | ||
} | ||
|
||
impl Default for Verifier { | ||
fn default() -> Self { | ||
let batch = batch::Verifier::default(); | ||
let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE); | ||
Self { batch, tx } | ||
} | ||
} | ||
|
||
/// Type alias to clarify that this batch::Item is a RedPallasItem | ||
pub type Item = batch::Item; | ||
|
||
impl Service<BatchControl<Item>> for Verifier { | ||
type Response = (); | ||
type Error = Error; | ||
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>; | ||
|
||
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
Poll::Ready(Ok(())) | ||
} | ||
|
||
fn call(&mut self, req: BatchControl<Item>) -> Self::Future { | ||
match req { | ||
BatchControl::Item(item) => { | ||
tracing::trace!("got item"); | ||
self.batch.queue(item); | ||
let mut rx = self.tx.subscribe(); | ||
Box::pin(async move { | ||
match rx.recv().await { | ||
Ok(result) => { | ||
if result.is_ok() { | ||
tracing::trace!(?result, "validated redpallas signature"); | ||
metrics::counter!("signatures.redpallas.validated", 1); | ||
} else { | ||
tracing::trace!(?result, "invalid redpallas signature"); | ||
metrics::counter!("signatures.redpallas.invalid", 1); | ||
} | ||
|
||
result | ||
} | ||
Err(RecvError::Lagged(_)) => { | ||
tracing::error!( | ||
"batch verification receiver lagged and lost verification results" | ||
); | ||
Err(Error::InvalidSignature) | ||
} | ||
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"), | ||
} | ||
}) | ||
} | ||
|
||
BatchControl::Flush => { | ||
tracing::trace!("got flush command"); | ||
let batch = mem::take(&mut self.batch); | ||
let _ = self.tx.send(batch.verify(thread_rng())); | ||
Box::pin(async { Ok(()) }) | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl Drop for Verifier { | ||
fn drop(&mut self) { | ||
// We need to flush the current batch in case there are still any pending futures. | ||
let batch = mem::take(&mut self.batch); | ||
let _ = self.tx.send(batch.verify(thread_rng())); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
//! Tests for redpallas signature verification | ||
use super::*; | ||
|
||
use std::time::Duration; | ||
|
||
use color_eyre::eyre::{eyre, Result}; | ||
use futures::stream::{FuturesUnordered, StreamExt}; | ||
use tower::ServiceExt; | ||
use tower_batch::Batch; | ||
|
||
async fn sign_and_verify<V>(mut verifier: V, n: usize) -> Result<(), V::Error> | ||
where | ||
V: Service<Item, Response = ()>, | ||
{ | ||
let mut rng = thread_rng(); | ||
let mut results = FuturesUnordered::new(); | ||
for i in 0..n { | ||
let span = tracing::trace_span!("sig", i); | ||
let msg = b"BatchVerifyTest"; | ||
|
||
match i % 2 { | ||
0 => { | ||
let sk = SigningKey::<SpendAuth>::new(&mut rng); | ||
let vk = VerificationKey::from(&sk); | ||
let sig = sk.sign(&mut rng, &msg[..]); | ||
verifier.ready_and().await?; | ||
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into()))) | ||
} | ||
1 => { | ||
let sk = SigningKey::<Binding>::new(&mut rng); | ||
let vk = VerificationKey::from(&sk); | ||
let sig = sk.sign(&mut rng, &msg[..]); | ||
verifier.ready_and().await?; | ||
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into()))) | ||
} | ||
_ => panic!(), | ||
} | ||
} | ||
|
||
while let Some(result) = results.next().await { | ||
result?; | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn batch_flushes_on_max_items() -> Result<()> { | ||
use tokio::time::timeout; | ||
|
||
// Use a very long max_latency and a short timeout to check that | ||
// flushing is happening based on hitting max_items. | ||
let verifier = Batch::new(Verifier::default(), 10, Duration::from_secs(1000)); | ||
timeout(Duration::from_secs(5), sign_and_verify(verifier, 100)) | ||
.await? | ||
.map_err(|e| eyre!(e))?; | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn batch_flushes_on_max_latency() -> Result<()> { | ||
use tokio::time::timeout; | ||
|
||
// Use a very high max_items and a short timeout to check that | ||
// flushing is happening based on hitting max_latency. | ||
let verifier = Batch::new(Verifier::default(), 100, Duration::from_millis(500)); | ||
timeout(Duration::from_secs(5), sign_and_verify(verifier, 10)) | ||
.await? | ||
.map_err(|e| eyre!(e))?; | ||
|
||
Ok(()) | ||
} |