Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(solis-pending): add modification to starknet service to gather r… #12

Merged
merged 1 commit into from
May 28, 2024

Conversation

kwiss
Copy link

@kwiss kwiss commented May 25, 2024

Enhance gather_messages function to handle pending and confirmed events

Pull Request Description

This pull request enhances the gather_messages function to ensure no events are missed by fetching and processing events from both confirmed and pending blocks within the Starknet blockchain.

Summary of Changes

The gather_messages function now:

  1. Fetches and processes events from a specified range of confirmed blocks (from_block to to_block).
  2. Fetches and processes events from the pending block to include the latest transactions.
  3. Fetches and processes events from the latest confirmed block after handling pending events to ensure no events are missed.
  4. Clears the cache of processed events and rechecks pending events to confirm no events were missed during the cache clearing process.

Detailed Explanation

  1. Fetching Latest Block Number:
let chain_latest_block: u64 = match self.provider.block_number().await {
    Ok(n) => n,
    Err(e) => {
        warn!(
            target: LOG_TARGET,
            "Couldn't fetch settlement chain last block number. Skipped, retry at the next tick. Error: {:?}", e
        );
        return Err(Error::SendError);
    }
};

The function correctly handles errors in fetching the latest block number, logging a warning and returning an error.

  1. Checking from_block Against Latest Block:
if from_block > chain_latest_block {
    // Nothing to fetch, we can skip waiting for the next tick.
    return Ok((chain_latest_block, vec![]));
}

This ensures the function does not process blocks that are not yet confirmed.

  1. Determining to_block:
let to_block = if from_block + max_blocks + 1 < chain_latest_block {
    from_block + max_blocks
} else {
    chain_latest_block
};

The to_block is determined correctly to avoid going beyond the latest block.

  1. Fetching and Processing Events from Confirmed Blocks:
let block_to_events = self
    .fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
    .await
    .map_err(|e| {
        error!(target: LOG_TARGET, "Error fetching events: {:?}", e);
        Error::SendError
    })?;

for block_events in block_to_events.values() {
    for event in block_events {
        if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) {
            if let Ok((from, to, selector)) = info_from_event(event) {
                let hooker = Arc::clone(&self.hooker);
                let is_message_accepted = hooker
                    .read()
                    .await
                    .verify_message_to_appchain(from, to, selector)
                    .await;

                if is_message_accepted {
                    l1_handler_txs.push(tx);
                }
            }
        }
    }
}

Events are fetched and processed correctly, converting them into L1HandlerTx transactions if they meet the required criteria.

  1. Handling Pending Block Events:
let cache_lock = self.cache_lock.write().await;

let pending_events = self.fetch_pending_events(chain_id, 100).await.map_err(|e| {
    error!(target: LOG_TARGET, "Error fetching pending events: {:?}", e);
    Error::SendError
})?;
l1_handler_txs.extend(pending_events);

A lock is acquired to ensure atomicity, and pending events are fetched and processed.

  1. Fetching Latest Block Number Again:
let latest_block_number = match self.provider.block_number().await {
    Ok(n) => n,
    Err(e) => {
        warn!(
            target: LOG_TARGET,
            "Couldn't fetch settlement chain last block number. Skipped, retry at the next tick. Error: {:?}", e
        );
        return Err(Error::SendError);
    }
};

The latest block number is fetched again to ensure no new blocks were missed.

  1. Fetching and Processing Confirmed Events from the Latest Block:
let confirmed_events = self.fetch_events(BlockId::Number(latest_block_number), BlockId::Number(latest_block_number)).await.map_err(|e| {
    error!(target: LOG_TARGET, "Error fetching confirmed block events: {:?}", e);
    Error::SendError
})?;

for block_events in confirmed_events.values() {
    for event in block_events {
        let event_id = event.transaction_hash.to_string();
        if !self.event_cache.is_event_processed(&event_id).await {
            if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) {
                if let Ok((from, to, selector)) = info_from_event(event) {
                    let hooker = Arc::clone(&self.hooker);
                    let is_message_accepted = hooker
                        .read()
                        .await
                        .verify_message_to_appchain(from, to, selector)
                        .await;

                    if is_message_accepted {
                        l1_handler_txs.push(tx);
                    }
                }
            }
        }
    }
}

Confirmed events from the latest block are fetched and processed to ensure none are missed.

  1. Clearing the Cache and Rechecking Pending Events:
self.event_cache.clear().await;

let rechecked_pending_events = self.fetch_pending_events(chain_id, 100).await.map_err(|e| {
    error!(target: LOG_TARGET, "Error rechecking pending events: {:?}", e);
    Error::SendError
})?;
l1_handler_txs.extend(rechecked_pending_events);

