From 7cfe7ad06c0c119000d280a6ae03ac08929861b2 Mon Sep 17 00:00:00 2001 From: Josh Stevens Date: Thu, 30 Jan 2025 17:18:07 +0000 Subject: [PATCH] format --- core/src/manifest/stream.rs | 4 ++-- core/src/streams/clients.rs | 20 +++++++++++--------- core/src/streams/mod.rs | 3 +-- core/src/streams/redis.rs | 37 +++++++++++++++++++++++-------------- 4 files changed, 37 insertions(+), 27 deletions(-) diff --git a/core/src/manifest/stream.rs b/core/src/manifest/stream.rs index bedff37b..27d3bdf5 100644 --- a/core/src/manifest/stream.rs +++ b/core/src/manifest/stream.rs @@ -42,7 +42,7 @@ pub struct RedisStreamConfig { pub connection_uri: String, #[serde(default = "default_pool_size")] pub max_pool_size: u32, - pub streams: Vec + pub streams: Vec, } fn default_pool_size() -> u32 { @@ -167,7 +167,7 @@ pub struct StreamsConfig { pub kafka: Option, #[serde(default, skip_serializing_if = "Option::is_none")] - pub redis: Option + pub redis: Option, } impl StreamsConfig { diff --git a/core/src/streams/clients.rs b/core/src/streams/clients.rs index 9f2b1c92..6b2651cb 100644 --- a/core/src/streams/clients.rs +++ b/core/src/streams/clients.rs @@ -14,11 +14,12 @@ use crate::{ event::{filter_event_data_by_conditions, EventMessage}, manifest::stream::{ KafkaStreamConfig, KafkaStreamQueueConfig, RabbitMQStreamConfig, RabbitMQStreamQueueConfig, - SNSStreamTopicConfig, StreamEvent, StreamsConfig, WebhookStreamConfig, RedisStreamConfig, RedisStreamStreamConfig + RedisStreamConfig, RedisStreamStreamConfig, SNSStreamTopicConfig, StreamEvent, + StreamsConfig, WebhookStreamConfig, }, streams::{ kafka::{Kafka, KafkaError}, - RabbitMQ, RabbitMQError, Webhook, WebhookError, SNS, Redis, RedisError + RabbitMQ, RabbitMQError, Redis, RedisError, Webhook, WebhookError, SNS, }, }; @@ -73,7 +74,7 @@ pub struct KafkaStream { pub struct RedisStream { config: RedisStreamConfig, - client: Arc + client: Arc, } pub struct StreamsClients { @@ -129,7 +130,7 @@ impl StreamsClients { Redis::new(config) .await .unwrap_or_else(|e| panic!("Failed to create Redis client: {:?}", e)), - ) + ), }) } else { None @@ -399,20 +400,21 @@ impl StreamsClients { let filtered_chunk: Vec = self.filter_chunk_event_data_by_conditions( &config.events, event_message, - chunk + chunk, ); let publish_message_id = self.generate_publish_message_id(id, index, &None); let client = Arc::clone(&client); let stream_name = config.stream_name.clone(); - let publish_message = self.create_chunk_message_json(event_message, &filtered_chunk); + let publish_message = + self.create_chunk_message_json(event_message, &filtered_chunk); task::spawn(async move { - client.publish(&publish_message_id, &stream_name, &publish_message) - .await?; + client.publish(&publish_message_id, &stream_name, &publish_message).await?; Ok(filtered_chunk.len()) }) - }).collect(); + }) + .collect(); tasks } diff --git a/core/src/streams/mod.rs b/core/src/streams/mod.rs index 2e77f89c..52a14a01 100644 --- a/core/src/streams/mod.rs +++ b/core/src/streams/mod.rs @@ -12,8 +12,7 @@ mod kafka; mod clients; mod redis; -pub use redis::{Redis, RedisError}; - pub use clients::StreamsClients; +pub use redis::{Redis, RedisError}; pub const STREAM_MESSAGE_ID_KEY: &str = "x-rindexer-id"; diff --git a/core/src/streams/redis.rs b/core/src/streams/redis.rs index 66e27e4c..4633d2e8 100644 --- a/core/src/streams/redis.rs +++ b/core/src/streams/redis.rs @@ -1,9 +1,14 @@ use std::sync::Arc; -use bb8_redis::bb8::{Pool, PooledConnection}; -use bb8_redis::{RedisConnectionManager, redis::{cmd, AsyncCommands}}; -use log::{error}; -use thiserror::Error; + +use bb8_redis::{ + bb8::{Pool, PooledConnection}, + redis::{cmd, AsyncCommands}, + RedisConnectionManager, +}; +use log::error; use serde_json::Value; +use thiserror::Error; + use crate::manifest::stream::RedisStreamConfig; #[derive(Error, Debug)] @@ -20,24 +25,23 @@ pub enum RedisError { #[derive(Debug, Clone)] pub struct Redis { - client: Arc> + client: Arc>, } -async fn get_pooled_connection(pool: &Arc>) -> Result, RedisError> { +async fn get_pooled_connection( + pool: &Arc>, +) -> Result, RedisError> { match pool.get().await { Ok(c) => Ok(c), - Err(err) => { - Err(RedisError::PoolError(err)) - } + Err(err) => Err(RedisError::PoolError(err)), } } impl Redis { pub async fn new(config: &RedisStreamConfig) -> Result { let connection_manager = RedisConnectionManager::new(config.connection_uri.as_str())?; - let redis_pool = Arc::new(Pool::builder() - .max_size(config.max_pool_size) - .build(connection_manager).await? + let redis_pool = Arc::new( + Pool::builder().max_size(config.max_pool_size).build(connection_manager).await?, ); let mut connection = get_pooled_connection(&redis_pool).await?; @@ -46,7 +50,12 @@ impl Redis { Ok(Self { client: redis_pool.clone() }) } - pub async fn publish(&self, message_id: &str, stream_name: &str, message: &Value) -> Result<(), RedisError> { + pub async fn publish( + &self, + message_id: &str, + stream_name: &str, + message: &Value, + ) -> Result<(), RedisError> { // redis stream message ids need to be a timestamp with guaranteed unique identification // so instead, we attach the message_id to the message value. let mut message_with_id = message.clone(); @@ -60,4 +69,4 @@ impl Redis { Ok(()) } -} \ No newline at end of file +}