diff --git a/teos/src/dbm.rs b/teos/src/dbm.rs index 3725dca9..ca6dabff 100644 --- a/teos/src/dbm.rs +++ b/teos/src/dbm.rs @@ -9,10 +9,10 @@ use std::str::FromStr; use rusqlite::limits::Limit; use rusqlite::{params, params_from_iter, Connection, Error as SqliteError}; +use bitcoin::consensus; use bitcoin::hashes::Hash; use bitcoin::secp256k1::SecretKey; use bitcoin::BlockHash; -use bitcoin::{consensus, Txid}; use teos_common::appointment::{Appointment, Locator}; use teos_common::dbm::{DatabaseConnection, DatabaseManager, Error}; @@ -20,7 +20,7 @@ use teos_common::UserId; use crate::extended_appointment::{ExtendedAppointment, UUID}; use crate::gatekeeper::UserInfo; -use crate::responder::{ConfirmationStatus, TransactionTracker}; +use crate::responder::{ConfirmationStatus, PenaltySummary, TransactionTracker}; const TABLES: [&str; 6] = [ "CREATE TABLE IF NOT EXISTS users ( @@ -668,7 +668,7 @@ impl DBM { } /// Loads the transaction IDs of all the penalties and their status from the database. - pub(crate) fn load_penalties_summaries(&self) -> HashMap { + pub(crate) fn load_penalties_summaries(&self) -> HashMap { let mut summaries = HashMap::new(); let mut stmt = self @@ -692,7 +692,7 @@ impl DBM { .txid(); summaries.insert( UUID::from_slice(&raw_uuid).unwrap(), - ( + PenaltySummary::new( penalty_txid, ConfirmationStatus::from_db_data(height, confirmed), ), @@ -758,6 +758,7 @@ mod tests { use teos_common::cryptography::{get_random_bytes, get_random_keypair}; use teos_common::test_utils::{get_random_locator, get_random_user_id}; + use crate::rpc_errors; use crate::test_utils::{ generate_dummy_appointment, generate_dummy_appointment_with_user, generate_uuid, get_random_tracker, get_random_tx, AVAILABLE_SLOTS, SUBSCRIPTION_EXPIRY, @@ -1370,11 +1371,8 @@ mod tests { // Store all the `n_app` appointments. for appointment in appointments.iter() { - dbm.store_appointment( - UUID::new(appointment.locator(), appointment.user_id), - appointment, - ) - .unwrap(); + dbm.store_appointment(appointment.uuid(), appointment) + .unwrap(); } // Select `n_app / 5` locators as if they appeared in a new block. @@ -1477,7 +1475,10 @@ mod tests { // Rejected status doesn't have a persistent DB representation. assert!(matches!( - dbm.update_tracker_status(uuid, &ConfirmationStatus::Rejected(100)), + dbm.update_tracker_status( + uuid, + &ConfirmationStatus::Rejected(rpc_errors::RPC_VERIFY_REJECTED) + ), Err(Error::MissingField) )); } @@ -1553,7 +1554,7 @@ mod tests { } #[test] - fn test_load_trackers_with_confirmation_status() { + fn test_load_trackers_with_confirmation_status_in_mempool() { let dbm = DBM::in_memory().unwrap(); let n_trackers = 100; let mut tracker_statuses = HashMap::new(); @@ -1603,6 +1604,45 @@ mod tests { ), in_mempool_since_i, ); + } + } + + #[test] + fn test_load_trackers_with_confirmation_status_confirmed() { + let dbm = DBM::in_memory().unwrap(); + let n_blocks = 100; + let n_trackers = 30; + let mut tracker_statuses = HashMap::new(); + + // Loop over a bunch of blocks. + for i in 0..n_blocks { + // Store a bunch of trackers in each block. + for j in 0..n_trackers { + let user_id = get_random_user_id(); + let user = UserInfo::new( + AVAILABLE_SLOTS + i, + SUBSCRIPTION_START + i, + SUBSCRIPTION_EXPIRY + i, + ); + dbm.store_user(user_id, &user).unwrap(); + + let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, None); + dbm.store_appointment(uuid, &appointment).unwrap(); + + // Some trackers confirmed and some aren't. + let status = if j % 2 == 0 { + ConfirmationStatus::InMempoolSince(i) + } else { + ConfirmationStatus::ConfirmedIn(i) + }; + + let tracker = get_random_tracker(user_id, status); + dbm.store_tracker(uuid, &tracker).unwrap(); + tracker_statuses.insert(uuid, status); + } + } + + for i in 0..n_blocks + 10 { let confirmed_in_i: HashSet = tracker_statuses .iter() .filter_map(|(&uuid, &status)| { @@ -1620,9 +1660,21 @@ mod tests { confirmed_in_i, ); } + } + + #[test] + fn test_load_trackers_with_confirmation_status_bad_status() { + let dbm = DBM::in_memory().unwrap(); + + assert!(matches!( + dbm.load_trackers_with_confirmation_status(ConfirmationStatus::Rejected( + rpc_errors::RPC_VERIFY_REJECTED + )), + Err(Error::MissingField) + )); assert!(matches!( - dbm.load_trackers_with_confirmation_status(ConfirmationStatus::Rejected(100)), + dbm.load_trackers_with_confirmation_status(ConfirmationStatus::IrrevocablyResolved), Err(Error::MissingField) )); } @@ -1654,7 +1706,8 @@ mod tests { let tracker = get_random_tracker(user_id, status); dbm.store_tracker(uuid, &tracker).unwrap(); - penalties_summaries.insert(uuid, (tracker.penalty_tx.txid(), status)); + penalties_summaries + .insert(uuid, PenaltySummary::new(tracker.penalty_tx.txid(), status)); } assert_eq!(dbm.load_penalties_summaries(), penalties_summaries); diff --git a/teos/src/extended_appointment.rs b/teos/src/extended_appointment.rs index b5c63da9..e7ba8f45 100644 --- a/teos/src/extended_appointment.rs +++ b/teos/src/extended_appointment.rs @@ -88,6 +88,10 @@ impl ExtendedAppointment { pub fn to_self_delay(&self) -> u32 { self.inner.to_self_delay } + + pub fn uuid(&self) -> UUID { + UUID::new(self.inner.locator, self.user_id) + } } #[cfg(test)] @@ -96,12 +100,6 @@ mod tests { use crate::test_utils::generate_uuid; - impl ExtendedAppointment { - pub fn uuid(&self) -> UUID { - UUID::new(self.inner.locator, self.user_id) - } - } - #[test] fn test_uuid_ser_deser() { let original_uuid = generate_uuid(); diff --git a/teos/src/gatekeeper.rs b/teos/src/gatekeeper.rs index a65e6893..fe67c156 100644 --- a/teos/src/gatekeeper.rs +++ b/teos/src/gatekeeper.rs @@ -259,7 +259,7 @@ impl Gatekeeper { /// If `refund` is set, the appointments owners will get their slots refunded back. /// /// DISCUSS: When `refund` is `false` we don't give back the slots to the user for the deleted appointments. - /// This is to disincentivise misbehavior (sending bad appointments, either non-decryptable or rejected by the network). + /// This is to discourage misbehavior (sending bad appointments, either non-decryptable or rejected by the network). pub(crate) fn delete_appointments(&self, appointments: Vec, refund: bool) { let mut dbm = self.dbm.lock().unwrap(); diff --git a/teos/src/responder.rs b/teos/src/responder.rs index 22374f65..e8d00fc5 100644 --- a/teos/src/responder.rs +++ b/teos/src/responder.rs @@ -100,6 +100,22 @@ impl From for common_msgs::Tracker { } } +/// A struct that packages the summary of a tracker's penalty transaction. +#[derive(Debug, PartialEq)] +pub(crate) struct PenaltySummary { + pub penalty_txid: Txid, + pub status: ConfirmationStatus, +} + +impl PenaltySummary { + pub fn new(penalty_txid: Txid, status: ConfirmationStatus) -> Self { + PenaltySummary { + penalty_txid, + status, + } + } +} + /// Component in charge of keeping track of triggered appointments. /// /// The [Responder] receives data from the [Watcher](crate::watcher::Watcher) in form of a [Breach]. @@ -227,8 +243,8 @@ impl Responder { let mut reorged_trackers = self.reorged_trackers.lock().unwrap(); let dbm = self.dbm.lock().unwrap(); - for (uuid, (penalty_txid, status)) in dbm.load_penalties_summaries() { - if txids.contains(&penalty_txid) { + for (uuid, penalty_summary) in dbm.load_penalties_summaries() { + if txids.contains(&penalty_summary.penalty_txid) { // First confirmation was received dbm.update_tracker_status(uuid, &ConfirmationStatus::ConfirmedIn(current_height)) .unwrap(); @@ -237,7 +253,7 @@ impl Responder { } else if reorged_trackers.contains(&uuid) { // Don't consider reorged trackers since they have wrong DB status. continue; - } else if let ConfirmationStatus::ConfirmedIn(h) = status { + } else if let ConfirmationStatus::ConfirmedIn(h) = penalty_summary.status { let confirmations = current_height - h; if confirmations == constants::IRREVOCABLY_RESOLVED { // Tracker is deep enough in the chain, it can be deleted @@ -245,11 +261,11 @@ impl Responder { } else { log::info!("{uuid} received a confirmation (count={confirmations})"); } - } else if let ConfirmationStatus::InMempoolSince(h) = status { + } else if let ConfirmationStatus::InMempoolSince(h) = penalty_summary.status { // Log all transactions that have missed confirmations log::info!( "Transaction missed a confirmation: {} (missed conf count: {})", - penalty_txid, + penalty_summary.penalty_txid, current_height - h ); } @@ -292,7 +308,6 @@ impl Responder { ); true } - // NOTE: We wil still rebroadcast the penalty nonetheless. ConfirmationStatus::Rejected(e) => { log::error!( "Reorged dispute tx (txid={}) rejected during rebroadcast (reason: {e:?})", @@ -1000,6 +1015,45 @@ mod tests { } } + #[tokio::test] + async fn test_handle_reorged_txs_rejected() { + let (responder, _s) = init_responder(MockedServerQuery::Error( + rpc_errors::RPC_VERIFY_REJECTED as i64, + )) + .await; + let n_trackers = 10; + let mut trackers = HashSet::new(); + + for _ in 0..n_trackers { + let uuid = responder + .add_random_tracker(ConfirmationStatus::ConfirmedIn(42)) + .uuid(); + responder.reorged_trackers.lock().unwrap().insert(uuid); + trackers.insert(uuid); + } + + let height = 100; + let rejected = HashSet::from_iter(responder.handle_reorged_txs(height).unwrap()); + // All the trackers should be returned as rejected. + assert_eq!(trackers, rejected); + // The reorged trackers buffer should be empty after this. + assert!(responder.reorged_trackers.lock().unwrap().is_empty()); + + // And all the reorged trackers statuses should be untouched. + for uuid in trackers { + assert_eq!( + responder + .dbm + .lock() + .unwrap() + .load_tracker(uuid) + .unwrap() + .status, + ConfirmationStatus::ConfirmedIn(42) + ); + } + } + #[tokio::test] async fn test_rebroadcast_stale_txs_accepted() { let (responder, _s) = init_responder(MockedServerQuery::InMempoool).await; @@ -1014,7 +1068,6 @@ mod tests { }; let uuid = responder.add_random_tracker(status).uuid(); - responder.reorged_trackers.lock().unwrap().insert(uuid); statues.insert(uuid, status); } @@ -1061,10 +1114,11 @@ mod tests { }; let uuid = responder.add_random_tracker(status).uuid(); - responder.reorged_trackers.lock().unwrap().insert(uuid); statues.insert(uuid, status); } + // `rebroadcast_stale_txs` will broadcast txs which has been in mempool since `CONFIRMATIONS_BEFORE_RETRY` or more + // blocks. Since our backend rejects all the txs, all these broadcasted txs should be returned from this method (rejected). let rejected = HashSet::from_iter(responder.rebroadcast_stale_txs(height).unwrap()); let should_reject: HashSet<_> = statues .iter() diff --git a/teos/src/test_utils.rs b/teos/src/test_utils.rs index f408b014..4dba0a93 100644 --- a/teos/src/test_utils.rs +++ b/teos/src/test_utils.rs @@ -323,7 +323,7 @@ pub(crate) fn generate_dummy_appointment_with_user( let mut app = generate_dummy_appointment(dispute_txid); app.user_id = user_id; - (UUID::new(app.locator(), user_id), app) + (app.uuid(), app) } pub(crate) fn get_random_breach() -> Breach { diff --git a/teos/src/watcher.rs b/teos/src/watcher.rs index acd7ffed..90606fcf 100644 --- a/teos/src/watcher.rs +++ b/teos/src/watcher.rs @@ -183,7 +183,7 @@ impl Watcher { self.last_known_block_height.load(Ordering::Acquire), ); - let uuid = UUID::new(extended_appointment.locator(), user_id); + let uuid = extended_appointment.uuid(); if self.responder.has_tracker(uuid) { log::info!("Tracker for {uuid} already found in Responder"); @@ -1140,7 +1140,7 @@ mod tests { } #[tokio::test] - async fn test_handle_breaches_rejected_by_responder() { + async fn test_handle_breaches_rejected_by_responder_backend() { let mut chain = Blockchain::default().with_height_and_txs(START_HEIGHT, 10); let (watcher, _s) = init_watcher(&mut chain).await; @@ -1163,21 +1163,53 @@ mod tests { let mut uuids = HashSet::new(); // Let the watcher track these breaches. + for (_, (_, tx)) in breaches.iter().enumerate() { + let (uuid, appointment) = + generate_dummy_appointment_with_user(user_id, Some(&tx.txid())); + let appointment = appointment.inner; + let signature = cryptography::sign(&appointment.to_vec(), &user_sk).unwrap(); + watcher.add_appointment(appointment, signature).unwrap(); + uuids.insert(uuid); + } + + assert_eq!( + uuids, + HashSet::from_iter(watcher.handle_breaches(breaches).unwrap()) + ); + } + + #[tokio::test] + async fn test_handle_breaches_rejected_by_responder_malformed() { + let mut chain = Blockchain::default().with_height_and_txs(START_HEIGHT, 10); + let (watcher, _s) = init_watcher(&mut chain).await; + + // Let's create some locators based on the transactions in the last block + let breaches: HashMap<_, _> = (0..10) + .map(|_| get_random_tx()) + .map(|tx| (Locator::new(tx.txid()), tx)) + .collect(); + + let (user_sk, user_pk) = get_random_keypair(); + let user_id = UserId(user_pk); + watcher.register(user_id).unwrap(); + + let mut rejected_breaches = HashSet::new(); + // Let the watcher track these breaches. for (i, (_, tx)) in breaches.iter().enumerate() { let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, Some(&tx.txid())); let mut appointment = appointment.inner; if i % 2 == 0 { - // Mal-format some appointments, they should still be returned as rejected. + // Mal-format some appointments, they should be returned as rejected. appointment.encrypted_blob.reverse(); + rejected_breaches.insert(uuid); }; let signature = cryptography::sign(&appointment.to_vec(), &user_sk).unwrap(); watcher.add_appointment(appointment, signature).unwrap(); - uuids.insert(uuid); } assert_eq!( - uuids, + rejected_breaches, HashSet::from_iter(watcher.handle_breaches(breaches).unwrap()) ); } @@ -1219,19 +1251,16 @@ mod tests { watcher.register(user_id).unwrap(); watcher.register(user2_id).unwrap(); - let appointment = generate_dummy_appointment(None); - let locator = appointment.locator(); - let uuid1 = UUID::new(locator, user_id); - let uuid2 = UUID::new(locator, user2_id); + let appointment = generate_dummy_appointment(None).inner; + let uuid1 = UUID::new(appointment.locator, user_id); + let uuid2 = UUID::new(appointment.locator, user2_id); - let user_sig = cryptography::sign(&appointment.inner.to_vec(), &user_sk).unwrap(); - watcher - .add_appointment(appointment.inner.clone(), user_sig) - .unwrap(); - let user2_sig = cryptography::sign(&appointment.inner.to_vec(), &user2_sk).unwrap(); + let user_sig = cryptography::sign(&appointment.to_vec(), &user_sk).unwrap(); watcher - .add_appointment(appointment.inner, user2_sig) + .add_appointment(appointment.clone(), user_sig) .unwrap(); + let user2_sig = cryptography::sign(&appointment.to_vec(), &user2_sk).unwrap(); + watcher.add_appointment(appointment, user2_sig).unwrap(); // Outdate the first user's registration. watcher