Skip to content

Commit

Permalink
Incorporate missed blocks into calculation for highest processed bloc…
Browse files Browse the repository at this point in the history
…k count
  • Loading branch information
samdealy committed Nov 15, 2022
1 parent 1d2cb29 commit a7cc0d8
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 44 deletions.
15 changes: 9 additions & 6 deletions fog/view/enclave/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod e_tx_out_store;
mod oblivious_utils;
mod types;

use crate::types::get_block_data;
use alloc::vec::Vec;
use e_tx_out_store::{ETxOutStore, StorageDataSize, StorageMetaSize};
use mc_attest_core::{IasNonce, Quote, QuoteNonce, Report, TargetInfo, VerificationReport};
Expand All @@ -34,7 +35,7 @@ use mc_fog_view_enclave_api::{
use mc_oblivious_traits::ORAMStorageCreator;
use mc_sgx_compat::sync::Mutex;
use mc_sgx_report_cache_api::{ReportableEnclave, Result as ReportableEnclaveResult};
use types::{BlockData, CommonShardData, DecryptedMultiViewStoreQueryResponse, LastKnownData};
use types::{CommonShardData, DecryptedMultiViewStoreQueryResponse, LastKnownData};

pub struct ViewEnclave<OSC>
where
Expand Down Expand Up @@ -323,17 +324,19 @@ where
result.last_known_block_cumulative_txo_count =
last_known_data.last_known_block_cumulative_txo_count;

let block_data = BlockData::from(responses.as_mut_slice());
result.highest_processed_block_count = block_data.highest_processed_block_count;
result.highest_processed_block_signature_timestamp =
block_data.highest_processed_block_signature_timestamp;

// TODO: Use missed block ranges in HPBC calculation.
let shared_data: CommonShardData = CommonShardData::from(responses.as_slice());
result.missed_block_ranges = shared_data.missed_block_ranges;
result.rng_records = shared_data.rng_records;
result.decommissioned_ingest_invocations = shared_data.decommissioned_ingest_invocations;
result.next_start_from_user_event_id = shared_data.next_start_from_user_event_id;

let block_data =
get_block_data(responses.as_mut_slice(), result.missed_block_ranges.clone());
result.highest_processed_block_count = block_data.highest_processed_block_count;
result.highest_processed_block_signature_timestamp =
block_data.highest_processed_block_signature_timestamp;

result.tx_out_search_results =
Self::get_collated_tx_out_search_results(client_query_request, &responses)?;

Expand Down
174 changes: 136 additions & 38 deletions fog/view/enclave/impl/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,38 +101,73 @@ impl CommonShardData {
}
}

