Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
telezhnaya committed Nov 5, 2021
1 parent aa3f60e commit c079cd3
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 68 deletions.
28 changes: 12 additions & 16 deletions src/db_adapters/assets/non_fungible_token_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use actix_diesel::dsl::AsyncRunQueryDsl;
use actix_diesel::{AsyncError, Database};
use bigdecimal::BigDecimal;
use diesel::PgConnection;
use tracing::{error, info, warn};
use tracing::warn;

use super::nft_types;
use crate::models;
Expand All @@ -13,7 +13,8 @@ pub(crate) async fn store_nft(
streamer_message: &near_indexer::StreamerMessage,
) -> anyhow::Result<()> {
for shard in &streamer_message.shards {
collect_and_store_nft_events(&pool, &shard, &streamer_message.block.header.timestamp).await?;
collect_and_store_nft_events(&pool, &shard, &streamer_message.block.header.timestamp)
.await?;
}
Ok(())
}
Expand All @@ -39,19 +40,17 @@ async fn collect_and_store_nft_events(
10,
"NonFungibleTokenEvent were adding to database".to_string(),
&nft_events,
&is_error_fully_handled
&is_error_handled
);
}
Ok(())
}

async fn is_error_fully_handled(async_error: &AsyncError<diesel::result::Error>) -> bool {
if let actix_diesel::AsyncError::Execute(
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
ref error_info,
),
) = *async_error
async fn is_error_handled(async_error: &AsyncError<diesel::result::Error>) -> bool {
if let actix_diesel::AsyncError::Execute(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
ref error_info,
)) = *async_error
{
let duplicate_constraint = "assets__non_fungible_token_events_pkey";
let broken_data_constraint = "assets__non_fungible_token_events_unique";
Expand All @@ -61,13 +60,12 @@ async fn is_error_fully_handled(async_error: &AsyncError<diesel::result::Error>)
return true;
} else if constraint_name == broken_data_constraint {
warn!(
target: crate::INDEXER_FOR_EXPLORER,
"NFT: data inconsistency is found"
);
target: crate::INDEXER_FOR_EXPLORER,
"NFT: data inconsistency is found"
);
}
}

// TODO recheck all info/warn/err usages, check that it's added to the contributing guide
return false;
}

Expand Down Expand Up @@ -104,7 +102,6 @@ fn collect_nft_events(
}
};

