Skip to content

Commit

Permalink
Merge pull request #970 from near/main
Browse files Browse the repository at this point in the history
Prod Release 06/08/24
  • Loading branch information
darunrs authored Aug 6, 2024
2 parents cc7fac4 + 3be5862 commit 3f95e60
Show file tree
Hide file tree
Showing 27 changed files with 1,988 additions and 720 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/coordinator-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ jobs:
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.75.0
override: true
profile: minimal
components: rustfmt
- name: Check
working-directory: ./coordinator
run: cargo check
Expand All @@ -33,6 +40,13 @@ jobs:
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.75.0
override: true
profile: minimal
components: rustfmt
- name: Test
working-directory: ./coordinator
run: cargo test
Expand Down
30 changes: 18 additions & 12 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,22 @@ impl BlockStream {
}
}

fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) -> JoinHandle<()> {
fn start_health_monitoring_task(
&self,
redis: Arc<RedisClient>,
start_block_height: near_indexer_primitives::types::BlockHeight,
) -> JoinHandle<()> {
tokio::spawn({
let config = self.indexer_config.clone();
let health = self.health.clone();
let redis_stream = self.redis_stream.clone();
let stalled_timeout_seconds = 120;

async move {
let mut last_processed_block =
redis.get_last_processed_block(&config).await.unwrap();

let mut last_processed_block = Some(start_block_height - 1);
loop {
tokio::time::sleep(std::time::Duration::from_secs(120)).await;
tokio::time::sleep(std::time::Duration::from_secs(stalled_timeout_seconds))
.await;

let new_last_processed_block =
if let Ok(block) = redis.get_last_processed_block(&config).await {
Expand Down Expand Up @@ -183,6 +187,11 @@ impl BlockStream {
health_lock.processing_state = ProcessingState::Waiting;
}
Ordering::Equal => {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"No block has been processed for {stalled_timeout_seconds} seconds"
);
health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Greater => {
Expand Down Expand Up @@ -266,7 +275,8 @@ impl BlockStream {

let cancellation_token = tokio_util::sync::CancellationToken::new();

let monitor_handle = self.start_health_monitoring_task(redis.clone());
let monitor_handle =
self.start_health_monitoring_task(redis.clone(), start_block_height.clone());

let stream_handle = self.start_block_stream_task(
start_block_height,
Expand Down Expand Up @@ -533,11 +543,7 @@ mod tests {
predicate::eq("near-lake-data-mainnet".to_string()),
predicate::eq("000091940840/block.json"),
)
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
"2023-12-09",
))
});
.returning(move |_, _| Ok(crate::test_utils::generate_block_with_date("2023-12-09")));

let mut mock_graphql_client = crate::graphql::client::GraphQLClient::default();

Expand Down Expand Up @@ -663,7 +669,7 @@ mod tests {
predicate::eq("000107503704/block.json"),
)
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
Ok(crate::test_utils::generate_block_with_date(
&chrono::Utc::now().format("%Y-%m-%d").to_string(),
))
});
Expand Down
22 changes: 15 additions & 7 deletions block-streamer/src/receiver_blocks/receiver_blocks_processor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context;
use async_stream::try_stream;
use chrono::{DateTime, Duration, TimeZone, Utc};
use chrono::{DateTime, Duration, TimeZone, Timelike, Utc};
use near_lake_framework::near_indexer_primitives;
use regex::Regex;

Expand Down Expand Up @@ -165,7 +165,15 @@ impl ReceiverBlocksProcessor {
try_stream! {
let start_date = self.get_nearest_block_date(start_block_height).await?;
let contract_pattern_type = ContractPatternType::from(contract_pattern.as_str());
let mut current_date = start_date;
let mut current_date = start_date
.with_hour(0)
.unwrap()
.with_minute(0)
.unwrap()
.with_second(0)
.unwrap()
.with_nanosecond(0)
.unwrap();

while current_date <= Utc::now() {
let base_64_bitmaps: Vec<Base64Bitmap> = self.query_base_64_bitmaps(&contract_pattern_type, &current_date).await?;
Expand Down Expand Up @@ -288,13 +296,13 @@ mod tests {
}

#[tokio::test]
async fn collect_block_heights_from_one_day() {
async fn collect_block_heights_from_today() {
let mut mock_s3_client = crate::s3_client::S3Client::default();
mock_s3_client
.expect_get_text_file()
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
&Utc::now().format("%Y-%m-%d").to_string(),
&Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
))
});

Expand Down Expand Up @@ -326,7 +334,7 @@ mod tests {
.expect_get_text_file()
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
&Utc::now().format("%Y-%m-%d").to_string(),
&Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
))
});

Expand Down Expand Up @@ -362,8 +370,8 @@ mod tests {
.expect_get_text_file()
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
&(Utc::now() - Duration::days(2))
.format("%Y-%m-%d")
&(Utc::now() - Duration::days(2) + Duration::minutes(10))
.format("%Y-%m-%dT%H:%M:%S")
.to_string(),
))
});
Expand Down
2 changes: 1 addition & 1 deletion block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ mod tests {
predicate::always(),
)
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
Ok(crate::test_utils::generate_block_with_date(
&chrono::Utc::now().format("%Y-%m-%d").to_string(),
))
});
Expand Down
7 changes: 6 additions & 1 deletion block-streamer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,16 @@ pub fn utc_date_time_from_date_string(date: &str) -> chrono::DateTime<chrono::Ut
chrono::TimeZone::from_utc_datetime(&chrono::Utc, &naive_date_time)
}

pub fn generate_block_with_timestamp(date: &str) -> String {
pub fn generate_block_with_date(date: &str) -> String {
let naive_date = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d")
.unwrap()
.and_hms_opt(0, 0, 0)
.unwrap();
return generate_block_with_timestamp(&naive_date.format("%Y-%m-%dT%H:%M:%S").to_string());
}

pub fn generate_block_with_timestamp(date: &str) -> String {
let naive_date = chrono::NaiveDateTime::parse_from_str(date, "%Y-%m-%dT%H:%M:%S").unwrap();

let date_time_utc = chrono::Utc.from_utc_datetime(&naive_date).timestamp() * 1_000_000_000;

Expand Down
Loading

0 comments on commit 3f95e60

Please sign in to comment.