Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pallet-Messenger-3 #852

Merged
merged 9 commits into from
Oct 10, 2022
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

349 changes: 270 additions & 79 deletions crates/pallet-messenger/src/lib.rs

Large diffs are not rendered by default.

212 changes: 190 additions & 22 deletions crates/pallet-messenger/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::verification::Proof;
use crate::{
ChannelId, Channels, Config, Error, Event, Inbox, InboxMessageResponses, InitiateChannelParams,
Nonce, Outbox, Pallet,
ChannelId, Channels, Config, Error, Event, Inbox, InboxResponses, InitiateChannelParams, Nonce,
Outbox, OutboxMessageResult, OutboxResponses, Pallet,
};
use codec::{Decode, Encode};
use frame_support::ensure;
use scale_info::TypeInfo;
use sp_messenger::endpoint::{EndpointRequest, EndpointResponse};
use sp_runtime::traits::Get;
use sp_runtime::{ArithmeticError, DispatchError, DispatchResult};

@@ -19,8 +20,7 @@ pub enum ProtocolMessageRequest {
}

/// Defines protocol requests performed on domains.
#[derive(Debug, Encode, Decode, Clone, Eq, PartialEq, TypeInfo)]
pub struct ProtocolMessageResponse(Result<(), DispatchError>);
pub type ProtocolMessageResponse = Result<(), DispatchError>;

/// Protocol message that encompasses request or its response.
#[derive(Debug, Encode, Decode, Clone, Eq, PartialEq, TypeInfo)]
@@ -32,8 +32,10 @@ pub enum RequestResponse<Request, Response> {
/// Payload of the message
#[derive(Debug, Encode, Decode, Clone, Eq, PartialEq, TypeInfo)]
pub enum Payload {
/// Protocol specific payload.
/// Protocol message.
Protocol(RequestResponse<ProtocolMessageRequest, ProtocolMessageResponse>),
/// Endpoint message.
Endpoint(RequestResponse<EndpointRequest, EndpointResponse>),
}

/// Versioned message payload
@@ -55,6 +57,8 @@ pub struct Message<DomainId> {
pub nonce: Nonce,
/// Payload of the message
pub payload: VersionedPayload,
/// Last delivered message response nonce on src_domain.
pub last_delivered_message_response_nonce: Option<Nonce>,
}

/// Cross Domain message contains Message and its proof on src_domain.
@@ -101,6 +105,8 @@ impl<T: Config> Pallet<T> {
channel_id,
nonce: next_outbox_nonce,
payload,
last_delivered_message_response_nonce: channel
.latest_response_received_message_nonce,
};
Outbox::<T>::insert((dst_domain_id, channel_id, next_outbox_nonce), msg);

@@ -120,6 +126,25 @@ impl<T: Config> Pallet<T> {
)
}

/// Removes messages responses from Inbox responses as the src_domain signalled that responses are delivered.
/// all the messages with nonce <= latest_confirmed_nonce are deleted.
fn clean_delivered_message_responses(
dst_domain_id: T::DomainId,
channel_id: ChannelId,
latest_confirmed_nonce: Option<Nonce>,
) {
let mut current_nonce = latest_confirmed_nonce;
while let Some(nonce) = current_nonce {
// fail if we have cleared all the messages
if InboxResponses::<T>::take((dst_domain_id, channel_id, nonce)).is_none() {
return;
}

current_nonce = nonce.checked_sub(Nonce::one())
}
}

