Skip to content

Commit

Permalink
add notice about user/program delayed sending
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx committed Oct 16, 2024
1 parent 1b11d5b commit 6bb0548
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 18 deletions.
4 changes: 2 additions & 2 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub type Rfm = (ProgramId, ActorId);
/// SendDispatch key; (msgs destinations program (stash and queue provider), message id)
pub type Sd = (ProgramId, MessageId);

/// SendUserMessage key; (msgs sources program (mailbox and stash provider), destination user id)
pub type Sum = (ProgramId, ActorId);
/// SendUserMessage key; (msgs sources program (mailbox and stash provider))
pub type Sum = ProgramId;

/// NOTE: generic keys differs to Vara and have been chosen dependent on storage organization of ethexe.
pub type ScheduledTask = gear_core::tasks::ScheduledTask<Rfm, Sd, Sum>;
Expand Down
13 changes: 8 additions & 5 deletions ethexe/runtime/common/src/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,15 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
.state_of(&dispatch.destination())
.is_none()
{
let user_id = dispatch.destination();

if !dispatch.is_reply() {
if let Ok(non_zero_delay) = delay.try_into() {
let expiry = self.in_block_transitions.schedule_task(
non_zero_delay,
ScheduledTask::SendUserMessage {
message_id: dispatch.id(),
to_mailbox: (dispatch.source(), dispatch.destination()),
to_mailbox: dispatch.source(),
},
);

Expand All @@ -183,7 +185,8 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {

state.stash_hash =
storage.modify_stash(state.stash_hash.clone(), |stash| {
let r = stash.insert(dispatch.id, (dispatch, expiry));
let r =
stash.insert(dispatch.id, ((dispatch, Some(user_id)), expiry));
debug_assert!(r.is_none());
})?;

Expand All @@ -195,7 +198,7 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
let expiry = self.in_block_transitions.schedule_task(
MAILBOX_VALIDITY.try_into().expect("infallible"),
ScheduledTask::RemoveFromMailbox(
(dispatch.source(), dispatch.destination()),
(dispatch.source(), user_id),
dispatch.id(),
),
);
Expand All @@ -204,7 +207,7 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
state.mailbox_hash =
storage.modify_mailbox(state.mailbox_hash.clone(), |mailbox| {
mailbox
.entry(dispatch.destination())
.entry(user_id)
.or_default()
.insert(dispatch.id(), (dispatch.value(), expiry));
})?;
Expand Down Expand Up @@ -244,7 +247,7 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {

self.update_state_with_storage(destination, |storage, state| {
state.stash_hash = storage.modify_stash(state.stash_hash.clone(), |stash| {
let r = stash.insert(dispatch.id, (dispatch, expiry));
let r = stash.insert(dispatch.id, ((dispatch, None), expiry));
debug_assert!(r.is_none());
})?;

Expand Down
21 changes: 11 additions & 10 deletions ethexe/runtime/common/src/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ impl<'a, S: Storage> TaskHandler<Rfm, Sd, Sum> for Handler<'a, S> {

fn send_dispatch(&mut self, (program_id, message_id): (ProgramId, MessageId)) -> u64 {
self.update_state_with_storage(program_id, |storage, state| {
let ((dispatch, _expiry), new_stash_hash) = storage
let (((dispatch, user_id), _expiry), new_stash_hash) = storage
.modify_stash_if_changed(state.stash_hash.clone(), |stash| {
stash.remove(&message_id)
})?
.ok_or_else(|| anyhow!("failed to find message in stash"))?;

debug_assert!(user_id.is_none());

state.stash_hash = new_stash_hash;
state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| {
queue.push_back(dispatch);
Expand All @@ -107,27 +109,26 @@ impl<'a, S: Storage> TaskHandler<Rfm, Sd, Sum> for Handler<'a, S> {
0
}

fn send_user_message(
&mut self,
stashed_message_id: MessageId,
(program_id, user_id): (ProgramId, ActorId),
) -> u64 {
let mut dispatch = None;
fn send_user_message(&mut self, stashed_message_id: MessageId, program_id: ProgramId) -> u64 {
let mut dispatch_and_user = None;

self.update_state_with_storage(program_id, |storage, state| {
let ((stashed_dispatch, _expiry), new_stash_hash) = storage
let (((dispatch, user_id), _expiry), new_stash_hash) = storage
.modify_stash_if_changed(state.stash_hash.clone(), |stash| {
stash.remove(&stashed_message_id)
})?
.ok_or_else(|| anyhow!("failed to find message in stash"))?;

state.stash_hash = new_stash_hash;
dispatch = Some(stashed_dispatch);
dispatch_and_user = Some((
dispatch,
user_id.expect("the message intended to user contains no id"),
));

Ok(())
});

if let Some(dispatch) = dispatch {
if let Some((dispatch, user_id)) = dispatch_and_user {
let expiry = self.in_block_transitions.schedule_task(
MAILBOX_VALIDITY.try_into().expect("infallible"),
ScheduledTask::RemoveFromMailbox((program_id, user_id), stashed_message_id),
Expand Down
2 changes: 1 addition & 1 deletion ethexe/runtime/common/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ pub type MessageQueue = VecDeque<Dispatch>;

pub type Waitlist = BTreeMap<MessageId, ValueWithExpiry<Dispatch>>;

pub type DispatchStash = BTreeMap<MessageId, ValueWithExpiry<Dispatch>>;
pub type DispatchStash = BTreeMap<MessageId, ValueWithExpiry<(Dispatch, Option<ActorId>)>>;

// TODO (breathx): consider here LocalMailbox for each user.
pub type Mailbox = BTreeMap<ActorId, BTreeMap<MessageId, ValueWithExpiry<Value>>>;
Expand Down

0 comments on commit 6bb0548

Please sign in to comment.