diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index e7705c310a4..3d1c2c2e95e 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -36,8 +36,8 @@ pub type Rfm = (ProgramId, ActorId); /// SendDispatch key; (msgs destinations program (stash and queue provider), message id) pub type Sd = (ProgramId, MessageId); -/// SendUserMessage key; (msgs sources program (mailbox and stash provider), destination user id) -pub type Sum = (ProgramId, ActorId); +/// SendUserMessage key; (msgs sources program (mailbox and stash provider)) +pub type Sum = ProgramId; /// NOTE: generic keys differs to Vara and have been chosen dependent on storage organization of ethexe. pub type ScheduledTask = gear_core::tasks::ScheduledTask; diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index 166a0dc4722..67ace2fdbe3 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -168,13 +168,15 @@ impl JournalHandler for Handler<'_, S> { .state_of(&dispatch.destination()) .is_none() { + let user_id = dispatch.destination(); + if !dispatch.is_reply() { if let Ok(non_zero_delay) = delay.try_into() { let expiry = self.in_block_transitions.schedule_task( non_zero_delay, ScheduledTask::SendUserMessage { message_id: dispatch.id(), - to_mailbox: (dispatch.source(), dispatch.destination()), + to_mailbox: dispatch.source(), }, ); @@ -183,7 +185,8 @@ impl JournalHandler for Handler<'_, S> { state.stash_hash = storage.modify_stash(state.stash_hash.clone(), |stash| { - let r = stash.insert(dispatch.id, (dispatch, expiry)); + let r = + stash.insert(dispatch.id, ((dispatch, Some(user_id)), expiry)); debug_assert!(r.is_none()); })?; @@ -195,7 +198,7 @@ impl JournalHandler for Handler<'_, S> { let expiry = self.in_block_transitions.schedule_task( MAILBOX_VALIDITY.try_into().expect("infallible"), ScheduledTask::RemoveFromMailbox( - (dispatch.source(), dispatch.destination()), + (dispatch.source(), user_id), dispatch.id(), ), ); @@ -204,7 +207,7 @@ impl JournalHandler for Handler<'_, S> { state.mailbox_hash = storage.modify_mailbox(state.mailbox_hash.clone(), |mailbox| { mailbox - .entry(dispatch.destination()) + .entry(user_id) .or_default() .insert(dispatch.id(), (dispatch.value(), expiry)); })?; @@ -244,7 +247,7 @@ impl JournalHandler for Handler<'_, S> { self.update_state_with_storage(destination, |storage, state| { state.stash_hash = storage.modify_stash(state.stash_hash.clone(), |stash| { - let r = stash.insert(dispatch.id, (dispatch, expiry)); + let r = stash.insert(dispatch.id, ((dispatch, None), expiry)); debug_assert!(r.is_none()); })?; diff --git a/ethexe/runtime/common/src/schedule.rs b/ethexe/runtime/common/src/schedule.rs index 07b91de11ad..76867dd762a 100644 --- a/ethexe/runtime/common/src/schedule.rs +++ b/ethexe/runtime/common/src/schedule.rs @@ -90,12 +90,14 @@ impl<'a, S: Storage> TaskHandler for Handler<'a, S> { fn send_dispatch(&mut self, (program_id, message_id): (ProgramId, MessageId)) -> u64 { self.update_state_with_storage(program_id, |storage, state| { - let ((dispatch, _expiry), new_stash_hash) = storage + let (((dispatch, user_id), _expiry), new_stash_hash) = storage .modify_stash_if_changed(state.stash_hash.clone(), |stash| { stash.remove(&message_id) })? .ok_or_else(|| anyhow!("failed to find message in stash"))?; + debug_assert!(user_id.is_none()); + state.stash_hash = new_stash_hash; state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| { queue.push_back(dispatch); @@ -107,27 +109,26 @@ impl<'a, S: Storage> TaskHandler for Handler<'a, S> { 0 } - fn send_user_message( - &mut self, - stashed_message_id: MessageId, - (program_id, user_id): (ProgramId, ActorId), - ) -> u64 { - let mut dispatch = None; + fn send_user_message(&mut self, stashed_message_id: MessageId, program_id: ProgramId) -> u64 { + let mut dispatch_and_user = None; self.update_state_with_storage(program_id, |storage, state| { - let ((stashed_dispatch, _expiry), new_stash_hash) = storage + let (((dispatch, user_id), _expiry), new_stash_hash) = storage .modify_stash_if_changed(state.stash_hash.clone(), |stash| { stash.remove(&stashed_message_id) })? .ok_or_else(|| anyhow!("failed to find message in stash"))?; state.stash_hash = new_stash_hash; - dispatch = Some(stashed_dispatch); + dispatch_and_user = Some(( + dispatch, + user_id.expect("the message intended to user contains no id"), + )); Ok(()) }); - if let Some(dispatch) = dispatch { + if let Some((dispatch, user_id)) = dispatch_and_user { let expiry = self.in_block_transitions.schedule_task( MAILBOX_VALIDITY.try_into().expect("infallible"), ScheduledTask::RemoveFromMailbox((program_id, user_id), stashed_message_id), diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 1c92c9481c8..09de4c925df 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -280,7 +280,7 @@ pub type MessageQueue = VecDeque; pub type Waitlist = BTreeMap>; -pub type DispatchStash = BTreeMap>; +pub type DispatchStash = BTreeMap)>>; // TODO (breathx): consider here LocalMailbox for each user. pub type Mailbox = BTreeMap>>;