Skip to content

Commit

Permalink
feat: message queue and its kafka impl (#306)
Browse files Browse the repository at this point in the history
* a draft of message queue and its kafka impl.

* address CR and add test.
  • Loading branch information
Rachelint authored Oct 22, 2022
1 parent 0a28ba6 commit d00687d
Show file tree
Hide file tree
Showing 10 changed files with 862 additions and 25 deletions.
97 changes: 72 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ members = [
"components/skiplist",
"components/table_kv",
"components/tracing_util",
"components/message_queue",
"df_operator",
"interpreters",
"meta_client",
Expand Down Expand Up @@ -98,6 +99,7 @@ tracing_util = { path = "components/tracing_util" }
tonic = "0.8.1"
tokio = { version = "1.0", features = ["full"] }
wal = { path = "wal" }
message_queue = { path = "components/message_queue" }

[workspace.dependencies.ceresdbproto]
git = "https://github.com/CeresDB/ceresdbproto.git"
Expand Down
28 changes: 28 additions & 0 deletions components/message_queue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "message_queue"
version = "0.1.0"

[package.edition]
workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
common_util = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
snafu = { workspace = true }
chrono = { workspace = true }
async-trait = { workspace = true }
log = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }

[dependencies.rskafka]
git = "https://github.com/influxdata/rskafka.git"
rev = "00988a564b1db0249d858065fc110476c075efad"
default-features = false
features = ["compression-gzip", "compression-lz4", "compression-snappy"]

[dev-dependencies]
uuid = { version = "1.0", features = ["v4"] }
85 changes: 85 additions & 0 deletions components/message_queue/src/kafka/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! Kafka implementation's config

use serde_derive::{Deserialize, Serialize};

/// Generic client config that is used for consumers, producers as well as admin
/// operations (like "create topic").
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
pub client_config: ClientConfig,
pub topic_management_config: TopicManagementConfig,
pub consumer_config: ConsumerConfig,
// TODO: may need some config options for producer,
// but it seems nothing needed now.
}

#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct ClientConfig {
/// The endpoint of boost broker, must be set and will panic if found it
/// None.
pub boost_broker: Option<String>,

/// Maximum message size in bytes.
///
/// Defaults to `None` (rskafka default).
pub max_message_size: Option<usize>,

/// Optional SOCKS5 proxy to use for connecting to the brokers.
///
/// Defaults to `None`.
pub socks5_proxy: Option<String>,
}

/// Config for topic creation.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct TopicManagementConfig {
/// Replication factor.
///
/// Extracted from `replication_factor` option. Defaults to `1`.
pub create_replication_factor: i16,

/// The maximum amount of time to wait while creating topic.
///
/// Defaults to `5_000`.
pub create_max_wait_ms: i32,

/// The maximum amount of time to wait while deleting records in topic.
///
/// Defaults to `5_000`.
pub delete_max_wait_ms: i32,
}

impl Default for TopicManagementConfig {
fn default() -> Self {
Self {
create_replication_factor: 1,
create_max_wait_ms: 5000,
delete_max_wait_ms: 5000,
}
}
}

/// Config for consumers.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct ConsumerConfig {
/// The maximum amount of time to wait for data before returning.
///
/// Defaults to `None` (rskafka default).
pub max_wait_ms: Option<i32>,

/// The maximum amount of data for the consumer to fetch in a single batch.
///
/// Defaults to `None` (rskafka default).
pub min_batch_size: Option<i32>,

/// Will wait for at least `min_batch_size` bytes of data.
///
/// Defaults to `None` (rskafka default).
pub max_batch_size: Option<i32>,
}
Loading

0 comments on commit d00687d

Please sign in to comment.