From 59574995215849bad60088b10fc1435eb1623112 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 26 Aug 2024 07:14:48 +0000 Subject: [PATCH 1/4] feat: introduce `create_topics` opt --- config/config.md | 10 ++++++---- config/metasrv.example.toml | 7 ++++++- config/standalone.example.toml | 7 ++++++- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/config/config.md b/config/config.md index c66eb44d4ed4..d0108cbf6dd8 100644 --- a/config/config.md +++ b/config/config.md @@ -67,9 +67,10 @@ | `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. | | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | -| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start.
**It's only used when the provider is `kafka`**. | +| `wal.create_topic` | Bool | `true` | Create topics for WAL
Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics) | +| `wal.num_topics` | Integer | `64` | Number of topics.
**It's only used when the provider is `kafka`**. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default)
**It's only used when the provider is `kafka`**. | -| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
**It's only used when the provider is `kafka`**. | +| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
**It's only used when the provider is `kafka`**. | | `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition.
**It's only used when the provider is `kafka`**. | | `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled.
**It's only used when the provider is `kafka`**. | | `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. | @@ -287,9 +288,10 @@ | `wal` | -- | -- | -- | | `wal.provider` | String | `raft_engine` | -- | | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. | -| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. | +| `wal.create_topic` | Bool | `true` | Create topics for WAL
Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics) | +| `wal.num_topics` | Integer | `64` | Number of topics. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | -| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. | +| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. | | `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. | | `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | | `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 41e9306ebd78..0546d3126d44 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -99,7 +99,11 @@ provider = "raft_engine" ## The broker endpoints of the Kafka cluster. broker_endpoints = ["127.0.0.1:9092"] -## Number of topics to be created upon start. +## Create topics for WAL +## Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics) +create_topic = true + +## Number of topics. num_topics = 64 ## Topic selector type. @@ -108,6 +112,7 @@ num_topics = 64 selector_type = "round_robin" ## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. +## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. topic_name_prefix = "greptimedb_wal_topic" ## Expected number of replicas of each partition. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 9258397bb82d..308165bc3c93 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -171,7 +171,11 @@ sync_period = "10s" ## **It's only used when the provider is `kafka`**. broker_endpoints = ["127.0.0.1:9092"] -## Number of topics to be created upon start. +## Create topics for WAL +## Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics) +create_topic = true + +## Number of topics. ## **It's only used when the provider is `kafka`**. num_topics = 64 @@ -182,6 +186,7 @@ num_topics = 64 selector_type = "round_robin" ## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. +## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. ## **It's only used when the provider is `kafka`**. topic_name_prefix = "greptimedb_wal_topic" From 63ac0eba5e42d174a13f979a885ad95976e7a5d9 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 26 Aug 2024 07:24:31 +0000 Subject: [PATCH 2/4] feat: allow skipping topic creation --- .../wal_options_allocator/kafka/topic_manager.rs | 4 ++++ src/common/wal/src/config.rs | 2 ++ src/common/wal/src/config/kafka/common.rs | 2 +- src/common/wal/src/config/kafka/datanode.rs | 5 +++++ src/common/wal/src/config/kafka/metasrv.rs | 16 +++++++++++++++- 5 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index 060f82d8d71e..83cee3a61053 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -76,6 +76,10 @@ impl TopicManager { /// The initializer first tries to restore persisted topics from the kv backend. /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics. pub async fn start(&self) -> Result<()> { + // Skip creating topics. + if !self.config.create_topic { + return Ok(()); + } let num_topics = self.config.kafka_topic.num_topics; ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 90f3e44f9c4a..5c429e26edd4 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -53,6 +53,7 @@ impl From for MetasrvWalConfig { connection: config.connection, backoff: config.backoff, kafka_topic: config.kafka_topic, + create_topic: config.create_topic, }), } } @@ -188,6 +189,7 @@ mod tests { replication_factor: 1, create_topic_timeout: Duration::from_secs(30), }, + create_topic: true, }; assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected)); diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index f68ddfa5d8b2..d12c651f23ba 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -187,7 +187,7 @@ impl Default for KafkaConnectionConfig { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct KafkaTopicConfig { - /// Number of topics to be created upon start. + /// Number of topics. pub num_topics: usize, /// Number of partitions per topic. pub num_partitions: i32, diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 84e9da6bccfa..44404c099f0b 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -40,6 +40,10 @@ pub struct DatanodeKafkaConfig { /// The kafka topic config. #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, + // Create topic for WAL. + // Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics). + pub create_topic: bool, + // Create index for WAL. pub create_index: bool, #[serde(with = "humantime_serde")] pub dump_index_interval: Duration, @@ -54,6 +58,7 @@ impl Default for DatanodeKafkaConfig { consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig::default(), kafka_topic: KafkaTopicConfig::default(), + create_topic: true, create_index: true, dump_index_interval: Duration::from_secs(60), } diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index f61047315cda..dbbe34dab81b 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -18,7 +18,7 @@ use super::common::KafkaConnectionConfig; use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; /// Kafka wal configurations for metasrv. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct MetasrvKafkaConfig { /// The kafka connection config. @@ -30,4 +30,18 @@ pub struct MetasrvKafkaConfig { /// The kafka config. #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, + // Create topics for WAL. + // Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics). + pub create_topic: bool, +} + +impl Default for MetasrvKafkaConfig { + fn default() -> Self { + Self { + connection: Default::default(), + backoff: Default::default(), + kafka_topic: Default::default(), + create_topic: true, + } + } } From 5bdd312ea2e2487b8a32811e6bb3f8fb8cd5c619 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 26 Aug 2024 07:44:50 +0000 Subject: [PATCH 3/4] chore: refine docs --- config/config.md | 4 ++-- config/metasrv.example.toml | 5 +++-- config/standalone.example.toml | 5 +++-- src/common/wal/src/config/kafka/datanode.rs | 1 - src/common/wal/src/config/kafka/metasrv.rs | 1 - 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/config/config.md b/config/config.md index d0108cbf6dd8..539f7ca9f7e8 100644 --- a/config/config.md +++ b/config/config.md @@ -67,7 +67,7 @@ | `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. | | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | -| `wal.create_topic` | Bool | `true` | Create topics for WAL
Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics) | +| `wal.create_topic` | Bool | `true` | Create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | | `wal.num_topics` | Integer | `64` | Number of topics.
**It's only used when the provider is `kafka`**. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default)
**It's only used when the provider is `kafka`**. | | `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
**It's only used when the provider is `kafka`**. | @@ -288,7 +288,7 @@ | `wal` | -- | -- | -- | | `wal.provider` | String | `raft_engine` | -- | | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. | -| `wal.create_topic` | Bool | `true` | Create topics for WAL
Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics) | +| `wal.create_topic` | Bool | `true` | Create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | | `wal.num_topics` | Integer | `64` | Number of topics. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | | `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 0546d3126d44..ae8a6916ae77 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -99,8 +99,9 @@ provider = "raft_engine" ## The broker endpoints of the Kafka cluster. broker_endpoints = ["127.0.0.1:9092"] -## Create topics for WAL -## Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics) +## Create topics for WAL. +## Set to `true` to automatically create topics for WAL. +## Otherwise, use topics named `topic_name_prefix_[0..num_topics)` create_topic = true ## Number of topics. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 308165bc3c93..fd5219183ba3 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -171,8 +171,9 @@ sync_period = "10s" ## **It's only used when the provider is `kafka`**. broker_endpoints = ["127.0.0.1:9092"] -## Create topics for WAL -## Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics) +## Create topics for WAL. +## Set to `true` to automatically create topics for WAL. +## Otherwise, use topics named `topic_name_prefix_[0..num_topics)` create_topic = true ## Number of topics. diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 44404c099f0b..a23e81f8894f 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -41,7 +41,6 @@ pub struct DatanodeKafkaConfig { #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, // Create topic for WAL. - // Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics). pub create_topic: bool, // Create index for WAL. pub create_index: bool, diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index dbbe34dab81b..cbbcd1c97be3 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -31,7 +31,6 @@ pub struct MetasrvKafkaConfig { #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, // Create topics for WAL. - // Set to false to use existing topics. It will then use topics named topic_name_prefix_[0..num_topics). pub create_topic: bool, } From b324e5c44b613203d64cd0966c0d927fd74beb8b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 26 Aug 2024 08:14:43 +0000 Subject: [PATCH 4/4] chore: apply suggestions from CR --- config/config.md | 4 ++-- config/metasrv.example.toml | 4 ++-- config/standalone.example.toml | 4 ++-- .../meta/src/wal_options_allocator/kafka/topic_manager.rs | 2 +- src/common/wal/src/config.rs | 4 ++-- src/common/wal/src/config/kafka/datanode.rs | 6 +++--- src/common/wal/src/config/kafka/metasrv.rs | 6 +++--- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/config/config.md b/config/config.md index 539f7ca9f7e8..6d4837ae46d1 100644 --- a/config/config.md +++ b/config/config.md @@ -67,7 +67,7 @@ | `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. | | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | -| `wal.create_topic` | Bool | `true` | Create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | +| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | | `wal.num_topics` | Integer | `64` | Number of topics.
**It's only used when the provider is `kafka`**. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default)
**It's only used when the provider is `kafka`**. | | `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
**It's only used when the provider is `kafka`**. | @@ -288,7 +288,7 @@ | `wal` | -- | -- | -- | | `wal.provider` | String | `raft_engine` | -- | | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. | -| `wal.create_topic` | Bool | `true` | Create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | +| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | | `wal.num_topics` | Integer | `64` | Number of topics. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | | `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index ae8a6916ae77..57be17b52021 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -99,10 +99,10 @@ provider = "raft_engine" ## The broker endpoints of the Kafka cluster. broker_endpoints = ["127.0.0.1:9092"] -## Create topics for WAL. +## Automatically create topics for WAL. ## Set to `true` to automatically create topics for WAL. ## Otherwise, use topics named `topic_name_prefix_[0..num_topics)` -create_topic = true +auto_create_topics = true ## Number of topics. num_topics = 64 diff --git a/config/standalone.example.toml b/config/standalone.example.toml index fd5219183ba3..7a9a09dc5ab4 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -171,10 +171,10 @@ sync_period = "10s" ## **It's only used when the provider is `kafka`**. broker_endpoints = ["127.0.0.1:9092"] -## Create topics for WAL. +## Automatically create topics for WAL. ## Set to `true` to automatically create topics for WAL. ## Otherwise, use topics named `topic_name_prefix_[0..num_topics)` -create_topic = true +auto_create_topics = true ## Number of topics. ## **It's only used when the provider is `kafka`**. diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index 83cee3a61053..3f1ffb4c45c5 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -77,7 +77,7 @@ impl TopicManager { /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics. pub async fn start(&self) -> Result<()> { // Skip creating topics. - if !self.config.create_topic { + if !self.config.auto_create_topics { return Ok(()); } let num_topics = self.config.kafka_topic.num_topics; diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 5c429e26edd4..052311b5af4d 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -53,7 +53,7 @@ impl From for MetasrvWalConfig { connection: config.connection, backoff: config.backoff, kafka_topic: config.kafka_topic, - create_topic: config.create_topic, + auto_create_topics: config.auto_create_topics, }), } } @@ -189,7 +189,7 @@ mod tests { replication_factor: 1, create_topic_timeout: Duration::from_secs(30), }, - create_topic: true, + auto_create_topics: true, }; assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected)); diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index a23e81f8894f..27f693204014 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -40,8 +40,8 @@ pub struct DatanodeKafkaConfig { /// The kafka topic config. #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, - // Create topic for WAL. - pub create_topic: bool, + // Automatically create topics for WAL. + pub auto_create_topics: bool, // Create index for WAL. pub create_index: bool, #[serde(with = "humantime_serde")] @@ -57,7 +57,7 @@ impl Default for DatanodeKafkaConfig { consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig::default(), kafka_topic: KafkaTopicConfig::default(), - create_topic: true, + auto_create_topics: true, create_index: true, dump_index_interval: Duration::from_secs(60), } diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index cbbcd1c97be3..abde6b74448b 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -30,8 +30,8 @@ pub struct MetasrvKafkaConfig { /// The kafka config. #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, - // Create topics for WAL. - pub create_topic: bool, + // Automatically create topics for WAL. + pub auto_create_topics: bool, } impl Default for MetasrvKafkaConfig { @@ -40,7 +40,7 @@ impl Default for MetasrvKafkaConfig { connection: Default::default(), backoff: Default::default(), kafka_topic: Default::default(), - create_topic: true, + auto_create_topics: true, } } }