From 60f04eca18e1fe833242aef201b5290c3296c5d3 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 16 Oct 2024 14:45:03 +0400 Subject: [PATCH 1/8] put error in place of failure to query block required data --- ethexe/processor/src/lib.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index abe16b462ab..086b120e13e 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -85,9 +85,13 @@ impl Processor { let states = self .db .block_start_program_states(block_hash) - .unwrap_or_default(); // TODO (breathx): shouldn't it be a panic? + .ok_or_else(|| { + anyhow!("failed to get block start program states for under-processing block") + })?; - let schedule = self.db.block_start_schedule(block_hash).unwrap_or_default(); // TODO (breathx): shouldn't it be a panic? + let schedule = self.db.block_start_schedule(block_hash).ok_or_else(|| { + anyhow!("failed to get block start schedule for under-processing block") + })?; let mut in_block_transitions = InBlockTransitions::new(header, states, schedule); From caee96389b4630504c8f61ffa953641337dad589 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 16 Oct 2024 14:45:38 +0400 Subject: [PATCH 2/8] remove solved todo --- ethexe/processor/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 086b120e13e..2ae8e701ef7 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -95,7 +95,6 @@ impl Processor { let mut in_block_transitions = InBlockTransitions::new(header, states, schedule); - // TODO (breathx): handle resulting addresses that were changed (e.g. top up balance wont be dumped as outcome). for event in events { match event { BlockRequestEvent::Router(event) => { From aac91505bbbb1b365a3ebcc7286a3459c52f6e12 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 16 Oct 2024 15:13:21 +0400 Subject: [PATCH 3/8] impl dispatch stash in storage; provide api --- ethexe/db/src/database.rs | 13 +++++++- ethexe/runtime/common/src/state.rs | 51 ++++++++++++++++++++++++++++++ ethexe/runtime/src/wasm/storage.rs | 13 +++++++- 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index b76d51cea4a..da82d71f0d2 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -28,7 +28,7 @@ use ethexe_common::{ BlockRequestEvent, }; use ethexe_runtime_common::state::{ - Allocations, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist, + Allocations, DispatchStash, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist, }; use gear_core::{ code::InstrumentedCode, @@ -504,6 +504,17 @@ impl Storage for Database { self.cas.write(&waitlist.encode()) } + fn read_stash(&self, hash: H256) -> Option { + self.cas.read(&hash).map(|data| { + DispatchStash::decode(&mut data.as_slice()) + .expect("Failed to decode data into `DispatchStash`") + }) + } + + fn write_stash(&self, stash: DispatchStash) -> H256 { + self.cas.write(&stash.encode()) + } + fn read_mailbox(&self, hash: H256) -> Option { self.cas.read(&hash).map(|data| { Mailbox::decode(&mut data.as_slice()).expect("Failed to decode data into `Mailbox`") diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index a48b0db6fc1..1b44fb217d8 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -144,6 +144,8 @@ pub struct ProgramState { pub queue_hash: MaybeHash, /// Hash of waiting messages list, see [`Waitlist`]. pub waitlist_hash: MaybeHash, + /// Hash of dispatch stash, see [`DispatchStash`]. + pub stash_hash: MaybeHash, /// Hash of mailboxed messages, see [`Mailbox`]. pub mailbox_hash: MaybeHash, /// Reducible balance. @@ -163,6 +165,7 @@ impl ProgramState { }), queue_hash: MaybeHash::Empty, waitlist_hash: MaybeHash::Empty, + stash_hash: MaybeHash::Empty, mailbox_hash: MaybeHash::Empty, balance: 0, executable_balance: 0, @@ -251,6 +254,8 @@ pub type MessageQueue = VecDeque; pub type Waitlist = BTreeMap>; +pub type DispatchStash = BTreeMap>; + // TODO (breathx): consider here LocalMailbox for each user. pub type Mailbox = BTreeMap>>; @@ -277,6 +282,12 @@ pub trait Storage { /// Writes waitlist and returns its hash. fn write_waitlist(&self, waitlist: Waitlist) -> H256; + /// Reads dispatch stash by its hash. + fn read_stash(&self, hash: H256) -> Option; + + /// Writes dispatch stash and returns its hash. + fn write_stash(&self, stash: DispatchStash) -> H256; + /// Reads mailbox by mailbox hash. fn read_mailbox(&self, hash: H256) -> Option; @@ -409,6 +420,46 @@ pub trait ComplexStorage: Storage { Ok(res) } + /// Usage: for optimized performance, please remove entries if empty. + /// Always updates storage. + fn modify_stash( + &self, + stash_hash: MaybeHash, + f: impl FnOnce(&mut DispatchStash), + ) -> Result { + self.modify_stash_if_changed(stash_hash, |stash| { + f(stash); + Some(()) + }) + .map(|v| v.expect("`Some` passed above; infallible").1) + } + + /// Usage: for optimized performance, please remove entries if empty. + /// DispatchStash is treated changed if f() returns Some. + fn modify_stash_if_changed( + &self, + stash_hash: MaybeHash, + f: impl FnOnce(&mut DispatchStash) -> Option, + ) -> Result> { + let mut stash = stash_hash.with_hash_or_default_result(|stash_hash| { + self.read_stash(stash_hash) + .ok_or_else(|| anyhow!("failed to read dispatch stash by its hash ({stash_hash})")) + })?; + + let res = if let Some(v) = f(&mut stash) { + let maybe_hash = stash + .is_empty() + .then_some(MaybeHash::Empty) + .unwrap_or_else(|| self.write_stash(stash).into()); + + Some((v, maybe_hash)) + } else { + None + }; + + Ok(res) + } + fn modify_queue( &self, queue_hash: MaybeHash, diff --git a/ethexe/runtime/src/wasm/storage.rs b/ethexe/runtime/src/wasm/storage.rs index c7d8566112e..eb650ceecba 100644 --- a/ethexe/runtime/src/wasm/storage.rs +++ b/ethexe/runtime/src/wasm/storage.rs @@ -20,7 +20,10 @@ use super::interface::database_ri; use alloc::{collections::BTreeMap, vec::Vec}; use core_processor::configs::BlockInfo; use ethexe_runtime_common::{ - state::{Allocations, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist}, + state::{ + Allocations, DispatchStash, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, + Waitlist, + }, RuntimeInterface, }; use gear_core::{memory::PageBuf, message::Payload, pages::GearPage}; @@ -64,6 +67,10 @@ impl Storage for RuntimeInterfaceStorage { database_ri::read_unwrapping(&hash) } + fn read_stash(&self, hash: H256) -> Option { + database_ri::read_unwrapping(&hash) + } + fn read_mailbox(&self, hash: H256) -> Option { database_ri::read_unwrapping(&hash) } @@ -100,6 +107,10 @@ impl Storage for RuntimeInterfaceStorage { database_ri::write(waitlist) } + fn write_stash(&self, stash: DispatchStash) -> H256 { + database_ri::write(stash) + } + fn write_mailbox(&self, mailbox: Mailbox) -> H256 { database_ri::write(mailbox) } From db0d7b89ed7b779e75da255b0380fea622422d17 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 16 Oct 2024 15:39:26 +0400 Subject: [PATCH 4/8] impl accessing stash in journal handling --- ethexe/runtime/common/src/journal.rs | 108 +++++++++++++++++---------- 1 file changed, 68 insertions(+), 40 deletions(-) diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index 253ba7604e9..17a16d7f133 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -163,38 +163,62 @@ impl JournalHandler for Handler<'_, S> { unreachable!("deprecated: {dispatch:?}"); } - if delay != 0 { - todo!("delayed sending isn't supported yet"); - } - if self .in_block_transitions .state_of(&dispatch.destination()) .is_none() { if !dispatch.is_reply() { - let expiry = self.in_block_transitions.schedule_task( - state::MAILBOX_VALIDITY.try_into().expect("infallible"), - ScheduledTask::RemoveFromMailbox( - (dispatch.source(), dispatch.destination()), - dispatch.id(), - ), - ); + if let Ok(non_zero_delay) = delay.try_into() { + // TODO (breathx): specify which program sent it. FIX WITHIN THE PR. + let expiry = self.in_block_transitions.schedule_task( + non_zero_delay, + ScheduledTask::SendUserMessage { + message_id: dispatch.id(), + to_mailbox: true, + }, + ); + + self.update_state_with_storage(dispatch.source(), |storage, state| { + let dispatch = Dispatch::from_stored(storage, dispatch.into_stored()); + + state.stash_hash = + storage.modify_stash(state.stash_hash.clone(), |stash| { + let r = stash.insert(dispatch.id, (dispatch, expiry)); + debug_assert!(r.is_none()); + })?; + + Ok(()) + }); + } else { + let expiry = self.in_block_transitions.schedule_task( + state::MAILBOX_VALIDITY.try_into().expect("infallible"), + ScheduledTask::RemoveFromMailbox( + (dispatch.source(), dispatch.destination()), + dispatch.id(), + ), + ); + + self.update_state_with_storage(dispatch.source(), |storage, state| { + state.mailbox_hash = + storage.modify_mailbox(state.mailbox_hash.clone(), |mailbox| { + mailbox + .entry(dispatch.destination()) + .or_default() + .insert(dispatch.id(), (dispatch.value(), expiry)); + })?; + + Ok(()) + }); + } - self.update_state_with_storage(dispatch.source(), |storage, state| { - state.mailbox_hash = - storage.modify_mailbox(state.mailbox_hash.clone(), |mailbox| { - mailbox - .entry(dispatch.destination()) - .or_default() - .insert(dispatch.id(), (dispatch.value(), expiry)); - })?; + return; + } - Ok(()) - }); + if delay != 0 { + unreachable!("delayed sending of replies is forbidden"); } - // TODO (breathx): send here to in_block_transitions. let source = dispatch.source(); let message = dispatch.into_parts().1; @@ -214,27 +238,31 @@ impl JournalHandler for Handler<'_, S> { return; } - let (kind, message) = dispatch.into_parts(); - let (id, source, destination, payload, gas_limit, value, details) = message.into_parts(); + let destination = dispatch.destination(); + let dispatch = Dispatch::from_stored(self.storage, dispatch.into_stored()); - let payload_hash = self.storage.write_payload(payload).into(); + if let Ok(non_zero_delay) = delay.try_into() { + // TODO (breathx): specify which program sent it. FIX WITHIN THE PR. + let expiry = self + .in_block_transitions + .schedule_task(non_zero_delay, ScheduledTask::SendDispatch(dispatch.id)); - let dispatch = Dispatch { - id, - kind, - source, - payload_hash, - value, - details, - context: None, - }; + self.update_state_with_storage(destination, |storage, state| { + state.stash_hash = storage.modify_stash(state.stash_hash.clone(), |stash| { + let r = stash.insert(dispatch.id, (dispatch, expiry)); + debug_assert!(r.is_none()); + })?; - self.update_state_with_storage(destination, |storage, state| { - state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| { - queue.push_back(dispatch); - })?; - Ok(()) - }); + Ok(()) + }); + } else { + self.update_state_with_storage(destination, |storage, state| { + state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| { + queue.push_back(dispatch); + })?; + Ok(()) + }); + } } fn wait_dispatch( From ca543f69d42b5e2a59407dfa8ae40bf1194dea5e Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 16 Oct 2024 16:15:36 +0400 Subject: [PATCH 5/8] make scheduled task generic-ugly, but flexible across implemented environments --- common/src/auxiliary/task_pool.rs | 8 ++++---- core/src/tasks.rs | 29 ++++++++++++++------------- ethexe/common/src/db.rs | 15 +++++++++++--- ethexe/runtime/common/src/journal.rs | 11 +++++----- ethexe/runtime/common/src/schedule.rs | 11 +++++++--- gsdk/src/metadata/generated.rs | 6 +++--- gtest/src/manager/journal.rs | 6 +++++- gtest/src/manager/task.rs | 6 +++--- gtest/src/state/task_pool.rs | 20 +++++++++--------- pallets/gear-program/src/lib.rs | 4 ++-- pallets/gear-scheduler/src/lib.rs | 4 ++-- pallets/gear/src/lib.rs | 4 ++-- pallets/gear/src/manager/journal.rs | 2 +- pallets/gear/src/manager/task.rs | 6 +++--- pallets/gear/src/mock.rs | 2 +- 15 files changed, 76 insertions(+), 58 deletions(-) diff --git a/common/src/auxiliary/task_pool.rs b/common/src/auxiliary/task_pool.rs index 372d3956f08..25ee5762428 100644 --- a/common/src/auxiliary/task_pool.rs +++ b/common/src/auxiliary/task_pool.rs @@ -20,20 +20,20 @@ use super::{AuxiliaryDoubleStorageWrap, BlockNumber, DoubleBTreeMap}; use crate::scheduler::TaskPoolImpl; -use gear_core::{ids::ProgramId, tasks::ScheduledTask}; +use gear_core::{ids::ProgramId, tasks::VaraScheduledTask}; use std::cell::RefCell; /// Task pool implementation that can be used in a native, non-wasm runtimes. pub type AuxiliaryTaskpool = TaskPoolImpl< TaskPoolStorageWrap, - ScheduledTask, + VaraScheduledTask, TaskPoolErrorImpl, TaskPoolErrorImpl, TaskPoolCallbacks, >; std::thread_local! { - pub(crate) static TASKPOOL_STORAGE: RefCell, ()>> = const { RefCell::new(DoubleBTreeMap::new()) }; + pub(crate) static TASKPOOL_STORAGE: RefCell, ()>> = const { RefCell::new(DoubleBTreeMap::new()) }; } /// `TaskPool` double storage map manager @@ -41,7 +41,7 @@ pub struct TaskPoolStorageWrap; impl AuxiliaryDoubleStorageWrap for TaskPoolStorageWrap { type Key1 = BlockNumber; - type Key2 = ScheduledTask; + type Key2 = VaraScheduledTask; type Value = (); fn with_storage(f: F) -> R diff --git a/core/src/tasks.rs b/core/src/tasks.rs index a1a89732360..fb420ee4c64 100644 --- a/core/src/tasks.rs +++ b/core/src/tasks.rs @@ -23,13 +23,16 @@ use gsys::Gas; use parity_scale_codec::{Decode, Encode, MaxEncodedLen}; use scale_info::TypeInfo; +/// Alias for ScheduledTask used in vara-runtime, generic across AccountId used. +pub type VaraScheduledTask = ScheduledTask; + /// Scheduled task sense and required data for processing action. /// -/// CAUTION: NEVER ALLOW `ScheduledTask` BE A BIG DATA. +/// CAUTION: NEVER ALLOW `ScheduledTask` BE A BIG DATA. /// To avoid redundant migrations only append new variant(s) to the enum /// with an explicit corresponding scale codec index. #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Encode, Decode, TypeInfo, MaxEncodedLen)] -pub enum ScheduledTask { +pub enum ScheduledTask { // Rent charging section. // ----- /// Pause program as out of rent one. @@ -42,7 +45,7 @@ pub enum ScheduledTask { /// Remove message from mailbox as out of rent one. #[codec(index = 2)] - RemoveFromMailbox(AccountId, MessageId), + RemoveFromMailbox(RFM, MessageId), /// Remove message from waitlist as out of rent one. #[codec(index = 3)] @@ -62,7 +65,7 @@ pub enum ScheduledTask { /// /// The message itself stored in DispatchStash. #[codec(index = 6)] - SendDispatch(MessageId), + SendDispatch(SD), /// Delayed message to user sending. /// @@ -72,7 +75,7 @@ pub enum ScheduledTask { /// What message to send. message_id: MessageId, /// Should it be inserted into users mailbox. - to_mailbox: bool, + to_mailbox: SUM, }, /// Remove gas reservation. @@ -85,9 +88,9 @@ pub enum ScheduledTask { RemoveResumeSession(u32), } -impl ScheduledTask { +impl ScheduledTask { /// Processing function of current task with given handler. - pub fn process_with(self, handler: &mut impl TaskHandler) -> Gas { + pub fn process_with(self, handler: &mut impl TaskHandler) -> Gas { use ScheduledTask::*; match self { @@ -116,7 +119,7 @@ impl ScheduledTask { } /// Task handler trait for dealing with required tasks. -pub trait TaskHandler { +pub trait TaskHandler { // Rent charging section. // ----- /// Pause program action. @@ -124,7 +127,7 @@ pub trait TaskHandler { /// Remove code action. fn remove_code(&mut self, code_id: CodeId) -> Gas; /// Remove from mailbox action. - fn remove_from_mailbox(&mut self, user_id: AccountId, message_id: MessageId) -> Gas; + fn remove_from_mailbox(&mut self, user_id: RFM, message_id: MessageId) -> Gas; /// Remove from waitlist action. fn remove_from_waitlist(&mut self, program_id: ProgramId, message_id: MessageId) -> Gas; /// Remove paused program action. @@ -136,10 +139,10 @@ pub trait TaskHandler { fn wake_message(&mut self, program_id: ProgramId, message_id: MessageId) -> Gas; /// Send delayed message to program action. - fn send_dispatch(&mut self, stashed_message_id: MessageId) -> Gas; + fn send_dispatch(&mut self, stashed_message_id: SD) -> Gas; /// Send delayed message to user action. - fn send_user_message(&mut self, stashed_message_id: MessageId, to_mailbox: bool) -> Gas; + fn send_user_message(&mut self, stashed_message_id: MessageId, to_mailbox: SUM) -> Gas; /// Remove gas reservation action. fn remove_gas_reservation( @@ -158,7 +161,5 @@ fn task_encoded_size() { const MAX_SIZE: usize = 256; // For example we will take `AccountId` = `ProgramId` from `gear_core`. - type AccountId = ProgramId; - - assert!(ScheduledTask::::max_encoded_len() <= MAX_SIZE); + assert!(VaraScheduledTask::::max_encoded_len() <= MAX_SIZE); } diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index 3e660b6e425..e7705c310a4 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -27,11 +27,20 @@ use gear_core::{ code::InstrumentedCode, ids::{ActorId, CodeId, ProgramId}, }; -use gprimitives::H256; +use gprimitives::{MessageId, H256}; use parity_scale_codec::{Decode, Encode}; -/// NOTE: key for actor id is (program_id, user_id). only used for mailbox. -pub type ScheduledTask = gear_core::tasks::ScheduledTask<(ProgramId, ActorId)>; +/// RemoveFromMailbox key; (msgs sources program (mailbox and queue provider), destination user id) +pub type Rfm = (ProgramId, ActorId); + +/// SendDispatch key; (msgs destinations program (stash and queue provider), message id) +pub type Sd = (ProgramId, MessageId); + +/// SendUserMessage key; (msgs sources program (mailbox and stash provider), destination user id) +pub type Sum = (ProgramId, ActorId); + +/// NOTE: generic keys differs to Vara and have been chosen dependent on storage organization of ethexe. +pub type ScheduledTask = gear_core::tasks::ScheduledTask; #[derive(Debug, Clone, Default, Encode, Decode, serde::Serialize)] pub struct BlockHeader { diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index 17a16d7f133..41f77fa8d50 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -170,12 +170,11 @@ impl JournalHandler for Handler<'_, S> { { if !dispatch.is_reply() { if let Ok(non_zero_delay) = delay.try_into() { - // TODO (breathx): specify which program sent it. FIX WITHIN THE PR. let expiry = self.in_block_transitions.schedule_task( non_zero_delay, ScheduledTask::SendUserMessage { message_id: dispatch.id(), - to_mailbox: true, + to_mailbox: (dispatch.source(), dispatch.destination()), }, ); @@ -242,10 +241,10 @@ impl JournalHandler for Handler<'_, S> { let dispatch = Dispatch::from_stored(self.storage, dispatch.into_stored()); if let Ok(non_zero_delay) = delay.try_into() { - // TODO (breathx): specify which program sent it. FIX WITHIN THE PR. - let expiry = self - .in_block_transitions - .schedule_task(non_zero_delay, ScheduledTask::SendDispatch(dispatch.id)); + let expiry = self.in_block_transitions.schedule_task( + non_zero_delay, + ScheduledTask::SendDispatch((destination, dispatch.id)), + ); self.update_state_with_storage(destination, |storage, state| { state.stash_hash = storage.modify_stash(state.stash_hash.clone(), |stash| { diff --git a/ethexe/runtime/common/src/schedule.rs b/ethexe/runtime/common/src/schedule.rs index 1928ef12048..9a499264e1b 100644 --- a/ethexe/runtime/common/src/schedule.rs +++ b/ethexe/runtime/common/src/schedule.rs @@ -3,6 +3,7 @@ use crate::{ InBlockTransitions, }; use anyhow::Result; +use ethexe_common::db::{Rfm, Sd, Sum}; use gear_core::{ids::ProgramId, tasks::TaskHandler}; use gprimitives::{ActorId, CodeId, MessageId, ReservationId, H256}; @@ -29,7 +30,7 @@ impl Handler<'_, S> { } } -impl<'a, S: Storage> TaskHandler<(ProgramId, ActorId)> for Handler<'a, S> { +impl<'a, S: Storage> TaskHandler for Handler<'a, S> { fn remove_from_mailbox( &mut self, (_program_id, _user_id): (ProgramId, ActorId), @@ -37,10 +38,14 @@ impl<'a, S: Storage> TaskHandler<(ProgramId, ActorId)> for Handler<'a, S> { ) -> u64 { unimplemented!("TODO (breathx)") } - fn send_dispatch(&mut self, _stashed_message_id: MessageId) -> u64 { + fn send_dispatch(&mut self, (_program_id, _message_id): (ProgramId, MessageId)) -> u64 { unimplemented!("TODO (breathx)") } - fn send_user_message(&mut self, _stashed_message_id: MessageId, _to_mailbox: bool) -> u64 { + fn send_user_message( + &mut self, + _stashed_message_id: MessageId, + (_program_id, _user_id): (ProgramId, ActorId), + ) -> u64 { unimplemented!("TODO (breathx)") } // TODO (breathx): consider deprecation of delayed wakes + non-concrete waits. diff --git a/gsdk/src/metadata/generated.rs b/gsdk/src/metadata/generated.rs index de5dd321109..4d3c64a6c0b 100644 --- a/gsdk/src/metadata/generated.rs +++ b/gsdk/src/metadata/generated.rs @@ -901,7 +901,7 @@ pub mod runtime_types { pub mod tasks { use super::runtime_types; #[derive(Debug, crate::gp::Decode, crate::gp::DecodeAsType, crate::gp::Encode)] - pub enum ScheduledTask<_0> { + pub enum ScheduledTask<_0, _1, _2> { #[codec(index = 0)] PauseProgram(runtime_types::gprimitives::ActorId), #[codec(index = 1)] @@ -921,11 +921,11 @@ pub mod runtime_types { runtime_types::gprimitives::MessageId, ), #[codec(index = 6)] - SendDispatch(runtime_types::gprimitives::MessageId), + SendDispatch(_1), #[codec(index = 7)] SendUserMessage { message_id: runtime_types::gprimitives::MessageId, - to_mailbox: ::core::primitive::bool, + to_mailbox: _2, }, #[codec(index = 8)] RemoveGasReservation( diff --git a/gtest/src/manager/journal.rs b/gtest/src/manager/journal.rs index f0a6f6e3281..07eefcda2ac 100644 --- a/gtest/src/manager/journal.rs +++ b/gtest/src/manager/journal.rs @@ -410,7 +410,11 @@ impl JournalHandler for ExtManager { program_id: ProgramId, expiration: u32, ) { - >::remove_gas_reservation(self, program_id, reservation_id); + >::remove_gas_reservation( + self, + program_id, + reservation_id, + ); let _ = self .task_pool diff --git a/gtest/src/manager/task.rs b/gtest/src/manager/task.rs index 4076a09acf5..910a422be7e 100644 --- a/gtest/src/manager/task.rs +++ b/gtest/src/manager/task.rs @@ -26,11 +26,11 @@ use gear_core::{ gas_metering::TaskWeights, ids::{CodeId, MessageId, ProgramId, ReservationId}, message::{DispatchKind, ReplyMessage}, - tasks::{ScheduledTask, TaskHandler}, + tasks::{ScheduledTask, TaskHandler, VaraScheduledTask}, }; use gear_core_errors::{ErrorReplyReason, SignalCode}; -pub(crate) fn get_maximum_task_gas(task: &ScheduledTask) -> Gas { +pub(crate) fn get_maximum_task_gas(task: &VaraScheduledTask) -> Gas { use ScheduledTask::*; let weights = TaskWeights::default(); match task { @@ -55,7 +55,7 @@ pub(crate) fn get_maximum_task_gas(task: &ScheduledTask) -> Gas { } } -impl TaskHandler for ExtManager { +impl TaskHandler for ExtManager { fn pause_program(&mut self, _program_id: ProgramId) -> GearCommonGas { log::debug!("Program rent logic is disabled."); diff --git a/gtest/src/state/task_pool.rs b/gtest/src/state/task_pool.rs index b06de027c8c..893a0d2ce4b 100644 --- a/gtest/src/state/task_pool.rs +++ b/gtest/src/state/task_pool.rs @@ -27,7 +27,7 @@ use gear_common::{ storage::KeyIterableByKeyMap, ProgramId, }; -use gear_core::tasks::ScheduledTask; +use gear_core::tasks::VaraScheduledTask; /// Task pool manager which operates under the hood over /// [`gear_common::auxiliary::task_pool::AuxiliaryTaskpool`]. @@ -42,7 +42,7 @@ impl TaskPoolManager { pub(crate) fn add( &self, block_number: BlockNumber, - task: ScheduledTask, + task: VaraScheduledTask, ) -> Result<(), TaskPoolErrorImpl> { as TaskPool>::add(block_number, task) } @@ -57,7 +57,7 @@ impl TaskPoolManager { pub(crate) fn contains( &self, block_number: &BlockNumber, - task: &ScheduledTask, + task: &VaraScheduledTask, ) -> bool { as TaskPool>::contains(block_number, task) } @@ -66,7 +66,7 @@ impl TaskPoolManager { pub(crate) fn delete( &self, block_number: BlockNumber, - task: ScheduledTask, + task: VaraScheduledTask, ) -> Result<(), TaskPoolErrorImpl> { as TaskPool>::delete(block_number, task) } @@ -92,22 +92,22 @@ impl TaskPoolCallbacks for TaskPoolCallbacksImpl { #[cfg(test)] mod tests { use super::TaskPoolManager; - use gear_core::{ids::ProgramId, tasks::ScheduledTask}; + use gear_core::{ids::ProgramId, tasks::VaraScheduledTask}; #[test] fn test_taskpool() { let manager = TaskPoolManager; let block_1_tasks = [ - ScheduledTask::::SendDispatch(42.into()), - ScheduledTask::::SendUserMessage { + VaraScheduledTask::::SendDispatch(42.into()), + VaraScheduledTask::::SendUserMessage { message_id: 422.into(), to_mailbox: true, }, ]; let block_2_tasks = [ - ScheduledTask::::RemoveGasReservation(922.into(), 1.into()), - ScheduledTask::::RemoveFromWaitlist(42.into(), 44.into()), + VaraScheduledTask::::RemoveGasReservation(922.into(), 1.into()), + VaraScheduledTask::::RemoveFromWaitlist(42.into(), 44.into()), ]; block_1_tasks @@ -148,7 +148,7 @@ mod tests { assert!(!manager.contains(&2, task)); } - let task = ScheduledTask::::RemoveFromMailbox(422.into(), 16.into()); + let task = VaraScheduledTask::::RemoveFromMailbox(422.into(), 16.into()); manager.add(3, task.clone()).unwrap(); manager.add(4, task.clone()).unwrap(); manager.delete(4, task.clone()).unwrap(); diff --git a/pallets/gear-program/src/lib.rs b/pallets/gear-program/src/lib.rs index aa64b059d25..23a7cfdc6f5 100644 --- a/pallets/gear-program/src/lib.rs +++ b/pallets/gear-program/src/lib.rs @@ -161,7 +161,7 @@ pub mod pallet { memory::PageBuf, pages::{numerated::tree::IntervalsTree, GearPage, WasmPage}, program::{MemoryInfix, Program}, - tasks::ScheduledTask, + tasks::VaraScheduledTask, }; use sp_runtime::DispatchError; @@ -173,7 +173,7 @@ pub mod pallet { /// Scheduler. type Scheduler: Scheduler< BlockNumber = BlockNumberFor, - Task = ScheduledTask, + Task = VaraScheduledTask, >; /// Custom block number tracker. diff --git a/pallets/gear-scheduler/src/lib.rs b/pallets/gear-scheduler/src/lib.rs index 2c0c938e663..86575f15519 100644 --- a/pallets/gear-scheduler/src/lib.rs +++ b/pallets/gear-scheduler/src/lib.rs @@ -64,7 +64,7 @@ pub mod pallet { traits::{Get, StorageVersion}, }; use frame_system::pallet_prelude::*; - use gear_core::tasks::ScheduledTask; + use gear_core::tasks::VaraScheduledTask; use sp_runtime::DispatchError; use sp_std::{convert::TryInto, marker::PhantomData}; @@ -140,7 +140,7 @@ pub mod pallet { type AccountId = ::AccountId; /// Task type of the scheduler. - type Task = ScheduledTask>; + type Task = VaraScheduledTask>; // Below goes storages and their gear's wrapper implementations. // diff --git a/pallets/gear/src/lib.rs b/pallets/gear/src/lib.rs index c573ce5623e..8aaaebe0461 100644 --- a/pallets/gear/src/lib.rs +++ b/pallets/gear/src/lib.rs @@ -89,7 +89,7 @@ use gear_core::{ ids::{prelude::*, CodeId, MessageId, ProgramId, ReservationId}, message::*, percent::Percent, - tasks::ScheduledTask, + tasks::VaraScheduledTask, }; use gear_lazy_pages_common::LazyPagesInterface; use gear_lazy_pages_interface::LazyPagesRuntimeInterface; @@ -253,7 +253,7 @@ pub mod pallet { type Scheduler: Scheduler< BlockNumber = BlockNumberFor, Cost = u64, - Task = ScheduledTask, + Task = VaraScheduledTask, >; /// Message Queue processing routing provider. diff --git a/pallets/gear/src/manager/journal.rs b/pallets/gear/src/manager/journal.rs index 4b43523921b..79a3bdeddad 100644 --- a/pallets/gear/src/manager/journal.rs +++ b/pallets/gear/src/manager/journal.rs @@ -602,7 +602,7 @@ where program_id: ProgramId, expiration: u32, ) { - >::remove_gas_reservation( + >::remove_gas_reservation( self, program_id, reservation_id, diff --git a/pallets/gear/src/manager/task.rs b/pallets/gear/src/manager/task.rs index e494c04bff3..127ec18fcef 100644 --- a/pallets/gear/src/manager/task.rs +++ b/pallets/gear/src/manager/task.rs @@ -33,11 +33,11 @@ use core::cmp; use gear_core::{ ids::{CodeId, MessageId, ProgramId, ReservationId}, message::{DispatchKind, Payload, ReplyMessage}, - tasks::{ScheduledTask, TaskHandler}, + tasks::{ScheduledTask, TaskHandler, VaraScheduledTask}, }; use gear_core_errors::{ErrorReplyReason, SignalCode}; -pub fn get_maximum_task_gas(task: &ScheduledTask) -> Gas { +pub fn get_maximum_task_gas(task: &VaraScheduledTask) -> Gas { use ScheduledTask::*; match task { @@ -67,7 +67,7 @@ pub fn get_maximum_task_gas(task: &ScheduledTask) -> Ga } } -impl TaskHandler for ExtManager +impl TaskHandler for ExtManager where T::AccountId: Origin, { diff --git a/pallets/gear/src/mock.rs b/pallets/gear/src/mock.rs index c229f625596..603ff539ee6 100644 --- a/pallets/gear/src/mock.rs +++ b/pallets/gear/src/mock.rs @@ -228,7 +228,7 @@ pub fn get_weight_of_adding_task() -> Weight { TaskPoolOf::::add( 100, - ScheduledTask::RemoveFromMailbox(USER_2, Default::default()), + VaraScheduledTask::RemoveFromMailbox(USER_2, Default::default()), ) .unwrap_or_else(|e| unreachable!("Scheduling logic invalidated! {:?}", e)); From 324dff1405b349622cb6cca99a9adb9afcee0785 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 16 Oct 2024 18:02:47 +0400 Subject: [PATCH 6/8] implement delayed tasks handling --- ethexe/runtime/common/src/journal.rs | 16 ++- ethexe/runtime/common/src/schedule.rs | 140 ++++++++++++++++++++++++-- ethexe/runtime/common/src/state.rs | 26 +++++ 3 files changed, 161 insertions(+), 21 deletions(-) diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index 41f77fa8d50..166a0dc4722 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -1,7 +1,7 @@ use crate::{ state::{ self, ActiveProgram, ComplexStorage, Dispatch, HashAndLen, MaybeHash, Program, - ProgramState, Storage, + ProgramState, Storage, MAILBOX_VALIDITY, }, InBlockTransitions, }; @@ -189,9 +189,11 @@ impl JournalHandler for Handler<'_, S> { Ok(()) }); + + return; } else { let expiry = self.in_block_transitions.schedule_task( - state::MAILBOX_VALIDITY.try_into().expect("infallible"), + MAILBOX_VALIDITY.try_into().expect("infallible"), ScheduledTask::RemoveFromMailbox( (dispatch.source(), dispatch.destination()), dispatch.id(), @@ -210,25 +212,19 @@ impl JournalHandler for Handler<'_, S> { Ok(()) }); } - - return; - } - - if delay != 0 { - unreachable!("delayed sending of replies is forbidden"); } let source = dispatch.source(); let message = dispatch.into_parts().1; - let source_state_hash = self + let state_hash = self .in_block_transitions .state_of(&source) .expect("must exist"); self.in_block_transitions.modify_state_with( source, - source_state_hash, + state_hash, 0, vec![], vec![OutgoingMessage::from(message)], diff --git a/ethexe/runtime/common/src/schedule.rs b/ethexe/runtime/common/src/schedule.rs index 9a499264e1b..6446b845303 100644 --- a/ethexe/runtime/common/src/schedule.rs +++ b/ethexe/runtime/common/src/schedule.rs @@ -1,10 +1,15 @@ use crate::{ - state::{ComplexStorage, ProgramState, Storage}, + state::{ComplexStorage, Dispatch, MaybeHash, ProgramState, Storage, MAILBOX_VALIDITY}, InBlockTransitions, }; +use alloc::vec; use anyhow::Result; -use ethexe_common::db::{Rfm, Sd, Sum}; -use gear_core::{ids::ProgramId, tasks::TaskHandler}; +use ethexe_common::{ + db::{Rfm, ScheduledTask, Sd, Sum}, + router::{OutgoingMessage, ValueClaim}, +}; +use gear_core::{ids::ProgramId, message::ReplyMessage, tasks::TaskHandler}; +use gear_core_errors::SuccessReplyReason; use gprimitives::{ActorId, CodeId, MessageId, ReservationId, H256}; pub struct Handler<'a, S: Storage> { @@ -33,21 +38,134 @@ impl Handler<'_, S> { impl<'a, S: Storage> TaskHandler for Handler<'a, S> { fn remove_from_mailbox( &mut self, - (_program_id, _user_id): (ProgramId, ActorId), - _message_id: MessageId, + (program_id, user_id): (ProgramId, ActorId), + message_id: MessageId, ) -> u64 { - unimplemented!("TODO (breathx)") + let mut value_claim = None; + + let state_hash = self.update_state_with_storage(program_id, |storage, state| { + // TODO (breathx): FIX WITHIN THE PR, this removal should be infallible, isn't it? + let Some(((claimed_value, expiry), new_mailbox_hash)) = storage + .modify_mailbox_if_changed(state.mailbox_hash.clone(), |mailbox| { + let local_mailbox = mailbox.get_mut(&user_id)?; + let claimed_value = local_mailbox.remove(&message_id)?; + + if local_mailbox.is_empty() { + mailbox.remove(&user_id); + } + + Some(claimed_value) + })? + else { + return Ok(()); + }; + + state.mailbox_hash = new_mailbox_hash; + + value_claim = Some(ValueClaim { + message_id, + destination: user_id, + value: claimed_value, + }); + + let reply = Dispatch::reply( + message_id, + user_id, + MaybeHash::Empty, + 0, + SuccessReplyReason::Auto, + ); + + state.queue_hash = + storage.modify_queue(state.queue_hash.clone(), |queue| queue.push_back(reply))?; + + Ok(()) + }); + + if let Some(value_claim) = value_claim { + self.in_block_transitions + .modify_state_with(program_id, state_hash, 0, vec![value_claim], vec![]) + .expect("can't be None"); + } + + 0 } - fn send_dispatch(&mut self, (_program_id, _message_id): (ProgramId, MessageId)) -> u64 { - unimplemented!("TODO (breathx)") + + fn send_dispatch(&mut self, (program_id, message_id): (ProgramId, MessageId)) -> u64 { + self.update_state_with_storage(program_id, |storage, state| { + // TODO (breathx): FIX WITHIN THE PR, this removal should be infallible, isn't it? + let Some(((dispatch, _expiry), new_stash_hash)) = storage + .modify_stash_if_changed(state.stash_hash.clone(), |stash| { + stash.remove(&message_id) + })? + else { + return Ok(()); + }; + + state.stash_hash = new_stash_hash; + state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| { + queue.push_back(dispatch); + })?; + + Ok(()) + }); + + 0 } + fn send_user_message( &mut self, - _stashed_message_id: MessageId, - (_program_id, _user_id): (ProgramId, ActorId), + stashed_message_id: MessageId, + (program_id, user_id): (ProgramId, ActorId), ) -> u64 { - unimplemented!("TODO (breathx)") + let mut dispatch = None; + + self.update_state_with_storage(program_id, |storage, state| { + // TODO (breathx): FIX WITHIN THE PR, this removal should be infallible, isn't it? + let Some(((stashed_dispatch, _expiry), new_stash_hash)) = storage + .modify_stash_if_changed(state.stash_hash.clone(), |stash| { + stash.remove(&stashed_message_id) + })? + else { + return Ok(()); + }; + + state.stash_hash = new_stash_hash; + dispatch = Some(stashed_dispatch); + + Ok(()) + }); + + if let Some(dispatch) = dispatch { + let expiry = self.in_block_transitions.schedule_task( + MAILBOX_VALIDITY.try_into().expect("infallible"), + ScheduledTask::RemoveFromMailbox((program_id, user_id), stashed_message_id), + ); + + let state_hash = self.update_state_with_storage(program_id, |storage, state| { + state.mailbox_hash = + storage.modify_mailbox(state.mailbox_hash.clone(), |mailbox| { + let r = mailbox + .entry(user_id) + .or_default() + .insert(dispatch.id, (dispatch.value, expiry)); + + debug_assert!(r.is_none()); + })?; + + Ok(()) + }); + + let outgoing_message = dispatch.into_outgoing(self.storage, user_id); + + self.in_block_transitions + .modify_state_with(program_id, state_hash, 0, vec![], vec![outgoing_message]) + .expect("must be") + } + + 0 } + // TODO (breathx): consider deprecation of delayed wakes + non-concrete waits. fn wake_message(&mut self, program_id: ProgramId, message_id: MessageId) -> u64 { log::trace!("Running scheduled task wake message {message_id} to {program_id}"); diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 1b44fb217d8..1c92c9481c8 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -24,6 +24,7 @@ use alloc::{ }; use anyhow::{anyhow, Result}; use core::num::NonZero; +use ethexe_common::router::OutgoingMessage; use gear_core::{ code::InstrumentedCode, ids::{prelude::MessageIdExt as _, ProgramId}, @@ -246,6 +247,31 @@ impl Dispatch { context, } } + + pub fn into_outgoing(self, storage: &S, destination: ActorId) -> OutgoingMessage { + let Self { + id, + payload_hash, + value, + details, + .. + } = self; + + let payload = payload_hash.with_hash_or_default(|payload_hash| { + storage + .read_payload(payload_hash) + .expect("must be found") + .into_vec() + }); + + OutgoingMessage { + id, + destination, + payload, + value, + reply_details: details.and_then(|d| d.to_reply_details()), + } + } } pub type ValueWithExpiry = (T, u32); From 1b11d5bc6015637597fa483fce6a9ec9f22ef92f Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 16 Oct 2024 18:18:59 +0400 Subject: [PATCH 7/8] fail on inability to find message on delayed task --- ethexe/runtime/common/src/schedule.rs | 30 ++++++++------------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/ethexe/runtime/common/src/schedule.rs b/ethexe/runtime/common/src/schedule.rs index 6446b845303..07b91de11ad 100644 --- a/ethexe/runtime/common/src/schedule.rs +++ b/ethexe/runtime/common/src/schedule.rs @@ -3,7 +3,7 @@ use crate::{ InBlockTransitions, }; use alloc::vec; -use anyhow::Result; +use anyhow::{anyhow, Result}; use ethexe_common::{ db::{Rfm, ScheduledTask, Sd, Sum}, router::{OutgoingMessage, ValueClaim}, @@ -44,8 +44,7 @@ impl<'a, S: Storage> TaskHandler for Handler<'a, S> { let mut value_claim = None; let state_hash = self.update_state_with_storage(program_id, |storage, state| { - // TODO (breathx): FIX WITHIN THE PR, this removal should be infallible, isn't it? - let Some(((claimed_value, expiry), new_mailbox_hash)) = storage + let ((claimed_value, expiry), new_mailbox_hash) = storage .modify_mailbox_if_changed(state.mailbox_hash.clone(), |mailbox| { let local_mailbox = mailbox.get_mut(&user_id)?; let claimed_value = local_mailbox.remove(&message_id)?; @@ -56,9 +55,7 @@ impl<'a, S: Storage> TaskHandler for Handler<'a, S> { Some(claimed_value) })? - else { - return Ok(()); - }; + .ok_or_else(|| anyhow!("failed to find message in mailbox"))?; state.mailbox_hash = new_mailbox_hash; @@ -93,14 +90,11 @@ impl<'a, S: Storage> TaskHandler for Handler<'a, S> { fn send_dispatch(&mut self, (program_id, message_id): (ProgramId, MessageId)) -> u64 { self.update_state_with_storage(program_id, |storage, state| { - // TODO (breathx): FIX WITHIN THE PR, this removal should be infallible, isn't it? - let Some(((dispatch, _expiry), new_stash_hash)) = storage + let ((dispatch, _expiry), new_stash_hash) = storage .modify_stash_if_changed(state.stash_hash.clone(), |stash| { stash.remove(&message_id) })? - else { - return Ok(()); - }; + .ok_or_else(|| anyhow!("failed to find message in stash"))?; state.stash_hash = new_stash_hash; state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| { @@ -121,14 +115,11 @@ impl<'a, S: Storage> TaskHandler for Handler<'a, S> { let mut dispatch = None; self.update_state_with_storage(program_id, |storage, state| { - // TODO (breathx): FIX WITHIN THE PR, this removal should be infallible, isn't it? - let Some(((stashed_dispatch, _expiry), new_stash_hash)) = storage + let ((stashed_dispatch, _expiry), new_stash_hash) = storage .modify_stash_if_changed(state.stash_hash.clone(), |stash| { stash.remove(&stashed_message_id) })? - else { - return Ok(()); - }; + .ok_or_else(|| anyhow!("failed to find message in stash"))?; state.stash_hash = new_stash_hash; dispatch = Some(stashed_dispatch); @@ -170,15 +161,12 @@ impl<'a, S: Storage> TaskHandler for Handler<'a, S> { fn wake_message(&mut self, program_id: ProgramId, message_id: MessageId) -> u64 { log::trace!("Running scheduled task wake message {message_id} to {program_id}"); - // TODO (breathx): don't update state if not changed? self.update_state_with_storage(program_id, |storage, state| { - let Some(((dispatch, _expiry), new_waitlist_hash)) = storage + let ((dispatch, _expiry), new_waitlist_hash) = storage .modify_waitlist_if_changed(state.waitlist_hash.clone(), |waitlist| { waitlist.remove(&message_id) })? - else { - return Ok(()); - }; + .ok_or_else(|| anyhow!("failed to find message in waitlist"))?; state.waitlist_hash = new_waitlist_hash; state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| { From 6bb05485954680fe793e45384918dd6a8a09e8ba Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 16 Oct 2024 18:44:29 +0400 Subject: [PATCH 8/8] add notice about user/program delayed sending --- ethexe/common/src/db.rs | 4 ++-- ethexe/runtime/common/src/journal.rs | 13 ++++++++----- ethexe/runtime/common/src/schedule.rs | 21 +++++++++++---------- ethexe/runtime/common/src/state.rs | 2 +- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index e7705c310a4..3d1c2c2e95e 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -36,8 +36,8 @@ pub type Rfm = (ProgramId, ActorId); /// SendDispatch key; (msgs destinations program (stash and queue provider), message id) pub type Sd = (ProgramId, MessageId); -/// SendUserMessage key; (msgs sources program (mailbox and stash provider), destination user id) -pub type Sum = (ProgramId, ActorId); +/// SendUserMessage key; (msgs sources program (mailbox and stash provider)) +pub type Sum = ProgramId; /// NOTE: generic keys differs to Vara and have been chosen dependent on storage organization of ethexe. pub type ScheduledTask = gear_core::tasks::ScheduledTask; diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index 166a0dc4722..67ace2fdbe3 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -168,13 +168,15 @@ impl JournalHandler for Handler<'_, S> { .state_of(&dispatch.destination()) .is_none() { + let user_id = dispatch.destination(); + if !dispatch.is_reply() { if let Ok(non_zero_delay) = delay.try_into() { let expiry = self.in_block_transitions.schedule_task( non_zero_delay, ScheduledTask::SendUserMessage { message_id: dispatch.id(), - to_mailbox: (dispatch.source(), dispatch.destination()), + to_mailbox: dispatch.source(), }, ); @@ -183,7 +185,8 @@ impl JournalHandler for Handler<'_, S> { state.stash_hash = storage.modify_stash(state.stash_hash.clone(), |stash| { - let r = stash.insert(dispatch.id, (dispatch, expiry)); + let r = + stash.insert(dispatch.id, ((dispatch, Some(user_id)), expiry)); debug_assert!(r.is_none()); })?; @@ -195,7 +198,7 @@ impl JournalHandler for Handler<'_, S> { let expiry = self.in_block_transitions.schedule_task( MAILBOX_VALIDITY.try_into().expect("infallible"), ScheduledTask::RemoveFromMailbox( - (dispatch.source(), dispatch.destination()), + (dispatch.source(), user_id), dispatch.id(), ), ); @@ -204,7 +207,7 @@ impl JournalHandler for Handler<'_, S> { state.mailbox_hash = storage.modify_mailbox(state.mailbox_hash.clone(), |mailbox| { mailbox - .entry(dispatch.destination()) + .entry(user_id) .or_default() .insert(dispatch.id(), (dispatch.value(), expiry)); })?; @@ -244,7 +247,7 @@ impl JournalHandler for Handler<'_, S> { self.update_state_with_storage(destination, |storage, state| { state.stash_hash = storage.modify_stash(state.stash_hash.clone(), |stash| { - let r = stash.insert(dispatch.id, (dispatch, expiry)); + let r = stash.insert(dispatch.id, ((dispatch, None), expiry)); debug_assert!(r.is_none()); })?; diff --git a/ethexe/runtime/common/src/schedule.rs b/ethexe/runtime/common/src/schedule.rs index 07b91de11ad..76867dd762a 100644 --- a/ethexe/runtime/common/src/schedule.rs +++ b/ethexe/runtime/common/src/schedule.rs @@ -90,12 +90,14 @@ impl<'a, S: Storage> TaskHandler for Handler<'a, S> { fn send_dispatch(&mut self, (program_id, message_id): (ProgramId, MessageId)) -> u64 { self.update_state_with_storage(program_id, |storage, state| { - let ((dispatch, _expiry), new_stash_hash) = storage + let (((dispatch, user_id), _expiry), new_stash_hash) = storage .modify_stash_if_changed(state.stash_hash.clone(), |stash| { stash.remove(&message_id) })? .ok_or_else(|| anyhow!("failed to find message in stash"))?; + debug_assert!(user_id.is_none()); + state.stash_hash = new_stash_hash; state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| { queue.push_back(dispatch); @@ -107,27 +109,26 @@ impl<'a, S: Storage> TaskHandler for Handler<'a, S> { 0 } - fn send_user_message( - &mut self, - stashed_message_id: MessageId, - (program_id, user_id): (ProgramId, ActorId), - ) -> u64 { - let mut dispatch = None; + fn send_user_message(&mut self, stashed_message_id: MessageId, program_id: ProgramId) -> u64 { + let mut dispatch_and_user = None; self.update_state_with_storage(program_id, |storage, state| { - let ((stashed_dispatch, _expiry), new_stash_hash) = storage + let (((dispatch, user_id), _expiry), new_stash_hash) = storage .modify_stash_if_changed(state.stash_hash.clone(), |stash| { stash.remove(&stashed_message_id) })? .ok_or_else(|| anyhow!("failed to find message in stash"))?; state.stash_hash = new_stash_hash; - dispatch = Some(stashed_dispatch); + dispatch_and_user = Some(( + dispatch, + user_id.expect("the message intended to user contains no id"), + )); Ok(()) }); - if let Some(dispatch) = dispatch { + if let Some((dispatch, user_id)) = dispatch_and_user { let expiry = self.in_block_transitions.schedule_task( MAILBOX_VALIDITY.try_into().expect("infallible"), ScheduledTask::RemoveFromMailbox((program_id, user_id), stashed_message_id), diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 1c92c9481c8..09de4c925df 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -280,7 +280,7 @@ pub type MessageQueue = VecDeque; pub type Waitlist = BTreeMap>; -pub type DispatchStash = BTreeMap>; +pub type DispatchStash = BTreeMap)>>; // TODO (breathx): consider here LocalMailbox for each user. pub type Mailbox = BTreeMap>>;