// TODO get rid of unwrap
let nft_types::NearEvent::Nep171(nep171_event) = event;
event_logs.push(nep171_event);
}
Expand All @@ -119,7 +116,6 @@ fn collect_nft_events(
for token_id in mint_event.token_ids {
nft_events.push(
models::assets::non_fungible_token_events::NonFungibleTokenEvent {
// TODO get rid of other to_string() methods
emitted_for_receipt_id: outcome.receipt.receipt_id.to_string(),
emitted_at_block_timestamp: BigDecimal::from(*block_timestamp),
emitted_in_shard_id: BigDecimal::from(*shard_id),
Expand Down
3 changes: 2 additions & 1 deletion src/db_adapters/execution_outcomes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pub async fn store_execution_outcomes_for_chunk(
10,
"Parent Receipt for ExecutionOutcome was fetched".to_string(),
&execution_outcomes
).unwrap()
)
.unwrap_or_default()
.into_iter()
.collect();

Expand Down
102 changes: 55 additions & 47 deletions src/db_adapters/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,35 +183,39 @@ async fn find_tx_hashes_for_receipts(
});
}

let tx_hashes_for_receipts_via_outcomes: Vec<(String, String)> = crate::await_retry_or_panic!(
schema::execution_outcome_receipts::table
.inner_join(
schema::receipts::table
.on(schema::execution_outcome_receipts::dsl::executed_receipt_id
.eq(schema::receipts::dsl::receipt_id)),
)
.filter(
schema::execution_outcome_receipts::dsl::produced_receipt_id.eq(any(receipts
.clone()
.iter()
.filter(|r| {
matches!(
let tx_hashes_for_receipts_via_outcomes: Vec<(String, String)> =
crate::await_retry_or_panic!(
schema::execution_outcome_receipts::table
.inner_join(
schema::receipts::table
.on(schema::execution_outcome_receipts::dsl::executed_receipt_id
.eq(schema::receipts::dsl::receipt_id)),
)
.filter(
schema::execution_outcome_receipts::dsl::produced_receipt_id.eq(any(
receipts
.clone()
.iter()
.filter(|r| {
matches!(
r.receipt,
near_indexer::near_primitives::views::ReceiptEnumView::Action { .. }
)
})
.map(|r| r.receipt_id.to_string())
.collect::<Vec<String>>())),
)
.select((
schema::execution_outcome_receipts::dsl::produced_receipt_id,
schema::receipts::dsl::originated_from_transaction_hash,
))
.load_async::<(String, String)>(&pool),
10,
"Parent Transaction for Receipts were fetched".to_string(),
&receipts
).unwrap();
})
.map(|r| r.receipt_id.to_string())
.collect::<Vec<String>>()
)),
)
.select((
schema::execution_outcome_receipts::dsl::produced_receipt_id,
schema::receipts::dsl::originated_from_transaction_hash,
))
.load_async::<(String, String)>(&pool),
10,
"Parent Transaction for Receipts were fetched".to_string(),
&receipts
)
.unwrap_or_default();

let found_hashes_len = tx_hashes_for_receipts_via_outcomes.len();
tx_hashes_for_receipts.extend(tx_hashes_for_receipts_via_outcomes);
Expand All @@ -223,30 +227,34 @@ async fn find_tx_hashes_for_receipts(
receipts
.retain(|r| !tx_hashes_for_receipts.contains_key(r.receipt_id.to_string().as_str()));

let tx_hashes_for_receipt_via_transactions: Vec<(String, String)> = crate::await_retry_or_panic!(
schema::transactions::table
.filter(
schema::transactions::dsl::converted_into_receipt_id.eq(any(receipts
.clone()
.iter()
.filter(|r| {
matches!(
let tx_hashes_for_receipt_via_transactions: Vec<(String, String)> =
crate::await_retry_or_panic!(
schema::transactions::table
.filter(
schema::transactions::dsl::converted_into_receipt_id.eq(any(
receipts
.clone()
.iter()
.filter(|r| {
matches!(
r.receipt,
near_indexer::near_primitives::views::ReceiptEnumView::Action { .. }
)
})
.map(|r| r.receipt_id.to_string())
.collect::<Vec<String>>())),
)
.select((
schema::transactions::dsl::converted_into_receipt_id,
schema::transactions::dsl::transaction_hash,
))
.load_async::<(String, String)>(&pool),
10,
"Parent Transaction for ExecutionOutcome were fetched".to_string(),
&receipts
).unwrap();
})
.map(|r| r.receipt_id.to_string())
.collect::<Vec<String>>()
)),
)
.select((
schema::transactions::dsl::converted_into_receipt_id,
schema::transactions::dsl::transaction_hash,
))
.load_async::<(String, String)>(&pool),
10,
"Parent Transaction for ExecutionOutcome were fetched".to_string(),
&receipts
)
.unwrap_or_default();

let found_hashes_len = tx_hashes_for_receipt_via_transactions.len();
tx_hashes_for_receipts.extend(tx_hashes_for_receipt_via_transactions);
Expand Down
9 changes: 7 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ async fn handle_message(
};

// Assets (NFT)
let nft_events_future = db_adapters::assets::non_fungible_token_events::store_nft(&pool, &streamer_message);
let nft_events_future =
db_adapters::assets::non_fungible_token_events::store_nft(&pool, &streamer_message);

if strict_mode {
// AccessKeys
Expand Down Expand Up @@ -119,7 +120,11 @@ async fn handle_message(
account_changes_future,
)?;
} else {
try_join!(execution_outcomes_future, accounts_future, nft_events_future)?;
try_join!(
execution_outcomes_future,
accounts_future,
nft_events_future
)?;
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/retriable.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[macro_export]
macro_rules! await_retry_or_panic {
($query: expr, $number_of_retries: expr, $error_message: expr, $debug_structs: expr $(, $error_handling_fn:expr)?) => {
($query: expr, $number_of_retries: expr, $error_message: expr, $debug_structs: expr $(, $is_error_handled:expr)? $(,)?) => {
{
let mut interval = crate::INTERVAL;
let mut retry_attempt = 0usize;
Expand All @@ -18,7 +18,7 @@ macro_rules! await_retry_or_panic {
match $query.await {
Ok(res) => break Some(res),
Err(async_error) => {
$(if $error_handling_fn(&async_error).await {
$(if $is_error_handled(&async_error).await {
break None;
})?

Expand Down

0 comments on commit c079cd3

Please sign in to comment.