impl From<&mut [DecryptedMultiViewStoreQueryResponse]> for BlockData {
fn from(responses: &mut [DecryptedMultiViewStoreQueryResponse]) -> Self {
responses.sort_unstable_by_key(|response| response.block_range.start_block);

// Find the first time in which a highest processed block count does not equate
// to the final block that the shard is responsible for.
let mut result = BlockData::default();
for response in responses.iter() {
let response_highest_processed_block_count =
response.query_response.highest_processed_block_count;
if response_highest_processed_block_count > result.highest_processed_block_count {
result = BlockData::new(
response_highest_processed_block_count,
response
.query_response
.highest_processed_block_signature_timestamp,
);
}

// In this case, the shard hasn't processed all the blocks it's responsible for,
// and, as such, those blocks might not be processed so we should return this
// number.
// TODO: Consider implementing logic that accounts for overlapping block ranges.
// If ranges overlap, then the next server might have processed those blocks
// that this shard did not process (but is responsible for).
if response_highest_processed_block_count < response.block_range.end_block {
return result;
}
pub(crate) fn get_block_data(
responses: &mut [DecryptedMultiViewStoreQueryResponse],
mut missed_block_ranges: Vec<BlockRange>,
) -> BlockData {
responses.sort_unstable_by_key(|response| response.block_range.start_block);
missed_block_ranges.sort_unstable_by_key(|block_range| block_range.start_block);

// Find the first time in which a highest processed block count does not equate
// to the final block that the shard is responsible for.
let mut result = BlockData::default();
for response in responses.iter() {
let shard_highest_processed_block_count =
response.query_response.highest_processed_block_count;
if shard_highest_processed_block_count > result.highest_processed_block_count {
result = BlockData::new(
shard_highest_processed_block_count,
response
.query_response
.highest_processed_block_signature_timestamp,
);
}

result
// In this case, the shard hasn't processed all the blocks it's responsible for,
// and, as such, those blocks might not be processed so we should return this
// number.
// TODO: Consider implementing logic that accounts for overlapping block ranges.
// If ranges overlap, then the next server might have processed those blocks
// that this shard did not process (but is responsible for).
let shard_highest_processed_block_index = shard_highest_processed_block_count + 1;
if !is_shard_complete(
shard_highest_processed_block_index,
response.block_range.end_block,
missed_block_ranges.as_slice(),
) {
return result;
}
}

result
}

fn is_shard_complete(
highest_processed_block_index: u64,
shard_end_block_index: u64,
missed_block_ranges: &[BlockRange],
) -> bool {
if highest_processed_block_index == shard_end_block_index {
return true;
}

let remaining_block_indices =
BlockRange::new(highest_processed_block_index + 1, shard_end_block_index);
are_all_remaining_blocks_missing(remaining_block_indices, missed_block_ranges)
}

/// Given a set of remaining blocks, return true if all of them are missing.
fn are_all_remaining_blocks_missing(
remaining_block_indices: BlockRange,
missed_block_ranges: &[BlockRange],
) -> bool {
(remaining_block_indices.start_block..remaining_block_indices.end_block)
.into_iter()
.all(|block_index| {
missed_block_ranges
.iter()
.any(|missed_block_range| missed_block_range.contains(block_index))
})
}

impl From<&[DecryptedMultiViewStoreQueryResponse]> for LastKnownData {
Expand Down Expand Up @@ -472,7 +507,7 @@ mod shared_data_tests {

#[cfg(test)]
mod get_block_data_tests {
use crate::{BlockData, DecryptedMultiViewStoreQueryResponse};
use crate::{types::get_block_data, DecryptedMultiViewStoreQueryResponse};
use alloc::{vec, vec::Vec};
use mc_fog_types::{common::BlockRange, view::QueryResponse};

Expand Down Expand Up @@ -507,7 +542,7 @@ mod get_block_data_tests {
decrypted_query_responses.push(decrypted_query_response);
}

let result = BlockData::from(decrypted_query_responses.as_mut());
let result = get_block_data(decrypted_query_responses.as_mut(), vec![]);

let last_response = decrypted_query_responses.last().unwrap();
assert_eq!(
Expand Down Expand Up @@ -561,7 +596,7 @@ mod get_block_data_tests {
block_range,
});

let result = BlockData::from(decrypted_query_responses.as_mut());
let result = get_block_data(decrypted_query_responses.as_mut(), vec![]);

assert_eq!(
result.highest_processed_block_count,
Expand Down Expand Up @@ -614,7 +649,7 @@ mod get_block_data_tests {
block_range,
});

let result = BlockData::from(decrypted_query_responses.as_mut());
let result = get_block_data(decrypted_query_responses.as_mut(), vec![]);

assert_eq!(
result.highest_processed_block_count,
Expand Down Expand Up @@ -668,7 +703,7 @@ mod get_block_data_tests {
block_range,
});

let result = BlockData::from(decrypted_query_responses.as_mut());
let result = get_block_data(decrypted_query_responses.as_mut(), vec![]);

assert_eq!(result.highest_processed_block_count, incomplete_block_count);
assert_eq!(
Expand Down Expand Up @@ -717,7 +752,7 @@ mod get_block_data_tests {
block_range,
});

let result = BlockData::from(decrypted_query_responses.as_mut());
let result = get_block_data(decrypted_query_responses.as_mut(), vec![]);

assert_eq!(result.highest_processed_block_count, incomplete_block_count);
assert_eq!(
Expand Down Expand Up @@ -769,7 +804,7 @@ mod get_block_data_tests {
block_range,
});

let result = BlockData::from(decrypted_query_responses.as_mut());
let result = get_block_data(decrypted_query_responses.as_mut(), vec![]);

assert_eq!(
result.highest_processed_block_count,
Expand Down Expand Up @@ -821,7 +856,7 @@ mod get_block_data_tests {
block_range,
});

let result = BlockData::from(decrypted_query_responses.as_mut());
let result = get_block_data(decrypted_query_responses.as_mut(), vec![]);

assert_eq!(result.highest_processed_block_count, incomplete_block_count);
assert_eq!(
Expand Down Expand Up @@ -873,7 +908,7 @@ mod get_block_data_tests {
block_range,
});

let result = BlockData::from(decrypted_query_responses.as_mut());
let result = get_block_data(decrypted_query_responses.as_mut(), vec![]);

assert_eq!(
result.highest_processed_block_count,
Expand All @@ -884,4 +919,67 @@ mod get_block_data_tests {
last_fully_processed_timestamp
);
}

#[test]
fn second_response_incomplete_missing_blocks_cover_it() {
const STORE_COUNT: usize = 4;
let mut decrypted_query_responses = Vec::with_capacity(STORE_COUNT);

// Make the first response fully processed.
let query_response = create_query_response(3, 3);
let block_range = BlockRange::new(0, 3);
let decrypted_query_response = DecryptedMultiViewStoreQueryResponse {
query_response,
block_range,
};
decrypted_query_responses.push(decrypted_query_response);

// Make the second response "incomplete"- i.e. it hasn't processed all of its
// blocks.
// Make the missed_block_range include block index 5. Since this ingest is not
// responsible for it, it should no factor into the highest processed
// block count calculation.
let incomplete_block_count = 4;
let incomplete_timestamp = 4;
let missed_block_range = BlockRange::new(5, 6);
let incomplete_query_response =
create_query_response(incomplete_block_count, incomplete_timestamp);
let block_range = BlockRange::new(3, 6);
decrypted_query_responses.push(DecryptedMultiViewStoreQueryResponse {
query_response: incomplete_query_response,
block_range,
});

// Make the third response fully processed.
let query_response = create_query_response(9, 9);
let block_range = BlockRange::new(6, 9);
decrypted_query_responses.push(DecryptedMultiViewStoreQueryResponse {
query_response,
block_range,
});

// Make the fourth response incomplete.
let highest_processed_block_count = 10;
let highest_processed_block_signature_timestamp = 10;
let query_response = create_query_response(
highest_processed_block_count,
highest_processed_block_signature_timestamp,
);
let block_range = BlockRange::new(9, 12);
decrypted_query_responses.push(DecryptedMultiViewStoreQueryResponse {
query_response,
block_range,
});

let result = get_block_data(decrypted_query_responses.as_mut(), vec![missed_block_range]);

assert_eq!(
result.highest_processed_block_count,
highest_processed_block_count
);
assert_eq!(
result.highest_processed_block_signature_timestamp,
highest_processed_block_signature_timestamp
);
}
}

0 comments on commit a7cc0d8

Please sign in to comment.