drop(cache_lock); // Release the lock

The cache is cleared, and pending events are fetched again to ensure no events are missed during cache clearing.

Complete Updated gather_messages Function

#[async_trait]
impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetMessaging<EF> {
    type MessageHash = FieldElement;
    type MessageTransaction = L1HandlerTx;

    async fn gather_messages(
        &self,
        from_block: u64,
        max_blocks: u64,
        chain_id: ChainId,
    ) -> MessengerResult<(u64, Vec<Self::MessageTransaction>)> {
        let chain_latest_block: u64 = match self.provider.block_number().await {
            Ok(n) => n,
            Err(e) => {
                warn!(
                    target: LOG_TARGET,
                    "Couldn't fetch settlement chain last block number. Skipped, retry at the next tick. Error: {:?}", e
                );
                return Err(Error::SendError);
            }
        };

        if from_block > chain_latest_block {
            // Nothing to fetch, we can skip waiting for the next tick.
            return Ok((chain_latest_block, vec![]));
        }

        // +1 as the from_block counts as 1 block fetched.
        let to_block = if from_block + max_blocks + 1 < chain_latest_block {
            from_block + max_blocks
        } else {
            chain_latest_block
        };

        let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];

        info!(target: LOG_TARGET, "Gathering messages from block {} to block {}", from_block, to_block);

        let block_to_events = self
            .fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
            .await
            .map_err(|e| {
                error!(target: LOG_TARGET, "Error fetching events: {:?}", e);
                Error::SendError
            })?;

        for block_events in block_to_events.values() {
            for event in block_events {
                if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) {
                    if let Ok((from, to, selector)) = info_from_event(event) {
                        let hooker = Arc::clone(&self.hooker);
                        let is_message_accepted = hooker
                            .read()
                            .await
                            .verify_message_to_appchain(from, to, selector)
                            .await;

                        if is_message_accepted {
                            l1_handler_txs.push(tx);
                        }
                    }
                }
            }
        }

        // Now, handle pending block events
        {
            // Use a lock to ensure atomicity
            let cache_lock = self.cache_lock.write().await;

            // Fetch pending block events
            let pending_events = self.fetch_pending_events(chain_id, 100).await.map_err(|e| {
                error!(target: LOG_TARGET, "Error fetching pending events: {:?}", e);
                Error::SendError
            })?;
            l1_handler_txs.extend(pending_events);

            // Get the latest block number again to ensure we didn't miss any new blocks
            let latest_block_number = match self.provider.block_number().await {
                Ok(n) => n,
                Err(e) => {
                    warn!(
                        target: LOG_TARGET,
                        "Couldn't fetch settlement chain last block number. Skipped, retry at the next tick. Error: {:?}", e
                    );
                    return Err(Error::SendError);
                }
            };

            // Fetch all events from the latest block to ensure none are missed
            let confirmed_events = self.fetch_events(BlockId::Number(latest_block_number), BlockId::Number(latest_block_number)).await.map_err(|e| {
                error!(target: LOG_TARGET, "Error fetching confirmed block events: {:?}", e);
                Error::SendError
            })?;
            
            for block_events in confirmed_events.values() {
                for event in block_events {
                    let event_id = event.transaction_hash.to_string();
                    if !self.event_cache.is_event_processed(&event_id).await {


                        if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) {
                            if let Ok((from, to, selector)) = info_from_event(event) {
                                let hooker = Arc::clone(&self.hooker);
                                let is_message_accepted = hooker
                                    .read()
                                    .await
                                    .verify_message_to_appchain(from, to, selector)
                                    .await;

                                if is_message_accepted {
                                    l1_handler_txs.push(tx);
                                }
                            }
                        }
                    }
                }
            }

            self.event_cache.clear().await;

            // Fetch pending events again to ensure no events were missed during the cache clearing
            let rechecked_pending_events = self.fetch_pending_events(chain_id, 100).await.map_err(|e| {
                error!(target: LOG_TARGET, "Error rechecking pending events: {:?}", e);
                Error::SendError
            })?;
            l1_handler_txs.extend(rechecked_pending_events);

            drop(cache_lock); // Release the lock
        }

        Ok((to_block, l1_handler_txs))
    }
}

@ybensacq ybensacq self-requested a review May 25, 2024 09:32
@ybensacq
Copy link

Nice explanation of changes, and the code is all good from my perspective.

@kwiss kwiss marked this pull request as ready for review May 28, 2024 13:56
@kwiss kwiss merged commit 64a15cb into solis-v0.7.0-alpha May 28, 2024
@kwiss kwiss deleted the feat/solis-pending-gather branch May 28, 2024 14:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants