From 843477650ef44b68f2d8cfe4b5459fecb24308c1 Mon Sep 17 00:00:00 2001 From: Bread White Date: Tue, 12 Nov 2024 11:46:18 +0300 Subject: [PATCH 1/3] fix(compose): docker compose fixes --- Dockerfile | 8 ++++---- config/production.toml | 3 +++ docker-compose.yaml | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1dbeedc..8f9e078 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,25 +20,25 @@ FROM chef AS builder WORKDIR /app -RUN apt update && apt install -y curl wget - COPY --from=planner /app/recipe.json recipe.json RUN cargo chef cook --release --recipe-path recipe.json COPY . . -RUN echo -e 'building binary with feature "${FEATURES}"' +RUN echo -e 'building binary with feature "$FEATURES"' RUN cargo install ${FEATURES} --bins --path . # Target layer based on tiny official ubuntu image with neccessary binaries and data to run. FROM ubuntu:rolling +RUN apt update && apt install -y curl + WORKDIR /app COPY --from=builder /app/target/release/news-rss . ENTRYPOINT ["/app/news-rss"] -EXPOSE 2892 +EXPOSE 2865 diff --git a/config/production.toml b/config/production.toml index 0dea285..82375a8 100644 --- a/config/production.toml +++ b/config/production.toml @@ -1,6 +1,9 @@ [logger] level = "info" +[server] +address = "0.0.0.0:2865" + [cache.local] expired_secs = 10368000 diff --git a/docker-compose.yaml b/docker-compose.yaml index 969282f..967dda2 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -43,7 +43,7 @@ services: - redis - pgsql ports: - - '2895:2895' + - '2865:2865' volumes: - './config:/app/config:ro' environment: From b0ccafbed49c680414a954fba09f13ee98df2d29 Mon Sep 17 00:00:00 2001 From: Bread White Date: Mon, 25 Nov 2024 11:40:45 +0300 Subject: [PATCH 2/3] chore(rmq): replaced direct to fanout exchange kind --- config/development.toml | 4 ++-- config/production.toml | 4 ++-- src/publish/rabbit/config.rs | 6 ++++-- src/publish/rabbit/mod.rs | 33 +++++++++------------------------ tests/test_publish_feeds.rs | 9 ++++++--- tests/tests_helper.rs | 32 ++++++++++++-------------------- 6 files changed, 35 insertions(+), 53 deletions(-) diff --git a/config/development.toml b/config/development.toml index 3de4d9f..6e41f96 100644 --- a/config/development.toml +++ b/config/development.toml @@ -17,10 +17,10 @@ expired_secs = 360 address = "amqp://localhost:5672" username = "rmq" password = "rmq" -stream_name = "news-rss-stream" exchange = "news-rss-exchange" routing_key = "news-rss-routing" -capacity_gb = 1 +no_wait = true +durable = false [publish.pgsql] address = "localhost:5432" diff --git a/config/production.toml b/config/production.toml index ad1eec0..9be2691 100644 --- a/config/production.toml +++ b/config/production.toml @@ -17,10 +17,10 @@ expired_secs = 10368000 address = "amqp://rabbitmq:5672" username = "rmq" password = "rmq" -stream_name = "news-rss-stream" exchange = "news-rss-exchange" routing_key = "news-rss-routing" -capacity_gb = 1 +no_wait = true +durable = false [publish.pgsql] address = "pgsql:5432" diff --git a/src/publish/rabbit/config.rs b/src/publish/rabbit/config.rs index 68da14b..6871e57 100644 --- a/src/publish/rabbit/config.rs +++ b/src/publish/rabbit/config.rs @@ -7,10 +7,12 @@ pub struct RabbitConfig { address: String, username: String, password: String, - stream_name: String, exchange: String, routing_key: String, #[getset(skip)] #[getset(get_copy = "pub")] - capacity_gb: u64, + no_wait: bool, + #[getset(skip)] + #[getset(get_copy = "pub")] + durable: bool, } diff --git a/src/publish/rabbit/mod.rs b/src/publish/rabbit/mod.rs index 59b122a..3721f26 100644 --- a/src/publish/rabbit/mod.rs +++ b/src/publish/rabbit/mod.rs @@ -39,35 +39,19 @@ impl ServiceConnect for RabbitPublisher { let exchange_opts = ExchangeDeclareOptions { nowait: true, + durable: false, ..Default::default() }; + channel .exchange_declare( config.exchange(), - ExchangeKind::Direct, + ExchangeKind::Fanout, exchange_opts, FieldTable::default(), ) .await?; - let queue_decl_opts = QueueDeclareOptions { - durable: true, - ..Default::default() - }; - channel - .queue_declare(config.stream_name(), queue_decl_opts, FieldTable::default()) - .await?; - - channel - .queue_bind( - config.stream_name(), - config.exchange(), - config.routing_key(), - QueueBindOptions::default(), - FieldTable::default(), - ) - .await?; - let client = RabbitPublisher { config: Arc::new(config.to_owned()), channel: Arc::new(channel), @@ -82,26 +66,27 @@ impl Publisher for RabbitPublisher { type Error = RabbitPublishError; async fn publish(&self, news: &PublishNews) -> Result<(), Self::Error> { + let exchange = self.config.exchange(); + let routing = self.config.routing_key(); let bytes = serde_json::to_vec(&news)?; let pub_opts = BasicPublishOptions { mandatory: true, immediate: false, }; - let pub_props = BasicProperties::default(); let confirm = self .channel .basic_publish( - self.config.exchange(), - self.config.routing_key(), + exchange, + routing, pub_opts, bytes.as_slice(), - pub_props, + BasicProperties::default(), ) .await? .await?; - tracing::info!("rabbit confirm is: {confirm:?}"); + tracing::info!(exchange=exchange, routing_key=routing, "rabbit confirm: {confirm:?}"); Ok(()) } diff --git a/tests/test_publish_feeds.rs b/tests/test_publish_feeds.rs index 4d4ef8f..42bf226 100644 --- a/tests/test_publish_feeds.rs +++ b/tests/test_publish_feeds.rs @@ -13,6 +13,9 @@ use std::sync::Arc; use std::time::Duration; const TEST_TIME_EXECUTION: u64 = 5; +const TEST_RMQ_QUEUE_NAME: &str = "news-rss"; +const TEST_SOURCE_NAME: &str = "NDTV World News"; +const TEST_TARGET_URL: &str = "https://feeds.feedburner.com/ndtvnews-world-news"; #[tokio::test] async fn test_rss_feeds() -> Result<(), anyhow::Error> { @@ -37,8 +40,8 @@ async fn test_rss_feeds() -> Result<(), anyhow::Error> { let crawler = tests_helper::build_llm_crawler(&config).await?; let rss_config = vec![RssConfig::builder() - .source_name("NDTV World News".to_owned()) - .target_url("https://feeds.feedburner.com/ndtvnews-world-news".to_owned()) + .source_name(TEST_SOURCE_NAME.to_owned()) + .target_url(TEST_TARGET_URL.to_owned()) .max_retries(3) .timeout(10) .interval_secs(5) @@ -60,7 +63,7 @@ async fn test_rss_feeds() -> Result<(), anyhow::Error> { .collect::>(); #[cfg(feature = "test-publish-rabbit")] - let _ = tests_helper::rabbit_consumer(config.publish().rmq()).await?; + let _ = tests_helper::rabbit_consumer(TEST_RMQ_QUEUE_NAME, config.publish().rmq()).await?; tokio::time::sleep(Duration::from_secs(TEST_TIME_EXECUTION)).await; diff --git a/tests/tests_helper.rs b/tests/tests_helper.rs index 90cbaec..95f43c1 100644 --- a/tests/tests_helper.rs +++ b/tests/tests_helper.rs @@ -49,46 +49,38 @@ pub async fn create_llm_completion_route(mock: &MockServer, url: &str, http_meth #[allow(dead_code)] #[allow(unused_assignments)] #[allow(unused_variables)] -pub async fn rabbit_consumer(config: &RabbitConfig) -> Result<(), anyhow::Error> { +pub async fn rabbit_consumer(queue: &str, config: &RabbitConfig) -> Result<(), anyhow::Error> { let conn_props = ConnectionProperties::default(); let connection = Connection::connect(config.address(), conn_props).await?; let channel = connection.create_channel().await?; - let exchange_opts = ExchangeDeclareOptions { - nowait: true, - ..Default::default() - }; - channel - .exchange_declare( - config.exchange(), - ExchangeKind::Direct, - exchange_opts, - FieldTable::default(), - ) - .await?; - let queue_decl_opts = QueueDeclareOptions { - durable: true, + durable: config.durable(), + nowait: config.no_wait(), ..Default::default() }; channel - .queue_declare(config.stream_name(), queue_decl_opts, FieldTable::default()) + .queue_declare(queue, queue_decl_opts, FieldTable::default()) .await?; + let queue_bind_opts = QueueBindOptions { + nowait: config.no_wait(), + }; + channel .queue_bind( - config.stream_name(), + queue, config.exchange(), config.routing_key(), - QueueBindOptions::default(), + queue_bind_opts, FieldTable::default(), ) .await?; let consumer = channel .basic_consume( - config.stream_name(), + queue, TEST_AMQP_CONSUMER_TAG, BasicConsumeOptions::default(), FieldTable::default(), @@ -114,7 +106,7 @@ pub async fn rabbit_consumer(config: &RabbitConfig) -> Result<(), anyhow::Error> delivery .ack(BasicAckOptions::default()) .await - .expect("Failed to ack send_webhook_event message"); + .expect("failed to ack send_webhook_event message"); }); counter += 1; From 89d216511d3051361cb38a4033d9c032d730a3a3 Mon Sep 17 00:00:00 2001 From: Bread White Date: Mon, 25 Nov 2024 13:03:36 +0300 Subject: [PATCH 3/3] chore(ci): fixed fmt and clippy warnigns --- src/publish/rabbit/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/publish/rabbit/mod.rs b/src/publish/rabbit/mod.rs index 3721f26..613e100 100644 --- a/src/publish/rabbit/mod.rs +++ b/src/publish/rabbit/mod.rs @@ -8,7 +8,6 @@ use crate::publish::Publisher; use crate::ServiceConnect; use lapin::options::{BasicPublishOptions, ExchangeDeclareOptions}; -use lapin::options::{QueueBindOptions, QueueDeclareOptions}; use lapin::types::FieldTable; use lapin::{BasicProperties, ConnectionProperties, ExchangeKind}; use lapin::{Channel, Connection}; @@ -86,7 +85,11 @@ impl Publisher for RabbitPublisher { .await? .await?; - tracing::info!(exchange=exchange, routing_key=routing, "rabbit confirm: {confirm:?}"); + tracing::info!( + exchange = exchange, + routing_key = routing, + "rabbit confirm: {confirm:?}" + ); Ok(()) }