Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
joshstevens19 committed Jan 30, 2025
1 parent 566579b commit 7cfe7ad
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 27 deletions.
4 changes: 2 additions & 2 deletions core/src/manifest/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct RedisStreamConfig {
pub connection_uri: String,
#[serde(default = "default_pool_size")]
pub max_pool_size: u32,
pub streams: Vec<RedisStreamStreamConfig>
pub streams: Vec<RedisStreamStreamConfig>,
}

fn default_pool_size() -> u32 {
Expand Down Expand Up @@ -167,7 +167,7 @@ pub struct StreamsConfig {
pub kafka: Option<KafkaStreamConfig>,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub redis: Option<RedisStreamConfig>
pub redis: Option<RedisStreamConfig>,
}

impl StreamsConfig {
Expand Down
20 changes: 11 additions & 9 deletions core/src/streams/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ use crate::{
event::{filter_event_data_by_conditions, EventMessage},
manifest::stream::{
KafkaStreamConfig, KafkaStreamQueueConfig, RabbitMQStreamConfig, RabbitMQStreamQueueConfig,
SNSStreamTopicConfig, StreamEvent, StreamsConfig, WebhookStreamConfig, RedisStreamConfig, RedisStreamStreamConfig
RedisStreamConfig, RedisStreamStreamConfig, SNSStreamTopicConfig, StreamEvent,
StreamsConfig, WebhookStreamConfig,
},
streams::{
kafka::{Kafka, KafkaError},
RabbitMQ, RabbitMQError, Webhook, WebhookError, SNS, Redis, RedisError
RabbitMQ, RabbitMQError, Redis, RedisError, Webhook, WebhookError, SNS,
},
};

Expand Down Expand Up @@ -73,7 +74,7 @@ pub struct KafkaStream {

pub struct RedisStream {
config: RedisStreamConfig,
client: Arc<Redis>
client: Arc<Redis>,
}

pub struct StreamsClients {
Expand Down Expand Up @@ -129,7 +130,7 @@ impl StreamsClients {
Redis::new(config)
.await
.unwrap_or_else(|e| panic!("Failed to create Redis client: {:?}", e)),
)
),
})
} else {
None
Expand Down Expand Up @@ -399,20 +400,21 @@ impl StreamsClients {
let filtered_chunk: Vec<Value> = self.filter_chunk_event_data_by_conditions(
&config.events,
event_message,
chunk
chunk,
);

let publish_message_id = self.generate_publish_message_id(id, index, &None);
let client = Arc::clone(&client);
let stream_name = config.stream_name.clone();
let publish_message = self.create_chunk_message_json(event_message, &filtered_chunk);
let publish_message =
self.create_chunk_message_json(event_message, &filtered_chunk);

task::spawn(async move {
client.publish(&publish_message_id, &stream_name, &publish_message)
.await?;
client.publish(&publish_message_id, &stream_name, &publish_message).await?;
Ok(filtered_chunk.len())
})
}).collect();
})
.collect();
tasks
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ mod kafka;
mod clients;

mod redis;
pub use redis::{Redis, RedisError};

pub use clients::StreamsClients;
pub use redis::{Redis, RedisError};

pub const STREAM_MESSAGE_ID_KEY: &str = "x-rindexer-id";
37 changes: 23 additions & 14 deletions core/src/streams/redis.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use std::sync::Arc;
use bb8_redis::bb8::{Pool, PooledConnection};
use bb8_redis::{RedisConnectionManager, redis::{cmd, AsyncCommands}};
use log::{error};
use thiserror::Error;

use bb8_redis::{
bb8::{Pool, PooledConnection},
redis::{cmd, AsyncCommands},
RedisConnectionManager,
};
use log::error;
use serde_json::Value;
use thiserror::Error;

use crate::manifest::stream::RedisStreamConfig;

#[derive(Error, Debug)]
Expand All @@ -20,24 +25,23 @@ pub enum RedisError {

#[derive(Debug, Clone)]
pub struct Redis {
client: Arc<Pool<RedisConnectionManager>>
client: Arc<Pool<RedisConnectionManager>>,
}

async fn get_pooled_connection(pool: &Arc<Pool<RedisConnectionManager>>) -> Result<PooledConnection<RedisConnectionManager>, RedisError> {
async fn get_pooled_connection(
pool: &Arc<Pool<RedisConnectionManager>>,
) -> Result<PooledConnection<RedisConnectionManager>, RedisError> {
match pool.get().await {
Ok(c) => Ok(c),
Err(err) => {
Err(RedisError::PoolError(err))
}
Err(err) => Err(RedisError::PoolError(err)),
}
}

impl Redis {
pub async fn new(config: &RedisStreamConfig) -> Result<Self, RedisError> {
let connection_manager = RedisConnectionManager::new(config.connection_uri.as_str())?;
let redis_pool = Arc::new(Pool::builder()
.max_size(config.max_pool_size)
.build(connection_manager).await?
let redis_pool = Arc::new(
Pool::builder().max_size(config.max_pool_size).build(connection_manager).await?,
);

let mut connection = get_pooled_connection(&redis_pool).await?;
Expand All @@ -46,7 +50,12 @@ impl Redis {
Ok(Self { client: redis_pool.clone() })
}

pub async fn publish(&self, message_id: &str, stream_name: &str, message: &Value) -> Result<(), RedisError> {
pub async fn publish(
&self,
message_id: &str,
stream_name: &str,
message: &Value,
) -> Result<(), RedisError> {
// redis stream message ids need to be a timestamp with guaranteed unique identification
// so instead, we attach the message_id to the message value.
let mut message_with_id = message.clone();
Expand All @@ -60,4 +69,4 @@ impl Redis {

Ok(())
}
}
}

0 comments on commit 7cfe7ad

Please sign in to comment.