diff --git a/container_api/src/holochain.rs b/container_api/src/holochain.rs index dc9ec7487c..f91df5ef12 100644 --- a/container_api/src/holochain.rs +++ b/container_api/src/holochain.rs @@ -94,7 +94,7 @@ impl Holochain { ); match result { Ok(_) => { - context.log(&format!("{} instantiated", name))?; + context.log(format!("{} instantiated", name)); let hc = Holochain { instance, context, diff --git a/core/src/action.rs b/core/src/action.rs index 611c09a50b..dcccd0dbc8 100644 --- a/core/src/action.rs +++ b/core/src/action.rs @@ -1,7 +1,7 @@ use crate::{ agent::state::AgentState, context::Context, - network::state::NetworkState, + network::{direct_message::DirectMessage, state::NetworkState}, nucleus::{ state::{NucleusState, ValidationResult}, ExecuteZomeFnResponse, ZomeFnCall, @@ -9,6 +9,7 @@ use crate::{ }; use holochain_core_types::{ cas::content::Address, + chain_header::ChainHeader, dna::Dna, entry::{Entry, EntryWithMeta}, error::HolochainError, @@ -77,9 +78,36 @@ impl Hash for ActionWrapper { /// All Actions for the Holochain Instance Store, according to Redux pattern. #[derive(Clone, PartialEq, Debug)] pub enum Action { - /// entry to Commit - /// MUST already have passed all callback checks + // ---------------- + // Agent actions: + // ---------------- + /// Writes an entry to the source chain. + /// Does not validate, assumes entry is valid. Commit((Entry, Option
)), + + // ------------- + // DHT actions: + // ------------- + /// Adds an entry to the local DHT shard. + /// Does not validate, assumes entry is valid. + Hold(Entry), + + /// Adds a link to the local DHT shard's meta/EAV storage + /// Does not validate, assumes link is valid. + AddLink(Link), + + // ---------------- + // Network actions: + // ---------------- + /// Create a network proxy instance from the given [NetworkSettings](struct.NetworkSettings.html) + InitNetwork(NetworkSettings), + + /// Makes the network PUT the given entry to the DHT. + /// Distinguishes between different entry types and does + /// the right thing respectively. + /// (only publish for AppEntryType, publish and publish_meta for links etc) + Publish(Address), + /// GetEntry by address GetEntry(Address), /// @@ -88,16 +116,41 @@ pub enum Action { RemoveEntry((Address, Address)), /// GetEntryTimeout(Address), - /// link to add - AddLink(Link), + + /// Lets the network module respond to a GET request. + /// Triggered from the corresponding workflow after retrieving the + /// requested entry from our local DHT shard. + RespondGet((GetDhtData, Option)), + /// get links from entry address and attribute-name //GetLinks(GetLinksArgs), - /// execute a function in a zome WASM - ExecuteZomeFunction(ZomeFnCall), - /// return the result of a zome WASM function call - ReturnZomeFunctionResult(ExecuteZomeFnResponse), + /// We got a response for our GET request which needs to be + /// added to the state. + /// Triggered from the network handler. + HandleGetResult(DhtData), + + /// Makes the network module send a direct (node-to-node) message + /// to the address given in [DirectMessageData](struct.DirectMessageData.html) + SendDirectMessage(DirectMessageData), + + /// Makes the network module forget about the direct message + /// connection with the given ID. + /// Triggered when we got an answer to our initial DM. + ResolveDirectConnection(String), + + /// Makes the network module DM the source of the given entry + /// and prepare for receiveing an answer + GetValidationPackage(ChainHeader), + + /// Updates the state to hold the response that we got for + /// our previous request for a validation package. + /// Triggered from the network handler when we got the response. + HandleGetValidationPackage((Address, Option)), + // ---------------- + // Nucleus actions: + // ---------------- /// initialize an application from a Dna /// not the same as genesis /// may call genesis internally @@ -106,26 +159,28 @@ pub enum Action { /// the result is Some arbitrary string ReturnInitializationResult(Option), + /// execute a function in a zome WASM + ExecuteZomeFunction(ZomeFnCall), + + /// return the result of a zome WASM function call + ReturnZomeFunctionResult(ExecuteZomeFnResponse), + /// Execute a zome function call called by another zome function Call(ZomeFnCall), - /// A validation result that should be stored + /// A validation result is returned from a local callback execution /// Key is an unique id of the calling context /// and the hash of the entry that was validated ReturnValidationResult(((snowflake::ProcessUniqueId, Address), ValidationResult)), + /// A validation package was created locally and is reported back + /// to be added to the state ReturnValidationPackage( ( snowflake::ProcessUniqueId, Result, ), ), - - InitNetwork((JsonString, String, String)), - Publish(Address), - Hold(Entry), - RespondGet((GetDhtData, Option)), - HandleGetResult(DhtData), } /// function signature for action handler functions @@ -136,6 +191,41 @@ pub type NetworkReduceFn = ReduceFn; pub type NucleusReduceFn = ReduceFn; pub type ReduceFn = fn(Arc, &mut S, &ActionWrapper); +/// Everything the network module needs to know in order to send a +/// direct message. +#[derive(Clone, PartialEq, Debug)] +pub struct DirectMessageData { + /// The address of the node to send a message to + pub address: Address, + + /// The message itself + pub message: DirectMessage, + + /// A unique message ID that is used to identify the response and attribute + /// it to the right context + pub msg_id: String, + + /// Should be true if we are responding to a previous message with this message. + /// msg_id should then be the same as the in the message that we received. + pub is_response: bool, +} + +/// Everything the network needs to initialize +#[derive(Clone, PartialEq, Debug)] +pub struct NetworkSettings { + /// JSON config that gets passed to [P2pNetwork](struct.P2pNetwork.html) + /// determines how to connect to the network module. + pub config: JsonString, + + /// DNA hash is needed so the network module knows which network to + /// connect us to. + pub dna_hash: String, + + /// The network module needs to know who we are. + /// This is this agent's address. + pub agent_id: String, +} + #[cfg(test)] pub mod tests { diff --git a/core/src/context.rs b/core/src/context.rs index 5751a72187..b67007beac 100644 --- a/core/src/context.rs +++ b/core/src/context.rs @@ -85,11 +85,15 @@ impl Context { network_config, }) } + // helper function to make it easier to call the logger - pub fn log(&self, msg: &str) -> Result<(), HolochainError> { - let mut logger = self.logger.lock().or(Err(HolochainError::LoggingError))?; - logger.log(msg.to_string()); - Ok(()) + pub fn log>(&self, msg: T) { + let mut logger = self + .logger + .lock() + .or(Err(HolochainError::LoggingError)) + .expect("Logger should work");; + logger.log(msg.into()); } pub fn set_state(&mut self, state: Arc>) { diff --git a/core/src/network/actions/get_entry.rs b/core/src/network/actions/get_entry.rs index c105fefe8b..a469a82dc9 100644 --- a/core/src/network/actions/get_entry.rs +++ b/core/src/network/actions/get_entry.rs @@ -8,11 +8,7 @@ use futures::{ future::Future, task::{LocalWaker, Poll}, }; -use holochain_core_types::{ - cas::content::Address, - entry::EntryWithMeta, - error::{HcResult, HolochainError}, -}; +use holochain_core_types::{cas::content::Address, entry::EntryWithMeta, error::HcResult}; use std::{ pin::{Pin, Unpin}, sync::Arc, @@ -56,10 +52,8 @@ impl Future for GetEntryFuture { fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { let state = self.context.state().unwrap().network(); - if state.network.is_none() || state.dna_hash.is_none() || state.agent_id.is_none() { - return Poll::Ready(Err(HolochainError::IoError( - "Network not initialized".to_string(), - ))); + if let Err(error) = state.initialized() { + return Poll::Ready(Err(error)); } // // TODO: connect the waker to state updates for performance reasons diff --git a/core/src/network/actions/get_validation_package.rs b/core/src/network/actions/get_validation_package.rs new file mode 100644 index 0000000000..4408cdf02c --- /dev/null +++ b/core/src/network/actions/get_validation_package.rs @@ -0,0 +1,67 @@ +extern crate futures; +use crate::{ + action::{Action, ActionWrapper}, + context::Context, + instance::dispatch_action, +}; +use futures::{ + future::Future, + task::{LocalWaker, Poll}, +}; +use holochain_core_types::{ + cas::content::Address, chain_header::ChainHeader, error::HcResult, + validation::ValidationPackage, +}; +use std::{ + pin::{Pin, Unpin}, + sync::Arc, +}; + +/// GetValidationPackage Action Creator +/// This triggers the network module to retrieve the validation package for the +/// entry given by the header. +/// +/// Returns a future that resolves to Option (or HolochainError). +/// If that is None this means that we couldn't get a validation package from the source. +pub async fn get_validation_package( + header: ChainHeader, + context: &Arc, +) -> HcResult> { + let entry_address = header.entry_address().clone(); + let action_wrapper = ActionWrapper::new(Action::GetValidationPackage(header)); + dispatch_action(&context.action_channel, action_wrapper.clone()); + await!(GetValidationPackageFuture { + context: context.clone(), + address: entry_address, + }) +} + +/// GetValidationPackageFuture resolves to an Option +/// which would be None if the source responded with None, indicating that it +/// is not the source. +pub struct GetValidationPackageFuture { + context: Arc, + address: Address, +} + +impl Unpin for GetValidationPackageFuture {} + +impl Future for GetValidationPackageFuture { + type Output = HcResult>; + + fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { + let state = self.context.state().unwrap().network(); + if let Err(error) = state.initialized() { + return Poll::Ready(Err(error)); + } + // + // TODO: connect the waker to state updates for performance reasons + // See: https://github.com/holochain/holochain-rust/issues/314 + // + lw.wake(); + match state.get_validation_package_results.get(&self.address) { + Some(Some(result)) => Poll::Ready(result.clone()), + _ => Poll::Pending, + } + } +} diff --git a/core/src/network/actions/initialize_network.rs b/core/src/network/actions/initialize_network.rs index a609d5415b..b8e26840d9 100644 --- a/core/src/network/actions/initialize_network.rs +++ b/core/src/network/actions/initialize_network.rs @@ -1,7 +1,7 @@ extern crate futures; extern crate serde_json; use crate::{ - action::{Action, ActionWrapper}, + action::{Action, ActionWrapper, NetworkSettings}, context::Context, instance::dispatch_action, }; @@ -31,14 +31,15 @@ async fn get_dna_and_agent(context: &Arc) -> Result<(String, String), H let dna_hash = base64::encode(&dna.multihash()?); Ok((dna_hash, agent_id)) } -/// InitNetwork Action Creator +/// Creates a network proxy object and stores DNA and agent hash in the network state. pub async fn initialize_network(context: &Arc) -> Result<(), HolochainError> { let (dna_hash, agent_id) = await!(get_dna_and_agent(context))?; - let action_wrapper = ActionWrapper::new(Action::InitNetwork(( - context.network_config.clone(), + let network_settings = NetworkSettings { + config: context.network_config.clone(), dna_hash, agent_id, - ))); + }; + let action_wrapper = ActionWrapper::new(Action::InitNetwork(network_settings)); dispatch_action(&context.action_channel, action_wrapper.clone()); await!(InitNetworkFuture { diff --git a/core/src/network/actions/mod.rs b/core/src/network/actions/mod.rs index a4b36d9e11..34d1ac147a 100644 --- a/core/src/network/actions/mod.rs +++ b/core/src/network/actions/mod.rs @@ -1,4 +1,5 @@ pub mod get_entry; +pub mod get_validation_package; pub mod initialize_network; pub mod publish; diff --git a/core/src/network/actions/publish.rs b/core/src/network/actions/publish.rs index 572ed252eb..7ae73c6c08 100644 --- a/core/src/network/actions/publish.rs +++ b/core/src/network/actions/publish.rs @@ -9,10 +9,7 @@ use futures::{ future::Future, task::{LocalWaker, Poll}, }; -use holochain_core_types::{ - cas::content::Address, - error::{HcResult, HolochainError}, -}; +use holochain_core_types::{cas::content::Address, error::HcResult}; use std::{ pin::{Pin, Unpin}, sync::Arc, @@ -46,10 +43,8 @@ impl Future for PublishFuture { fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { let state = self.context.state().unwrap().network(); - if state.network.is_none() || state.dna_hash.is_none() || state.agent_id.is_none() { - return Poll::Ready(Err(HolochainError::IoError( - "Network not initialized".to_string(), - ))); + if let Err(error) = state.initialized() { + return Poll::Ready(Err(error)); } // // TODO: connect the waker to state updates for performance reasons diff --git a/core/src/network/direct_message.rs b/core/src/network/direct_message.rs new file mode 100644 index 0000000000..265c994bfd --- /dev/null +++ b/core/src/network/direct_message.rs @@ -0,0 +1,21 @@ +use holochain_core_types::{cas::content::Address, validation::ValidationPackage}; + +/// These are the different kinds of (low-level, i.e. non-app) +/// node-to-node messages that can be send between Holochain nodes. +#[derive(Serialize, Deserialize, Clone, PartialEq, Debug)] +pub enum DirectMessage { + /// A custom direct message is something that gets triggered + /// from zome code, i.e. from the app. + /// Receiving such a messages triggers a WASM callback + Custom(String), + + /// This message is used to ask another node (which needs to + /// be the author) for the validation package of a given entry. + RequestValidationPackage(Address), + + /// With this message an author is responding to a + /// RequestValidationPackage message. + /// Option<> since there has to be a way to respond saying + /// "I can't" + ValidationPackage(Option), +} diff --git a/core/src/network/handler.rs b/core/src/network/handler.rs deleted file mode 100644 index c958b0f924..0000000000 --- a/core/src/network/handler.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::{ - action::{Action, ActionWrapper}, - context::Context, - dht::actions::{add_link::add_link, hold::hold_entry}, - instance::dispatch_action, - network::entry_with_header::EntryWithHeader, - nucleus, -}; -use futures::executor::block_on; -use holochain_core_types::{ - cas::content::Address, - crud_status::{CrudStatus, LINK_NAME, STATUS_NAME}, - entry::Entry, -}; -use holochain_net_connection::{net_connection::NetHandler, protocol_wrapper::ProtocolWrapper}; -use std::{convert::TryFrom, sync::Arc}; - -pub fn create_handler(c: &Arc) -> NetHandler { - let context = c.clone(); - Box::new(move |message| { - let message = message.unwrap(); - let protocol_wrapper = ProtocolWrapper::try_from(message); - match protocol_wrapper { - Ok(ProtocolWrapper::StoreDht(dht_data)) => { - let entry_with_header: EntryWithHeader = - serde_json::from_str(&serde_json::to_string(&dht_data.content).unwrap()) - .unwrap(); - let _ = block_on(hold_entry(&entry_with_header.entry_body, &context.clone())); - } - Ok(ProtocolWrapper::StoreDhtMeta(dht_meta_data)) => { - match dht_meta_data.attribute.as_ref() { - "link" => { - let entry_with_header: EntryWithHeader = serde_json::from_str( - &serde_json::to_string(&dht_meta_data.content) - .expect("dht_meta_data should be EntryWithHader"), - ) - .expect("dht_meta_data should be EntryWithHader"); - let link_add = match entry_with_header.entry_body { - Entry::LinkAdd(link_add) => link_add, - _ => unreachable!(), - }; - let link = link_add.link().clone(); - let _ = block_on(add_link(&link, &context.clone())); - } - STATUS_NAME => { - let _crud_status: CrudStatus = serde_json::from_str( - &serde_json::to_string(&dht_meta_data.content) - .expect("dht_meta_data should be crud_status"), - ) - .expect("dht_meta_data should be crud_status"); - // FIXME: block_on hold crud_status metadata in DHT? - } - LINK_NAME => { - let _crud_link: Address = serde_json::from_str( - &serde_json::to_string(&dht_meta_data.content) - .expect("dht_meta_data should be crud_link"), - ) - .expect("dht_meta_data should be crud_link"); - // FIXME: block_on hold crud_link metadata in DHT? - } - _ => {} - } - } - Ok(ProtocolWrapper::GetDht(get_dht_data)) => { - let _ = nucleus::actions::get_entry::get_entry_with_meta( - &context, - Address::from(get_dht_data.address.clone()), - ) - .map(|maybe_entry_with_meta| { - let action_wrapper = ActionWrapper::new(Action::RespondGet(( - get_dht_data, - maybe_entry_with_meta, - ))); - dispatch_action(&context.action_channel, action_wrapper.clone()); - }); - } - Ok(ProtocolWrapper::GetDhtResult(dht_data)) => { - let action_wrapper = ActionWrapper::new(Action::HandleGetResult(dht_data)); - dispatch_action(&context.action_channel, action_wrapper.clone()); - } - _ => {} - } - Ok(()) - }) -} diff --git a/core/src/network/handler/get.rs b/core/src/network/handler/get.rs new file mode 100644 index 0000000000..f5c611af2d --- /dev/null +++ b/core/src/network/handler/get.rs @@ -0,0 +1,33 @@ +use crate::{ + action::{Action, ActionWrapper}, + context::Context, + instance::dispatch_action, + nucleus, +}; +use holochain_core_types::cas::content::Address; +use std::sync::Arc; + +use holochain_net_connection::protocol_wrapper::{DhtData, GetDhtData}; + +/// The network has requested a DHT entry from us. +/// Lets try to get it and trigger a response. +pub fn handle_get_dht(get_dht_data: GetDhtData, context: Arc) { + let maybe_entry_with_meta = nucleus::actions::get_entry::get_entry_with_meta( + &context, + Address::from(get_dht_data.address.clone()), + ) + .unwrap_or_else(|error| { + context.log(format!("Error trying to find entry {:?}", error)); + None + }); + + let action_wrapper = + ActionWrapper::new(Action::RespondGet((get_dht_data, maybe_entry_with_meta))); + dispatch_action(&context.action_channel, action_wrapper.clone()); +} + +/// The network comes back with a result to our previous GET request. +pub fn handle_get_dht_result(dht_data: DhtData, context: Arc) { + let action_wrapper = ActionWrapper::new(Action::HandleGetResult(dht_data)); + dispatch_action(&context.action_channel, action_wrapper.clone()); +} diff --git a/core/src/network/handler/mod.rs b/core/src/network/handler/mod.rs new file mode 100644 index 0000000000..53aa726555 --- /dev/null +++ b/core/src/network/handler/mod.rs @@ -0,0 +1,41 @@ +pub mod get; +pub mod send; +pub mod store; + +use crate::{ + context::Context, + network::handler::{get::*, send::*, store::*}, +}; +use holochain_net_connection::{net_connection::NetHandler, protocol_wrapper::ProtocolWrapper}; +use std::{convert::TryFrom, sync::Arc}; + +/// Creates the network handler. +/// The returned closure is called by the network thread for every network event that core +/// has to handle. +pub fn create_handler(c: &Arc) -> NetHandler { + let context = c.clone(); + Box::new(move |message| { + let message = message.unwrap(); + let protocol_wrapper = ProtocolWrapper::try_from(message); + match protocol_wrapper { + Ok(ProtocolWrapper::StoreDht(dht_data)) => handle_store_dht(dht_data, context.clone()), + Ok(ProtocolWrapper::StoreDhtMeta(dht_meta_data)) => { + handle_store_dht_meta(dht_meta_data, context.clone()) + } + Ok(ProtocolWrapper::GetDht(get_dht_data)) => { + handle_get_dht(get_dht_data, context.clone()) + } + Ok(ProtocolWrapper::GetDhtResult(dht_data)) => { + handle_get_dht_result(dht_data, context.clone()) + } + Ok(ProtocolWrapper::HandleSend(message_data)) => { + handle_send(message_data, context.clone()) + } + Ok(ProtocolWrapper::SendResult(message_data)) => { + handle_send_result(message_data, context.clone()) + } + _ => {} + } + Ok(()) + }) +} diff --git a/core/src/network/handler/send.rs b/core/src/network/handler/send.rs new file mode 100644 index 0000000000..24b8529304 --- /dev/null +++ b/core/src/network/handler/send.rs @@ -0,0 +1,82 @@ +use crate::{ + action::{Action, ActionWrapper}, + context::Context, + instance::dispatch_action, + network::direct_message::DirectMessage, + workflows::respond_validation_package_request::respond_validation_package_request, +}; +use futures::executor::block_on; +use holochain_core_types::cas::content::Address; +use std::{sync::Arc, thread}; + +use holochain_net_connection::protocol_wrapper::MessageData; + +/// We got a ProtocolWrapper::SendMessage, this means somebody initiates message roundtrip +/// -> we are being called +pub fn handle_send(message_data: MessageData, context: Arc) { + let message: DirectMessage = + serde_json::from_str(&serde_json::to_string(&message_data.data).unwrap()).unwrap(); + + match message { + DirectMessage::Custom(_) => context.log("DirectMessage::Custom not implemented"), + DirectMessage::RequestValidationPackage(address) => { + // Async functions only get executed when they are polled. + // I don't want to wait for this workflow to finish here as it would block the + // network thread, so I use block_on to poll the async function but do that in + // another thread: + thread::spawn(move || { + block_on(respond_validation_package_request( + Address::from(message_data.from_agent_id), + message_data.msg_id, + address, + context.clone(), + )); + }); + } + DirectMessage::ValidationPackage(_) => context.log( + "Got DirectMessage::ValidationPackage as initial message. This should not happen.", + ), + }; +} + +/// We got a ProtocolWrapper::SendResult, this means somebody has responded to our message +/// -> we called and this is the answer +pub fn handle_send_result(message_data: MessageData, context: Arc) { + let response: DirectMessage = + serde_json::from_str(&serde_json::to_string(&message_data.data).unwrap()).unwrap(); + + let initial_message = context + .state() + .unwrap() + .network() + .as_ref() + .direct_message_connections + .get(&message_data.msg_id) + .cloned(); + + match response { + DirectMessage::Custom(_) => context.log("DirectMessage::Custom not implemented"), + DirectMessage::RequestValidationPackage(_) => context.log( + "Got DirectMessage::RequestValidationPackage as a response. This should not happen.", + ), + DirectMessage::ValidationPackage(maybe_validation_package) => { + if initial_message.is_none() { + context.log("Received a validation package but could not find message ID in history. Not able to process."); + return; + } + + let initial_message = initial_message.unwrap(); + let address = unwrap_to!(initial_message => DirectMessage::RequestValidationPackage); + + let action_wrapper = ActionWrapper::new(Action::HandleGetValidationPackage(( + address.clone(), + maybe_validation_package.clone(), + ))); + dispatch_action(&context.action_channel, action_wrapper.clone()); + + let action_wrapper = + ActionWrapper::new(Action::ResolveDirectConnection(message_data.msg_id)); + dispatch_action(&context.action_channel, action_wrapper.clone()); + } + }; +} diff --git a/core/src/network/handler/store.rs b/core/src/network/handler/store.rs new file mode 100644 index 0000000000..955f7991fe --- /dev/null +++ b/core/src/network/handler/store.rs @@ -0,0 +1,57 @@ +use crate::{ + context::Context, + dht::actions::{add_link::add_link, hold::hold_entry}, + network::entry_with_header::EntryWithHeader, +}; +use futures::executor::block_on; +use holochain_core_types::{ + cas::content::Address, + crud_status::{CrudStatus, LINK_NAME, STATUS_NAME}, + entry::Entry, +}; +use holochain_net_connection::protocol_wrapper::{DhtData, DhtMetaData}; +use std::sync::Arc; + +/// The network requests us to store (i.e. hold) the given entry. +pub fn handle_store_dht(dht_data: DhtData, context: Arc) { + let entry_with_header: EntryWithHeader = + serde_json::from_str(&serde_json::to_string(&dht_data.content).unwrap()).unwrap(); + let _ = block_on(hold_entry(&entry_with_header.entry_body, &context.clone())); +} + +/// The network requests us to store meta information (links/CRUD/etc) for an +/// entry that we hold. +pub fn handle_store_dht_meta(dht_meta_data: DhtMetaData, context: Arc) { + match dht_meta_data.attribute.as_ref() { + "link" => { + let entry_with_header: EntryWithHeader = serde_json::from_str( + &serde_json::to_string(&dht_meta_data.content) + .expect("dht_meta_data should be EntryWithHader"), + ) + .expect("dht_meta_data should be EntryWithHader"); + let link_add = match entry_with_header.entry_body { + Entry::LinkAdd(link_add) => link_add, + _ => unreachable!(), + }; + let link = link_add.link().clone(); + let _ = block_on(add_link(&link, &context.clone())); + } + STATUS_NAME => { + let _crud_status: CrudStatus = serde_json::from_str( + &serde_json::to_string(&dht_meta_data.content) + .expect("dht_meta_data should be crud_status"), + ) + .expect("dht_meta_data should be crud_status"); + // FIXME: block_on hold crud_status metadata in DHT? + } + LINK_NAME => { + let _crud_link: Address = serde_json::from_str( + &serde_json::to_string(&dht_meta_data.content) + .expect("dht_meta_data should be crud_link"), + ) + .expect("dht_meta_data should be crud_link"); + // FIXME: block_on hold crud_link metadata in DHT? + } + _ => {} + } +} diff --git a/core/src/network/mod.rs b/core/src/network/mod.rs index 7d70a7aa93..e9c9afd8b0 100644 --- a/core/src/network/mod.rs +++ b/core/src/network/mod.rs @@ -1,4 +1,5 @@ pub mod actions; +pub mod direct_message; pub mod entry_with_header; pub mod handler; pub mod reducers; @@ -7,7 +8,9 @@ pub mod state; #[cfg(test)] pub mod tests { use crate::{ - instance::tests::test_instance_and_context_by_name, network::actions::get_entry::get_entry, + instance::tests::test_instance_and_context_by_name, + network::actions::{get_entry::get_entry, get_validation_package::get_validation_package}, + workflows::author_entry::author_entry, }; use futures::executor::block_on; use holochain_core_types::{ @@ -70,4 +73,97 @@ pub mod tests { let maybe_entry_with_meta = result.unwrap(); assert!(maybe_entry_with_meta.is_none()); } + + #[test] + fn get_validation_package_roundtrip() { + let wat = r#" +(module + + (memory 1) + (export "memory" (memory 0)) + + (func + (export "__hdk_validate_app_entry") + (param $allocation i32) + (result i32) + + (i32.const 0) + ) + + (func + (export "__hdk_validate_link") + (param $allocation i32) + (result i32) + + (i32.const 0) + ) + + + (func + (export "__hdk_get_validation_package_for_entry_type") + (param $allocation i32) + (result i32) + + ;; This writes "Entry" into memory + (i32.store (i32.const 0) (i32.const 34)) + (i32.store (i32.const 1) (i32.const 69)) + (i32.store (i32.const 2) (i32.const 110)) + (i32.store (i32.const 3) (i32.const 116)) + (i32.store (i32.const 4) (i32.const 114)) + (i32.store (i32.const 5) (i32.const 121)) + (i32.store (i32.const 6) (i32.const 34)) + + (i32.const 7) + ) + + (func + (export "__hdk_get_validation_package_for_link") + (param $allocation i32) + (result i32) + + ;; This writes "Entry" into memory + (i32.store (i32.const 0) (i32.const 34)) + (i32.store (i32.const 1) (i32.const 69)) + (i32.store (i32.const 2) (i32.const 110)) + (i32.store (i32.const 3) (i32.const 116)) + (i32.store (i32.const 4) (i32.const 114)) + (i32.store (i32.const 5) (i32.const 121)) + (i32.store (i32.const 6) (i32.const 34)) + + (i32.const 7) + ) + + (func + (export "__list_capabilities") + (param $allocation i32) + (result i32) + + (i32.const 0) + ) +) + "#; + + let mut dna = create_test_dna_with_wat("test_zome", "test_cap", Some(wat)); + dna.uuid = String::from("get_validation_package_roundtrip"); + let (_, context1) = test_instance_and_context_by_name(dna.clone(), "alice1").unwrap(); + + let entry = test_entry(); + block_on(author_entry(&entry, None, &context1)).expect("Could not author entry"); + + let agent1_state = context1.state().unwrap().agent(); + let header = agent1_state + .chain() + .iter_type(&agent1_state.top_chain_header(), &entry.entry_type()) + .find(|h| h.entry_address() == &entry.address()) + .expect("There must be a header in the author's source chain after commit"); + + let (_, context2) = test_instance_and_context_by_name(dna.clone(), "bob1").unwrap(); + let result = block_on(get_validation_package(header.clone(), &context2)); + + assert!(result.is_ok()); + let maybe_validation_package = result.unwrap(); + assert!(maybe_validation_package.is_some()); + let validation_package = maybe_validation_package.unwrap(); + assert_eq!(validation_package.chain_header, Some(header)); + } } diff --git a/core/src/network/reducers/get_entry.rs b/core/src/network/reducers/get_entry.rs index 92e1b18066..cf56d70776 100644 --- a/core/src/network/reducers/get_entry.rs +++ b/core/src/network/reducers/get_entry.rs @@ -1,35 +1,24 @@ -use boolinator::*; -use crate::{action::ActionWrapper, context::Context, network::state::NetworkState}; -use holochain_core_types::{cas::content::Address, error::HolochainError}; -use holochain_net_connection::{ - net_connection::NetConnection, - protocol_wrapper::{GetDhtData, ProtocolWrapper}, +use crate::{ + action::ActionWrapper, + context::Context, + network::{reducers::send, state::NetworkState}, }; +use holochain_core_types::{cas::content::Address, error::HolochainError}; +use holochain_net_connection::protocol_wrapper::{GetDhtData, ProtocolWrapper}; use std::sync::Arc; fn inner(network_state: &mut NetworkState, address: &Address) -> Result<(), HolochainError> { - (network_state.network.is_some() - && network_state.dna_hash.is_some() & network_state.agent_id.is_some()) - .ok_or("Network not initialized".to_string())?; - - let data = GetDhtData { - msg_id: "?".to_string(), - dna_hash: network_state.dna_hash.clone().unwrap(), - from_agent_id: network_state.agent_id.clone().unwrap(), - address: address.to_string(), - }; - - network_state - .network - .as_mut() - .map(|network| { - network - .lock() - .unwrap() - .send(ProtocolWrapper::GetDht(data).into()) - .map_err(|error| HolochainError::IoError(error.to_string())) - }) - .expect("Network has to be Some because of check above") + network_state.initialized()?; + + send( + network_state, + ProtocolWrapper::GetDht(GetDhtData { + msg_id: "?".to_string(), + dna_hash: network_state.dna_hash.clone().unwrap(), + from_agent_id: network_state.agent_id.clone().unwrap(), + address: address.to_string(), + }), + ) } pub fn reduce_get_entry( @@ -82,7 +71,7 @@ pub fn reduce_get_entry_timeout( mod tests { use crate::{ - action::{Action, ActionWrapper}, + action::{Action, ActionWrapper, NetworkSettings}, context::mock_network_config, instance::tests::test_context, state::test_store, @@ -122,11 +111,11 @@ mod tests { let context = test_context("alice"); let store = test_store(context.clone()); - let action_wrapper = ActionWrapper::new(Action::InitNetwork(( - mock_network_config(), - String::from("abcd"), - String::from("abcd"), - ))); + let action_wrapper = ActionWrapper::new(Action::InitNetwork(NetworkSettings { + config: mock_network_config(), + dna_hash: String::from("abcd"), + agent_id: String::from("abcd"), + })); let store = store.reduce(context.clone(), action_wrapper); let entry = test_entry(); @@ -149,11 +138,11 @@ mod tests { Arc::get_mut(&mut context).unwrap().set_state(store.clone()); - let action_wrapper = ActionWrapper::new(Action::InitNetwork(( - mock_network_config(), - String::from("abcd"), - String::from("abcd"), - ))); + let action_wrapper = ActionWrapper::new(Action::InitNetwork(NetworkSettings { + config: mock_network_config(), + dna_hash: String::from("abcd"), + agent_id: String::from("abcd"), + })); { let mut new_store = store.write().unwrap(); diff --git a/core/src/network/reducers/get_validation_package.rs b/core/src/network/reducers/get_validation_package.rs new file mode 100644 index 0000000000..9c879ed150 --- /dev/null +++ b/core/src/network/reducers/get_validation_package.rs @@ -0,0 +1,40 @@ +use crate::{ + action::ActionWrapper, + context::Context, + network::{direct_message::DirectMessage, reducers::send_message, state::NetworkState}, +}; +use holochain_core_types::{chain_header::ChainHeader, error::HolochainError}; +use std::sync::Arc; + +fn inner(network_state: &mut NetworkState, header: &ChainHeader) -> Result<(), HolochainError> { + network_state.initialized()?; + + let source_address = header + .sources() + .first() + .ok_or(HolochainError::ErrorGeneric( + "No source found in ChainHeader".to_string(), + ))?; + let direct_message = DirectMessage::RequestValidationPackage(header.entry_address().clone()); + + send_message(network_state, source_address, direct_message) +} + +pub fn reduce_get_validation_package( + _context: Arc, + network_state: &mut NetworkState, + action_wrapper: &ActionWrapper, +) { + let action = action_wrapper.action(); + let header = unwrap_to!(action => crate::action::Action::GetValidationPackage); + let entry_address = header.entry_address().clone(); + + let result = match inner(network_state, header) { + Ok(()) => None, + Err(err) => Some(Err(err)), + }; + + network_state + .get_validation_package_results + .insert(entry_address, result); +} diff --git a/core/src/network/reducers/handle_get_result.rs b/core/src/network/reducers/handle_get_result.rs index 3fc6a6a158..e1a0adc82f 100644 --- a/core/src/network/reducers/handle_get_result.rs +++ b/core/src/network/reducers/handle_get_result.rs @@ -1,4 +1,3 @@ -use boolinator::*; use crate::{action::ActionWrapper, context::Context, network::state::NetworkState}; use holochain_core_types::{cas::content::Address, entry::EntryWithMeta, error::HolochainError}; use holochain_net_connection::protocol_wrapper::DhtData; @@ -8,9 +7,7 @@ fn inner( network_state: &mut NetworkState, dht_data: &DhtData, ) -> Result, HolochainError> { - (network_state.network.is_some() - && network_state.dna_hash.is_some() & network_state.agent_id.is_some()) - .ok_or("Network not initialized".to_string())?; + network_state.initialized()?; let res = serde_json::from_str(&serde_json::to_string(&dht_data.content).unwrap()); if let Err(_) = res { diff --git a/core/src/network/reducers/handle_get_validation_package.rs b/core/src/network/reducers/handle_get_validation_package.rs new file mode 100644 index 0000000000..61972c5d27 --- /dev/null +++ b/core/src/network/reducers/handle_get_validation_package.rs @@ -0,0 +1,16 @@ +use crate::{action::ActionWrapper, context::Context, network::state::NetworkState}; +use std::sync::Arc; + +pub fn reduce_handle_get_validation_package( + _context: Arc, + network_state: &mut NetworkState, + action_wrapper: &ActionWrapper, +) { + let action = action_wrapper.action(); + let (address, maybe_validation_package) = + unwrap_to!(action => crate::action::Action::HandleGetValidationPackage); + + network_state + .get_validation_package_results + .insert(address.clone(), Some(Ok(maybe_validation_package.clone()))); +} diff --git a/core/src/network/reducers/init.rs b/core/src/network/reducers/init.rs index 4b7b041c41..dcd1d5d270 100644 --- a/core/src/network/reducers/init.rs +++ b/core/src/network/reducers/init.rs @@ -16,21 +16,21 @@ pub fn reduce_init( action_wrapper: &ActionWrapper, ) { let action = action_wrapper.action(); - let (_, dna_hash, agent_id) = unwrap_to!(action => Action::InitNetwork); - let mut network = P2pNetwork::new(create_handler(&context), &context.network_config).unwrap(); + let network_settings = unwrap_to!(action => Action::InitNetwork); + let mut network = P2pNetwork::new(create_handler(&context), &network_settings.config).unwrap(); let _ = network .send( ProtocolWrapper::TrackApp(TrackAppData { - dna_hash: dna_hash.clone(), - agent_id: agent_id.clone(), + dna_hash: network_settings.dna_hash.clone(), + agent_id: network_settings.agent_id.clone(), }) .into(), ) .and_then(|_| { state.network = Some(Arc::new(Mutex::new(network))); - state.dna_hash = Some(dna_hash.clone()); - state.agent_id = Some(agent_id.clone()); + state.dna_hash = Some(network_settings.dna_hash.clone()); + state.agent_id = Some(network_settings.agent_id.clone()); Ok(()) }); } diff --git a/core/src/network/reducers/mod.rs b/core/src/network/reducers/mod.rs index 13205cf52d..80545e2fb2 100644 --- a/core/src/network/reducers/mod.rs +++ b/core/src/network/reducers/mod.rs @@ -1,23 +1,38 @@ pub mod get_entry; +pub mod get_validation_package; pub mod handle_get_result; +pub mod handle_get_validation_package; pub mod init; pub mod publish; +pub mod resolve_direct_connection; pub mod respond_get; +pub mod send_direct_message; use crate::{ action::{Action, ActionWrapper, NetworkReduceFn}, context::Context, network::{ + direct_message::DirectMessage, reducers::{ get_entry::{reduce_get_entry, reduce_get_entry_timeout}, + get_validation_package::reduce_get_validation_package, handle_get_result::reduce_handle_get_result, + handle_get_validation_package::reduce_handle_get_validation_package, init::reduce_init, publish::reduce_publish, + resolve_direct_connection::reduce_resolve_direct_connection, respond_get::reduce_respond_get, + send_direct_message::reduce_send_direct_message, }, state::NetworkState, }, }; +use holochain_core_types::{cas::content::Address, error::HolochainError}; +use holochain_net_connection::{ + net_connection::NetConnection, + protocol_wrapper::{MessageData, ProtocolWrapper}, +}; +use snowflake::ProcessUniqueId; use std::sync::Arc; /// maps incoming action to the correct handler @@ -25,10 +40,14 @@ fn resolve_reducer(action_wrapper: &ActionWrapper) -> Option { match action_wrapper.action() { Action::GetEntry(_) => Some(reduce_get_entry), Action::GetEntryTimeout(_) => Some(reduce_get_entry_timeout), + Action::GetValidationPackage(_) => Some(reduce_get_validation_package), Action::HandleGetResult(_) => Some(reduce_handle_get_result), + Action::HandleGetValidationPackage(_) => Some(reduce_handle_get_validation_package), Action::InitNetwork(_) => Some(reduce_init), Action::Publish(_) => Some(reduce_publish), + Action::ResolveDirectConnection(_) => Some(reduce_resolve_direct_connection), Action::RespondGet(_) => Some(reduce_respond_get), + Action::SendDirectMessage(_) => Some(reduce_send_direct_message), _ => None, } } @@ -48,3 +67,52 @@ pub fn reduce( None => old_state, } } + +/// Sends the given ProtocolWrapper over the network using the network proxy instance +/// that lives in the NetworkState. +pub fn send( + network_state: &mut NetworkState, + protocol_wrapper: ProtocolWrapper, +) -> Result<(), HolochainError> { + network_state + .network + .as_mut() + .map(|network| { + network + .lock() + .unwrap() + .send(protocol_wrapper.into()) + .map_err(|error| HolochainError::IoError(error.to_string())) + }) + .ok_or(HolochainError::ErrorGeneric( + "Network not intialized".to_string(), + ))? +} + +/// Sends the given DirectMessage to the node given by to_agent_id. +/// This creates a transient connection as every node-to-node communication follows a +/// request-response pattern. This function therefore logs the open connection +/// (expecting a response) in network_state.direct_message_connections. +pub fn send_message( + network_state: &mut NetworkState, + to_agent_id: &Address, + message: DirectMessage, +) -> Result<(), HolochainError> { + let id = ProcessUniqueId::new().to_string(); + + let data = MessageData { + msg_id: id.clone(), + dna_hash: network_state.dna_hash.clone().unwrap(), + to_agent_id: to_agent_id.to_string(), + from_agent_id: network_state.agent_id.clone().unwrap(), + data: serde_json::from_str(&serde_json::to_string(&message).unwrap()).unwrap(), + }; + + println!("SEND MESSAGE: {:?}", data); + + let _ = send(network_state, ProtocolWrapper::SendMessage(data))?; + + network_state.direct_message_connections.insert(id, message); + + Ok(()) +} diff --git a/core/src/network/reducers/publish.rs b/core/src/network/reducers/publish.rs index f694fc6f5b..0d634b4ffc 100644 --- a/core/src/network/reducers/publish.rs +++ b/core/src/network/reducers/publish.rs @@ -1,10 +1,10 @@ -use boolinator::*; use crate::{ action::ActionWrapper, context::Context, network::{ actions::ActionResponse, entry_with_header::{fetch_entry_with_header, EntryWithHeader}, + reducers::send, state::NetworkState, }, nucleus::actions::get_entry::get_entry_crud_meta_from_dht, @@ -15,34 +15,26 @@ use holochain_core_types::{ entry::{entry_type::EntryType, Entry}, error::HolochainError, }; -use holochain_net_connection::{ - net_connection::NetConnection, - protocol_wrapper::{DhtData, DhtMetaData, ProtocolWrapper}, -}; +use holochain_net_connection::protocol_wrapper::{DhtData, DhtMetaData, ProtocolWrapper}; use std::sync::Arc; fn publish_entry( network_state: &mut NetworkState, entry_with_header: &EntryWithHeader, ) -> Result<(), HolochainError> { - let data = DhtData { - msg_id: "?".to_string(), - dna_hash: network_state.dna_hash.clone().unwrap(), - agent_id: network_state.agent_id.clone().unwrap(), - address: entry_with_header.entry_body.address().to_string(), - content: serde_json::from_str(&serde_json::to_string(&entry_with_header).unwrap()).unwrap(), - }; - network_state - .network - .as_mut() - .map(|network| { - network - .lock() - .unwrap() - .send(ProtocolWrapper::PublishDht(data).into()) - .map_err(|error| HolochainError::IoError(error.to_string())) - }) - .expect("Network has to be Some because of check above") + //let entry_with_header = util::EntryWithHeader::from((entry.clone(), header.clone())); + + send( + network_state, + ProtocolWrapper::PublishDht(DhtData { + msg_id: "?".to_string(), + dna_hash: network_state.dna_hash.clone().unwrap(), + agent_id: network_state.agent_id.clone().unwrap(), + address: entry_with_header.entry_body.address().to_string(), + content: serde_json::from_str(&serde_json::to_string(&entry_with_header).unwrap()) + .unwrap(), + }), + ) } fn publish_crud_meta( @@ -51,44 +43,35 @@ fn publish_crud_meta( crud_status: CrudStatus, crud_link: Option
, ) -> Result<(), HolochainError> { - let network = network_state - .network - .as_mut() - .expect("Should have network state-slice"); // publish crud-status - let data = DhtMetaData { - msg_id: "?".to_string(), - dna_hash: network_state.dna_hash.clone().unwrap(), - agent_id: network_state.agent_id.clone().unwrap(), - address: entry_address.to_string(), - attribute: STATUS_NAME.to_string(), - content: serde_json::from_str(&serde_json::to_string(&crud_status).unwrap()).unwrap(), - }; - network - .lock() - .unwrap() - .send(ProtocolWrapper::PublishDhtMeta(data).into()) - .map_err(|error| HolochainError::IoError(error.to_string())) - .expect("Network has to be Some because of check above"); + send( + network_state, + ProtocolWrapper::PublishDhtMeta(DhtMetaData { + msg_id: "?".to_string(), + dna_hash: network_state.dna_hash.clone().unwrap(), + agent_id: network_state.agent_id.clone().unwrap(), + address: entry_address.to_string(), + attribute: STATUS_NAME.to_string(), + content: serde_json::from_str(&serde_json::to_string(&crud_status).unwrap()).unwrap(), + }), + )?; + // publish crud-link if there is one if crud_link.is_none() { return Ok(()); } - let data = DhtMetaData { - msg_id: "?".to_string(), - dna_hash: network_state.dna_hash.clone().unwrap(), - agent_id: network_state.agent_id.clone().unwrap(), - address: entry_address.to_string(), - attribute: LINK_NAME.to_string(), - content: serde_json::from_str(&serde_json::to_string(&crud_link.unwrap()).unwrap()) - .unwrap(), - }; - network - .lock() - .unwrap() - .send(ProtocolWrapper::PublishDhtMeta(data).into()) - .map_err(|error| HolochainError::IoError(error.to_string())) - .expect("Network has to be Some because of check above"); + send( + network_state, + ProtocolWrapper::PublishDhtMeta(DhtMetaData { + msg_id: "?".to_string(), + dna_hash: network_state.dna_hash.clone().unwrap(), + agent_id: network_state.agent_id.clone().unwrap(), + address: entry_address.to_string(), + attribute: LINK_NAME.to_string(), + content: serde_json::from_str(&serde_json::to_string(&crud_link.unwrap()).unwrap()) + .unwrap(), + }), + )?; Ok(()) } @@ -107,26 +90,18 @@ fn publish_link_meta( }; let link = link_add_entry.link().clone(); - let data = DhtMetaData { - msg_id: "?".to_string(), - dna_hash: network_state.dna_hash.clone().unwrap(), - agent_id: network_state.agent_id.clone().unwrap(), - address: link.base().to_string(), - attribute: String::from("link"), - content: serde_json::from_str(&serde_json::to_string(&entry_with_header).unwrap()).unwrap(), - }; - - network_state - .network - .as_mut() - .map(|network| { - network - .lock() - .unwrap() - .send(ProtocolWrapper::PublishDhtMeta(data).into()) - .map_err(|error| HolochainError::IoError(error.to_string())) - }) - .expect("Network has to be Some because of check above") + send( + network_state, + ProtocolWrapper::PublishDhtMeta(DhtMetaData { + msg_id: "?".to_string(), + dna_hash: network_state.dna_hash.clone().unwrap(), + agent_id: network_state.agent_id.clone().unwrap(), + address: link.base().to_string(), + attribute: String::from("link"), + content: serde_json::from_str(&serde_json::to_string(&entry_with_header).unwrap()) + .unwrap(), + }), + ) } fn reduce_publish_inner( @@ -134,9 +109,7 @@ fn reduce_publish_inner( network_state: &mut NetworkState, address: &Address, ) -> Result<(), HolochainError> { - (network_state.network.is_some() - && network_state.dna_hash.is_some() & network_state.agent_id.is_some()) - .ok_or("Network not initialized".to_string())?; + network_state.initialized()?; let entry_with_header = fetch_entry_with_header(&address, &context)?; let (crud_status, maybe_crud_link) = get_entry_crud_meta_from_dht(context, address.clone())? diff --git a/core/src/network/reducers/resolve_direct_connection.rs b/core/src/network/reducers/resolve_direct_connection.rs new file mode 100644 index 0000000000..4ca993968e --- /dev/null +++ b/core/src/network/reducers/resolve_direct_connection.rs @@ -0,0 +1,13 @@ +use crate::{action::ActionWrapper, context::Context, network::state::NetworkState}; +use std::sync::Arc; + +pub fn reduce_resolve_direct_connection( + _context: Arc, + network_state: &mut NetworkState, + action_wrapper: &ActionWrapper, +) { + let action = action_wrapper.action(); + let id = unwrap_to!(action => crate::action::Action::ResolveDirectConnection); + + network_state.direct_message_connections.remove(id); +} diff --git a/core/src/network/reducers/respond_get.rs b/core/src/network/reducers/respond_get.rs index 14ac54ac57..3a25a8a8c8 100644 --- a/core/src/network/reducers/respond_get.rs +++ b/core/src/network/reducers/respond_get.rs @@ -1,14 +1,10 @@ -use boolinator::*; use crate::{ action::ActionWrapper, context::Context, - network::{actions::ActionResponse, state::NetworkState}, + network::{actions::ActionResponse, reducers::send, state::NetworkState}, }; use holochain_core_types::{entry::EntryWithMeta, error::HolochainError}; -use holochain_net_connection::{ - net_connection::NetConnection, - protocol_wrapper::{DhtData, GetDhtData, ProtocolWrapper}, -}; +use holochain_net_connection::protocol_wrapper::{DhtData, GetDhtData, ProtocolWrapper}; use std::sync::Arc; fn reduce_respond_get_inner( @@ -16,29 +12,18 @@ fn reduce_respond_get_inner( get_dht_data: &GetDhtData, maybe_entry: &Option, ) -> Result<(), HolochainError> { - (network_state.network.is_some() - && network_state.dna_hash.is_some() & network_state.agent_id.is_some()) - .ok_or("Network not initialized".to_string())?; - - let data = DhtData { - msg_id: get_dht_data.msg_id.clone(), - dna_hash: network_state.dna_hash.clone().unwrap(), - agent_id: get_dht_data.from_agent_id.clone(), - address: get_dht_data.address.clone(), - content: serde_json::from_str(&serde_json::to_string(&maybe_entry).unwrap()).unwrap(), - }; + network_state.initialized()?; - network_state - .network - .as_mut() - .map(|network| { - network - .lock() - .unwrap() - .send(ProtocolWrapper::GetDhtResult(data).into()) - .map_err(|error| HolochainError::IoError(error.to_string())) - }) - .expect("Network has to be Some because of check above") + send( + network_state, + ProtocolWrapper::GetDhtResult(DhtData { + msg_id: get_dht_data.msg_id.clone(), + dna_hash: network_state.dna_hash.clone().unwrap(), + agent_id: get_dht_data.from_agent_id.clone(), + address: get_dht_data.address.clone(), + content: serde_json::from_str(&serde_json::to_string(&maybe_entry).unwrap()).unwrap(), + }), + ) } pub fn reduce_respond_get( diff --git a/core/src/network/reducers/send_direct_message.rs b/core/src/network/reducers/send_direct_message.rs new file mode 100644 index 0000000000..5ee4e1cb6e --- /dev/null +++ b/core/src/network/reducers/send_direct_message.rs @@ -0,0 +1,44 @@ +use crate::{ + action::{ActionWrapper, DirectMessageData}, + context::Context, + network::{reducers::send, state::NetworkState}, +}; +use holochain_core_types::error::HolochainError; +use holochain_net_connection::protocol_wrapper::{MessageData, ProtocolWrapper}; +use std::sync::Arc; + +fn inner( + network_state: &mut NetworkState, + direct_message_data: &DirectMessageData, +) -> Result<(), HolochainError> { + network_state.initialized()?; + + let data = MessageData { + msg_id: direct_message_data.msg_id.clone(), + dna_hash: network_state.dna_hash.clone().unwrap(), + to_agent_id: direct_message_data.address.to_string(), + from_agent_id: network_state.agent_id.clone().unwrap(), + data: serde_json::from_str(&serde_json::to_string(&direct_message_data.message).unwrap()) + .unwrap(), + }; + + let protocol_object = if direct_message_data.is_response { + ProtocolWrapper::HandleSendResult(data) + } else { + ProtocolWrapper::SendMessage(data) + }; + + send(network_state, protocol_object) +} + +pub fn reduce_send_direct_message( + context: Arc, + network_state: &mut NetworkState, + action_wrapper: &ActionWrapper, +) { + let action = action_wrapper.action(); + let dm_data = unwrap_to!(action => crate::action::Action::SendDirectMessage); + if let Err(error) = inner(network_state, dm_data) { + context.log(format!("Error sending direct message: {:?}", error)); + } +} diff --git a/core/src/network/state.rs b/core/src/network/state.rs index 8ed6a92210..94c611e19e 100644 --- a/core/src/network/state.rs +++ b/core/src/network/state.rs @@ -1,5 +1,12 @@ -use crate::{action::ActionWrapper, network::actions::ActionResponse}; -use holochain_core_types::{cas::content::Address, entry::EntryWithMeta, error::HolochainError}; +use boolinator::*; +use crate::{ + action::ActionWrapper, + network::{actions::ActionResponse, direct_message::DirectMessage}, +}; +use holochain_core_types::{ + cas::content::Address, entry::EntryWithMeta, error::HolochainError, + validation::ValidationPackage, +}; use holochain_net::p2p_network::P2pNetwork; use snowflake; use std::{ @@ -7,19 +14,45 @@ use std::{ sync::{Arc, Mutex}, }; -type ActionMap = HashMap; +type Actions = HashMap; + +/// This represents the state of a get_entry network process: +/// None: process started, but no response yet from the network +/// Some(Err(_)): there was a problem at some point +/// Some(Ok(None)): no problem but also no entry -> it does not exist +/// Some(Ok(Some(entry_with_meta))): we have it type GetEntryWithMetaResult = Option, HolochainError>>; +/// This represents the state of a get_validation_package network process: +/// None: process started, but no response yet from the network +/// Some(Err(_)): there was a problem at some point +/// Some(Ok(None)): no error but also no validation package -> we seem to have asked the wrong +/// agent which actually should not happen. Something weird is going on. +/// Some(Ok(Some(entry))): we have it +type GetValidationPackageResult = Option, HolochainError>>; + #[derive(Clone, Debug)] pub struct NetworkState { /// every action and the result of that action // @TODO this will blow up memory, implement as some kind of dropping/FIFO with a limit? // @see https://github.com/holochain/holochain-rust/issues/166 - pub actions: ActionMap, + pub actions: Actions, pub network: Option>>, pub dna_hash: Option, pub agent_id: Option, + + /// Here we store the results of GET entry processes. + /// None means that we are still waiting for a result from the network. pub get_entry_with_meta_results: HashMap, + + /// Here we store the results of get validation package processes. + /// None means that we are still waiting for a result from the network. + pub get_validation_package_results: HashMap, + + /// This stores every open (= waiting for response) node-to-node messages. + /// Entries get removed when we receive an answer through Action::ResolveDirectConnection. + pub direct_message_connections: HashMap, + id: snowflake::ProcessUniqueId, } @@ -36,12 +69,22 @@ impl NetworkState { network: None, dna_hash: None, agent_id: None, + get_entry_with_meta_results: HashMap::new(), + get_validation_package_results: HashMap::new(), + direct_message_connections: HashMap::new(), + id: snowflake::ProcessUniqueId::new(), } } - pub fn actions(&self) -> ActionMap { + pub fn actions(&self) -> Actions { self.actions.clone() } + + pub fn initialized(&self) -> Result<(), HolochainError> { + (self.network.is_some() && self.dna_hash.is_some() & self.agent_id.is_some()).ok_or( + HolochainError::ErrorGeneric("Network not initialized".to_string()), + ) + } } diff --git a/core/src/nucleus/ribosome/api/debug.rs b/core/src/nucleus/ribosome/api/debug.rs index f684434388..79804667b5 100644 --- a/core/src/nucleus/ribosome/api/debug.rs +++ b/core/src/nucleus/ribosome/api/debug.rs @@ -11,8 +11,7 @@ pub fn invoke_debug(runtime: &mut Runtime, args: &RuntimeArgs) -> ZomeApiResult // TODO #502 - log in logger as DEBUG log-level runtime .context - .log(&format!("zome_log:DEBUG: '{}'", payload)) - .expect("Logger should work"); + .log(format!("zome_log:DEBUG: '{}'", payload)); // Done ribosome_success!() } diff --git a/core/src/nucleus/ribosome/run_dna.rs b/core/src/nucleus/ribosome/run_dna.rs index c0a1e669a4..9f5af48e3f 100644 --- a/core/src/nucleus/ribosome/run_dna.rs +++ b/core/src/nucleus/ribosome/run_dna.rs @@ -168,12 +168,9 @@ pub fn run_dna( } }; // Log & done - runtime - .context - .log(&format!( - "Zome Function '{}' returned: {}", - zome_call.fn_name, return_log_msg, - )) - .expect("Logger should work"); + runtime.context.log(format!( + "Zome Function '{}' returned: {}", + zome_call.fn_name, return_log_msg, + )); return return_result; } diff --git a/core/src/workflows/mod.rs b/core/src/workflows/mod.rs index 077693f11d..4447718183 100644 --- a/core/src/workflows/mod.rs +++ b/core/src/workflows/mod.rs @@ -1,2 +1,3 @@ pub mod author_entry; pub mod get_entry_history; +pub mod respond_validation_package_request; diff --git a/core/src/workflows/respond_validation_package_request.rs b/core/src/workflows/respond_validation_package_request.rs new file mode 100644 index 0000000000..a87e30b474 --- /dev/null +++ b/core/src/workflows/respond_validation_package_request.rs @@ -0,0 +1,47 @@ +use crate::{ + action::{Action, ActionWrapper, DirectMessageData}, + context::Context, + instance::dispatch_action, + network::direct_message::DirectMessage, + nucleus::actions::build_validation_package::build_validation_package, +}; + +use holochain_core_types::{cas::content::Address, entry::Entry, error::HolochainError}; +use std::{convert::TryFrom, sync::Arc}; + +fn get_entry(address: &Address, context: &Arc) -> Result { + let raw = context + .state() + .unwrap() + .agent() + .chain() + .content_storage() + .read() + .unwrap() + .fetch(address)? + .ok_or(HolochainError::ErrorGeneric("Entry not found".to_string()))?; + + Entry::try_from(raw) +} + +pub async fn respond_validation_package_request( + to_agent_id: Address, + msg_id: String, + requested_entry_address: Address, + context: Arc, +) { + let maybe_validation_package = match get_entry(&requested_entry_address, &context) { + Ok(entry) => await!(build_validation_package(&entry, &context)).ok(), + Err(_) => None, + }; + + let direct_message = DirectMessage::ValidationPackage(maybe_validation_package); + let direct_message_data = DirectMessageData { + address: to_agent_id, + message: direct_message, + msg_id, + is_response: true, + }; + let action_wrapper = ActionWrapper::new(Action::SendDirectMessage(direct_message_data)); + dispatch_action(&context.action_channel, action_wrapper); +} diff --git a/core_types/src/entry/mod.rs b/core_types/src/entry/mod.rs index 6b61f8a27e..7e51cd3719 100644 --- a/core_types/src/entry/mod.rs +++ b/core_types/src/entry/mod.rs @@ -12,6 +12,7 @@ use entry::entry_type::{test_app_entry_type, test_app_entry_type_b, AppEntryType use error::{HcResult, HolochainError}; use json::{default_to_json, default_try_from_json, JsonString, RawString}; use link::{link_add::LinkAdd, link_list::LinkList, link_remove::LinkRemove}; +use multihash::Hash; use serde::{ser::SerializeTuple, Deserialize, Deserializer, Serializer}; use snowflake; use std::convert::TryFrom; @@ -100,6 +101,13 @@ impl PartialEq for Entry { } impl AddressableContent for Entry { + fn address(&self) -> Address { + match &self { + Entry::AgentId(agent_id) => agent_id.address(), + _ => Address::encode_from_str(&String::from(self.content()), Hash::SHA2256), + } + } + fn content(&self) -> Content { self.into() } diff --git a/hdk-rust/tests/integration_test.rs b/hdk-rust/tests/integration_test.rs index bee550019a..ddb9d3adda 100755 --- a/hdk-rust/tests/integration_test.rs +++ b/hdk-rust/tests/integration_test.rs @@ -196,7 +196,7 @@ fn can_use_globals() { assert_eq!( result.clone(), Ok(JsonString::from(HashString::from( - "QmfFVhScc1cVzEqTBVLBr6d2FbsHaM5Cn3ynnvM7CUiJp9" + "alex--------------------------------------------------------------------------------ADO_" ))), "result = {:?}", result