Skip to content

Commit

Permalink
feat: Remove allowlist from Coordinator V2 (#570)
Browse files Browse the repository at this point in the history
Now that both dev/prod have been completely migrated, we can remove the
`allowlist`. This makes the V2 architecture the default.

We don't need to remove the `denylist` from Coordinator V2, that process
has already been stopped.
  • Loading branch information
morgsmccauley authored Feb 21, 2024
1 parent 345e15a commit 89e96e6
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 605 deletions.
68 changes: 4 additions & 64 deletions coordinator/src/block_streams/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::cmp::Ordering;
use registry_types::StartBlock;

use crate::indexer_config::IndexerConfig;
use crate::migration::MIGRATED_STREAM_VERSION;
use crate::redis::RedisClient;
use crate::registry::IndexerRegistry;

Expand Down Expand Up @@ -105,10 +104,8 @@ async fn synchronise_block_stream(
Ok(())
}

#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
enum StreamStatus {
/// Stream has just been migrated to V2
Migrated,
/// Stream version is synchronized with the registry
Synced,
/// Stream version does not match registry
Expand All @@ -129,10 +126,6 @@ async fn get_stream_status(

let stream_version = stream_version.unwrap();

if stream_version == MIGRATED_STREAM_VERSION {
return Ok(StreamStatus::Migrated);
}

match indexer_config.get_registry_version().cmp(&stream_version) {
Ordering::Equal => Ok(StreamStatus::Synced),
Ordering::Greater => Ok(StreamStatus::Outdated),
Expand All @@ -149,10 +142,8 @@ async fn clear_block_stream_if_needed(
indexer_config: &IndexerConfig,
redis_client: &RedisClient,
) -> anyhow::Result<()> {
if matches!(
stream_status,
StreamStatus::Migrated | StreamStatus::Synced | StreamStatus::New
) || indexer_config.start_block == StartBlock::Continue
if matches!(stream_status, StreamStatus::Synced | StreamStatus::New)
|| indexer_config.start_block == StartBlock::Continue
{
return Ok(());
}
Expand All @@ -167,7 +158,7 @@ async fn determine_start_block_height(
indexer_config: &IndexerConfig,
redis_client: &RedisClient,
) -> anyhow::Result<u64> {
if matches!(stream_status, StreamStatus::Migrated | StreamStatus::Synced) {
if stream_status == &StreamStatus::Synced {
tracing::info!("Resuming block stream");

return get_continuation_block_height(indexer_config, redis_client).await;
Expand Down Expand Up @@ -536,57 +527,6 @@ mod tests {
.unwrap();
}

#[tokio::test]
async fn resumes_stream_post_migration() {
let indexer_config = IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
code: String::new(),
schema: String::new(),
rule: Rule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
},
created_at_block_height: 101,
updated_at_block_height: Some(200),
start_block: StartBlock::Height(1000),
};
let indexer_registry = HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);

let mut redis_client = RedisClient::default();
redis_client
.expect_get_stream_version()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(Some(MIGRATED_STREAM_VERSION)))
.once();
redis_client
.expect_get_last_published_block()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(Some(100)))
.once();
redis_client
.expect_set_stream_version()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(()))
.once();

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler.expect_list().returning(|| Ok(vec![]));
block_stream_handler.expect_stop().never();
block_stream_handler
.expect_start()
.with(predicate::eq(101), predicate::eq(indexer_config))
.returning(|_, _| Ok(()))
.once();

synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler)
.await
.unwrap();
}

#[tokio::test]
async fn does_not_start_stream_without_last_published_block() {
let indexer_config = IndexerConfig {
Expand Down
14 changes: 0 additions & 14 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::registry::Registry;
mod block_streams;
mod executors;
mod indexer_config;
mod migration;
mod redis;
mod registry;
mod utils;
Expand Down Expand Up @@ -53,19 +52,6 @@ async fn main() -> anyhow::Result<()> {
loop {
let indexer_registry = registry.fetch().await?;

let allowlist = migration::fetch_allowlist(&redis_client).await?;

migration::migrate_pending_accounts(
&indexer_registry,
&allowlist,
&redis_client,
&executors_handler,
)
.await?;

let indexer_registry =
migration::filter_registry_by_allowlist(indexer_registry, &allowlist).await?;

tokio::try_join!(
synchronise_executors(&indexer_registry, &executors_handler),
synchronise_block_streams(&indexer_registry, &redis_client, &block_streams_handler),
Expand Down
Loading

0 comments on commit 89e96e6

Please sign in to comment.