Skip to content

Commit

Permalink
fix: Pragma Dispatch ExEx behavior (#7)
Browse files Browse the repository at this point in the history
* feat(exex_behavior):

* feat(exex_behavior): Updated addresses

* feat(exex_behavior):

* feat(exex_behavior): Better logs

* feat(exex_behavior): quick refacto

* feat(exex_behavior):

* feat(exex_behavior): Refactoring

* feat(exex_behavior): Better logs

* feat(exex_behavior): reliable arg order
  • Loading branch information
akhercha authored Oct 9, 2024
1 parent 3a6bc3b commit 893e971
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next release

- fix: Pragma's ExEx refresh behavior
- feat: `exex_pragma_dispatch` implementation
- feat: Madara ExExs proof of concept
- fix: override chain config
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mp-block = { workspace = true }
mp-chain-config = { workspace = true }
mp-convert = { workspace = true }
mp-exex = { workspace = true }
mp-receipt = { workspace = true }
mp-rpc = { workspace = true }
mp-transactions = { workspace = true }
mp-utils = { workspace = true }
Expand Down
121 changes: 86 additions & 35 deletions crates/node/src/extensions/pragma_dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{sync::Arc, time::Duration};

use anyhow::bail;
use futures::StreamExt;
use mp_block::MadaraPendingBlock;
use mp_rpc::Starknet;
use starknet_api::felt;
use starknet_core::types::{
Expand All @@ -23,81 +24,129 @@ use mp_transactions::broadcasted_to_blockifier;
use tokio::time::sleep;

const PENDING_BLOCK: BlockId = BlockId::Tag(BlockTag::Pending);
// Update the feed ids from the Feed Registry every 500 blocks. (500s)
const UPDATE_FEEDS_INTERVAL: u64 = 500;

lazy_static::lazy_static! {
// TODO: Keystore path?
pub static ref ACCOUNT_ADDRESS: Felt = felt!("0x4a2b383d808b7285cc98b2309f974f5111633c84fd82c9375c118485d2d57ba");
pub static ref ACCOUNT_ADDRESS: Felt = felt!("0x9ea0674c4d7b87b4afcb4c4ddc783b0c07e758778b9a1d133adc97cddfe38f");
pub static ref PRIVATE_KEY: SigningKey = SigningKey::from_secret_scalar(felt!("0x7a9779748888c95d96bbbce041b5109c6ffc0c4f30561c0170384a5922d9e91"));

// TODO: Replace by the correct addresses
pub static ref PRAGMA_FEEDS_REGISTRY_ADDRESS: Felt = felt!("0x18d070bcad4b53eb0c716c13d36e5f0d798e52bd87a2a25f0de477b5902c9ff");
pub static ref PRAGMA_DISPATCHER_ADDRESS: Felt = felt!("0x42d0ccae2cd3647df3bf9379d74efc93851370b12338a5aa6a676e381396b5");
pub static ref PRAGMA_FEEDS_REGISTRY_ADDRESS: Felt = felt!("0x13c3404ff9802442d0bf389afcf2fab9201b47c2268fcaa4bd36ba1978af76");
pub static ref PRAGMA_DISPATCHER_ADDRESS: Felt = felt!("0x38d9b85bf3623681aaa37b1c591b07237dee8b17a11eaac53ddc07a306fefe2");

pub static ref MAX_FEE: Felt = felt!("2386F26FC10000"); // 0.01 eth

// NewFeedId event selector
pub static ref NEW_FEED_ID_SELECTOR: Felt = felt!("0x012eaeb62184f1ca53999ece2d2273b81f9c64bc057a93dad05e09f970b030f9");
// RemovedFeedId event selector
pub static ref REMOVED_FEED_ID_SELECTOR: Felt = felt!("0x02a45c5a3b53e7afa46712156f544cec1b9d4679804036a16ec9521389117be4");

// Empty feed list. Used instead of [`Vec::is_empty`].
// The first element is the length of the vec & after are the elements.
pub static ref EMPTY_FEEDS: Vec<Felt> = vec![Felt::ZERO];
}

/// 🧩 Pragma main ExEx.
/// At the end of each produced block by the node, adds a new dispatch transaction
/// using the Pragma Dispatcher contract.
pub async fn exex_pragma_dispatch(mut ctx: ExExContext) -> anyhow::Result<()> {
let mut feed_ids: Vec<Felt> = Vec::new();
let mut last_fetch_block = 0;
// Feed ids that will be dispatched.
// The first element is the length of the vec & after are the elements.
let mut feed_ids: Vec<Felt> = get_feed_ids_from_registry(&ctx.starknet).await.unwrap_or(vec![Felt::ZERO]);
log::info!("🧩 Pragma's ExEx: Initialized feed IDs from Registry. Total feeds: {}", feed_ids[0]);

while let Some(notification) = ctx.notifications.next().await {
let block_number = match notification {
ExExNotification::BlockProduced { block: _, block_number } => block_number,
let (block, block_number) = match notification {
ExExNotification::BlockProduced { block, block_number } => (block, block_number),
ExExNotification::BlockSynced { block_number } => {
ctx.events.send(ExExEvent::FinishedHeight(block_number))?;
continue;
}
};

if should_update_feed_ids(block_number.0, last_fetch_block, &feed_ids) {
match update_feed_ids(&ctx.starknet, block_number.0, &mut feed_ids).await {
Ok(()) => last_fetch_block = block_number.0,
Err(_) => {
ctx.events.send(ExExEvent::FinishedHeight(block_number))?;
continue;
}
}
// Will update in-place the feed ids vec
if let Err(e) = update_feed_ids_if_necessary(&ctx.starknet, &block, block_number.0, &mut feed_ids).await {
log::error!("🧩 [#{}] Pragma's ExEx: Error while updating feed IDs: {:?}", block_number, e);
ctx.events.send(ExExEvent::FinishedHeight(block_number))?;
continue;
}

if feed_ids.is_empty() {
if feed_ids == *EMPTY_FEEDS {
log::warn!("🧩 [#{}] Pragma's ExEx: No feed IDs available, skipping dispatch", block_number);
ctx.events.send(ExExEvent::FinishedHeight(block_number))?;
continue;
}

if let Err(e) = process_dispatch_transaction(&ctx, &feed_ids, block_number.0).await {
log::error!("🧩 [#{}] Pragma's ExEx: Error processing dispatch transaction: {:?}", block_number, e);
if let Err(e) = process_dispatch_transaction(&ctx, block_number.0, &feed_ids).await {
log::error!("🧩 [#{}] Pragma's ExEx: Error while processing dispatch transaction: {:?}", block_number, e);
}

ctx.events.send(ExExEvent::FinishedHeight(block_number))?;
}
Ok(())
}

fn should_update_feed_ids(current_block: u64, last_fetch_block: u64, feed_ids: &[Felt]) -> bool {
current_block.saturating_sub(last_fetch_block) >= UPDATE_FEEDS_INTERVAL || feed_ids.is_empty()
}
/// Update the feed ids list if necessary.
/// It means:
/// * if the feed id list is empty,
/// * if we find the event [NewFeedId] or [RemovedFeedId] in the block's events.
async fn update_feed_ids_if_necessary(
starknet: &Arc<Starknet>,
block: &MadaraPendingBlock,
block_number: u64,
feed_ids: &mut Vec<Felt>,
) -> anyhow::Result<()> {
// If the list is empty, it may be because the contract wasn't deployed before.
// Requery.
if *feed_ids == *EMPTY_FEEDS {
*feed_ids = get_feed_ids_from_registry(starknet).await?;
log::info!("🧩 [#{}] Pragma's ExEx: Refreshed all feeds. Total feeds: {}", block_number, feed_ids[0]);
return Ok(());
}

async fn update_feed_ids(starknet: &Arc<Starknet>, block_number: u64, feed_ids: &mut Vec<Felt>) -> anyhow::Result<()> {
match get_feed_ids_from_registry(starknet).await {
Ok(new_feed_ids) => {
*feed_ids = new_feed_ids;
log::info!("🧩 [#{}] Pragma's ExEx: 📜 Updated feed IDs: {:?}", block_number, feed_ids);
Ok(())
}
Err(e) => {
log::warn!("🧩 [#{}] Pragma's ExEx: Failed to fetch feed IDs: {:?}", block_number, e);
Err(e)
for receipt in &block.inner.receipts {
if let mp_receipt::TransactionReceipt::Invoke(invoke_receipt) = receipt {
for event in &invoke_receipt.events {
if event.from_address != *PRAGMA_FEEDS_REGISTRY_ADDRESS {
continue;
}
if event.keys.is_empty() || event.data.len() != 2 {
continue;
}
let selector = event.keys[0];
let feed_id = event.data[1];
if selector == *NEW_FEED_ID_SELECTOR {
if !feed_ids.contains(&feed_id) {
feed_ids.push(feed_id);
feed_ids[0] += Felt::ONE;
log::info!(
"🧩 [#{}] Pragma's ExEx: Added new feed ID \"0x{:x}\". Total feeds: {}",
block_number,
feed_id,
feed_ids[0]
);
}
} else if selector == *REMOVED_FEED_ID_SELECTOR {
if let Some(pos) = feed_ids.iter().position(|x| *x == feed_id) {
feed_ids.remove(pos);
feed_ids[0] -= Felt::ONE;
log::info!(
"🧩 [#{}] Pragma's ExEx: Removed feed ID \"0x{:x}\". Total feeds: {}",
block_number,
feed_id,
feed_ids[0]
);
}
}
}
}
}

Ok(())
}

async fn process_dispatch_transaction(ctx: &ExExContext, feed_ids: &[Felt], block_number: u64) -> anyhow::Result<()> {
/// Create a Dispatch tx and sends it.
/// Logs info about the tx status.
async fn process_dispatch_transaction(ctx: &ExExContext, block_number: u64, feed_ids: &[Felt]) -> anyhow::Result<()> {
let invoke_result = create_and_add_dispatch_tx(&ctx.starknet, feed_ids, block_number).await?;
let status = get_transaction_status(&ctx.starknet, &invoke_result.transaction_hash).await?;

Expand All @@ -116,6 +165,7 @@ async fn process_dispatch_transaction(ctx: &ExExContext, feed_ids: &[Felt], bloc
Ok(())
}

/// Creates & Invoke the Dispatch TX.
async fn create_and_add_dispatch_tx(
starknet: &Arc<Starknet>,
feed_ids: &[Felt],
Expand All @@ -127,6 +177,7 @@ async fn create_and_add_dispatch_tx(
Ok(invoke_result)
}

/// Check the status of a transaction & log info about it.
async fn handle_accepted_transaction(
ctx: &ExExContext,
transaction_hash: &Felt,
Expand Down

0 comments on commit 893e971

Please sign in to comment.