Skip to content

Commit

Permalink
Add RedJubjub signature verifier service (#460)
Browse files Browse the repository at this point in the history
Using tower-batch-based async pattern.

Now the Verifier is agnostic of redjubjub SigTypes.  Updated tests to
generate sigs of both types and batch verifies the whole batch.

Resolves #407
  • Loading branch information
dconnolly authored Jul 8, 2020
1 parent 83f0747 commit 2cd58c8
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 23 deletions.
10 changes: 6 additions & 4 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
name: Coverage
name: CI

on:
pull_request:
paths:
- '**.rs'
- '**.rs'
push:
branches:
- main
paths:
- '**.rs'
- '**.rs'

jobs:

codecov:
coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down
54 changes: 48 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 15 additions & 13 deletions zebra-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,27 @@ authors = ["Zcash Foundation <zebra@zfnd.org>"]
license = "MIT OR Apache-2.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
zebra-chain = { path = "../zebra-chain" }
zebra-state = { path = "../zebra-state" }

chrono = "0.4.13"
futures = "0.3.5"
futures-util = "0.3.5"
tokio = { version = "0.2", features = ["sync"] }
tower = "0.3.1"
rand = "0.7"
redjubjub = { git = "https://github.com/ZcashFoundation/redjubjub.git", branch = "main"}
tokio = { version = "0.2", features = ["time", "sync", "stream"] }
tower = "0.3"
tracing = "0.1.15"
tracing-futures = "0.2.4"

[dev-dependencies]
zebra-test = { path = "../zebra-test/" }
tower-batch = { path = "../tower-batch/" }
zebra-chain = { path = "../zebra-chain" }
zebra-state = { path = "../zebra-state" }

[dev-dependencies]
color-eyre = "0.5"
futures = "0.3.5"
rand = "0.7"
spandoc = "0.1"
tokio = { version = "0.2", features = ["macros", "time"] }
tracing = "0.1.15"
tokio = { version = "0.2", features = ["full"] }
tracing-error = "0.1.2"
tracing-futures = "0.2"
tracing-subscriber = "0.2.7"

zebra-test = { path = "../zebra-test/" }
1 change: 1 addition & 0 deletions zebra-consensus/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! verification.
mod block;
pub mod redjubjub;
mod script;
mod transaction;

Expand Down
155 changes: 155 additions & 0 deletions zebra-consensus/src/verify/redjubjub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//! Async RedJubjub batch verifier service
use std::{
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
};

use rand::thread_rng;
use redjubjub::{batch, *};
use tokio::sync::broadcast::{channel, RecvError, Sender};
use tower::Service;
use tower_batch::BatchControl;

/// RedJubjub signature verifier service
pub struct Verifier {
batch: batch::Verifier,
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also
// provides a spmc channel, "watch", but it only keeps the latest value, so
// using it would require thinking through whether it was possible for
// results from one batch to be mixed with another.
tx: Sender<Result<(), Error>>,
}

#[allow(clippy::new_without_default)]
impl Verifier {
/// Create a new RedJubjubVerifier instance
pub fn new() -> Self {
let batch = batch::Verifier::default();
// XXX(hdevalence) what's a reasonable choice here?
let (tx, _) = channel(10);
Self { tx, batch }
}
}

/// Type alias to clarify that this batch::Item is a RedJubjubItem
pub type Item = batch::Item;

impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
match req {
BatchControl::Item(item) => {
tracing::trace!("got item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.recv().await {
Ok(result) => result,
Err(RecvError::Lagged(_)) => {
tracing::warn!(
"missed channel updates for the correct signature batch!"
);
Err(Error::InvalidSignature)
}
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
}
})
}

BatchControl::Flush => {
tracing::trace!("got flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
Box::pin(async { Ok(()) })
}
}
}
}

impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::time::Duration;

use color_eyre::eyre::Result;
use futures::stream::{FuturesUnordered, StreamExt};
use tower::ServiceExt;
use tower_batch::Batch;

async fn sign_and_verify<V>(mut verifier: V, n: usize) -> Result<(), V::Error>
where
V: Service<Item, Response = ()>,
{
let rng = thread_rng();
let mut results = FuturesUnordered::new();
for i in 0..n {
let span = tracing::trace_span!("sig", i);
let msg = b"BatchVerifyTest";

match i % 2 {
0 => {
let sk = SigningKey::<SpendAuth>::new(rng);
let vk = VerificationKey::from(&sk);
let sig = sk.sign(rng, &msg[..]);
verifier.ready_and().await?;
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
}
1 => {
let sk = SigningKey::<Binding>::new(rng);
let vk = VerificationKey::from(&sk);
let sig = sk.sign(rng, &msg[..]);
verifier.ready_and().await?;
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
}
_ => panic!(),
}
}

while let Some(result) = results.next().await {
result?;
}

Ok(())
}

#[tokio::test]
#[spandoc::spandoc]
async fn batch_flushes_on_max_items() -> Result<()> {
use tokio::time::timeout;

// Use a very long max_latency and a short timeout to check that
// flushing is happening based on hitting max_items.
let verifier = Batch::new(Verifier::new(), 10, Duration::from_secs(1000));
timeout(Duration::from_secs(5), sign_and_verify(verifier, 100)).await?
}

#[tokio::test]
#[spandoc::spandoc]
async fn batch_flushes_on_max_latency() -> Result<()> {
use tokio::time::timeout;

// Use a very high max_items and a short timeout to check that
// flushing is happening based on hitting max_latency.
let verifier = Batch::new(Verifier::new(), 100, Duration::from_millis(500));
timeout(Duration::from_secs(5), sign_and_verify(verifier, 10)).await?
}
}

0 comments on commit 2cd58c8

Please sign in to comment.