Skip to content

Commit

Permalink
refactor: Configure block stream redis stream from coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jan 8, 2024
1 parent f247ad7 commit 3dbc9ee
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 13 deletions.
1 change: 1 addition & 0 deletions block-streamer/examples/start_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
version: 0,
redis_stream: "morgs.near/test:block_stream".to_string(),
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "social.near".to_string(),
status: Status::Success.into(),
Expand Down
6 changes: 4 additions & 2 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ message StartStreamRequest {
string function_name = 3;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 4;
// Key of Redis Stream to publish blocks to
string redis_stream = 5;
// Filter rule to apply to incoming blocks
oneof rule {
ActionAnyRule action_any_rule = 5;
ActionFunctionCallRule action_function_call_rule = 6;
ActionAnyRule action_any_rule = 6;
ActionFunctionCallRule action_function_call_rule = 7;
}
}

Expand Down
20 changes: 14 additions & 6 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@ pub struct BlockStream {
pub indexer_config: IndexerConfig,
pub chain_id: ChainId,
pub version: u64,
pub redis_stream: String,
}

impl BlockStream {
pub fn new(indexer_config: IndexerConfig, chain_id: ChainId, version: u64) -> Self {
pub fn new(
indexer_config: IndexerConfig,
chain_id: ChainId,
version: u64,
redis_stream: String,
) -> Self {
Self {
task: None,
indexer_config,
chain_id,
version,
redis_stream,
}
}

Expand All @@ -48,6 +55,7 @@ impl BlockStream {

let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
let redis_stream = self.redis_stream.clone();

let handle = tokio::spawn(async move {
tokio::select! {
Expand All @@ -67,7 +75,8 @@ impl BlockStream {
delta_lake_client,
lake_s3_config,
&chain_id,
LAKE_PREFETCH_SIZE
LAKE_PREFETCH_SIZE,
redis_stream
) => {
result.map_err(|err| {
tracing::error!(
Expand Down Expand Up @@ -112,6 +121,7 @@ pub(crate) async fn start_block_stream(
lake_s3_config: aws_sdk_s3::Config,
chain_id: &ChainId,
lake_prefetch_size: usize,
redis_stream: String,
) -> anyhow::Result<()> {
tracing::info!(
account_id = indexer.account_id.as_str(),
Expand Down Expand Up @@ -161,10 +171,7 @@ pub(crate) async fn start_block_stream(
for block in &blocks_from_index {
let block = block.to_owned();
redis_client
.xadd(
crate::redis::generate_historical_stream_key(&indexer.get_full_name()),
&[("block_height".to_string(), block)],
)
.xadd(redis_stream.clone(), &[("block_height".to_string(), block)])
.await
.context("Failed to add block to Redis Stream")?;
redis_client
Expand Down Expand Up @@ -304,6 +311,7 @@ mod tests {
lake_s3_config,
&ChainId::Mainnet,
1,
"stream key".to_string(),
)
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
indexer_config.clone(),
self.chain_id.clone(),
request.version,
request.redis_stream,
);

block_stream
Expand Down Expand Up @@ -240,6 +241,7 @@ mod tests {
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
version: 0,
redis_stream: "stream".to_string(),
rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: 1,
Expand Down Expand Up @@ -273,6 +275,7 @@ mod tests {
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
version: 0,
redis_stream: "stream".to_string(),
rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: 1,
Expand Down
2 changes: 2 additions & 0 deletions coordinator/src/block_streams_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl BlockStreamsHandlerImpl {
account_id: String,
function_name: String,
version: u64,
redis_stream: String,
rule: registry_types::MatchingRule,
) -> anyhow::Result<()> {
let rule = match &rule {
Expand Down Expand Up @@ -93,6 +94,7 @@ impl BlockStreamsHandlerImpl {
account_id,
function_name,
version,
redis_stream,
rule: Some(rule),
});

Expand Down
13 changes: 9 additions & 4 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ async fn synchronise_block_streams(
indexer_config.account_id.to_string(),
indexer_config.function_name.clone(),
registry_version,
indexer_config.get_redis_stream(),
indexer_config.filter.matching_rule.clone(),
)
.await?;
Expand Down Expand Up @@ -386,12 +387,13 @@ mod tests {
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(1),
predicate::eq("morgs.near/test:block_stream".to_string()),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _, _| Ok(()));
.returning(|_, _, _, _, _, _| Ok(()));

synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler)
.await
Expand Down Expand Up @@ -439,12 +441,13 @@ mod tests {
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(200),
predicate::eq("morgs.near/test:block_stream".to_string()),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _, _| Ok(()));
.returning(|_, _, _, _, _, _| Ok(()));

synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler)
.await
Expand Down Expand Up @@ -492,12 +495,13 @@ mod tests {
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(101),
predicate::eq("morgs.near/test:block_stream".to_string()),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _, _| Ok(()));
.returning(|_, _, _, _, _, _| Ok(()));

synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler)
.await
Expand Down Expand Up @@ -635,12 +639,13 @@ mod tests {
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(200),
predicate::eq("morgs.near/test:block_stream".to_string()),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _, _| Ok(()));
.returning(|_, _, _, _, _, _| Ok(()));

synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler)
.await
Expand Down
1 change: 0 additions & 1 deletion runner/src/server/runner-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ function getRunnerService (StreamHandlerType: typeof StreamHandler): RunnerHandl
const response: ExecutorInfo__Output[] = [];
try {
streamHandlers.forEach((handler, executorId) => {
console.log({ streamHandlers, handler, executorId });
if (handler.indexerConfig?.account_id === undefined || handler.indexerConfig?.function_name === undefined) {
throw new Error(`Stream handler ${executorId} has no/invalid indexer config.`);
}
Expand Down

0 comments on commit 3dbc9ee

Please sign in to comment.