Skip to content

Commit

Permalink
Use sharding strategy to tell view what blocks to process
Browse files Browse the repository at this point in the history
  • Loading branch information
samdealy committed Aug 12, 2022
1 parent 24815b4 commit 7d90337
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 22 deletions.
7 changes: 6 additions & 1 deletion fog/view/server/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use mc_attest_net::{Client, RaClient};
use mc_common::{logger::log, time::SystemTimeProvider};
use mc_fog_sql_recovery_db::SqlRecoveryDb;
use mc_fog_view_enclave::{SgxViewEnclave, ENCLAVE_FILE};
use mc_fog_view_server::{config::MobileAcctViewConfig, server::ViewServer};
use mc_fog_view_server::{
config::MobileAcctViewConfig, server::ViewServer, sharding_strategy::EpochShardingStrategy,
};
use mc_util_cli::ParserWithBuildInfo;
use mc_util_grpc::AdminServer;
use std::{env, sync::Arc};
Expand Down Expand Up @@ -64,6 +66,9 @@ fn main() {
recovery_db,
ias_client,
SystemTimeProvider::default(),
// TODO: Change the EpochShardingStrategy to incorporate config values that specify
// start and end block indices. See PR #2352
EpochShardingStrategy::default(),
logger.clone(),
);
server.start();
Expand Down
57 changes: 49 additions & 8 deletions fog/view/server/src/db_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

//! An object for managing background data fetches from the recovery database.
use crate::{block_tracker::BlockTracker, counters};
use crate::{block_tracker::BlockTracker, counters, sharding_strategy::ShardingStrategy};
use mc_common::logger::{log, Logger};
use mc_crypto_keys::CompressedRistrettoPublic;
use mc_fog_recovery_db_iface::{IngressPublicKeyRecord, IngressPublicKeyRecordFilters, RecoveryDb};
Expand Down Expand Up @@ -75,11 +75,16 @@ pub struct DbFetcher {
}

impl DbFetcher {
pub fn new<DB: RecoveryDb + Clone + Send + Sync + 'static>(
pub fn new<DB, SS>(
db: DB,
readiness_indicator: ReadinessIndicator,
sharding_strategy: SS,
logger: Logger,
) -> Self {
) -> Self
where
DB: RecoveryDb + Clone + Send + Sync + 'static,
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
let stop_requested = Arc::new(AtomicBool::new(false));

let shared_state = Arc::new(Mutex::new(DbFetcherSharedState::default()));
Expand All @@ -102,6 +107,7 @@ impl DbFetcher {
thread_shared_state,
thread_num_queued_records_limiter,
readiness_indicator,
sharding_strategy,
logger,
)
})
Expand Down Expand Up @@ -165,25 +171,35 @@ impl Drop for DbFetcher {
}
}

struct DbFetcherThread<DB: RecoveryDb + Clone + Send + Sync + 'static> {
struct DbFetcherThread<DB, SS>
where
DB: RecoveryDb + Clone + Send + Sync + 'static,
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
db: DB,
stop_requested: Arc<AtomicBool>,
shared_state: Arc<Mutex<DbFetcherSharedState>>,
block_tracker: BlockTracker,
num_queued_records_limiter: Arc<(Mutex<usize>, Condvar)>,
readiness_indicator: ReadinessIndicator,
sharding_strategy: SS,
logger: Logger,
}

