Skip to content

Commit

Permalink
Consistent expiry period for async message and block operations (#4722)
Browse files Browse the repository at this point in the history
* Asc message execution - requery message bytecode after each message execution (#4710)

* Requery bytecode

* cargo fmt

* fix call stack inconsistency (#4709)

* Improve async message checks (#4706)

* Improve async message checks

* Change checks for async messages

* Add unit tests

* Fix ledger change to take into account cancelled message balance change (#4715)

* Take again the speculative changes after async message cancellation

* use .apply() to merge the two LedgerChanges

* Fix: we cannot combine two ledger changes with apply

* avoid cloning the changes

* Remove comment

* Fix async msg same slot (#4718)

* fix open rpc spec (#4716)

* Add eliminated_new_messages in eliminated_msg

---------

Co-authored-by: Modship <yeskinokay@gmail.com>

* Consistent expiry period for async message and block operations

* Update message validity for expiration

* Minor comment fix

---------

Co-authored-by: Leo-Besancon <lb@massa.net>
Co-authored-by: Modship <yeskinokay@gmail.com>
  • Loading branch information
3 people authored Aug 1, 2024
1 parent 5f9f2a7 commit 736a9ed
Showing 1 changed file with 57 additions and 11 deletions.
68 changes: 57 additions & 11 deletions massa-execution-worker/src/speculative_async_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ impl SpeculativeAsyncPool {
}

/// Takes a batch of asynchronous messages to execute,
/// removing them from the speculative asynchronous pool and settling their deletion from it in the changes accumulator.
/// removing them from the speculative asynchronous pool and settling their deletion from it
/// in the changes accumulator.
///
/// # Arguments
/// * `slot`: slot at which the batch is taken (allows filtering by validity interval)
Expand All @@ -122,9 +123,10 @@ impl SpeculativeAsyncPool {

for (message_id, message_info) in message_infos.iter() {
let corrected_max_gas = message_info.max_gas.saturating_add(async_msg_cst_gas_cost);
// Note: SecureShareOperation.get_validity_range(...) returns RangeInclusive
// so to be consistent here, use >= & <= checks
if available_gas >= corrected_max_gas
&& slot >= message_info.validity_start
&& slot < message_info.validity_end
&& Self::is_message_ready_to_execute(&slot, &message_info.validity_start, &message_info.validity_end)
&& message_info.can_be_executed
{
available_gas -= corrected_max_gas;
Expand Down Expand Up @@ -158,26 +160,27 @@ impl SpeculativeAsyncPool {
) -> Vec<(AsyncMessageId, AsyncMessage)> {
// Update the messages_info: remove messages that should be removed
// Filter out all messages for which the validity end is expired.
// Note that the validity_end bound is NOT included in the validity interval of the message.
// Note: that the validity_end bound is included in the validity interval of the message.

let mut eliminated_infos = Vec::new();
self.message_infos.retain(|id, info| {
if *slot < info.validity_end {
true
} else {

if Self::is_message_expired(slot, &info.validity_end) {
eliminated_infos.push((*id, info.clone()));
false
} else {
true
}
});

let mut eliminated_new_messages = Vec::new();
self.pool_changes.0.retain(|k, v| match v {
SetUpdateOrDelete::Set(message) => {
if *slot < message.validity_end {
true
} else {
if Self::is_message_expired(slot, &message.validity_end) {
eliminated_new_messages.push((*k, v.clone()));
false
} else {
true
}
}
SetUpdateOrDelete::Update(_v) => true,
Expand All @@ -190,7 +193,7 @@ impl SpeculativeAsyncPool {
SetUpdateOrDelete::Delete => None,
}));

// Truncate message pool to its max size, removing non-prioritary items
// Truncate message pool to its max size, removing non-priority items
let excess_count = self
.message_infos
.len()
Expand Down Expand Up @@ -310,9 +313,52 @@ impl SpeculativeAsyncPool {

msgs
}

/// Return true if a message (given its validity end) is expired
/// Must be consistent with is_message_valid
fn is_message_expired(slot: &Slot, message_validity_end: &Slot) -> bool {
// Note: SecureShareOperation.get_validity_range(...) returns RangeInclusive
// (for operation validity) so apply the same rule for message validity
*slot > *message_validity_end
}

/// Return true if a message (given its validity_start & validity end) is ready to execute
/// Must be consistent with is_message_expired
fn is_message_ready_to_execute(slot: &Slot, message_validity_start: &Slot, message_validity_end: &Slot) -> bool {
// Note: SecureShareOperation.get_validity_range(...) returns RangeInclusive
// (for operation validity) so apply the same rule for message validity
slot >= message_validity_start
&& slot <= message_validity_end
}
}

/// Check in the ledger changes if a message trigger has been triggered
fn is_triggered(filter: &AsyncMessageTrigger, ledger_changes: &LedgerChanges) -> bool {
ledger_changes.has_changes(&filter.address, filter.datastore_key.clone())
}

#[cfg(test)]
mod tests {
use super::*;

// Test if is_message_expired & is_message_ready_to_execute are consistent
#[test]
fn test_validity() {
let slot1 = Slot::new(6, 0);
let slot2 = Slot::new(9, 0);
let slot_validity_start = Slot::new(4, 0);
let slot_validity_end = Slot::new(8, 0);

assert!(!SpeculativeAsyncPool::is_message_expired(&slot1, &slot_validity_end));
assert!(SpeculativeAsyncPool::is_message_ready_to_execute(&slot1, &slot_validity_start, &slot_validity_end));

assert!(!SpeculativeAsyncPool::is_message_expired(&slot_validity_start, &slot_validity_end));
assert!(SpeculativeAsyncPool::is_message_ready_to_execute(&slot_validity_start, &slot_validity_start, &slot_validity_end));

assert!(!SpeculativeAsyncPool::is_message_expired(&slot_validity_end, &slot_validity_end));
assert!(SpeculativeAsyncPool::is_message_ready_to_execute(&slot_validity_end, &slot_validity_start, &slot_validity_end));

assert!(SpeculativeAsyncPool::is_message_expired(&slot2, &slot_validity_end));
assert!(!SpeculativeAsyncPool::is_message_ready_to_execute(&slot2, &slot_validity_start, &slot_validity_end));
}
}

0 comments on commit 736a9ed

Please sign in to comment.