Skip to content

Commit

Permalink
a draft of message queue and its kafka impl.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Oct 16, 2022
1 parent a8fb2c9 commit 7b0bb73
Show file tree
Hide file tree
Showing 8 changed files with 651 additions and 26 deletions.
113 changes: 87 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ members = [
"components/skiplist",
"components/table_kv",
"components/tracing_util",
"components/message_queue",
"df_operator",
"interpreters",
"meta_client",
Expand Down Expand Up @@ -96,6 +97,7 @@ tracing_util = { path = "components/tracing_util" }
tonic = "0.8.1"
tokio = { version = "1.0", features = ["full"] }
wal = { path = "wal" }
message_queue = { path = "components/message_queue" }

[workspace.dependencies.ceresdbproto]
git = "https://github.com/CeresDB/ceresdbproto.git"
Expand Down
24 changes: 24 additions & 0 deletions components/message_queue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "message_queue"
version = "0.1.0"

[package.edition]
workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
common_util = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
snafu = { workspace = true }
rskafka = { version = "0.3.0", default-features = false, features = ["compression-gzip", "compression-lz4", "compression-snappy"] }
# rskafka = "0.3.0"
time = "0.3.15"
async-trait = { workspace = true }
dashmap = "5.4.0"
log = { workspace = true }
futures = { workspace = true }

[dev-dependencies]
rand = { workspace = true }
90 changes: 90 additions & 0 deletions components/message_queue/src/kafka/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use serde_derive::{Deserialize, Serialize};

/// Generic client config that is used for consumers, producers as well as admin
/// operations (like "create topic").
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
pub client_config: ClientConfig,
pub topic_creation_config: TopicCreationConfig,
pub wal_config: WalConfig,
}

/// Generic client config that is used for consumers, producers as well as admin
/// operations (like "create topic").
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct ClientConfig {
/// The endpoint of boost broker, must be set and will panic if not.
pub boost_broker: Option<String>,

/// Maximum message size in bytes.
///
/// extracted from `max_message_size`. Defaults to `None` (rskafka default).
pub max_message_size: Option<usize>,

/// Optional SOCKS5 proxy to use for connecting to the brokers.
///
/// extracted from `socks5_proxy`. Defaults to `None`.
pub socks5_proxy: Option<String>,
}

/// Config for topic creation.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct TopicCreationConfig {
/// Replication factor.
///
/// Extracted from `replication_factor` option. Defaults to `1`.
pub replication_factor: i16,

/// Timeout in ms.
///
/// Extracted from `timeout_ms` option. Defaults to `5_000`.
pub timeout_ms: i32,
}

impl Default for TopicCreationConfig {
fn default() -> Self {
Self {
replication_factor: 1,
timeout_ms: 5000,
}
}
}

/// Config for consumers.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct WalConfig {
/// Will wait for at least `min_batch_size` bytes of data
///
/// Extracted from `consumer_max_wait_ms`. Defaults to `None` (rskafka
/// default).
pub reader_max_wait_ms: Option<i32>,

/// The maximum amount of data to fetch in a single batch
///
/// Extracted from `consumer_min_batch_size`. Defaults to `None` (rskafka
/// default).
pub reader_min_batch_size: Option<i32>,

/// The maximum amount of time to wait for data before returning
///
/// Extracted from `consumer_max_batch_size`. Defaults to `None` (rskafka
/// default).
pub reader_max_batch_size: Option<i32>,

pub reader_consume_all_wait_ms: i32,
}

impl Default for WalConfig {
fn default() -> Self {
Self {
reader_max_wait_ms: Default::default(),
reader_min_batch_size: Default::default(),
reader_max_batch_size: Default::default(),
reader_consume_all_wait_ms: 5000,
}
}
}
Loading

0 comments on commit 7b0bb73

Please sign in to comment.