Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(solis-pending): add modification to starknet service to gather r… #12

Merged
merged 1 commit into from
May 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 152 additions & 44 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,51 @@ use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, error, info, trace, warn};
use url::Url;
use std::collections::HashSet;

use super::{Error, MessagingConfig, Messenger, MessengerResult, LOG_TARGET};

/// As messaging in starknet is only possible with EthAddress in the `to_address`
/// field, we have to set magic value to understand what the user want to do.
/// In the case of execution -> the felt 'EXE' will be passed.
/// And for normal messages, the felt 'MSG' is used.
/// Those values are very not likely a valid account address on starknet.
const MSG_MAGIC: FieldElement = felt!("0x4d5347");
const EXE_MAGIC: FieldElement = felt!("0x455845");

pub const HASH_EXEC: FieldElement = felt!("0xee");

pub struct EventCache {
processed_events: AsyncRwLock<HashSet<String>>,
}

impl EventCache {
pub fn new() -> Self {
EventCache {
processed_events: AsyncRwLock::new(HashSet::new()),
}
}

pub async fn is_event_processed(&self, event_id: &String) -> bool {
let events = self.processed_events.read().await;
events.contains(event_id)
}

pub async fn mark_event_as_processed(&self, event_id: String) {
let mut events = self.processed_events.write().await;
events.insert(event_id);
}

pub async fn clear(&self) {
let mut events = self.processed_events.write().await;
events.clear();
}
}

pub struct StarknetMessaging<EF: katana_executor::ExecutorFactory + Send + Sync> {
chain_id: FieldElement,
provider: AnyProvider,
wallet: LocalWallet,
sender_account_address: FieldElement,
messaging_contract_address: FieldElement,
hooker: Arc<AsyncRwLock<dyn KatanaHooker<EF> + Send + Sync>>,
cache_lock: AsyncRwLock<()>,
event_cache: Arc<EventCache>,
}

impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
Expand Down Expand Up @@ -65,6 +90,8 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
sender_account_address,
messaging_contract_address,
hooker,
cache_lock: AsyncRwLock::new(()),
event_cache: Arc::new(EventCache::new()),
})
}

Expand All @@ -82,11 +109,9 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
from_block: Some(from_block),
to_block: Some(to_block),
address: Some(self.messaging_contract_address),
// TODO: this might come from the configuration actually.
keys: None,
};

// TODO: this chunk_size may also come from configuration?
let chunk_size = 200;
let mut continuation_token: Option<String> = None;

Expand All @@ -95,7 +120,6 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
self.provider.get_events(filter.clone(), continuation_token, chunk_size).await?;

event_page.events.into_iter().for_each(|event| {
// We ignore events without the block number
if let Some(block_number) = event.block_number {
block_to_events
.entry(block_number)
Expand All @@ -114,7 +138,58 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
Ok(block_to_events)
}

/// Sends an invoke TX on starknet.
async fn fetch_pending_events(
&self,
chain_id: ChainId,
chunk_size: u64,
) -> Result<Vec<L1HandlerTx>> {
let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];
let mut continuation_token: Option<String> = None;

loop {
let filter = EventFilter {
from_block: Some(BlockId::Tag(BlockTag::Pending)),
to_block: Some(BlockId::Tag(BlockTag::Pending)),
address: Some(self.messaging_contract_address),
keys: None,
};

let event_page = self.provider.get_events(filter.clone(), continuation_token.clone(), chunk_size).await?;

for event in event_page.events {
let event_id = event.transaction_hash.to_string(); // Assuming `transaction_hash` is the unique identifier for the event

if self.event_cache.is_event_processed(&event_id).await {
continue;
}

if let Ok(tx) = l1_handler_tx_from_event(&event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(&event) {
let hooker = Arc::clone(&self.hooker);
let is_message_accepted = hooker
.read()
.await
.verify_message_to_appchain(from, to, selector)
.await;

if is_message_accepted {
l1_handler_txs.push(tx);
self.event_cache.mark_event_as_processed(event_id).await;
}
}
}
}

continuation_token = event_page.continuation_token;

if continuation_token.is_none() {
break;
}
}

Ok(l1_handler_txs)
}

async fn send_invoke_tx(&self, calls: Vec<Call>) -> Result<FieldElement> {
let signer = Arc::new(&self.wallet);

Expand All @@ -128,7 +203,6 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {

account.set_block_id(BlockId::Tag(BlockTag::Pending));

// TODO: we need to have maximum fee configurable.
let execution = account.execute(calls).fee_estimate_multiplier(10f64);
let estimated_fee = (execution.estimate_fee().await?.overall_fee) * 10u64.into();
let execution_with_fee = execution.max_fee(estimated_fee);
Expand All @@ -149,7 +223,6 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
}
}

/// Sends messages hashes to settlement layer by sending a transaction.
async fn send_hashes(&self, mut hashes: Vec<FieldElement>) -> MessengerResult<FieldElement> {
hashes.retain(|&x| x != HASH_EXEC);

Expand Down Expand Up @@ -205,7 +278,7 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
};

if from_block > chain_latest_block {
// Nothing to fetch, we can skip waiting the next tick.
// Nothing to fetch, we can skip waiting for the next tick.
return Ok((chain_latest_block, vec![]));
}

Expand All @@ -228,15 +301,8 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
Error::SendError
})?;

