diff --git a/fog/view/server/src/bin/main.rs b/fog/view/server/src/bin/main.rs index f5aa50c54b..4986dd62de 100644 --- a/fog/view/server/src/bin/main.rs +++ b/fog/view/server/src/bin/main.rs @@ -6,7 +6,9 @@ use mc_attest_net::{Client, RaClient}; use mc_common::{logger::log, time::SystemTimeProvider}; use mc_fog_sql_recovery_db::SqlRecoveryDb; use mc_fog_view_enclave::{SgxViewEnclave, ENCLAVE_FILE}; -use mc_fog_view_server::{config::MobileAcctViewConfig, server::ViewServer}; +use mc_fog_view_server::{ + config::MobileAcctViewConfig, server::ViewServer, sharding_strategy::EpochShardingStrategy, +}; use mc_util_cli::ParserWithBuildInfo; use mc_util_grpc::AdminServer; use std::{env, sync::Arc}; @@ -64,6 +66,9 @@ fn main() { recovery_db, ias_client, SystemTimeProvider::default(), + // TODO: Change the EpochShardingStrategy to incorporate config values that specify + // start and end block indices. See PR #2352 + EpochShardingStrategy::default(), logger.clone(), ); server.start(); diff --git a/fog/view/server/src/db_fetcher.rs b/fog/view/server/src/db_fetcher.rs index e969cf8d97..437f0f4028 100644 --- a/fog/view/server/src/db_fetcher.rs +++ b/fog/view/server/src/db_fetcher.rs @@ -2,7 +2,7 @@ //! An object for managing background data fetches from the recovery database. -use crate::{block_tracker::BlockTracker, counters}; +use crate::{block_tracker::BlockTracker, counters, sharding_strategy::ShardingStrategy}; use mc_common::logger::{log, Logger}; use mc_crypto_keys::CompressedRistrettoPublic; use mc_fog_recovery_db_iface::{IngressPublicKeyRecord, IngressPublicKeyRecordFilters, RecoveryDb}; @@ -75,11 +75,16 @@ pub struct DbFetcher { } impl DbFetcher { - pub fn new( + pub fn new( db: DB, readiness_indicator: ReadinessIndicator, + sharding_strategy: SS, logger: Logger, - ) -> Self { + ) -> Self + where + DB: RecoveryDb + Clone + Send + Sync + 'static, + SS: ShardingStrategy + Clone + Send + Sync + 'static, + { let stop_requested = Arc::new(AtomicBool::new(false)); let shared_state = Arc::new(Mutex::new(DbFetcherSharedState::default())); @@ -102,6 +107,7 @@ impl DbFetcher { thread_shared_state, thread_num_queued_records_limiter, readiness_indicator, + sharding_strategy, logger, ) }) @@ -165,25 +171,35 @@ impl Drop for DbFetcher { } } -struct DbFetcherThread { +struct DbFetcherThread +where + DB: RecoveryDb + Clone + Send + Sync + 'static, + SS: ShardingStrategy + Clone + Send + Sync + 'static, +{ db: DB, stop_requested: Arc, shared_state: Arc>, block_tracker: BlockTracker, num_queued_records_limiter: Arc<(Mutex, Condvar)>, readiness_indicator: ReadinessIndicator, + sharding_strategy: SS, logger: Logger, } /// Background worker thread implementation that takes care of periodically /// polling data out of the database. -impl DbFetcherThread { +impl DbFetcherThread +where + DB: RecoveryDb + Clone + Send + Sync + 'static, + SS: ShardingStrategy + Clone + Send + Sync + 'static, +{ pub fn start( db: DB, stop_requested: Arc, shared_state: Arc>, num_queued_records_limiter: Arc<(Mutex, Condvar)>, readiness_indicator: ReadinessIndicator, + sharding_strategy: SS, logger: Logger, ) { let thread = Self { @@ -193,6 +209,7 @@ impl DbFetcherThread { block_tracker: BlockTracker::new(logger.clone()), num_queued_records_limiter, readiness_indicator, + sharding_strategy, logger, }; thread.run(); @@ -305,6 +322,14 @@ impl DbFetcherThread { // Mark that we are done fetching data for this block. self.block_tracker.block_processed(ingress_key, block_index); + if !self.sharding_strategy.should_process_block(block_index) { + log::trace!( + self.logger, + "Not adding block_index {} TxOuts because this shard is not responsible for it.", + block_index, + ); + continue; + } // Store the fetched records so that they could be consumed by the enclave // when its ready. @@ -366,6 +391,7 @@ impl DbFetcherThread { #[cfg(test)] mod tests { use super::*; + use crate::sharding_strategy::EpochShardingStrategy; use mc_attest_core::VerificationReport; use mc_common::logger::test_with_logger; use mc_fog_recovery_db_iface::{IngressPublicKeyStatus, ReportData, ReportDb}; @@ -380,7 +406,12 @@ mod tests { let mut rng: StdRng = SeedableRng::from_seed([123u8; 32]); let db_test_context = SqlRecoveryDbTestContext::new(logger.clone()); let db = db_test_context.get_db_instance(); - let db_fetcher = DbFetcher::new(db.clone(), Default::default(), logger); + let db_fetcher = DbFetcher::new( + db.clone(), + Default::default(), + EpochShardingStrategy::default(), + logger, + ); // Initially, our database starts empty. let ingress_keys = db_fetcher.get_highest_processed_block_context(); @@ -610,7 +641,12 @@ mod tests { let mut rng: StdRng = SeedableRng::from_seed([123u8; 32]); let db_test_context = SqlRecoveryDbTestContext::new(logger.clone()); let db = db_test_context.get_db_instance(); - let db_fetcher = DbFetcher::new(db.clone(), Default::default(), logger); + let db_fetcher = DbFetcher::new( + db.clone(), + Default::default(), + EpochShardingStrategy::default(), + logger, + ); // Register two ingress keys that have some overlap: // key_id1 starts at block 0, key2 starts at block 5. @@ -667,7 +703,12 @@ mod tests { let mut rng: StdRng = SeedableRng::from_seed([123u8; 32]); let db_test_context = SqlRecoveryDbTestContext::new(logger.clone()); let db = db_test_context.get_db_instance(); - let db_fetcher = DbFetcher::new(db.clone(), Default::default(), logger); + let db_fetcher = DbFetcher::new( + db.clone(), + Default::default(), + EpochShardingStrategy::default(), + logger, + ); // Register two ingress keys that have some overlap: // invoc_id1 starts at block 0, invoc_id2 starts at block 50. diff --git a/fog/view/server/src/lib.rs b/fog/view/server/src/lib.rs index 7f667ae34a..c202dce361 100644 --- a/fog/view/server/src/lib.rs +++ b/fog/view/server/src/lib.rs @@ -8,10 +8,10 @@ pub mod fog_view_router_server; pub mod fog_view_router_service; pub mod fog_view_service; pub mod server; +pub mod sharding_strategy; mod block_tracker; mod counters; mod db_fetcher; mod router_request_handler; mod shard_responses_processor; -mod sharding_strategy; diff --git a/fog/view/server/src/server.rs b/fog/view/server/src/server.rs index 8def9929fe..368f2f90af 100644 --- a/fog/view/server/src/server.rs +++ b/fog/view/server/src/server.rs @@ -10,6 +10,7 @@ use crate::{ counters, db_fetcher::DbFetcher, fog_view_service::FogViewService, + sharding_strategy::ShardingStrategy, }; use futures::executor::block_on; use mc_attest_net::RaClient; @@ -41,26 +42,28 @@ use std::{ time::{Duration, Instant}, }; -pub struct ViewServer +pub struct ViewServer where E: ViewEnclaveProxy, RC: RaClient + Send + Sync + 'static, DB: RecoveryDb + Clone + Send + Sync + 'static, + SS: ShardingStrategy + Clone + Send + Sync + 'static, { config: MobileAcctViewConfig, server: grpcio::Server, enclave: E, ra_client: RC, report_cache_thread: Option, - db_poll_thread: DbPollThread, + db_poll_thread: DbPollThread, logger: Logger, } -impl ViewServer +impl ViewServer where E: ViewEnclaveProxy, RC: RaClient + Send + Sync + 'static, DB: RecoveryDb + Clone + Send + Sync + 'static, + SS: ShardingStrategy + Clone + Send + Sync + 'static, { /// Make a new view server instance pub fn new( @@ -69,14 +72,16 @@ where recovery_db: DB, ra_client: RC, time_provider: impl TimeProvider + 'static, + sharding_strategy: SS, logger: Logger, - ) -> ViewServer { + ) -> ViewServer { let readiness_indicator = ReadinessIndicator::default(); let db_poll_thread = DbPollThread::new( enclave.clone(), recovery_db.clone(), readiness_indicator.clone(), + sharding_strategy, logger.clone(), ); @@ -191,11 +196,12 @@ where } } -impl Drop for ViewServer +impl Drop for ViewServer where E: ViewEnclaveProxy, RC: RaClient + Send + Sync + 'static, DB: RecoveryDb + Clone + Send + Sync + 'static, + SS: ShardingStrategy + Clone + Send + Sync + 'static, { fn drop(&mut self) { self.stop(); @@ -220,10 +226,11 @@ pub struct DbPollSharedState { } /// A thread that periodically pushes new tx data from db to enclave -struct DbPollThread +struct DbPollThread where E: ViewEnclaveProxy, DB: RecoveryDb + Clone + Send + Sync + 'static, + SS: ShardingStrategy + Clone + Send + Sync + 'static, { /// Enclave. enclave: E, @@ -243,6 +250,9 @@ where /// Readiness indicator. readiness_indicator: ReadinessIndicator, + /// Sharding strategy, + sharding_strategy: SS, + /// Logger. logger: Logger, } @@ -250,10 +260,11 @@ where /// How long to wait between polling db const DB_POLL_INTERNAL: Duration = Duration::from_millis(100); -impl DbPollThread +impl DbPollThread where E: ViewEnclaveProxy, DB: RecoveryDb + Clone + Send + Sync + 'static, + SS: ShardingStrategy + Clone + Send + Sync + 'static, { /// Get the shared state. pub fn get_shared_state(&self) -> Arc> { @@ -265,6 +276,7 @@ where enclave: E, db: DB, readiness_indicator: ReadinessIndicator, + sharding_strategy: SS, logger: Logger, ) -> Self { let stop_requested = Arc::new(AtomicBool::new(false)); @@ -277,6 +289,7 @@ where stop_requested, shared_state, readiness_indicator, + sharding_strategy, logger, } } @@ -295,6 +308,7 @@ where let thread_stop_requested = self.stop_requested.clone(); let thread_shared_state = self.shared_state.clone(); let thread_readiness_indicator = self.readiness_indicator.clone(); + let thread_sharding_strategy = self.sharding_strategy.clone(); let thread_logger = self.logger.clone(); self.join_handle = Some( @@ -307,6 +321,7 @@ where thread_stop_requested, thread_shared_state, thread_readiness_indicator, + thread_sharding_strategy, thread_logger, ) }) @@ -330,6 +345,7 @@ where stop_requested: Arc, shared_state: Arc>, readiness_indicator: ReadinessIndicator, + sharding_strategy: SS, logger: Logger, ) { log::debug!(logger, "Db poll thread started"); @@ -340,6 +356,7 @@ where db, shared_state, readiness_indicator, + sharding_strategy, logger.clone(), ); loop { @@ -359,10 +376,11 @@ where } } -impl Drop for DbPollThread +impl Drop for DbPollThread where E: ViewEnclaveProxy, DB: RecoveryDb + Clone + Send + Sync + 'static, + SS: ShardingStrategy + Clone + Send + Sync + 'static, { fn drop(&mut self) { let _ = self.stop(); @@ -416,20 +434,24 @@ where E: ViewEnclaveProxy, DB: RecoveryDb + Clone + Send + Sync + 'static, { - pub fn new( + pub fn new( stop_requested: Arc, enclave: E, db: DB, shared_state: Arc>, readiness_indicator: ReadinessIndicator, + sharding_strategy: SS, logger: Logger, - ) -> Self { + ) -> Self + where + SS: ShardingStrategy + Clone + Send + Sync + 'static, + { Self { stop_requested, enclave, db: db.clone(), shared_state, - db_fetcher: DbFetcher::new(db, readiness_indicator, logger.clone()), + db_fetcher: DbFetcher::new(db, readiness_indicator, sharding_strategy, logger.clone()), enclave_block_tracker: BlockTracker::new(logger.clone()), last_unblocked_at: Instant::now(), logger, diff --git a/fog/view/server/src/sharding_strategy.rs b/fog/view/server/src/sharding_strategy.rs index 31fe677c84..09439efcba 100644 --- a/fog/view/server/src/sharding_strategy.rs +++ b/fog/view/server/src/sharding_strategy.rs @@ -20,6 +20,7 @@ pub trait ShardingStrategy { /// /// In practice, the set of Fog View Shards will contain overlapping /// [epoch_block_ranges] in order to obfuscate which shard processed the TxOuts. +#[derive(Clone)] pub struct EpochShardingStrategy { /// If a block falls within this range, then the Fog View Store should /// process its TxOuts. diff --git a/fog/view/server/tests/smoke_tests.rs b/fog/view/server/tests/smoke_tests.rs index d096f986d8..cbdd572d6f 100644 --- a/fog/view/server/tests/smoke_tests.rs +++ b/fog/view/server/tests/smoke_tests.rs @@ -36,6 +36,7 @@ use mc_fog_view_protocol::FogViewConnection; use mc_fog_view_server::{ config::{ClientListenUri::ClientFacing, MobileAcctViewConfig as ViewConfig}, server::ViewServer, + sharding_strategy::EpochShardingStrategy, }; use mc_util_from_random::FromRandom; use mc_util_grpc::GrpcRetryConfig; @@ -52,7 +53,7 @@ fn get_test_environment( logger: Logger, ) -> ( SqlRecoveryDbTestContext, - ViewServer, + ViewServer, FogViewGrpcClient, ) { let db_test_context = SqlRecoveryDbTestContext::new(logger.clone()); @@ -90,6 +91,7 @@ fn get_test_environment( db, ra_client, SystemTimeProvider::default(), + EpochShardingStrategy::default(), logger.clone(), ); server.start();