/// Background worker thread implementation that takes care of periodically
/// polling data out of the database.
impl<DB: RecoveryDb + Clone + Send + Sync + 'static> DbFetcherThread<DB> {
impl<DB, SS> DbFetcherThread<DB, SS>
where
DB: RecoveryDb + Clone + Send + Sync + 'static,
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
pub fn start(
db: DB,
stop_requested: Arc<AtomicBool>,
shared_state: Arc<Mutex<DbFetcherSharedState>>,
num_queued_records_limiter: Arc<(Mutex<usize>, Condvar)>,
readiness_indicator: ReadinessIndicator,
sharding_strategy: SS,
logger: Logger,
) {
let thread = Self {
Expand All @@ -193,6 +209,7 @@ impl<DB: RecoveryDb + Clone + Send + Sync + 'static> DbFetcherThread<DB> {
block_tracker: BlockTracker::new(logger.clone()),
num_queued_records_limiter,
readiness_indicator,
sharding_strategy,
logger,
};
thread.run();
Expand Down Expand Up @@ -305,6 +322,14 @@ impl<DB: RecoveryDb + Clone + Send + Sync + 'static> DbFetcherThread<DB> {

// Mark that we are done fetching data for this block.
self.block_tracker.block_processed(ingress_key, block_index);
if !self.sharding_strategy.should_process_block(block_index) {
log::trace!(
self.logger,
"Not adding block_index {} TxOuts because this shard is not responsible for it.",
block_index,
);
continue;
}

// Store the fetched records so that they could be consumed by the enclave
// when its ready.
Expand Down Expand Up @@ -366,6 +391,7 @@ impl<DB: RecoveryDb + Clone + Send + Sync + 'static> DbFetcherThread<DB> {
#[cfg(test)]
mod tests {
use super::*;
use crate::sharding_strategy::EpochShardingStrategy;
use mc_attest_core::VerificationReport;
use mc_common::logger::test_with_logger;
use mc_fog_recovery_db_iface::{IngressPublicKeyStatus, ReportData, ReportDb};
Expand All @@ -380,7 +406,12 @@ mod tests {
let mut rng: StdRng = SeedableRng::from_seed([123u8; 32]);
let db_test_context = SqlRecoveryDbTestContext::new(logger.clone());
let db = db_test_context.get_db_instance();
let db_fetcher = DbFetcher::new(db.clone(), Default::default(), logger);
let db_fetcher = DbFetcher::new(
db.clone(),
Default::default(),
EpochShardingStrategy::default(),
logger,
);

// Initially, our database starts empty.
let ingress_keys = db_fetcher.get_highest_processed_block_context();
Expand Down Expand Up @@ -610,7 +641,12 @@ mod tests {
let mut rng: StdRng = SeedableRng::from_seed([123u8; 32]);
let db_test_context = SqlRecoveryDbTestContext::new(logger.clone());
let db = db_test_context.get_db_instance();
let db_fetcher = DbFetcher::new(db.clone(), Default::default(), logger);
let db_fetcher = DbFetcher::new(
db.clone(),
Default::default(),
EpochShardingStrategy::default(),
logger,
);

// Register two ingress keys that have some overlap:
// key_id1 starts at block 0, key2 starts at block 5.
Expand Down Expand Up @@ -667,7 +703,12 @@ mod tests {
let mut rng: StdRng = SeedableRng::from_seed([123u8; 32]);
let db_test_context = SqlRecoveryDbTestContext::new(logger.clone());
let db = db_test_context.get_db_instance();
let db_fetcher = DbFetcher::new(db.clone(), Default::default(), logger);
let db_fetcher = DbFetcher::new(
db.clone(),
Default::default(),
EpochShardingStrategy::default(),
logger,
);

// Register two ingress keys that have some overlap:
// invoc_id1 starts at block 0, invoc_id2 starts at block 50.
Expand Down
2 changes: 1 addition & 1 deletion fog/view/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ pub mod fog_view_router_server;
pub mod fog_view_router_service;
pub mod fog_view_service;
pub mod server;
pub mod sharding_strategy;

mod block_tracker;
mod counters;
mod db_fetcher;
mod router_request_handler;
mod shard_responses_processor;
mod sharding_strategy;
44 changes: 33 additions & 11 deletions fog/view/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
counters,
db_fetcher::DbFetcher,
fog_view_service::FogViewService,
sharding_strategy::ShardingStrategy,
};
use futures::executor::block_on;
use mc_attest_net::RaClient;
Expand Down Expand Up @@ -41,26 +42,28 @@ use std::{
time::{Duration, Instant},
};

pub struct ViewServer<E, RC, DB>
pub struct ViewServer<E, RC, DB, SS>
where
E: ViewEnclaveProxy,
RC: RaClient + Send + Sync + 'static,
DB: RecoveryDb + Clone + Send + Sync + 'static,
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
config: MobileAcctViewConfig,
server: grpcio::Server,
enclave: E,
ra_client: RC,
report_cache_thread: Option<ReportCacheThread>,
db_poll_thread: DbPollThread<E, DB>,
db_poll_thread: DbPollThread<E, DB, SS>,
logger: Logger,
}

impl<E, RC, DB> ViewServer<E, RC, DB>
impl<E, RC, DB, SS> ViewServer<E, RC, DB, SS>
where
E: ViewEnclaveProxy,
RC: RaClient + Send + Sync + 'static,
DB: RecoveryDb + Clone + Send + Sync + 'static,
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
/// Make a new view server instance
pub fn new(
Expand All @@ -69,14 +72,16 @@ where
recovery_db: DB,
ra_client: RC,
time_provider: impl TimeProvider + 'static,
sharding_strategy: SS,
logger: Logger,
) -> ViewServer<E, RC, DB> {
) -> ViewServer<E, RC, DB, SS> {
let readiness_indicator = ReadinessIndicator::default();

let db_poll_thread = DbPollThread::new(
enclave.clone(),
recovery_db.clone(),
readiness_indicator.clone(),
sharding_strategy,
logger.clone(),
);

Expand Down Expand Up @@ -191,11 +196,12 @@ where
}
}

impl<E, RC, DB> Drop for ViewServer<E, RC, DB>
impl<E, RC, DB, SS> Drop for ViewServer<E, RC, DB, SS>
where
E: ViewEnclaveProxy,
RC: RaClient + Send + Sync + 'static,
DB: RecoveryDb + Clone + Send + Sync + 'static,
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
fn drop(&mut self) {
self.stop();
Expand All @@ -220,10 +226,11 @@ pub struct DbPollSharedState {
}

/// A thread that periodically pushes new tx data from db to enclave
struct DbPollThread<E, DB>
struct DbPollThread<E, DB, SS>
where
E: ViewEnclaveProxy,
DB: RecoveryDb + Clone + Send + Sync + 'static,
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
/// Enclave.
enclave: E,
Expand All @@ -243,17 +250,21 @@ where
/// Readiness indicator.
readiness_indicator: ReadinessIndicator,

/// Sharding strategy,
sharding_strategy: SS,

/// Logger.
logger: Logger,
}

/// How long to wait between polling db
const DB_POLL_INTERNAL: Duration = Duration::from_millis(100);

impl<E, DB> DbPollThread<E, DB>
impl<E, DB, SS> DbPollThread<E, DB, SS>
where
E: ViewEnclaveProxy,
DB: RecoveryDb + Clone + Send + Sync + 'static,
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
/// Get the shared state.
pub fn get_shared_state(&self) -> Arc<Mutex<DbPollSharedState>> {
Expand All @@ -265,6 +276,7 @@ where
enclave: E,
db: DB,
readiness_indicator: ReadinessIndicator,
sharding_strategy: SS,
logger: Logger,
) -> Self {
let stop_requested = Arc::new(AtomicBool::new(false));
Expand All @@ -277,6 +289,7 @@ where
stop_requested,
shared_state,
readiness_indicator,
sharding_strategy,
logger,
}
}
Expand All @@ -295,6 +308,7 @@ where
let thread_stop_requested = self.stop_requested.clone();
let thread_shared_state = self.shared_state.clone();
let thread_readiness_indicator = self.readiness_indicator.clone();
let thread_sharding_strategy = self.sharding_strategy.clone();
let thread_logger = self.logger.clone();

self.join_handle = Some(
Expand All @@ -307,6 +321,7 @@ where
thread_stop_requested,
thread_shared_state,
thread_readiness_indicator,
thread_sharding_strategy,
thread_logger,
)
})
Expand All @@ -330,6 +345,7 @@ where
stop_requested: Arc<AtomicBool>,
shared_state: Arc<Mutex<DbPollSharedState>>,
readiness_indicator: ReadinessIndicator,
sharding_strategy: SS,
logger: Logger,
) {
log::debug!(logger, "Db poll thread started");
Expand All @@ -340,6 +356,7 @@ where
db,
shared_state,
readiness_indicator,
sharding_strategy,
logger.clone(),
);
loop {
Expand All @@ -359,10 +376,11 @@ where
}
}

