Skip to content

Commit

Permalink
address CR and add test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Oct 21, 2022
1 parent 7b0bb73 commit 12aa640
Show file tree
Hide file tree
Showing 10 changed files with 371 additions and 159 deletions.
26 changes: 6 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions components/message_queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ common_util = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
snafu = { workspace = true }
rskafka = { version = "0.3.0", default-features = false, features = ["compression-gzip", "compression-lz4", "compression-snappy"] }
# rskafka = "0.3.0"
time = "0.3.15"
chrono = { workspace = true }
async-trait = { workspace = true }
dashmap = "5.4.0"
log = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }

[dependencies.rskafka]
git = "https://github.com/influxdata/rskafka.git"
rev = "00988a564b1db0249d858065fc110476c075efad"
default-features = false
features = ["compression-gzip", "compression-lz4", "compression-snappy"]

[dev-dependencies]
rand = { workspace = true }
uuid = { version = "1.0", features = ["v4"] }
79 changes: 37 additions & 42 deletions components/message_queue/src/kafka/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! Kafka implementation's config

use serde_derive::{Deserialize, Serialize};

/// Generic client config that is used for consumers, producers as well as admin
Expand All @@ -6,85 +10,76 @@ use serde_derive::{Deserialize, Serialize};
#[serde(default)]
pub struct Config {
pub client_config: ClientConfig,
pub topic_creation_config: TopicCreationConfig,
pub wal_config: WalConfig,
pub topic_management_config: TopicManagementConfig,
pub consumer_config: ConsumerConfig,
// TODO: may need some config options for producer,
// but it seems nothing needed now.
}

/// Generic client config that is used for consumers, producers as well as admin
/// operations (like "create topic").
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct ClientConfig {
/// The endpoint of boost broker, must be set and will panic if not.
/// The endpoint of boost broker, must be set and will panic if found it
/// None.
pub boost_broker: Option<String>,

/// Maximum message size in bytes.
///
/// extracted from `max_message_size`. Defaults to `None` (rskafka default).
/// Defaults to `None` (rskafka default).
pub max_message_size: Option<usize>,

/// Optional SOCKS5 proxy to use for connecting to the brokers.
///
/// extracted from `socks5_proxy`. Defaults to `None`.
/// Defaults to `None`.
pub socks5_proxy: Option<String>,
}

/// Config for topic creation.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct TopicCreationConfig {
pub struct TopicManagementConfig {
/// Replication factor.
///
/// Extracted from `replication_factor` option. Defaults to `1`.
pub replication_factor: i16,
pub create_replication_factor: i16,

/// Timeout in ms.
/// The maximum amount of time to wait while creating topic.
///
/// Extracted from `timeout_ms` option. Defaults to `5_000`.
pub timeout_ms: i32,
/// Defaults to `5_000`.
pub create_max_wait_ms: i32,

/// The maximum amount of time to wait while deleting records in topic.
///
/// Defaults to `5_000`.
pub delete_max_wait_ms: i32,
}

impl Default for TopicCreationConfig {
impl Default for TopicManagementConfig {
fn default() -> Self {
Self {
replication_factor: 1,
timeout_ms: 5000,
create_replication_factor: 1,
create_max_wait_ms: 5000,
delete_max_wait_ms: 5000,
}
}
}

/// Config for consumers.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct WalConfig {
/// Will wait for at least `min_batch_size` bytes of data
pub struct ConsumerConfig {
/// The maximum amount of time to wait for data before returning.
///
/// Extracted from `consumer_max_wait_ms`. Defaults to `None` (rskafka
/// default).
pub reader_max_wait_ms: Option<i32>,
/// Defaults to `None` (rskafka default).
pub max_wait_ms: Option<i32>,

/// The maximum amount of data to fetch in a single batch
/// The maximum amount of data for the consumer to fetch in a single batch.
///
/// Extracted from `consumer_min_batch_size`. Defaults to `None` (rskafka
/// default).
pub reader_min_batch_size: Option<i32>,
/// Defaults to `None` (rskafka default).
pub min_batch_size: Option<i32>,

/// The maximum amount of time to wait for data before returning
/// Will wait for at least `min_batch_size` bytes of data.
///
/// Extracted from `consumer_max_batch_size`. Defaults to `None` (rskafka
/// default).
pub reader_max_batch_size: Option<i32>,

pub reader_consume_all_wait_ms: i32,
}

impl Default for WalConfig {
fn default() -> Self {
Self {
reader_max_wait_ms: Default::default(),
reader_min_batch_size: Default::default(),
reader_max_batch_size: Default::default(),
reader_consume_all_wait_ms: 5000,
}
}
/// Defaults to `None` (rskafka default).
pub max_batch_size: Option<i32>,
}
Loading

0 comments on commit 12aa640

Please sign in to comment.