Skip to content
This repository has been archived by the owner on Feb 3, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2202 from holochain/cache-validation-packages
Browse files Browse the repository at this point in the history
Cache validation packages
  • Loading branch information
zippy authored Jul 13, 2020
2 parents 2c97c3b + 095455f commit 6047b04
Show file tree
Hide file tree
Showing 16 changed files with 222 additions and 40 deletions.
16 changes: 10 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion app_spec/test/files/links.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ module.exports = scenario => {
t.ok(result_bob_delete.Err)
t.notOk(result_bob_delete.Ok)
const error = JSON.parse(result_bob_delete.Err.Internal)
t.match(error.kind.ErrorGeneric, /Target for link not found: Link { base: HashString\(".*"\), target: HashString\(".*"\), link_type: "authored_posts", tag: "" }/)
let re = /Target for link not found: Link { base: HashString\(".*"\), target: HashString\(".*"\), link_type: "authored_posts", tag: "" }/;
t.equal(true, re.test(error.kind.ErrorGeneric))
t.ok(error.file)
t.ok(error.line)
})
Expand Down
3 changes: 2 additions & 1 deletion crates/conductor_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ tokio = "=0.1.22"
test_utils = { version = "=0.0.49-alpha1", path = "../../test_utils" }
tempfile = "=3.0.7"
holochain_wasm_utils = { version = "=0.0.49-alpha1", path = "../wasm_utils" }
structopt = "=0.2.15"
structopt-derive = "=0.2.18"
structopt = "=0.2.18"
pretty_assertions = "=0.6.1"
ws = "=0.8.0"
parking_lot = "=0.7.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ pub enum Action {
/// Updates the state to hold the response that we got for
/// our previous request for a validation package.
/// Triggered from the network handler when we get the response.
HandleGetValidationPackage((ValidationKey, Option<ValidationPackage>)),
HandleGetValidationPackage((Address, ValidationKey, Option<ValidationPackage>)),

/// Clean up the validation package result so the state doesn't grow indefinitely.
ClearValidationPackageResult(ValidationKey),
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/network/handler/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub fn handle_send_message_result(message_data: DirectMessageData, context: Arc<
let address = unwrap_to!(initial_message => DirectMessage::RequestValidationPackage);

let action_wrapper = ActionWrapper::new(Action::HandleGetValidationPackage((
message_data.from_agent_id.into(),
address.clone(),
maybe_validation_package,
)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ pub fn reduce_handle_get_validation_package(
action_wrapper: &ActionWrapper,
) {
let action = action_wrapper.action();
let (address, maybe_validation_package) =
let (responder, address, maybe_validation_package) =
unwrap_to!(action => crate::action::Action::HandleGetValidationPackage);

network_state
.get_validation_package_results
.insert(address.clone(), Some(Ok(maybe_validation_package.clone())));

if let Some(validation_package) = maybe_validation_package {
network_state.cache_validation(responder.clone(), validation_package);
}
}
117 changes: 116 additions & 1 deletion crates/core/src/network/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ use crate::{
network::{actions::Response, direct_message::DirectMessage, query::NetworkQueryResult},
};
use boolinator::*;
use holochain_core_types::{error::HolochainError, validation::ValidationPackage};
use holochain_core_types::{
chain_header::ChainHeader,
entry::Entry,
error::HolochainError,
validation::{ValidationPackage, ValidationPackageDefinition},
};
use holochain_json_api::{error::JsonError, json::JsonString};
use holochain_net::p2p_network::P2pNetwork;
use holochain_persistence_api::cas::content::Address;
use im::HashMap;
Expand All @@ -21,6 +27,21 @@ type GetValidationPackageResult = Option<Result<Option<ValidationPackage>, Holoc

type GetResults = Option<Result<NetworkQueryResult, HolochainError>>;

/// Cached source chain data for validation
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, DefaultJson)]
pub struct ValidationCacheData {
pub entries: Option<Vec<Entry>>,
pub headers: Vec<ChainHeader>,
pub cached_at: SystemTime,
}

impl ValidationCacheData {
fn latest_header(&self) -> ChainHeader {
// we should never add an empty headers array to the cache
self.headers.first().unwrap().clone()
}
}

#[derive(Clone, Debug)]
pub struct NetworkState {
/// every action and the result of that action
Expand All @@ -46,6 +67,7 @@ pub struct NetworkState {
pub direct_message_timeouts: HashMap<String, (SystemTime, Duration)>,

pub custom_direct_message_replys: HashMap<String, Result<String, HolochainError>>,
pub validation_cache: HashMap<Address, ValidationCacheData>,

id: String,
}
Expand All @@ -71,6 +93,7 @@ impl NetworkState {
direct_message_connections: HashMap::new(),
direct_message_timeouts: HashMap::new(),
custom_direct_message_replys: HashMap::new(),
validation_cache: HashMap::new(),

id: nanoid::simple(),
}
Expand All @@ -85,4 +108,96 @@ impl NetworkState {
HolochainError::ErrorGeneric("Network not initialized".to_string()),
)
}

pub(crate) fn cache_validation(&mut self, agent: Address, validation: &ValidationPackage) {
if validation.source_chain_headers.is_none() {
return;
}
if self.validation_cache.contains_key(&agent) {
let mut cache_entry = self.validation_cache.get_mut(&agent).unwrap();
if validation.chain_header.timestamp() > cache_entry.latest_header().timestamp() {
debug!("Vcache: updating {}:{:?}", &agent, validation.chain_header);
debug!(
"header count: {}",
validation.source_chain_headers.as_ref().unwrap().len()
);
cache_entry.headers = validation.source_chain_headers.clone().unwrap();
if validation.source_chain_entries.is_some() {
debug!(
"entry count: {}",
validation.source_chain_entries.as_ref().unwrap().len()
);
cache_entry.entries = validation.source_chain_entries.clone();
}
} else {
debug!("Vcache: already {}:{:?}", &agent, validation.chain_header);
}
} else {
debug!(
"Vcache: initial insert {}:{:?} {:#?}",
&agent, validation.chain_header, &validation
);
self.validation_cache.insert(
agent,
ValidationCacheData {
entries: validation.source_chain_entries.clone(),
headers: validation.source_chain_headers.as_ref().unwrap().clone(),
cached_at: SystemTime::now(),
},
);
}
}

// tries to build a validation package of the given type from the cache for a
pub fn get_validation_package_from_cache(
&self,
agent: Address,
definition: &ValidationPackageDefinition,
header: &ChainHeader,
) -> Option<ValidationPackage> {
let cache_entry = self.validation_cache.get(&agent)?;

// check all the cases where we know we can't calculate the validation package and
// return, so the rest of the cases we can just unwrap.
match definition {
ValidationPackageDefinition::Entry
| ValidationPackageDefinition::ChainEntries
| ValidationPackageDefinition::Custom(_) => return None,
ValidationPackageDefinition::ChainHeaders => {
// if cache_entry.headers.is_none() {
return None;
// };
}
ValidationPackageDefinition::ChainFull => {
if cache_entry.headers.len() == 0 {
return None;
};
if cache_entry.entries.is_none() {
return None;
};
}
}

// if we are looking for a header that was after the latest cached
// we can't build it
if header.timestamp() >= cache_entry.latest_header().timestamp() {
return None;
};

match definition {
ValidationPackageDefinition::ChainHeaders => Some(ValidationPackage {
chain_header: header.clone(),
source_chain_headers: Some(cache_entry.headers.clone()),
source_chain_entries: None,
custom: None,
}),
ValidationPackageDefinition::ChainFull => Some(ValidationPackage {
chain_header: header.clone(),
source_chain_headers: Some(cache_entry.headers.clone()),
source_chain_entries: cache_entry.entries.clone(),
custom: None,
}),
_ => None,
}
}
}
25 changes: 17 additions & 8 deletions crates/core/src/nucleus/actions/build_validation_package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,10 @@ pub fn build_validation_package<'a>(

