Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Tell verifiers when not to parallelize accounting #125

Merged
merged 2 commits into from
Apr 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,12 @@ impl Accountant {
pub fn process_verified_transactions(&self, trs: Vec<Transaction>) -> Vec<Result<Transaction>> {
// Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically.
trs.into_par_iter()
let results: Vec<_> = trs.into_par_iter()
.map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr))
.collect(); // Calling collect() here forces all debits to complete before moving on.

results
.into_par_iter()
.map(|result| {
result.map(|tr| {
self.process_verified_transaction_credits(&tr);
Expand All @@ -166,6 +170,27 @@ impl Accountant {
.collect()
}

fn partition_events(events: Vec<Event>) -> (Vec<Transaction>, Vec<Event>) {
let mut trs = vec![];
let mut rest = vec![];
for event in events {
match event {
Event::Transaction(tr) => trs.push(tr),
_ => rest.push(event),
}
}
(trs, rest)
}

pub fn process_verified_events(&self, events: Vec<Event>) -> Result<()> {
let (trs, rest) = Self::partition_events(events);
self.process_verified_transactions(trs);
for event in rest {
self.process_verified_event(&event)?;
}
Ok(())
}

/// Process a Witness Signature that has already been verified.
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) {
Expand Down Expand Up @@ -410,6 +435,17 @@ mod tests {
// Assert we're no longer able to use the oldest entry ID.
assert!(!acc.reserve_signature_with_last_id(&sig, &alice.last_id()));
}

#[test]
fn test_debits_before_credits() {
let mint = Mint::new(2);
let acc = Accountant::new(&mint);
let alice = KeyPair::new();
let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let trs = vec![tr0, tr1];
assert!(acc.process_verified_transactions(trs)[1].is_err());
}
}

#[cfg(all(feature = "unstable", test))]
Expand Down
46 changes: 44 additions & 2 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
}

// Let validators know they should not attempt to process additional
// transactions in parallel.
self.historian.sender.send(Signal::Tick)?;

// Process the remaining requests serially.
let rsps = reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
Expand Down Expand Up @@ -321,12 +325,14 @@ mod tests {
use accountant::Accountant;
use accountant_skel::AccountantSkel;
use accountant_stub::AccountantStub;
use entry::Entry;
use historian::Historian;
use mint::Mint;
use plan::Plan;
use recorder::Signal;
use signature::{KeyPair, KeyPairUtil};
use std::io::sink;
use std::net::UdpSocket;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::sleep;
Expand Down Expand Up @@ -359,6 +365,43 @@ mod tests {
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
}

#[test]
fn test_accounting_sequential_consistency() {
// In this attack we'll demonstrate that a verifier can interpret the ledger
// differently if either the server doesn't signal the ledger to add an
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let acc = Accountant::new(&mint);
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
let historian = Historian::new(&mint.last_id(), None);
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian);

// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)];
assert!(skel.process_packets(req_vers).is_ok());

// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)];
assert!(skel.process_packets(req_vers).is_ok());

// Collect the ledger and feed it to a new accountant.
skel.historian.sender.send(Signal::Tick).unwrap();
drop(skel.historian.sender);
let entries: Vec<Entry> = skel.historian.receiver.iter().collect();

// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
// the account balance below zero before the credit is added.
let acc = Accountant::new(&mint);
for entry in entries {
acc.process_verified_events(entry.events).unwrap();
}
assert_eq!(acc.get_balance(&alice.pubkey()), Some(1));
}

#[test]
fn test_accountant_bad_sig() {
let serve_port = 9002;
Expand Down Expand Up @@ -468,7 +511,6 @@ mod bench {
let tps = txs as f64 / sec;

// Ensure that all transactions were successfully logged.
skel.historian.sender.send(Signal::Tick).unwrap();
drop(skel.historian.sender);
let entries: Vec<Entry> = skel.historian.receiver.iter().collect();
assert_eq!(entries.len(), 1);
Expand Down
4 changes: 1 addition & 3 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ fn main() {
let mut last_id = entry1.id;
for entry in entries {
last_id = entry.id;
for event in entry.events {
acc.process_verified_event(&event).unwrap();
}
acc.process_verified_events(entry.events).unwrap();
}

let historian = Historian::new(&last_id, Some(1000));
Expand Down