From e2da55a029a923abdee791ecb995bd845ca3e200 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Tue, 11 Feb 2025 01:35:05 -0300 Subject: [PATCH] feat: add deduplicate opt for create topic cli --- Cargo.lock | 2 +- crates/fluvio-cli/src/client/topic/create.rs | 45 +++++++++++++++++++ .../fluvio-controlplane-metadata/Cargo.toml | 2 +- .../src/topic/config.rs | 4 +- .../src/services/public_api/topic/create.rs | 2 +- tests/cli/fluvio_smoke_tests/topic-basic.bats | 2 +- 6 files changed, 51 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cbc0e42c13..766008710d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2694,7 +2694,7 @@ dependencies = [ [[package]] name = "fluvio-controlplane-metadata" -version = "0.30.1" +version = "0.30.2" dependencies = [ "anyhow", "base64 0.22.1", diff --git a/crates/fluvio-cli/src/client/topic/create.rs b/crates/fluvio-cli/src/client/topic/create.rs index 6a1dbb326d..e8ed1fc76d 100644 --- a/crates/fluvio-cli/src/client/topic/create.rs +++ b/crates/fluvio-cli/src/client/topic/create.rs @@ -24,12 +24,18 @@ use fluvio_sc_schema::shared::validate_resource_name; use fluvio_sc_schema::mirror::MirrorSpec; use fluvio_sc_schema::topic::HomeMirrorConfig; use fluvio_sc_schema::topic::MirrorConfig; +use fluvio_sc_schema::topic::Bounds; +use fluvio_sc_schema::topic::Deduplication; +use fluvio_sc_schema::topic::Filter; +use fluvio_sc_schema::topic::Transform; use fluvio::Fluvio; use fluvio::FluvioAdmin; use fluvio::metadata::topic::TopicSpec; use crate::CliError; +const DEFAULT_DEDUP_FILTER: &str = "fluvio/dedup-bloom-filter@0.1.0"; + #[derive(Debug, Parser)] pub struct CreateTopicOpt { /// The name of the Topic to create @@ -229,6 +235,12 @@ impl CreateTopicOpt { topic_spec.set_compression_type(compression_type); } + if self.setting.deduplicate { + let deduplication = + create_deduplication(self.setting.dedup_count, self.setting.dedup_age); + topic_spec.set_deduplication(Some(deduplication)); + } + topic_spec.set_system(self.setting.system); if self.setting.segment_size.is_some() || self.setting.max_partition_size.is_some() { @@ -258,6 +270,21 @@ fn validate(name: &str, _spec: &TopicSpec) -> Result<()> { Ok(()) } +fn create_deduplication(dedup_count: u64, dedup_age: Option) -> Deduplication { + Deduplication { + bounds: Bounds { + count: dedup_count, + age: dedup_age, + }, + filter: Filter { + transform: Transform { + uses: DEFAULT_DEDUP_FILTER.to_string(), + with: Default::default(), + }, + }, + } +} + #[derive(Debug, Parser)] #[group(id = "config-arg")] pub struct TopicConfigOpt { @@ -280,6 +307,24 @@ pub struct TopicConfigOpt { #[arg(long, value_name = "bytes")] max_partition_size: Option, + /// Deduplicate records in the topic + #[arg(long)] + deduplicate: bool, + + /// Number of records to keep in deduplication filter + #[arg( + long, + value_name = "integer", + requires = "deduplicate", + default_value = "5" + )] + dedup_count: u64, + + /// Age of records to keep in deduplication filter + /// Ex: '1h', '2d 10s', '7 days' (default) + #[arg(long, value_name = "time", value_parser=parse_duration, requires = "deduplicate")] + dedup_age: Option, + /// Flag to create a system topic /// System topics are for internal operations #[arg(long, short = 's', hide = true)] diff --git a/crates/fluvio-controlplane-metadata/Cargo.toml b/crates/fluvio-controlplane-metadata/Cargo.toml index 06639a5ee8..62eb95b69a 100644 --- a/crates/fluvio-controlplane-metadata/Cargo.toml +++ b/crates/fluvio-controlplane-metadata/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-controlplane-metadata" edition = "2021" -version = "0.30.1" +version = "0.30.2" authors = ["Fluvio Contributors "] description = "Metadata definition for Fluvio control plane" repository = "https://github.com/infinyon/fluvio" diff --git a/crates/fluvio-controlplane-metadata/src/topic/config.rs b/crates/fluvio-controlplane-metadata/src/topic/config.rs index 949aa01233..f406eab1d0 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/config.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/config.rs @@ -236,7 +236,7 @@ deduplication: age: 1m filter: transform: - uses: infinyon/dedup-filter@0.1.0 + uses: fluvio/dedup-bloom-filter@0.1.0 "#; //when @@ -373,7 +373,7 @@ compression: }, filter: Filter { transform: Transform { - uses: "infinyon/dedup-filter@0.1.0".to_string(), + uses: "fluvio/dedup-bloom-filter@0.1.0".to_string(), with: Default::default(), }, }, diff --git a/crates/fluvio-sc/src/services/public_api/topic/create.rs b/crates/fluvio-sc/src/services/public_api/topic/create.rs index c05c660f14..0cd6e70eb4 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/create.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/create.rs @@ -130,7 +130,7 @@ async fn validate_topic_request( sm_name.to_string(), ErrorCode::DeduplicationSmartModuleNotLoaded, Some(format!( - "{}\nHint: try `fluvio hub download {sm_name}` and repeat this operation", + "{}\nHint: try `fluvio hub sm download {sm_name}` and repeat this operation", ErrorCode::DeduplicationSmartModuleNotLoaded )), ); diff --git a/tests/cli/fluvio_smoke_tests/topic-basic.bats b/tests/cli/fluvio_smoke_tests/topic-basic.bats index 4e336369f5..99fadbc9e0 100644 --- a/tests/cli/fluvio_smoke_tests/topic-basic.bats +++ b/tests/cli/fluvio_smoke_tests/topic-basic.bats @@ -23,7 +23,7 @@ setup_file() { TOPIC_NAME_SYSTEM=$(random_string) export TOPIC_NAME_SYSTEM - DEDUP_FILTER_NAME="dedup-filter" + DEDUP_FILTER_NAME="dedup-bloom-filter" export DEDUP_FILTER_NAME cat <$TOPIC_CONFIG_PATH