Skip to content

Commit

Permalink
Implement James's and Andrew's suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
samdealy committed Jan 11, 2023
1 parent 478ec56 commit 18c8d9b
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 47 deletions.
6 changes: 5 additions & 1 deletion fog/types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use core::str::FromStr;
use prost::Message;
use serde::{Deserialize, Serialize};

/// The string that delimits the start and end blocks in a string that
/// represents a BlockRange.
pub const BLOCK_RANGE_DELIMITER: &str = "-";

/// A half-open [a, b) range of blocks
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Message, Serialize, Deserialize)]
pub struct BlockRange {
Expand Down Expand Up @@ -70,7 +74,7 @@ impl FromStr for BlockRange {

fn from_str(s: &str) -> Result<Self, Self::Err> {
let block_indices: Vec<u64> = s
.split(',')
.split(BLOCK_RANGE_DELIMITER)
.map(|index_str| index_str.trim().parse())
.collect::<Result<Vec<_>, _>>()
.map_err(|_| "BlockRange index is not a number.")?;
Expand Down
20 changes: 7 additions & 13 deletions fog/view/server/src/bin/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use mc_common::{logger::log, time::SystemTimeProvider};
use mc_fog_api::view_grpc::FogViewStoreApiClient;
use mc_fog_view_enclave::{SgxViewEnclave, ENCLAVE_FILE};
use mc_fog_view_server::{
config::{FogViewRouterConfig, ShardingStrategy::Epoch},
config::FogViewRouterConfig,
fog_view_router_server::{FogViewRouterServer, Shard},
sharding_strategy::ShardingStrategy,
sharding_strategy::{EpochShardingStrategy, ShardingStrategy},
};
use mc_util_cli::ParserWithBuildInfo;
use mc_util_grpc::ConnectionUriGrpcioChannel;
Expand Down Expand Up @@ -47,23 +47,17 @@ fn main() {
.name_prefix("Main-RPC".to_string())
.build(),
);
for (i, shard_uri) in config.shard_uris.clone().into_iter().enumerate() {
for shard_uri in config.shard_uris.clone() {
let fog_view_store_grpc_client = FogViewStoreApiClient::new(
ChannelBuilder::default_channel_builder(grpc_env.clone())
.connect_to_uri(&shard_uri, &logger),
);

let sharding_strategy = config
.sharding_strategies
.get(i)
.unwrap_or_else(|| panic!("Couldn't find shard at index {}", i));
let Epoch(epoch_sharding_strategy) = sharding_strategy;
// TODO: update this logic once we introduce other types of sharding strategies.
let epoch_sharding_strategy = EpochShardingStrategy::try_from(shard_uri.clone())
.expect("Could not get sharding strategy");
let block_range = epoch_sharding_strategy.get_block_range();
let shard = Shard::new(
shard_uri.clone(),
Arc::new(fog_view_store_grpc_client),
block_range,
);
let shard = Shard::new(shard_uri, Arc::new(fog_view_store_grpc_client), block_range);
shards.push(shard);
}
let shards = Arc::new(RwLock::new(shards));
Expand Down
5 changes: 0 additions & 5 deletions fog/view/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,6 @@ pub struct FogViewRouterConfig {
#[clap(long, env = "MC_CLIENT_LISTEN_URI")]
pub client_listen_uri: RouterClientListenUri,

/// Sharding Strategies for each Shard. Should be indexed the same as the
/// `shard_uris` field.
#[clap(long, env = "MC_VIEW_SHARDING_STRATEGIES")]
pub sharding_strategies: Vec<ShardingStrategy>,

/// gRPC listening URI for Fog View Stores. Should be indexed the same as
/// the `sharding_stratgies` field.
#[clap(long, env = "MC_VIEW_SHARD_URIS")]
Expand Down
11 changes: 2 additions & 9 deletions fog/view/server/src/router_admin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use crate::{
SVC_COUNTERS,
};
use grpcio::{ChannelBuilder, RpcContext, RpcStatus, UnarySink};
use mc_common::{
logger::{log, Logger},
HashSet,
};
use mc_common::logger::{log, Logger};
use mc_fog_api::{
view::AddShardRequest,
view_grpc::{FogViewRouterAdminApi, FogViewStoreApiClient},
Expand Down Expand Up @@ -46,9 +43,7 @@ impl FogViewRouterAdminService {
let mut shards = self.shards.write().expect("RwLock Poisoned");
if shards
.iter()
.map(|shard| shard.uri.clone())
.collect::<HashSet<FogViewStoreUri>>()
.contains(&view_store_uri)
.any(|shard| shard.uri.clone() == view_store_uri)
{
let error = rpc_precondition_error(
"add_shard",
Expand All @@ -66,8 +61,6 @@ impl FogViewRouterAdminService {
ChannelBuilder::default_channel_builder(grpc_env)
.connect_to_uri(&view_store_uri, logger),
);
// TODO: Add block range or sharding strategy to this...
// Check to make sure this block range isn't already covered...
let block_range = EpochShardingStrategy::default().get_block_range();
let shard = Shard::new(view_store_uri, Arc::new(view_store_client), block_range);
shards.push(shard);
Expand Down
26 changes: 25 additions & 1 deletion fog/view/server/src/sharding_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
//! TxOuts across Fog View Store instances.

use mc_blockchain_types::BlockIndex;
use mc_fog_types::{common::BlockRange, BlockCount};
use mc_fog_types::{
common::{BlockRange, BLOCK_RANGE_DELIMITER},
BlockCount,
};
use mc_fog_uri::FogViewStoreUri;
use mc_util_uri::ConnectionUri;
use serde::Serialize;
use std::str::FromStr;

Expand Down Expand Up @@ -39,6 +44,17 @@ pub struct EpochShardingStrategy {
epoch_block_range: BlockRange,
}

impl TryFrom<FogViewStoreUri> for EpochShardingStrategy {
type Error = String;

fn try_from(src: FogViewStoreUri) -> Result<Self, Self::Error> {
let sharding_strategy_string = src
.get_param("sharding_strategy")
.unwrap_or_else(|| "default".to_string());
EpochShardingStrategy::from_str(&sharding_strategy_string)
}
}

impl ShardingStrategy for EpochShardingStrategy {
fn should_process_block(&self, block_index: BlockIndex) -> bool {
self.epoch_block_range.contains(block_index)
Expand All @@ -61,6 +77,14 @@ impl Default for EpochShardingStrategy {
}
}

impl ToString for EpochShardingStrategy {
fn to_string(&self) -> String {
let start_block = self.epoch_block_range.start_block;
let end_block = self.epoch_block_range.end_block;
format!("{start_block}{BLOCK_RANGE_DELIMITER}{end_block}")
}
}

impl EpochShardingStrategy {
#[allow(dead_code)]
pub fn new(epoch_block_range: BlockRange) -> Self {
Expand Down
25 changes: 7 additions & 18 deletions fog/view/server/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use mc_fog_view_protocol::FogViewConnection;
use mc_fog_view_server::{
config::{
FogViewRouterConfig, MobileAcctViewConfig as ViewConfig, RouterClientListenUri,
ShardingStrategy, ShardingStrategy::Epoch,
ShardingStrategy::Epoch,
},
fog_view_router_server::{FogViewRouterServer, Shard},
server::ViewServer,
Expand Down Expand Up @@ -71,7 +71,7 @@ pub struct RouterTestEnvironment {
impl RouterTestEnvironment {
/// Creates a `RouterTestEnvironment` for the router integration tests.
pub fn new(omap_capacity: u64, store_block_ranges: Vec<BlockRange>, logger: Logger) -> Self {
let (db_test_context, store_servers, store_clients, shard_uris, sharding_strategies) =
let (db_test_context, store_servers, store_clients, shard_uris) =
Self::create_view_stores(omap_capacity, store_block_ranges, logger.clone());
let port = portpicker::pick_unused_port().expect("pick_unused_port");
let router_uri =
Expand All @@ -86,7 +86,6 @@ impl RouterTestEnvironment {
.responder_id()
.expect("Could not get responder id for Fog View Router."),
ias_api_key: Default::default(),
sharding_strategies,
shard_uris,
ias_spid: Default::default(),
client_listen_uri: RouterClientListenUri::Streaming(router_uri.clone()),
Expand All @@ -112,7 +111,7 @@ impl RouterTestEnvironment {
store_block_ranges: Vec<BlockRange>,
logger: Logger,
) -> Self {
let (db_test_context, store_servers, store_clients, shard_uris, sharding_strategies) =
let (db_test_context, store_servers, store_clients, shard_uris) =
Self::create_view_stores(omap_capacity, store_block_ranges, logger.clone());
let port = portpicker::pick_unused_port().expect("pick_unused_port");
let router_uri =
Expand All @@ -128,7 +127,6 @@ impl RouterTestEnvironment {
.expect("Could not get responder id for Fog View Router."),
ias_api_key: Default::default(),
ias_spid: Default::default(),
sharding_strategies,
shard_uris,
client_listen_uri: RouterClientListenUri::Unary(router_uri.clone()),
client_auth_token_max_lifetime: Default::default(),
Expand Down Expand Up @@ -218,28 +216,25 @@ impl RouterTestEnvironment {
Vec<TestViewServer>,
Arc<RwLock<Vec<Shard>>>,
Vec<FogViewStoreUri>,
Vec<ShardingStrategy>,
) {
let db_test_context = SqlRecoveryDbTestContext::new(logger.clone());
let db = db_test_context.get_db_instance();
let mut store_servers = Vec::new();
let mut shards = Vec::new();
let mut shard_uris: Vec<FogViewStoreUri> = Vec::new();
let mut sharding_strategies: Vec<ShardingStrategy> = Vec::new();

for (i, store_block_range) in store_block_ranges.into_iter().enumerate() {
let (store, store_uri) = {
let port = portpicker::pick_unused_port().expect("pick_unused_port");
let epoch_sharding_strategy = EpochShardingStrategy::new(store_block_range.clone());
let uri = FogViewStoreUri::from_str(&format!(
"insecure-fog-view-store://127.0.0.1:{}",
port
"insecure-fog-view-store://127.0.0.1:{port}?sharding_strategy={}",
epoch_sharding_strategy.to_string()
))
.unwrap();

let epoch_sharding_strategy = EpochShardingStrategy::new(store_block_range.clone());
let sharding_strategy = Epoch(epoch_sharding_strategy);
shard_uris.push(uri.clone());
sharding_strategies.push(sharding_strategy.clone());

let config = ViewConfig {
chain_id: "local".to_string(),
Expand Down Expand Up @@ -296,13 +291,7 @@ impl RouterTestEnvironment {

let store_clients = Arc::new(RwLock::new(shards));

(
db_test_context,
store_servers,
store_clients,
shard_uris,
sharding_strategies,
)
(db_test_context, store_servers, store_clients, shard_uris)
}
}

Expand Down

0 comments on commit 18c8d9b

Please sign in to comment.