let entry = entry.clone();
let context = context;
let maybe_entry_header = find_chain_header(
&entry.clone(),
&context
.state()
.expect("No state in build_validation_package"),
);
let state = &context
.state()
.expect("No state in build_validation_package");
let maybe_entry_header = find_chain_header(&entry.clone(), &state);
let entry_header = match maybe_entry_header {
None => {
// TODO: make sure that we don't run into race conditions with respect to the chain
Expand Down Expand Up @@ -134,7 +132,7 @@ pub fn build_validation_package<'a>(
}
ChainFull => {
let mut package = ValidationPackage::only_header(entry_header);
let headers = all_chain_headers_before_header(&context, &package.chain_header);
let headers = all_chain_headers(&context);
package.source_chain_entries =
Some(public_chain_entries_from_headers(&context, &headers));
package.source_chain_headers = Some(headers);
Expand Down Expand Up @@ -171,6 +169,17 @@ fn public_chain_entries_from_headers(
.collect::<Vec<_>>()
}

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
fn all_chain_headers(context: &Arc<Context>) -> Vec<ChainHeader> {
let state = &context
.state()
.expect("No state in build_validation_package")
.agent();
let top = state.top_chain_header().expect("there has to be a top");
let chain = state.chain_store();
chain.iter(&Some(top)).collect()
}

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
fn all_chain_headers_before_header(
context: &Arc<Context>,
Expand Down Expand Up @@ -293,7 +302,7 @@ mod tests {
build_validation_package(&test_entry_package_chain_full(), context.clone(), &vec![]);
assert!(maybe_validation_package.is_ok());

let headers = all_chain_headers_before_header(&context, &chain_header);
let headers = all_chain_headers(&context);

let expected = ValidationPackage {
chain_header,
Expand Down
7 changes: 5 additions & 2 deletions crates/core/src/nucleus/validation/build_from_dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub mod tests {
.source_chain_headers
.expect("chain headers not in locally generated packagae")
.len(),
2
3
);

assert_eq!(
Expand All @@ -243,6 +243,9 @@ pub mod tests {
2
);

assert_eq!(local_validation_package, dht_validation_package,)
// these are no longer the same because the validation package from the author
// will include the entire chain, not just the items below the given header so that
// they can be cached. TODO: cache the dht validation packages too.
//assert_eq!(local_validation_package, dht_validation_package,)
}
}
9 changes: 8 additions & 1 deletion crates/core/src/nucleus/validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,18 @@ impl From<ValidationError> for HolochainError {
pub async fn validate_entry(
entry: Entry,
link: Option<Address>,
validation_data: ValidationData,
mut validation_data: ValidationData,
context: &Arc<Context>,
) -> ValidationResult {
log_debug!(context, "workflow/validate_entry: {:?}", entry);
//check_entry_type(entry.entry_type(), context)?;

// cleanup validation data which may include too much
if let Some(ref mut headers) = validation_data.package.source_chain_headers {
let t = validation_data.package.chain_header.timestamp();
headers.retain(|header| header.timestamp() < t);
}

header_address::validate_header_address(&entry, &validation_data.package.chain_header)?;
provenances::validate_provenances(&validation_data)?;

Expand Down
Loading

0 comments on commit 6047b04

Please sign in to comment.