Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe): delayed messaging #4290

Merged
merged 9 commits into from
Oct 17, 2024
8 changes: 4 additions & 4 deletions common/src/auxiliary/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,28 @@

use super::{AuxiliaryDoubleStorageWrap, BlockNumber, DoubleBTreeMap};
use crate::scheduler::TaskPoolImpl;
use gear_core::{ids::ProgramId, tasks::ScheduledTask};
use gear_core::{ids::ProgramId, tasks::VaraScheduledTask};
use std::cell::RefCell;

/// Task pool implementation that can be used in a native, non-wasm runtimes.
pub type AuxiliaryTaskpool<TaskPoolCallbacks> = TaskPoolImpl<
TaskPoolStorageWrap,
ScheduledTask<ProgramId>,
VaraScheduledTask<ProgramId>,
TaskPoolErrorImpl,
TaskPoolErrorImpl,
TaskPoolCallbacks,
>;

std::thread_local! {
pub(crate) static TASKPOOL_STORAGE: RefCell<DoubleBTreeMap<BlockNumber, ScheduledTask<ProgramId>, ()>> = const { RefCell::new(DoubleBTreeMap::new()) };
pub(crate) static TASKPOOL_STORAGE: RefCell<DoubleBTreeMap<BlockNumber, VaraScheduledTask<ProgramId>, ()>> = const { RefCell::new(DoubleBTreeMap::new()) };
}

/// `TaskPool` double storage map manager
pub struct TaskPoolStorageWrap;

