Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Cleanup last_id access in stub and skel #171

Merged
merged 5 commits into from
May 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ impl Accountant {
acc
}

/// Return the last entry ID registered
pub fn last_id(&self) -> Hash {
let last_ids = self.last_ids.read().unwrap();
let last_item = last_ids.iter().last().expect("empty last_ids list");
last_item.0
}

fn reserve_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
if signatures.read().unwrap().contains(sig) {
return false;
Expand Down Expand Up @@ -327,6 +334,8 @@ mod tests {
let alice = Mint::new(10_000);
let bob_pubkey = KeyPair::new().pubkey();
let acc = Accountant::new(&alice);
assert_eq!(acc.last_id(), alice.last_id());

acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
Expand Down
30 changes: 6 additions & 24 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use transaction::Transaction;

pub struct AccountantSkel {
acc: Mutex<Accountant>,
last_id: Mutex<Hash>,
historian_input: Mutex<SyncSender<Signal>>,
historian: Historian,
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
Expand All @@ -44,7 +43,6 @@ pub struct AccountantSkel {
pub enum Request {
Transaction(Transaction),
GetBalance { key: PublicKey },
GetLastId,
Subscribe { subscriptions: Vec<Subscription> },
}

Expand Down Expand Up @@ -76,20 +74,13 @@ type SharedSkel = Arc<AccountantSkel>;
pub enum Response {
Balance { key: PublicKey, val: Option<i64> },
EntryInfo(EntryInfo),
LastId { id: Hash },
}

impl AccountantSkel {
/// Create a new AccountantSkel that wraps the given Accountant.
pub fn new(
acc: Accountant,
last_id: Hash,
historian_input: SyncSender<Signal>,
historian: Historian,
) -> Self {
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
AccountantSkel {
acc: Mutex::new(acc),
last_id: Mutex::new(last_id),
entry_info_subscribers: Mutex::new(vec![]),
historian_input: Mutex::new(historian_input),
historian,
Expand All @@ -116,10 +107,7 @@ impl AccountantSkel {

fn update_entry<W: Write>(obj: &SharedSkel, writer: &Arc<Mutex<W>>, entry: &Entry) {
trace!("update_entry entry");
let mut last_id_l = obj.last_id.lock().unwrap();
*last_id_l = entry.id;
obj.acc.lock().unwrap().register_entry_id(&last_id_l);
drop(last_id_l);
obj.acc.lock().unwrap().register_entry_id(&entry.id);
writeln!(
writer.lock().unwrap(),
"{}",
Expand Down Expand Up @@ -226,12 +214,6 @@ impl AccountantSkel {
let val = self.acc.lock().unwrap().get_balance(&key);
Some((Response::Balance { key, val }, rsp_addr))
}
Request::GetLastId => Some((
Response::LastId {
id: *self.last_id.lock().unwrap(),
},
rsp_addr,
)),
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
for subscription in subscriptions {
Expand Down Expand Up @@ -699,7 +681,7 @@ mod tests {
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let skel = AccountantSkel::new(acc, mint.last_id(), input, historian);
let skel = AccountantSkel::new(acc, input, historian);

// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
Expand Down Expand Up @@ -740,7 +722,7 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc_skel = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian));
let acc_skel = Arc::new(AccountantSkel::new(acc, input, historian));
let serve_addr = leader_serve.local_addr().unwrap();
let threads = AccountantSkel::serve(
&acc_skel,
Expand Down Expand Up @@ -858,7 +840,7 @@ mod tests {
let acc = Accountant::new(&alice);
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian));
let acc = Arc::new(AccountantSkel::new(acc, input, historian));
let replicate_addr = target1_data.replicate_addr;
let threads = AccountantSkel::replicate(
&acc,
Expand Down Expand Up @@ -1007,7 +989,7 @@ mod bench {

let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let skel = AccountantSkel::new(acc, mint.last_id(), input, historian);
let skel = AccountantSkel::new(acc, input, historian);

let now = Instant::now();
assert!(skel.process_packets(req_vers).is_ok());
Expand Down
23 changes: 3 additions & 20 deletions src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ impl AccountantStub {
Response::Balance { key, val } => {
self.balances.insert(key, val);
}
Response::LastId { id } => {
self.last_id = Some(id);
}
Response::EntryInfo(entry_info) => {
self.last_id = Some(entry_info.id);
self.num_events += entry_info.num_events;
Expand Down Expand Up @@ -109,23 +106,9 @@ impl AccountantStub {
}

/// Request the last Entry ID from the server. This method blocks
/// until the server sends a response. At the time of this writing,
/// it also has the side-effect of causing the server to log any
/// entries that have been published by the Historian.
/// until the server sends a response.
pub fn get_last_id(&mut self) -> FutureResult<Hash, ()> {
let req = Request::GetLastId;
let data = serialize(&req).expect("serialize GetId");
self.socket
.send_to(&data, &self.addr)
.expect("buffer error");
let mut done = false;
while !done {
let resp = self.recv_response().expect("recv response");
if let &Response::LastId { .. } = &resp {
done = true;
}
self.process_response(resp);
}
self.transaction_count();
ok(self.last_id.unwrap_or(Hash::default()))
}

Expand Down Expand Up @@ -192,7 +175,7 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian));
let acc = Arc::new(AccountantSkel::new(acc, input, historian));
let threads = AccountantSkel::serve(&acc, d, serve, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(300));

Expand Down
2 changes: 1 addition & 1 deletion src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn main() {
let (input, event_receiver) = sync_channel(10_000);
let historian = Historian::new(event_receiver, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false));
let skel = Arc::new(AccountantSkel::new(acc, last_id, input, historian));
let skel = Arc::new(AccountantSkel::new(acc, input, historian));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,8 @@ mod test {
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver,
PacketReceiver};
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
use streamer::{BlobReceiver, PacketReceiver};

fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
Expand Down