Skip to content

Commit

Permalink
Merge pull request #574 from near/main
Browse files Browse the repository at this point in the history
Prod Release 21/02/24
  • Loading branch information
morgsmccauley authored Feb 22, 2024
2 parents 895b761 + 92d5526 commit 5a25fe3
Show file tree
Hide file tree
Showing 33 changed files with 1,756 additions and 1,669 deletions.
2 changes: 1 addition & 1 deletion block-streamer/examples/list_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use block_streamer::ListStreamsRequest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?;
let mut client = BlockStreamerClient::connect("http://0.0.0.0:8002").await?;

let response = client
.list_streams(Request::new(ListStreamsRequest {}))
Expand Down
2 changes: 1 addition & 1 deletion block-streamer/examples/start_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use block_streamer::{start_stream_request::Rule, ActionAnyRule, StartStreamReque

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?;
let mut client = BlockStreamerClient::connect("http://0.0.0.0:8002").await?;

let response = client
.start_stream(Request::new(StartStreamRequest {
Expand Down
2 changes: 1 addition & 1 deletion block-streamer/examples/stop_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use block_streamer::StopStreamRequest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?;
let mut client = BlockStreamerClient::connect("http://0.0.0.0:8002").await?;

let response = client
.stop_stream(Request::new(StopStreamRequest {
Expand Down
200 changes: 200 additions & 0 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use registry_types::Rule;
/// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but
/// we need this configurable for testing purposes.
const LAKE_PREFETCH_SIZE: usize = 100;
const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"];

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
Expand Down Expand Up @@ -184,6 +185,16 @@ async fn process_delta_lake_blocks(
affected_account_id,
..
} => {
if affected_account_id
.split(",")
.any(|account_id| DELTA_LAKE_SKIP_ACCOUNTS.contains(&account_id.trim()))
{
tracing::debug!(
"Skipping fetching index files from delta lake due to wildcard contract filter present in {}",
affected_account_id
);
return Ok(start_block_height);
}
tracing::debug!(
"Fetching block heights starting from {} from delta lake",
start_block_height,
Expand Down Expand Up @@ -358,4 +369,193 @@ mod tests {
.await
.unwrap();
}

#[tokio::test]
async fn skips_delta_lake_for_star_filter() {
let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default();
mock_delta_lake_client
.expect_get_latest_block_metadata()
.returning(|| {
Ok(crate::delta_lake_client::LatestBlockMetadata {
last_indexed_block: "107503700".to_string(),
processed_at_utc: "".to_string(),
first_indexed_block: "".to_string(),
last_indexed_block_date: "".to_string(),
first_indexed_block_date: "".to_string(),
})
});
mock_delta_lake_client
.expect_list_matching_block_heights()
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields));
Ok(())
})
.times(2);
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields[0].1));
Ok(())
})
.times(2);

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
"morgs.near".to_string(),
)
.unwrap(),
function_name: "test".to_string(),
rule: registry_types::Rule::ActionAny {
affected_account_id: "*".to_string(),
status: registry_types::Status::Success,
},
};

start_block_stream(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_config,
&ChainId::Mainnet,
1,
"stream key".to_string(),
)
.await
.unwrap();
}

#[tokio::test]
async fn skips_delta_lake_for_multiple_star_filter() {
let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default();
mock_delta_lake_client
.expect_get_latest_block_metadata()
.returning(|| {
Ok(crate::delta_lake_client::LatestBlockMetadata {
last_indexed_block: "107503700".to_string(),
processed_at_utc: "".to_string(),
first_indexed_block: "".to_string(),
last_indexed_block_date: "".to_string(),
first_indexed_block_date: "".to_string(),
})
});
mock_delta_lake_client
.expect_list_matching_block_heights()
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields));
Ok(())
})
.times(2);
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields[0].1));
Ok(())
})
.times(2);

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
"morgs.near".to_string(),
)
.unwrap(),
function_name: "test".to_string(),
rule: registry_types::Rule::ActionAny {
affected_account_id: "*, *.tg".to_string(),
status: registry_types::Status::Success,
},
};

start_block_stream(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_config,
&ChainId::Mainnet,
1,
"stream key".to_string(),
)
.await
.unwrap();
}