impl<E, DB> Drop for DbPollThread<E, DB>
impl<E, DB, SS> Drop for DbPollThread<E, DB, SS>
where
E: ViewEnclaveProxy,
DB: RecoveryDb + Clone + Send + Sync + 'static,
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
fn drop(&mut self) {
let _ = self.stop();
Expand Down Expand Up @@ -416,20 +434,24 @@ where
E: ViewEnclaveProxy,
DB: RecoveryDb + Clone + Send + Sync + 'static,
{
pub fn new(
pub fn new<SS>(
stop_requested: Arc<AtomicBool>,
enclave: E,
db: DB,
shared_state: Arc<Mutex<DbPollSharedState>>,
readiness_indicator: ReadinessIndicator,
sharding_strategy: SS,
logger: Logger,
) -> Self {
) -> Self
where
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
Self {
stop_requested,
enclave,
db: db.clone(),
shared_state,
db_fetcher: DbFetcher::new(db, readiness_indicator, logger.clone()),
db_fetcher: DbFetcher::new(db, readiness_indicator, sharding_strategy, logger.clone()),
enclave_block_tracker: BlockTracker::new(logger.clone()),
last_unblocked_at: Instant::now(),
logger,
Expand Down
1 change: 1 addition & 0 deletions fog/view/server/src/sharding_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub trait ShardingStrategy {
///
/// In practice, the set of Fog View Shards will contain overlapping
/// [epoch_block_ranges] in order to obfuscate which shard processed the TxOuts.
#[derive(Clone)]
pub struct EpochShardingStrategy {
/// If a block falls within this range, then the Fog View Store should
/// process its TxOuts.
Expand Down
Loading

0 comments on commit 7d90337

Please sign in to comment.