Skip to content

Commit

Permalink
Support subscribing to transaction status via ws (#646)
Browse files Browse the repository at this point in the history
  • Loading branch information
FabijanC authored Nov 15, 2024
1 parent 248d5e8 commit a598805
Show file tree
Hide file tree
Showing 11 changed files with 692 additions and 131 deletions.
7 changes: 1 addition & 6 deletions crates/starknet-devnet-core/src/starknet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,12 +1094,7 @@ impl Starknet {
transaction_hash: TransactionHash,
) -> DevnetResult<TransactionStatus> {
let transaction = self.transactions.get(&transaction_hash).ok_or(Error::NoTransaction)?;

Ok(TransactionStatus {
finality_status: transaction.finality_status,
failure_reason: transaction.execution_info.revert_error.clone(),
execution_status: transaction.execution_result.status(),
})
Ok(transaction.get_status())
}

pub fn simulate_transactions(
Expand Down
16 changes: 14 additions & 2 deletions crates/starknet-devnet-core/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use starknet_types::rpc::transaction_receipt::{
DeployTransactionReceipt, FeeAmount, FeeInUnits, TransactionReceipt,
};
use starknet_types::rpc::transactions::{
DeclareTransaction, DeployAccountTransaction, InvokeTransaction, Transaction, TransactionTrace,
TransactionType, TransactionWithHash,
DeclareTransaction, DeployAccountTransaction, InvokeTransaction, Transaction,
TransactionStatus, TransactionTrace, TransactionType, TransactionWithHash,
};

use crate::constants::UDC_CONTRACT_ADDRESS;
Expand Down Expand Up @@ -228,6 +228,18 @@ impl StarknetTransaction {
}
}

pub fn get_block_number(&self) -> Option<BlockNumber> {
self.block_number
}

pub fn get_status(&self) -> TransactionStatus {
TransactionStatus {
finality_status: self.finality_status,
failure_reason: self.execution_info.revert_error.clone(),
execution_status: self.execution_result.status(),
}
}

pub fn get_trace(&self) -> Option<TransactionTrace> {
self.trace.clone()
}
Expand Down
159 changes: 127 additions & 32 deletions crates/starknet-devnet-server/src/api/json_rpc/endpoints_ws.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use starknet_core::error::Error;
use starknet_core::starknet::starknet_config::BlockGenerationOn;
use starknet_rs_core::types::{BlockId, BlockTag};
use starknet_types::starknet_api::block::BlockStatus;
use starknet_types::starknet_api::block::{BlockNumber, BlockStatus};

use super::error::ApiError;
use super::models::{BlockInput, SubscriptionIdInput};
use super::models::{BlockInput, SubscriptionIdInput, TransactionBlockInput};
use super::{JsonRpcHandler, JsonRpcSubscriptionRequest};
use crate::rpc_core::request::Id;
use crate::subscribe::{SocketId, SubscriptionNotification};
use crate::subscribe::{NewTransactionStatus, SocketId, Subscription, SubscriptionNotification};

/// The definitions of JSON-RPC read endpoints defined in starknet_ws_api.json
impl JsonRpcHandler {
Expand All @@ -20,7 +21,9 @@ impl JsonRpcHandler {
JsonRpcSubscriptionRequest::NewHeads(data) => {
self.subscribe_new_heads(data, rpc_request_id, socket_id).await
}
JsonRpcSubscriptionRequest::TransactionStatus => todo!(),
JsonRpcSubscriptionRequest::TransactionStatus(data) => {
self.subscribe_tx_status(data, rpc_request_id, socket_id).await
}
JsonRpcSubscriptionRequest::PendingTransactions => todo!(),
JsonRpcSubscriptionRequest::Events => todo!(),
JsonRpcSubscriptionRequest::Unsubscribe(SubscriptionIdInput { subscription_id }) => {
Expand All @@ -37,29 +40,23 @@ impl JsonRpcHandler {
}
}

/// starknet_subscribeNewHeads
/// Checks if an optional block ID is provided. Validates that the block exists and is not too
/// many blocks in the past. If it is a valid block, the user is notified of all blocks from the
/// old up to the latest, and subscribed to new ones. If no block ID specified, the user is just
/// subscribed to new blocks.
pub async fn subscribe_new_heads(
/// Returns (starting block number, latest block number). Returns an error in case the starting
/// block does not exist or there are too many blocks.
async fn convert_to_block_number_range(
&self,
block_input: Option<BlockInput>,
rpc_request_id: Id,
socket_id: SocketId,
) -> Result<(), ApiError> {
let latest_tag = BlockId::Tag(BlockTag::Latest);
let block_id = if let Some(BlockInput { block }) = block_input {
block.into()
} else {
// if no block ID input, this eventually just subscribes the user to new blocks
latest_tag
};

mut starting_block_id: BlockId,
) -> Result<(u64, u64), ApiError> {
let starknet = self.api.starknet.lock().await;

// Convert pending to latest to prevent getting block_number = 0
// Info on 2024/11/12: Pending block_id shall be disallowed
starting_block_id = match starting_block_id {
BlockId::Tag(BlockTag::Pending) => BlockId::Tag(BlockTag::Latest),
other => other,
};

// checking the block's existence; aborted blocks treated as not found
let query_block = match starknet.get_block(&block_id) {
let query_block = match starknet.get_block(&starting_block_id) {
Ok(block) => match block.status() {
BlockStatus::Rejected => Err(ApiError::BlockNotFound),
_ => Ok(block),
Expand All @@ -68,27 +65,47 @@ impl JsonRpcHandler {
Err(other) => Err(ApiError::StarknetDevnetError(other)),
}?;

let latest_block = starknet.get_block(&latest_tag)?;
let latest_block = starknet.get_block(&BlockId::Tag(BlockTag::Latest))?;

let query_block_number = query_block.block_number().0;
let latest_block_number = latest_block.block_number().0;

let blocks_back_amount = if query_block_number > latest_block_number {
0
// safe to subtract, ensured by previous checks
if latest_block_number - query_block_number > 1024 {
return Err(ApiError::TooManyBlocksBack);
}

Ok((query_block_number, latest_block_number))
}

/// starknet_subscribeNewHeads
/// Checks if an optional block ID is provided. Validates that the block exists and is not too
/// many blocks in the past. If it is a valid block, the user is notified of all blocks from the
/// old up to the latest, and subscribed to new ones. If no block ID specified, the user is just
/// subscribed to new blocks.
async fn subscribe_new_heads(
&self,
block_input: Option<BlockInput>,
rpc_request_id: Id,
socket_id: SocketId,
) -> Result<(), ApiError> {
let block_id = if let Some(BlockInput { block }) = block_input {
block.into()
} else {
latest_block_number - query_block_number
// if no block ID input, this eventually just subscribes the user to new blocks
BlockId::Tag(BlockTag::Latest)
};

if blocks_back_amount > 1024 {
return Err(ApiError::TooManyBlocksBack);
}
let (query_block_number, latest_block_number) =
self.convert_to_block_number_range(block_id).await?;

// perform the actual subscription
let mut sockets = self.api.sockets.lock().await;
let socket_context = sockets.get_mut(&socket_id).ok_or(ApiError::StarknetDevnetError(
Error::UnexpectedInternalError { msg: format!("Unregistered socket ID: {socket_id}") },
))?;
let subscription_id = socket_context.subscribe(rpc_request_id).await;
let subscription_id =
socket_context.subscribe(rpc_request_id, Subscription::NewHeads).await;

if let BlockId::Tag(_) = block_id {
// if the specified block ID is a tag (i.e. latest/pending), no old block handling
Expand All @@ -97,15 +114,93 @@ impl JsonRpcHandler {

// Notifying of old blocks. latest_block_number inclusive?
// Yes, only if block_id != latest/pending (handled above)
let starknet = self.api.starknet.lock().await;
for block_n in query_block_number..=latest_block_number {
let old_block = starknet
.get_block(&BlockId::Number(block_n))
.map_err(ApiError::StarknetDevnetError)?;

let notification = SubscriptionNotification::NewHeadsNotification(old_block.into());
let old_header = old_block.into();
let notification = SubscriptionNotification::NewHeads(Box::new(old_header));
socket_context.notify(subscription_id, notification).await;
}

Ok(())
}

/// Based on block generation mode and specified block ID, decide on subscription's sensitivity:
/// notify of changes in pending or latest block
fn get_subscription_tag(&self, block_id: BlockId) -> BlockTag {
match self.starknet_config.block_generation_on {
BlockGenerationOn::Transaction => BlockTag::Latest,
BlockGenerationOn::Demand | BlockGenerationOn::Interval(_) => match block_id {
BlockId::Tag(tag) => tag,
BlockId::Hash(_) | BlockId::Number(_) => BlockTag::Pending,
},
}
}

async fn subscribe_tx_status(
&self,
transaction_block_input: TransactionBlockInput,
rpc_request_id: Id,
socket_id: SocketId,
) -> Result<(), ApiError> {
let TransactionBlockInput { transaction_hash, block } = transaction_block_input;

let query_block_id = if let Some(block_id) = block {
block_id.0
} else {
// if no block ID input, this eventually just subscribes the user to new blocks
BlockId::Tag(BlockTag::Latest)
};

let (query_block_number, latest_block_number) =
self.convert_to_block_number_range(query_block_id).await?;

// perform the actual subscription
let mut sockets = self.api.sockets.lock().await;
let socket_context = sockets.get_mut(&socket_id).ok_or(ApiError::StarknetDevnetError(
Error::UnexpectedInternalError { msg: format!("Unregistered socket ID: {socket_id}") },
))?;

// TODO if tx present, but in a block before the one specified, no point in subscribing -
// its status shall never change (unless considering block abortion). It would make
// sense to just add a ReorgSubscription
let subscription = Subscription::TransactionStatus {
tag: self.get_subscription_tag(query_block_id),
transaction_hash,
};
let subscription_id = socket_context.subscribe(rpc_request_id, subscription).await;

let starknet = self.api.starknet.lock().await;

if let Some(tx) = starknet.transactions.get(&transaction_hash) {
let notification = SubscriptionNotification::TransactionStatus(NewTransactionStatus {
transaction_hash,
status: tx.get_status(),
});
match tx.get_block_number() {
Some(BlockNumber(block_number))
if (query_block_number <= block_number
&& block_number <= latest_block_number
&& query_block_id != BlockId::Tag(BlockTag::Pending)) =>
{
// if the number of the block when the tx was added is between
// specified/query block number and latest, notify the client
socket_context.notify(subscription_id, notification).await;
}
None if query_block_id == BlockId::Tag(BlockTag::Pending) => {
// if tx stored but no block number, it means it's pending, so only notify
// if the specified block ID is pending
socket_context.notify(subscription_id, notification).await;
}
_ => tracing::debug!("Tx status subscription: tx not reachable"),
}
} else {
tracing::debug!("Tx status subscription: tx not yet received")
}

Ok(())
}
}
Loading

0 comments on commit a598805

Please sign in to comment.