Skip to content

Commit

Permalink
refactor(indexer): Optimize delayed local receipts tracking in Indexe…
Browse files Browse the repository at this point in the history
…r Framework (#4373)

* refactor(indexer): Optimize delayed local receipts tracking in Indexer Framework

* update Cargo.lock
  • Loading branch information
khorolets authored Jun 16, 2021
1 parent f9a3d75 commit 7e24d5d
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions chain/indexer/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.9.2

* Optimize the delayed receipts tracking process introduced in previous version to avoid indexer stuck.

## 0.9.1

* Introduce a hot-fix. Execution outcome for local receipt might appear not in the same block as the receipt. Local receipts are not saved in database and unable to be fetched. To include a receipt in `IndexerExecutionOutcomeWithReceipt` and prevent NEAR Indexer Framework from panic we fetch previous blocks to find corresponding local receipt to include.
Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "near-indexer"
version = "0.9.1"
version = "0.9.2"
authors = ["Near Inc <hello@nearprotocol.com>"]
edition = "2018"

Expand Down
79 changes: 62 additions & 17 deletions chain/indexer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ async fn build_streamer_message(
let receipt = if let Some(receipt) = receipt {
receipt
} else {
// TODO(#4317): optimize it https://github.com/near/nearcore/issues/4317
// Receipt might be missing only in case of delayed local receipt
// that appeared in some of the previous blocks
// we will be iterating over previous blocks until we found the receipt
Expand All @@ -141,23 +140,13 @@ async fn build_streamer_message(

prev_block_hash = prev_block.header.prev_hash;

let streamer_message = match build_streamer_message(&client, prev_block).await {
Ok(response) => response,
Err(err) => {
panic!("Unable to build streamer message for previous block: {:?}", err)
}
};
for shard in streamer_message.shards {
if let Some(chunk) = shard.chunk {
if let Some(receipt) = chunk
.receipts
.into_iter()
.find(|rec| rec.receipt_id == execution_outcome.id)
{
break 'find_local_receipt receipt;
}
}
if let Some(receipt) =
find_local_receipt_by_id_in_block(&client, prev_block, execution_outcome.id)
.await?
{
break 'find_local_receipt receipt;
}

prev_block_tried += 1;
}
};
Expand Down Expand Up @@ -192,6 +181,62 @@ async fn build_streamer_message(
Ok(StreamerMessage { block, shards: indexer_shards, state_changes })
}

/// Function that tries to find specific local receipt by it's ID and returns it
/// otherwise returns None
async fn find_local_receipt_by_id_in_block(
client: &Addr<near_client::ViewClientActor>,
block: views::BlockView,
receipt_id: near_primitives::hash::CryptoHash,
) -> Result<Option<views::ReceiptView>, FailedToFetchData> {
let chunks_to_fetch = block
.chunks
.iter()
.filter_map(|c| {
if c.height_included == block.header.height {
Some(c.chunk_hash)
} else {
None
}
})
.collect::<Vec<_>>();
let chunks = fetch_chunks(&client, chunks_to_fetch).await?;
let protocol_config_view = fetch_protocol_config(&client, block.header.hash).await?;

let mut shards_outcomes = fetch_outcomes(&client, block.header.hash).await?;

for chunk in chunks {
let views::ChunkView { header, transactions, .. } = chunk;

let outcomes = shards_outcomes
.remove(&header.shard_id)
.expect("Execution outcomes for given shard should be present");

if let Some((transaction, outcome)) =
transactions.into_iter().zip(outcomes.into_iter()).find(|(_, outcome)| {
outcome
.execution_outcome
.outcome
.receipt_ids
.first()
.expect("The transaction ExecutionOutcome should have one receipt id in vec")
== &receipt_id
})
{
let indexer_transaction = IndexerTransactionWithOutcome { transaction, outcome };
let local_receipts = convert_transactions_sir_into_local_receipts(
&client,
&protocol_config_view,
vec![&indexer_transaction],
&block,
)
.await?;

return Ok(local_receipts.into_iter().next());
}
}
Ok(None)
}

/// Function that starts Streamer's busy loop. Every half a seconds it fetches the status
/// compares to already fetched block height and in case it differs fetches new block of given height.
///
Expand Down

0 comments on commit 7e24d5d

Please sign in to comment.