Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Create initial Coordinator V2 service #444

Merged
merged 46 commits into from
Jan 11, 2024
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8e6e95d
feat: Create crate for coordinator control service
morgsmccauley Dec 12, 2023
c2f857b
feat: Fetch and print registry contract
morgsmccauley Dec 12, 2023
dd26099
feat: Used shared registry types
morgsmccauley Dec 14, 2023
025b315
refactor: Extract registry fetch to own mod
morgsmccauley Dec 14, 2023
3e05d9a
refactor: Use structured logging
morgsmccauley Dec 17, 2023
4ba5296
feat: Start all streams with stubbed data
morgsmccauley Dec 18, 2023
126296d
feat: Start stream with filter specified in registry
morgsmccauley Dec 18, 2023
64d835b
feat: Add `update_at`/`created_at` fields to registry
morgsmccauley Dec 18, 2023
c91f814
chore: Temporarily mock registry
morgsmccauley Dec 18, 2023
b19c607
feat: Write `last_indexed_block` per block stream
morgsmccauley Dec 18, 2023
0db45b0
feat: Start block stream from desired height
morgsmccauley Dec 18, 2023
a849a1a
refactor: Abstract block stream handling away
morgsmccauley Dec 18, 2023
32ca6b7
refactor: Allow mocking of `Registry`
morgsmccauley Dec 18, 2023
34b21f7
refactor: Allow mocking of `BlockStreamHandler`
morgsmccauley Dec 18, 2023
574b629
test: Add unit test for control loop
morgsmccauley Dec 18, 2023
b8cbcee
refactor: Rename for clarity
morgsmccauley Dec 18, 2023
19fe056
test: Test starting of block streams
morgsmccauley Dec 18, 2023
dd4f587
feat: Add `version` to block streams
morgsmccauley Dec 18, 2023
d011a22
feat: Write block stream version from coordinator
morgsmccauley Dec 18, 2023
bb157e7
feat: Stop streams not in registry
morgsmccauley Dec 19, 2023
fbc71d5
feat: Ignores streams with matching versions
morgsmccauley Dec 19, 2023
b803a03
feat: Restart streams with mismatched versions
morgsmccauley Dec 19, 2023
e0c3714
refactor: Avoid `.clone()`
morgsmccauley Dec 21, 2023
09cca2b
feat: Add support for `ActionFunctionCallRule`
morgsmccauley Dec 21, 2023
124cec9
feat: Log block stream requests
morgsmccauley Dec 21, 2023
a0568ed
fix: Map correct status values
morgsmccauley Dec 21, 2023
b70b25d
feat: Skip historical/delta lake processing for function/event rules
morgsmccauley Dec 21, 2023
4a81033
chore: Pin `near-lake-framework` to `0.7.5`
morgsmccauley Dec 21, 2023
dbdc1df
feat: Continuously loop registry config synchronization
morgsmccauley Dec 21, 2023
b2d8720
chore: Remove stubbed registry contract
morgsmccauley Dec 21, 2023
6e89697
chore: Use `near-primitives` feature for `registry-types`
morgsmccauley Jan 3, 2024
67e60fd
refactor: Rename `synchronise_registry_config` -> `synchronise_block_…
morgsmccauley Jan 7, 2024
83a2d92
feat: Start executors
morgsmccauley Jan 7, 2024
2977df0
feat: Add error context
morgsmccauley Jan 7, 2024
20b0fb4
feat: Restart executors with mismatched versions
morgsmccauley Jan 8, 2024
73609d8
feat: Stop executors not in registry
morgsmccauley Jan 8, 2024
a9cf247
refactor: Configure block stream redis stream from coordinator
morgsmccauley Jan 8, 2024
240ba3d
fix: Enable (hack) mocking by pinning lake framework version
morgsmccauley Jan 8, 2024
165111f
ci: Add checks for Coordinator
morgsmccauley Jan 8, 2024
62a3334
refactor: Rename to `last_published_block` as it has not been executed
morgsmccauley Jan 8, 2024
d7aa296
chore: Remove "historical" name references from logging
morgsmccauley Jan 11, 2024
e373d4c
fix: Publish blocks to correct redis stream
morgsmccauley Jan 11, 2024
abc7272
chore: Fix spelling
morgsmccauley Jan 11, 2024
762a078
fixup! fix: Publish blocks to correct redis stream
morgsmccauley Jan 11, 2024
d250f1f
refactor: Rename for clarity
morgsmccauley Jan 11, 2024
46745d2
fix: Fix rate limiting on protoc setup
morgsmccauley Jan 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test: Test starting of block streams
morgsmccauley committed Jan 8, 2024
commit 19fe056ce342e5f0c71759d122b690fa899f520e
203 changes: 163 additions & 40 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
@@ -62,53 +62,176 @@ async fn synchronise_registry_config(
mod tests {
use super::*;

use mockall::predicate;
use std::collections::HashMap;

use registry_types::{IndexerRule, IndexerRuleKind, MatchingRule, Status};

use crate::registry::IndexerConfig;

#[tokio::test]
async fn something() {
let mut registry = Registry::default();
registry.expect_fetch().returning(|| {
Ok(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([(
"test".to_string(),
IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
code: String::new(),
schema: Some(String::new()),
filter: IndexerRule {
id: None,
name: None,
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
mod block_stream {
use super::*;

#[tokio::test]
async fn uses_start_block_height_when_set() {
let mut registry = Registry::default();
registry.expect_fetch().returning(|| {
Ok(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([(
"test".to_string(),
IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
code: String::new(),
schema: Some(String::new()),
filter: IndexerRule {
id: None,
name: None,
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
},
},
created_at_block_height: 1,
updated_at_block_height: None,
start_block_height: Some(100),
},
created_at_block_height: 1,
updated_at_block_height: None,
start_block_height: None,
},
)]),
)]))
});

let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| Ok(1));

let mut block_stream_handler = BlockStreamHandler::default();
block_stream_handler
.expect_start()
.returning(|_, _, _, _| Ok(()));

let _ =
synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler).await;
)]),
)]))
});

let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| anyhow::bail!("none"));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler
.expect_start()
.with(
predicate::eq(100),
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _| Ok(()));

synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler)
.await
.unwrap();
}

#[tokio::test]
async fn uses_updated_at_when_no_start_block_height() {
let mut registry = Registry::default();
registry.expect_fetch().returning(|| {
Ok(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([(
"test".to_string(),
IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
code: String::new(),
schema: Some(String::new()),
filter: IndexerRule {
id: None,
name: None,
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
},
},
created_at_block_height: 101,
updated_at_block_height: Some(200),
start_block_height: None,
},
)]),
)]))
});

let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| anyhow::bail!("none"));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler
.expect_start()
.with(
predicate::eq(200),
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _| Ok(()));

synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler)
.await
.unwrap();
}

#[tokio::test]
async fn uses_created_at_when_no_updated_at_block_height() {
let mut registry = Registry::default();
registry.expect_fetch().returning(|| {
Ok(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([(
"test".to_string(),
IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
code: String::new(),
schema: Some(String::new()),
filter: IndexerRule {
id: None,
name: None,
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
},
},
created_at_block_height: 101,
updated_at_block_height: None,
start_block_height: None,
},
)]),
)]))
});

let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| anyhow::bail!("none"));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler
.expect_start()
.with(
predicate::eq(101),
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _| Ok(()));

synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler)
.await
.unwrap();
}
}
}
6 changes: 4 additions & 2 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt::Debug;

use redis::{aio::ConnectionManager, FromRedisValue, RedisError, ToRedisArgs};
pub use redis::RedisError;
use redis::{aio::ConnectionManager, FromRedisValue, ToRedisArgs};

#[cfg(test)]
pub use MockRedisClientImpl as RedisClient;
@@ -21,7 +22,7 @@ impl RedisClientImpl {
Ok(Self { connection })
}

pub async fn get<T, U>(&self, key: T) -> Result<U, RedisError>
pub async fn get<T, U>(&self, key: T) -> anyhow::Result<U>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
U: FromRedisValue + Send + Sync + 'static,
@@ -32,5 +33,6 @@ impl RedisClientImpl {
.arg(key)
.query_async(&mut self.connection.clone())
.await
.map_err(|e| e.into())
}
}