Skip to content

Commit

Permalink
feat: Restart streams with mismatched versions
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Dec 19, 2023
1 parent b36b005 commit 3241d19
Showing 1 changed file with 72 additions and 0 deletions.
72 changes: 72 additions & 0 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ async fn synchronise_registry_config(
if active_block_stream.version == registry_version {
continue;
}

block_streams_handler
.stop(active_block_stream.stream_id)
.await?;
}

let start_block_height = if let Some(start_block_height) =
Expand Down Expand Up @@ -347,5 +351,73 @@ mod tests {
.await
.unwrap();
}

#[tokio::test]
async fn restarts_streams_with_mismatched_versions() {
let mut registry = Registry::default();
registry.expect_fetch().returning(|| {
Ok(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([(
"test".to_string(),
IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
code: String::new(),
schema: Some(String::new()),
filter: IndexerRule {
id: None,
name: None,
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
},
},
created_at_block_height: 101,
updated_at_block_height: Some(200),
start_block_height: Some(1000),
},
)]),
)]))
});

let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| anyhow::bail!("none"));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler.expect_list().returning(|| {
Ok(vec![block_streamer::StreamInfo {
stream_id: "stream_id".to_string(),
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
version: 101,
}])
});
block_stream_handler
.expect_stop()
.with(predicate::eq("stream_id".to_string()))
.returning(|_| Ok(()))
.once();
block_stream_handler
.expect_start()
.with(
predicate::eq(1000),
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(200),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _, _| Ok(()));

synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler)
.await
.unwrap();
}
}
}

0 comments on commit 3241d19

Please sign in to comment.