for (block_number, block_events) in block_to_events.iter() {
debug!(
target: LOG_TARGET,
block_number = %block_number,
events_count = %block_events.len(),
"Converting events of block into L1HandlerTx."
);

for event in block_events.iter() {
for block_events in block_to_events.values() {
for event in block_events {
if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(event) {
let hooker = Arc::clone(&self.hooker);
Expand All @@ -254,6 +320,70 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
}
}

// Now, handle pending block events
{
// Use a lock to ensure atomicity
let cache_lock = self.cache_lock.write().await;

// Fetch pending block events
let pending_events = self.fetch_pending_events(chain_id, 100).await.map_err(|e| {
error!(target: LOG_TARGET, "Error fetching pending events: {:?}", e);
Error::SendError
})?;
l1_handler_txs.extend(pending_events);

// Get the latest block number again to ensure we didn't miss any new blocks
let latest_block_number = match self.provider.block_number().await {
Ok(n) => n,
Err(e) => {
warn!(
target: LOG_TARGET,
"Couldn't fetch settlement chain last block number. Skipped, retry at the next tick. Error: {:?}", e
);
return Err(Error::SendError);
}
};

// Fetch all events from the latest block to ensure none are missed
let confirmed_events = self.fetch_events(BlockId::Number(latest_block_number), BlockId::Number(latest_block_number)).await.map_err(|e| {
error!(target: LOG_TARGET, "Error fetching confirmed block events: {:?}", e);
Error::SendError
})?;

for block_events in confirmed_events.values() {
for event in block_events {
let event_id = event.transaction_hash.to_string();
if !self.event_cache.is_event_processed(&event_id).await {
if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(event) {
let hooker = Arc::clone(&self.hooker);
let is_message_accepted = hooker
.read()
.await
.verify_message_to_appchain(from, to, selector)
.await;

if is_message_accepted {
l1_handler_txs.push(tx);
}
}
}
}
}
}

self.event_cache.clear().await;

// Fetch pending events again to ensure no events were missed during the cache clearing
let rechecked_pending_events = self.fetch_pending_events(chain_id, 100).await.map_err(|e| {
error!(target: LOG_TARGET, "Error rechecking pending events: {:?}", e);
Error::SendError
})?;
l1_handler_txs.extend(rechecked_pending_events);

drop(cache_lock); // Release the lock
}

Ok((to_block, l1_handler_txs))
}

Expand Down Expand Up @@ -297,18 +427,11 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
}
}

/// Parses messages sent by cairo contracts to compute their hashes.
///
/// Messages can also be labelled as EXE, which in this case generate a `Call`
/// additionally to the hash.
fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec<FieldElement>, Vec<Call>)> {
let mut hashes: Vec<FieldElement> = vec![];
let mut calls: Vec<Call> = vec![];

for m in messages {
// Field `to_address` is restricted to eth addresses space. So the
// `to_address` is set to 'EXE'/'MSG' to indicate that the message
// has to be executed or sent normally.
let magic = m.to_address;

if magic == EXE_MAGIC {
Expand All @@ -325,23 +448,14 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec<FieldElement
let selector = m.payload[1];

let mut calldata = vec![];
// We must exclude the `to_address` and `selector` from the actual payload.
if m.payload.len() >= 3 {
calldata.extend(m.payload[2..].to_vec());
}

calls.push(Call { to, selector, calldata });
hashes.push(HASH_EXEC);
} else if magic == MSG_MAGIC {
// In the case of a regular message, we compute the message's hash
// which will then be sent in a transaction to be registered.

// As `to_address` is used by the magic, the `to_address` we want
// is the first element of the payload.
let to_address = m.payload[0];

// Then, the payload must be changed to only keep the rest of the
// data, without the first element that was the `to_address`.
let payload = &m.payload[1..];

let mut buf: Vec<u8> = vec![];
Expand All @@ -354,7 +468,6 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec<FieldElement

hashes.push(starknet_keccak(&buf));
} else {
// Skip the message if no valid magic number is found.
warn!(target: LOG_TARGET, magic = ?magic, "Invalid message to_address magic value.");
continue;
}
Expand All @@ -377,14 +490,11 @@ fn l1_handler_tx_from_event(event: &EmittedEvent, chain_id: ChainId) -> Result<L
error!(target: LOG_TARGET, "Event MessageSentToAppchain is not well formatted.");
}

// See contract appchain_messaging.cairo for MessageSentToAppchain event.
let from_address = event.keys[2];
let to_address = event.keys[3];
let entry_point_selector = event.data[0];
let nonce = event.data[1];

// Skip the length of the serialized array for the payload which is data[2].
// Payload starts at data[3].
let mut calldata = vec![from_address];
calldata.extend(&event.data[3..]);

Expand All @@ -395,7 +505,6 @@ fn l1_handler_tx_from_event(event: &EmittedEvent, chain_id: ChainId) -> Result<L
calldata,
chain_id,
message_hash,
// This is the min value paid on L1 for the message to be sent to L2.
paid_fee_on_l1: 30000_u128,
entry_point_selector,
version: FieldElement::ZERO,
Expand All @@ -416,7 +525,6 @@ fn info_from_event(event: &EmittedEvent) -> Result<(FieldElement, FieldElement,
error!(target: LOG_TARGET, "Event MessageSentToAppchain is not well formatted");
}

// See contract appchain_messaging.cairo for MessageSentToAppchain event.
let from_address = event.keys[2];
let to_address = event.keys[3];
let entry_point_selector = event.data[0];
Expand Down Expand Up @@ -550,8 +658,8 @@ mod tests {
chain_id,
message_hash,
paid_fee_on_l1: 30000_u128,
version: FieldElement::ZERO,
entry_point_selector: selector,
version: FieldElement::ZERO,
contract_address: to_address.into(),
};

Expand Down