impl AuxiliaryDoubleStorageWrap for TaskPoolStorageWrap {
type Key1 = BlockNumber;
type Key2 = ScheduledTask<ProgramId>;
type Key2 = VaraScheduledTask<ProgramId>;
type Value = ();

fn with_storage<F, R>(f: F) -> R
Expand Down
29 changes: 15 additions & 14 deletions core/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ use gsys::Gas;
use parity_scale_codec::{Decode, Encode, MaxEncodedLen};
use scale_info::TypeInfo;

/// Alias for ScheduledTask used in vara-runtime, generic across AccountId used.
pub type VaraScheduledTask<AccountId> = ScheduledTask<AccountId, MessageId, bool>;

/// Scheduled task sense and required data for processing action.
///
/// CAUTION: NEVER ALLOW `ScheduledTask<AccountId>` BE A BIG DATA.
/// CAUTION: NEVER ALLOW `ScheduledTask` BE A BIG DATA.
/// To avoid redundant migrations only append new variant(s) to the enum
/// with an explicit corresponding scale codec index.
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Encode, Decode, TypeInfo, MaxEncodedLen)]
pub enum ScheduledTask<AccountId> {
pub enum ScheduledTask<RFM, SD, SUM> {
// Rent charging section.
// -----
/// Pause program as out of rent one.
Expand All @@ -42,7 +45,7 @@ pub enum ScheduledTask<AccountId> {

/// Remove message from mailbox as out of rent one.
#[codec(index = 2)]
RemoveFromMailbox(AccountId, MessageId),
RemoveFromMailbox(RFM, MessageId),

/// Remove message from waitlist as out of rent one.
#[codec(index = 3)]
Expand All @@ -62,7 +65,7 @@ pub enum ScheduledTask<AccountId> {
///
/// The message itself stored in DispatchStash.
#[codec(index = 6)]
SendDispatch(MessageId),
SendDispatch(SD),

/// Delayed message to user sending.
///
Expand All @@ -72,7 +75,7 @@ pub enum ScheduledTask<AccountId> {
/// What message to send.
message_id: MessageId,
/// Should it be inserted into users mailbox.
to_mailbox: bool,
to_mailbox: SUM,
},

/// Remove gas reservation.
Expand All @@ -85,9 +88,9 @@ pub enum ScheduledTask<AccountId> {
RemoveResumeSession(u32),
}

impl<AccountId> ScheduledTask<AccountId> {
impl<RFM, SD, SUM> ScheduledTask<RFM, SD, SUM> {
/// Processing function of current task with given handler.
pub fn process_with(self, handler: &mut impl TaskHandler<AccountId>) -> Gas {
pub fn process_with(self, handler: &mut impl TaskHandler<RFM, SD, SUM>) -> Gas {
use ScheduledTask::*;

match self {
Expand Down Expand Up @@ -116,15 +119,15 @@ impl<AccountId> ScheduledTask<AccountId> {
}

/// Task handler trait for dealing with required tasks.
pub trait TaskHandler<AccountId> {
pub trait TaskHandler<RFM, SD, SUM> {
// Rent charging section.
// -----
/// Pause program action.
fn pause_program(&mut self, program_id: ProgramId) -> Gas;
/// Remove code action.
fn remove_code(&mut self, code_id: CodeId) -> Gas;
/// Remove from mailbox action.
fn remove_from_mailbox(&mut self, user_id: AccountId, message_id: MessageId) -> Gas;
fn remove_from_mailbox(&mut self, user_id: RFM, message_id: MessageId) -> Gas;
/// Remove from waitlist action.
fn remove_from_waitlist(&mut self, program_id: ProgramId, message_id: MessageId) -> Gas;
/// Remove paused program action.
Expand All @@ -136,10 +139,10 @@ pub trait TaskHandler<AccountId> {
fn wake_message(&mut self, program_id: ProgramId, message_id: MessageId) -> Gas;

/// Send delayed message to program action.
fn send_dispatch(&mut self, stashed_message_id: MessageId) -> Gas;
fn send_dispatch(&mut self, stashed_message_id: SD) -> Gas;

/// Send delayed message to user action.
fn send_user_message(&mut self, stashed_message_id: MessageId, to_mailbox: bool) -> Gas;
fn send_user_message(&mut self, stashed_message_id: MessageId, to_mailbox: SUM) -> Gas;

/// Remove gas reservation action.
fn remove_gas_reservation(
Expand All @@ -158,7 +161,5 @@ fn task_encoded_size() {
const MAX_SIZE: usize = 256;

// For example we will take `AccountId` = `ProgramId` from `gear_core`.
type AccountId = ProgramId;

assert!(ScheduledTask::<AccountId>::max_encoded_len() <= MAX_SIZE);
assert!(VaraScheduledTask::<ProgramId>::max_encoded_len() <= MAX_SIZE);
}
15 changes: 12 additions & 3 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,20 @@ use gear_core::{
code::InstrumentedCode,
ids::{ActorId, CodeId, ProgramId},
};
use gprimitives::H256;
use gprimitives::{MessageId, H256};
use parity_scale_codec::{Decode, Encode};

/// NOTE: key for actor id is (program_id, user_id). only used for mailbox.
pub type ScheduledTask = gear_core::tasks::ScheduledTask<(ProgramId, ActorId)>;
/// RemoveFromMailbox key; (msgs sources program (mailbox and queue provider), destination user id)
pub type Rfm = (ProgramId, ActorId);

/// SendDispatch key; (msgs destinations program (stash and queue provider), message id)
pub type Sd = (ProgramId, MessageId);

/// SendUserMessage key; (msgs sources program (mailbox and stash provider))
pub type Sum = ProgramId;

/// NOTE: generic keys differs to Vara and have been chosen dependent on storage organization of ethexe.
pub type ScheduledTask = gear_core::tasks::ScheduledTask<Rfm, Sd, Sum>;

#[derive(Debug, Clone, Default, Encode, Decode, serde::Serialize)]
pub struct BlockHeader {
Expand Down
13 changes: 12 additions & 1 deletion ethexe/db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use ethexe_common::{
BlockRequestEvent,
};
use ethexe_runtime_common::state::{
Allocations, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist,
Allocations, DispatchStash, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist,
};
use gear_core::{
code::InstrumentedCode,
Expand Down Expand Up @@ -504,6 +504,17 @@ impl Storage for Database {
self.cas.write(&waitlist.encode())
}

fn read_stash(&self, hash: H256) -> Option<DispatchStash> {
self.cas.read(&hash).map(|data| {
DispatchStash::decode(&mut data.as_slice())
.expect("Failed to decode data into `DispatchStash`")
})
}

fn write_stash(&self, stash: DispatchStash) -> H256 {
self.cas.write(&stash.encode())
}

fn read_mailbox(&self, hash: H256) -> Option<Mailbox> {
self.cas.read(&hash).map(|data| {
Mailbox::decode(&mut data.as_slice()).expect("Failed to decode data into `Mailbox`")
Expand Down
9 changes: 6 additions & 3 deletions ethexe/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,16 @@ impl Processor {
let states = self
.db
.block_start_program_states(block_hash)
.unwrap_or_default(); // TODO (breathx): shouldn't it be a panic?
.ok_or_else(|| {
anyhow!("failed to get block start program states for under-processing block")
})?;

let schedule = self.db.block_start_schedule(block_hash).unwrap_or_default(); // TODO (breathx): shouldn't it be a panic?
let schedule = self.db.block_start_schedule(block_hash).ok_or_else(|| {
anyhow!("failed to get block start schedule for under-processing block")
})?;

let mut in_block_transitions = InBlockTransitions::new(header, states, schedule);

// TODO (breathx): handle resulting addresses that were changed (e.g. top up balance wont be dumped as outcome).
for event in events {
match event {
BlockRequestEvent::Router(event) => {
Expand Down
116 changes: 71 additions & 45 deletions ethexe/runtime/common/src/journal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
state::{
self, ActiveProgram, ComplexStorage, Dispatch, HashAndLen, MaybeHash, Program,
ProgramState, Storage,
ProgramState, Storage, MAILBOX_VALIDITY,
},
InBlockTransitions,
};
Expand Down Expand Up @@ -163,49 +163,71 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
unreachable!("deprecated: {dispatch:?}");
}

if delay != 0 {
todo!("delayed sending isn't supported yet");
}

if self
.in_block_transitions
.state_of(&dispatch.destination())
.is_none()
{
if !dispatch.is_reply() {
let expiry = self.in_block_transitions.schedule_task(
state::MAILBOX_VALIDITY.try_into().expect("infallible"),
ScheduledTask::RemoveFromMailbox(
(dispatch.source(), dispatch.destination()),
dispatch.id(),
),
);

self.update_state_with_storage(dispatch.source(), |storage, state| {
state.mailbox_hash =
storage.modify_mailbox(state.mailbox_hash.clone(), |mailbox| {
mailbox
.entry(dispatch.destination())
.or_default()
.insert(dispatch.id(), (dispatch.value(), expiry));
})?;
let user_id = dispatch.destination();

Ok(())
});
if !dispatch.is_reply() {
if let Ok(non_zero_delay) = delay.try_into() {
let expiry = self.in_block_transitions.schedule_task(
non_zero_delay,
ScheduledTask::SendUserMessage {
message_id: dispatch.id(),
to_mailbox: dispatch.source(),
},
);

self.update_state_with_storage(dispatch.source(), |storage, state| {
let dispatch = Dispatch::from_stored(storage, dispatch.into_stored());

state.stash_hash =
storage.modify_stash(state.stash_hash.clone(), |stash| {
let r =
stash.insert(dispatch.id, ((dispatch, Some(user_id)), expiry));
debug_assert!(r.is_none());
})?;

Ok(())
});

return;
} else {
let expiry = self.in_block_transitions.schedule_task(
MAILBOX_VALIDITY.try_into().expect("infallible"),
ScheduledTask::RemoveFromMailbox(
(dispatch.source(), user_id),
dispatch.id(),
),
);

self.update_state_with_storage(dispatch.source(), |storage, state| {
state.mailbox_hash =
storage.modify_mailbox(state.mailbox_hash.clone(), |mailbox| {
mailbox
.entry(user_id)
.or_default()
.insert(dispatch.id(), (dispatch.value(), expiry));
})?;

Ok(())
});
}
}

// TODO (breathx): send here to in_block_transitions.
let source = dispatch.source();
let message = dispatch.into_parts().1;

let source_state_hash = self
let state_hash = self
.in_block_transitions
.state_of(&source)
.expect("must exist");

self.in_block_transitions.modify_state_with(
source,
source_state_hash,
state_hash,
0,
vec![],
vec![OutgoingMessage::from(message)],
Expand All @@ -214,27 +236,31 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
return;
}

let (kind, message) = dispatch.into_parts();
let (id, source, destination, payload, gas_limit, value, details) = message.into_parts();
let destination = dispatch.destination();
let dispatch = Dispatch::from_stored(self.storage, dispatch.into_stored());

let payload_hash = self.storage.write_payload(payload).into();
if let Ok(non_zero_delay) = delay.try_into() {
let expiry = self.in_block_transitions.schedule_task(
non_zero_delay,
ScheduledTask::SendDispatch((destination, dispatch.id)),
);

let dispatch = Dispatch {
id,
kind,
source,
payload_hash,
value,
details,
context: None,
};
self.update_state_with_storage(destination, |storage, state| {
state.stash_hash = storage.modify_stash(state.stash_hash.clone(), |stash| {
let r = stash.insert(dispatch.id, ((dispatch, None), expiry));
debug_assert!(r.is_none());
})?;

self.update_state_with_storage(destination, |storage, state| {
state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| {
queue.push_back(dispatch);
})?;
Ok(())
});
Ok(())
});
} else {
self.update_state_with_storage(destination, |storage, state| {
state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| {
queue.push_back(dispatch);
})?;
Ok(())
});
}
}

fn wait_dispatch(
Expand Down
Loading
Loading