#[tokio::test]
async fn skips_delta_lake_for_star_filter_after_normal_account() {
let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default();
mock_delta_lake_client
.expect_get_latest_block_metadata()
.returning(|| {
Ok(crate::delta_lake_client::LatestBlockMetadata {
last_indexed_block: "107503700".to_string(),
processed_at_utc: "".to_string(),
first_indexed_block: "".to_string(),
last_indexed_block_date: "".to_string(),
first_indexed_block_date: "".to_string(),
})
});
mock_delta_lake_client
.expect_list_matching_block_heights()
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields));
Ok(())
})
.times(2);
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields[0].1));
Ok(())
})
.times(2);

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
"morgs.near".to_string(),
)
.unwrap(),
function_name: "test".to_string(),
rule: registry_types::Rule::ActionAny {
affected_account_id: "someone.near, *.kaiching".to_string(),
status: registry_types::Status::Success,
},
};

start_block_stream(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_config,
&ChainId::Mainnet,
1,
"stream key".to_string(),
)
.await
.unwrap();
}
}
68 changes: 4 additions & 64 deletions coordinator/src/block_streams/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::cmp::Ordering;
use registry_types::StartBlock;

use crate::indexer_config::IndexerConfig;
use crate::migration::MIGRATED_STREAM_VERSION;
use crate::redis::RedisClient;
use crate::registry::IndexerRegistry;

Expand Down Expand Up @@ -105,10 +104,8 @@ async fn synchronise_block_stream(
Ok(())
}

#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
enum StreamStatus {
/// Stream has just been migrated to V2
Migrated,
/// Stream version is synchronized with the registry
Synced,
/// Stream version does not match registry
Expand All @@ -129,10 +126,6 @@ async fn get_stream_status(

let stream_version = stream_version.unwrap();

if stream_version == MIGRATED_STREAM_VERSION {
return Ok(StreamStatus::Migrated);
}

match indexer_config.get_registry_version().cmp(&stream_version) {
Ordering::Equal => Ok(StreamStatus::Synced),
Ordering::Greater => Ok(StreamStatus::Outdated),
Expand All @@ -149,10 +142,8 @@ async fn clear_block_stream_if_needed(
indexer_config: &IndexerConfig,
redis_client: &RedisClient,
) -> anyhow::Result<()> {
if matches!(
stream_status,
StreamStatus::Migrated | StreamStatus::Synced | StreamStatus::New
) || indexer_config.start_block == StartBlock::Continue
if matches!(stream_status, StreamStatus::Synced | StreamStatus::New)
|| indexer_config.start_block == StartBlock::Continue
{
return Ok(());
}
Expand All @@ -167,7 +158,7 @@ async fn determine_start_block_height(
indexer_config: &IndexerConfig,
redis_client: &RedisClient,
) -> anyhow::Result<u64> {
if matches!(stream_status, StreamStatus::Migrated | StreamStatus::Synced) {
if stream_status == &StreamStatus::Synced {
tracing::info!("Resuming block stream");

return get_continuation_block_height(indexer_config, redis_client).await;
Expand Down Expand Up @@ -536,57 +527,6 @@ mod tests {
.unwrap();
}

#[tokio::test]
async fn resumes_stream_post_migration() {
let indexer_config = IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
code: String::new(),
schema: String::new(),
rule: Rule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
},
created_at_block_height: 101,
updated_at_block_height: Some(200),
start_block: StartBlock::Height(1000),
};
let indexer_registry = HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);

let mut redis_client = RedisClient::default();
redis_client
.expect_get_stream_version()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(Some(MIGRATED_STREAM_VERSION)))
.once();
redis_client
.expect_get_last_published_block()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(Some(100)))
.once();
redis_client
.expect_set_stream_version()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(()))
.once();

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler.expect_list().returning(|| Ok(vec![]));
block_stream_handler.expect_stop().never();
block_stream_handler
.expect_start()
.with(predicate::eq(101), predicate::eq(indexer_config))
.returning(|_, _| Ok(()))
.once();

synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler)
.await
.unwrap();
}

#[tokio::test]
async fn does_not_start_stream_without_last_published_block() {
let indexer_config = IndexerConfig {
Expand Down
14 changes: 0 additions & 14 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::registry::Registry;
mod block_streams;
mod executors;
mod indexer_config;
mod migration;
mod redis;
mod registry;
mod utils;
Expand Down Expand Up @@ -53,19 +52,6 @@ async fn main() -> anyhow::Result<()> {
loop {
let indexer_registry = registry.fetch().await?;

let allowlist = migration::fetch_allowlist(&redis_client).await?;

migration::migrate_pending_accounts(
&indexer_registry,
&allowlist,
&redis_client,
&executors_handler,
)
.await?;

let indexer_registry =
migration::filter_registry_by_allowlist(indexer_registry, &allowlist).await?;

tokio::try_join!(
synchronise_executors(&indexer_registry, &executors_handler),
synchronise_block_streams(&indexer_registry, &redis_client, &block_streams_handler),
Expand Down
Loading

0 comments on commit 5a25fe3

Please sign in to comment.