/// Process the incoming messages from given domain_id and channel_id.
pub(crate) fn process_inbox_messages(
dst_domain_id: T::DomainId,
channel_id: ChannelId,
@@ -132,28 +157,67 @@ impl<T: Config> Pallet<T> {
let mut messages_processed = 0;
while let Some(msg) = Inbox::<T>::take((dst_domain_id, channel_id, next_inbox_nonce)) {
let response = match msg.payload {
VersionedPayload::V0(Payload::Protocol(msg)) => {
Self::process_incoming_protocol_message(dst_domain_id, channel_id, msg)
// process incoming protocol message.
VersionedPayload::V0(Payload::Protocol(RequestResponse::Request(req))) => {
Payload::Protocol(RequestResponse::Response(
Self::process_incoming_protocol_message_req(dst_domain_id, channel_id, req),
))
}

// process incoming endpoint message.
VersionedPayload::V0(Payload::Endpoint(RequestResponse::Request(req))) => {
let response = if let Some(endpoint_handler) =
T::get_endpoint_response_handler(&req.dst_endpoint)
{
endpoint_handler.message(dst_domain_id, req)
} else {
Err(Error::<T>::NoMessageHandler.into())
};

Payload::Endpoint(RequestResponse::Response(response))
}

// return error for all the remaining branches
VersionedPayload::V0(payload) => match payload {
Payload::Protocol(_) => Payload::Protocol(RequestResponse::Response(Err(
Error::<T>::InvalidMessagePayload.into(),
))),
Payload::Endpoint(_) => Payload::Endpoint(RequestResponse::Response(Err(
Error::<T>::InvalidMessagePayload.into(),
))),
},
};

InboxMessageResponses::<T>::insert(
InboxResponses::<T>::insert(
(dst_domain_id, channel_id, next_inbox_nonce),
Message {
src_domain_id: T::SelfDomainId::get(),
dst_domain_id,
channel_id,
nonce: next_inbox_nonce,
payload: VersionedPayload::V0(Payload::Protocol(RequestResponse::Response(
ProtocolMessageResponse(response),
))),
payload: VersionedPayload::V0(response),
// this nonce is not considered in response context.
last_delivered_message_response_nonce: None,
},
);

Self::deposit_event(Event::InboxMessageResponse {
domain_id: dst_domain_id,
channel_id,
nonce: next_inbox_nonce,
});

next_inbox_nonce = next_inbox_nonce
.checked_add(Nonce::one())
.ok_or(DispatchError::Arithmetic(ArithmeticError::Overflow))?;
messages_processed += 1;

// clean any delivered inbox responses
Self::clean_delivered_message_responses(
dst_domain_id,
channel_id,
msg.last_delivered_message_response_nonce,
)
}

if messages_processed > 0 {
@@ -171,21 +235,125 @@ impl<T: Config> Pallet<T> {
Ok(())
}

fn process_incoming_protocol_message(
fn process_incoming_protocol_message_req(
domain_id: T::DomainId,
channel_id: ChannelId,
req_resp: RequestResponse<ProtocolMessageRequest, ProtocolMessageResponse>,
req: ProtocolMessageRequest,
) -> Result<(), DispatchError> {
match req_resp {
RequestResponse::Request(req) => match req {
ProtocolMessageRequest::ChannelOpen(_) => {
Self::do_open_channel(domain_id, channel_id)
}
ProtocolMessageRequest::ChannelClose => {
Self::do_close_channel(domain_id, channel_id)
match req {
ProtocolMessageRequest::ChannelOpen(_) => Self::do_open_channel(domain_id, channel_id),
ProtocolMessageRequest::ChannelClose => Self::do_close_channel(domain_id, channel_id),
}
}

fn process_incoming_protocol_message_response(
domain_id: T::DomainId,
channel_id: ChannelId,
req: ProtocolMessageRequest,
resp: ProtocolMessageResponse,
) -> DispatchResult {
match (req, resp) {
// channel open request is accepted by dst_domain.
// open channel on our end.
(ProtocolMessageRequest::ChannelOpen(_), Ok(_)) => {
Self::do_open_channel(domain_id, channel_id)
}

// for rest of the branches we dont care about the outcome and return Ok
// for channel close request, we do not care about the response as channel is already closed.
// for channel open request and request is rejected, channel is left in init state and no new messages are accepted.
_ => Ok(()),
}
}

pub(crate) fn process_outbox_message_responses(
dst_domain_id: T::DomainId,
channel_id: ChannelId,
) -> DispatchResult {
// fetch the next message response nonce to process
// starts with nonce 0
let mut last_message_response_nonce = Channels::<T>::get(dst_domain_id, channel_id)
.ok_or(Error::<T>::MissingChannel)?
.latest_response_received_message_nonce;

let mut next_message_response_nonce = last_message_response_nonce
.and_then(|nonce| nonce.checked_add(Nonce::one()))
.unwrap_or(Nonce::zero());

// TODO(ved): maybe a bound of number of message responses to process in a single call?
let mut messages_processed = 0;
while let Some(resp_msg) =
OutboxResponses::<T>::take((dst_domain_id, channel_id, next_message_response_nonce))
{
// fetch original request
let req_msg =
Outbox::<T>::take((dst_domain_id, channel_id, next_message_response_nonce))
.ok_or(Error::<T>::MissingMessage)?;

let resp = match (req_msg.payload, resp_msg.payload) {
// process incoming protocol outbox message response.
(
VersionedPayload::V0(Payload::Protocol(RequestResponse::Request(req))),
VersionedPayload::V0(Payload::Protocol(RequestResponse::Response(resp))),
) => Self::process_incoming_protocol_message_response(
dst_domain_id,
channel_id,
req,
resp,
),

// process incoming endpoint outbox message response.
(
VersionedPayload::V0(Payload::Endpoint(RequestResponse::Request(req))),
VersionedPayload::V0(Payload::Endpoint(RequestResponse::Response(resp))),
) => {
if let Some(endpoint_handler) =
T::get_endpoint_response_handler(&req.dst_endpoint)
{
endpoint_handler.message_response(dst_domain_id, req, resp)
} else {
Err(Error::<T>::NoMessageHandler.into())
}
}
},
RequestResponse::Response(_) => Err(Error::<T>::InvalidMessagePayload.into()),

(_, _) => Err(Error::<T>::InvalidMessagePayload.into()),
};

// deposit event notifying the message status.
match resp {
Ok(_) => Self::deposit_event(Event::OutboxMessageResult {
domain_id: dst_domain_id,
channel_id,
nonce: next_message_response_nonce,
result: OutboxMessageResult::Ok,
}),
Err(err) => Self::deposit_event(Event::OutboxMessageResult {
domain_id: dst_domain_id,
channel_id,
nonce: next_message_response_nonce,
result: OutboxMessageResult::Err(err),
}),
}

last_message_response_nonce = Some(next_message_response_nonce);
next_message_response_nonce = next_message_response_nonce
.checked_add(Nonce::one())
.ok_or(DispatchError::Arithmetic(ArithmeticError::Overflow))?;
messages_processed += 1;
}

if messages_processed > 0 {
Channels::<T>::mutate(
dst_domain_id,
channel_id,
|maybe_channel| -> DispatchResult {
let channel = maybe_channel.as_mut().ok_or(Error::<T>::MissingChannel)?;
channel.latest_response_received_message_nonce = last_message_response_nonce;
Ok(())
},
)?;
}

Ok(())
}
}
242 changes: 159 additions & 83 deletions crates/pallet-messenger/src/mock.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,138 @@
use crate::{ChannelId, Channels};
use frame_support::parameter_types;
use crate::{ChannelId, Channels, Config, InboxResponses, Nonce, Outbox, StateRootOf};
use frame_support::storage::generator::StorageDoubleMap;
use frame_support::traits::{ConstU16, ConstU32, ConstU64};
use sp_core::storage::StorageKey;
use sp_core::H256;
use sp_runtime::testing::Header;
use sp_runtime::traits::{BlakeTwo256, IdentityLookup};
use sp_messenger::endpoint::{EndpointHandler, EndpointRequest, EndpointResponse};
use sp_runtime::traits::BlakeTwo256;
use sp_runtime::DispatchResult;
use sp_state_machine::backend::Backend;
use sp_state_machine::{prove_read, InMemoryBackend};
use sp_std::vec::Vec;
use sp_trie::StorageProof;

type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic<Test>;
type Block = frame_system::mocking::MockBlock<Test>;

frame_support::construct_runtime!(
pub struct Test where
Block = Block,
NodeBlock = Block,
UncheckedExtrinsic = UncheckedExtrinsic,
{
System: frame_system::{Pallet, Call, Config, Storage, Event<T>},
SystemDomainTracker: mock_system_domain_tracker::{Pallet, Storage},
Messenger: crate::{Pallet, Call, Event<T>}
pub(crate) type DomainId = u64;

pub type TestExternalities = sp_state_machine::TestExternalities<BlakeTwo256>;

macro_rules! impl_runtime {
($runtime:ty, $domain_id:literal) => {
use crate::mock::{mock_system_domain_tracker, DomainId, TestExternalities, MockEndpoint};
use frame_support::parameter_types;
use sp_core::H256;
use sp_runtime::testing::Header;
use sp_runtime::traits::{BlakeTwo256, ConstU16, ConstU32, ConstU64, IdentityLookup};
use sp_std::vec::Vec;
use sp_messenger::endpoint::{EndpointHandler, Endpoint};

type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic<Runtime>;
type Block = frame_system::mocking::MockBlock<Runtime>;

frame_support::construct_runtime!(
pub struct Runtime where
Block = Block,
NodeBlock = Block,
UncheckedExtrinsic = UncheckedExtrinsic,
{
System: frame_system::{Pallet, Call, Config, Storage, Event<T>},
SystemDomainTracker: mock_system_domain_tracker::{Pallet, Storage},
Messenger: crate::{Pallet, Call, Event<T>}
}
);


impl frame_system::Config for $runtime {
type BaseCallFilter = frame_support::traits::Everything;
type BlockWeights = ();
type BlockLength = ();
type DbWeight = ();
type Origin = Origin;
type Call = Call;
type Index = u64;
type BlockNumber = u64;
type Hash = H256;
type Hashing = BlakeTwo256;
type AccountId = u64;
type Lookup = IdentityLookup<Self::AccountId>;
type Header = Header;
type Event = Event;
type BlockHashCount = ConstU64<250>;
type Version = ();
type PalletInfo = PalletInfo;
type AccountData = ();
type OnNewAccount = ();
type OnKilledAccount = ();
type SystemWeightInfo = ();
type SS58Prefix = ConstU16<42>;
type OnSetCode = ();
type MaxConsumers = ConstU32<16>;
}

parameter_types! {
pub const ExistentialDeposit: u64 = 1;
}

impl mock_system_domain_tracker::Config for $runtime {}

parameter_types! {
pub const SelfDomainId: DomainId = $domain_id;
}

impl crate::Config for $runtime {
type Event = Event;
type DomainId = DomainId;
type SelfDomainId = SelfDomainId;
type SystemDomainTracker = SystemDomainTracker;
/// function to fetch endpoint response handler by Endpoint.
fn get_endpoint_response_handler(
_endpoint: &Endpoint,
) -> Option<Box<dyn EndpointHandler<Self::DomainId>>>{
Some(Box::new(MockEndpoint{}))
}
}

pub fn new_test_ext() -> TestExternalities {
let t = frame_system::GenesisConfig::default()
.build_storage::<Runtime>()
.unwrap();

let mut t: TestExternalities = t.into();
t.execute_with(|| System::set_block_number(1));
t
}
};
}

pub struct MockEndpoint {}
impl EndpointHandler<DomainId> for MockEndpoint {
fn message(&self, _src_domain_id: DomainId, req: EndpointRequest) -> EndpointResponse {
let req = req.payload;
assert_eq!(req, vec![1, 2, 3, 4]);
Ok(vec![5, 6, 7, 8])
}

fn message_response(
&self,
_dst_domain_id: DomainId,
_req: EndpointRequest,
resp: EndpointResponse,
) -> DispatchResult {
let resp = resp.unwrap();
assert_eq!(resp, vec![5, 6, 7, 8]);
Ok(())
}
);

impl frame_system::Config for Test {
type BaseCallFilter = frame_support::traits::Everything;
type BlockWeights = ();
type BlockLength = ();
type DbWeight = ();
type Origin = Origin;
type Call = Call;
type Index = u64;
type BlockNumber = u64;
type Hash = H256;
type Hashing = BlakeTwo256;
type AccountId = u64;
type Lookup = IdentityLookup<Self::AccountId>;
type Header = Header;
type Event = Event;
type BlockHashCount = ConstU64<250>;
type Version = ();
type PalletInfo = PalletInfo;
type AccountData = ();
type OnNewAccount = ();
type OnKilledAccount = ();
type SystemWeightInfo = ();
type SS58Prefix = ConstU16<42>;
type OnSetCode = ();
type MaxConsumers = ConstU32<16>;
}

parameter_types! {
pub const ExistentialDeposit: u64 = 1;
pub(crate) mod domain_a {
impl_runtime!(Runtime, 1);
}

pub(crate) type DomainId = u64;
pub(crate) mod domain_b {
impl_runtime!(Runtime, 2);
}

#[frame_support::pallet]
mod mock_system_domain_tracker {
pub(crate) mod mock_system_domain_tracker {
use frame_support::pallet_prelude::*;
use sp_core::H256;
use sp_messenger::SystemDomainTracker;
use sp_messenger::SystemDomainTracker as SystemDomainTrackerT;

#[pallet::config]
pub trait Config: frame_system::Config {}
@@ -77,53 +146,60 @@ mod mock_system_domain_tracker {
#[pallet::storage]
pub(super) type StateRoot<T: Config> = StorageValue<_, H256, ValueQuery>;

impl<T: Config> SystemDomainTracker<H256> for Pallet<T> {
impl<T: Config> SystemDomainTrackerT<H256> for Pallet<T> {
fn latest_state_roots() -> Vec<H256> {
vec![StateRoot::<T>::get()]
}
}
}

impl mock_system_domain_tracker::Config for Test {}

parameter_types! {
pub const SelfDomainId: DomainId = 0;
}

impl crate::Config for Test {
type Event = Event;
type DomainId = DomainId;
type SelfDomainId = SelfDomainId;
type SystemDomainTracker = SystemDomainTracker;
}

pub fn new_test_ext() -> sp_io::TestExternalities {
let t = frame_system::GenesisConfig::default()
.build_storage::<Test>()
.unwrap();

let mut t: sp_io::TestExternalities = t.into();
t.execute_with(|| System::set_block_number(1));
t
impl<T: Config> Pallet<T> {
pub fn set_state_root(state_root: H256) {
StateRoot::<T>::put(state_root)
}
}
}

fn storage_proof_for_key(
backend: InMemoryBackend<sp_core::Blake2Hasher>,
fn storage_proof_for_key<T: Config>(
backend: InMemoryBackend<T::Hashing>,
key: StorageKey,
) -> (H256, StorageProof) {
) -> (StateRootOf<T>, StorageProof) {
let state_version = sp_runtime::StateVersion::default();
let root = backend.storage_root(std::iter::empty(), state_version).0;
let proof = StorageProof::new(prove_read(backend, &[key]).unwrap().iter_nodes());
(root, proof)
}

pub(crate) fn storage_proof_of_channels(
backend: InMemoryBackend<sp_core::Blake2Hasher>,
domain_id: DomainId,
pub(crate) fn storage_proof_of_channels<T: Config>(
backend: InMemoryBackend<T::Hashing>,
domain_id: T::DomainId,
channel_id: ChannelId,
) -> (StateRootOf<T>, StorageKey, StorageProof) {
let key = Channels::<T>::storage_double_map_final_key(domain_id, channel_id);
let storage_key = StorageKey(key);
let (root, proof) = storage_proof_for_key::<T>(backend, storage_key.clone());
(root, storage_key, proof)
}

pub(crate) fn storage_proof_of_outbox_messages<T: Config>(
backend: InMemoryBackend<T::Hashing>,
domain_id: T::DomainId,
channel_id: ChannelId,
nonce: Nonce,
) -> (StateRootOf<T>, StorageKey, StorageProof) {
let key = Outbox::<T>::hashed_key_for((domain_id, channel_id, nonce));
let storage_key = StorageKey(key);
let (root, proof) = storage_proof_for_key::<T>(backend, storage_key.clone());
(root, storage_key, proof)
}

pub(crate) fn storage_proof_of_inbox_message_responses<T: Config>(
backend: InMemoryBackend<T::Hashing>,
domain_id: T::DomainId,
channel_id: ChannelId,
) -> (H256, StorageKey, StorageProof) {
let key = Channels::<Test>::storage_double_map_final_key(domain_id, channel_id);
nonce: Nonce,
) -> (StateRootOf<T>, StorageKey, StorageProof) {
let key = InboxResponses::<T>::hashed_key_for((domain_id, channel_id, nonce));
let storage_key = StorageKey(key);
let (root, proof) = storage_proof_for_key(backend, storage_key.clone());
let (root, proof) = storage_proof_for_key::<T>(backend, storage_key.clone());
(root, storage_key, proof)
}
487 changes: 434 additions & 53 deletions crates/pallet-messenger/src/tests.rs

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions crates/sp-messenger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,8 +14,14 @@ include = [
]

[dependencies]

codec = { package = "parity-scale-codec", version = "3.1.5", default-features = false, features = ["derive"] }
scale-info = { version = "2.1.2", default-features = false, features = ["derive"] }
sp-runtime = { version = "6.0.0", default-features = false, git = "https://github.com/subspace/substrate", rev = "1a7c28721fa77ecce9632ad9ce473f2d3cf1a598" }

[features]
default = ["std"]
std = []
std = [
"codec/std",
"scale-info/std",
"sp-runtime/std"
]
47 changes: 47 additions & 0 deletions crates/sp-messenger/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use codec::{Decode, Encode};
use scale_info::TypeInfo;
use sp_runtime::{DispatchError, DispatchResult};

/// Endpoint as defined in the formal spec.
/// Endpoint is an application that can send and receive messages from other domains.
#[derive(Debug, Encode, Decode, Clone, Eq, PartialEq, TypeInfo)]
pub enum Endpoint {
/// Id of the endpoint on a specific domain.
Id(u64),
}

/// Endpoint request or response payload.
pub type EndpointPayload = Vec<u8>;

/// Request sent by src_endpoint to dst_endpoint.
#[derive(Debug, Encode, Decode, Clone, Eq, PartialEq, TypeInfo)]
pub struct EndpointRequest {
pub src_endpoint: Endpoint,
pub dst_endpoint: Endpoint,
pub payload: EndpointPayload,
}

/// Response for the message request.
pub type EndpointResponse = Result<EndpointPayload, DispatchError>;

/// Sender provides abstraction on sending messages to other domains.
pub trait Sender<DomainId> {
/// sends a message to dst_domain_id.
fn send_message(dst_domain_id: DomainId, req: EndpointRequest) -> DispatchResult;
}

/// Handler to
/// - handle message request from other domains.
/// - handle requested message responses from other domains.
pub trait EndpointHandler<DomainId> {
/// Triggered by pallet-messenger when a new inbox message is received and bound for this handler.
fn message(&self, src_domain_id: DomainId, req: EndpointRequest) -> EndpointResponse;

/// Triggered by pallet-messenger when a response for a request is received from dst_domain_id.
fn message_response(
&self,
dst_domain_id: DomainId,
req: EndpointRequest,
resp: EndpointResponse,
) -> DispatchResult;
}
2 changes: 2 additions & 0 deletions crates/sp-messenger/src/lib.rs
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@
#![cfg_attr(not(feature = "std"), no_std)]

pub mod endpoint;

/// A trait used by domains to track and fetch info about system domain.
pub trait SystemDomainTracker<StateRoot> {
/// Get the latest state roots of the K-deep System domain blocks.