From c9b738381cbbad21bbc6d4d97ca10f9868a4f641 Mon Sep 17 00:00:00 2001 From: Matt Forbes Date: Fri, 22 Nov 2024 14:51:29 -0800 Subject: [PATCH 1/3] add kafka connection properties to profile --- crates/arroyo-connectors/src/confluent/mod.rs | 1 + crates/arroyo-connectors/src/kafka/mod.rs | 9 +++++++++ crates/arroyo-connectors/src/kafka/profile.json | 10 ++++++++++ 3 files changed, 20 insertions(+) diff --git a/crates/arroyo-connectors/src/confluent/mod.rs b/crates/arroyo-connectors/src/confluent/mod.rs index c694b9ef7..4db2ebf4b 100644 --- a/crates/arroyo-connectors/src/confluent/mod.rs +++ b/crates/arroyo-connectors/src/confluent/mod.rs @@ -84,6 +84,7 @@ impl From for KafkaConfig { password: c.secret, }, schema_registry_enum: Some(c.schema_registry.into()), + connection_properties: vec![], } } } diff --git a/crates/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs index 4db46c48a..2f2668ad6 100644 --- a/crates/arroyo-connectors/src/kafka/mod.rs +++ b/crates/arroyo-connectors/src/kafka/mod.rs @@ -105,6 +105,7 @@ impl KafkaConnector { authentication: auth, bootstrap_servers: BootstrapServers(pull_opt("bootstrap_servers", options)?), schema_registry_enum: schema_registry, + connection_properties: vec![], }) } @@ -926,6 +927,14 @@ pub fn client_configs( } }; + for prop in connection.connection_properties.iter() { + if let Some((k, v)) = prop.split_once('=') { + client_configs.insert(k.to_string(), v.to_string()); + } else { + bail!("invalid connection property: {}", prop); + } + } + if let Some(table) = table { client_configs.extend( table diff --git a/crates/arroyo-connectors/src/kafka/profile.json b/crates/arroyo-connectors/src/kafka/profile.json index 6fe762dee..463472710 100644 --- a/crates/arroyo-connectors/src/kafka/profile.json +++ b/crates/arroyo-connectors/src/kafka/profile.json @@ -106,6 +106,16 @@ "sensitive": ["apiSecret"] } ] + }, + "connectionProperties": { + "type": "array", + "title": "Connection Properties", + "description": "Key-value pairs of rdkafka configuration options", + "examples": ["client.id=arroyo"], + "items": { + "type": "string", + "title": "property" + } } }, "required": ["bootstrapServers", "authentication"] From f7568ddf19d371700b2e1f618bca364957d5f8a9 Mon Sep 17 00:00:00 2001 From: Matt Forbes Date: Mon, 2 Dec 2024 13:14:06 -0800 Subject: [PATCH 2/3] PR feedback --- crates/arroyo-connectors/src/confluent/mod.rs | 2 +- crates/arroyo-connectors/src/kafka/mod.rs | 23 ++++++++----------- .../arroyo-connectors/src/kafka/profile.json | 6 +++-- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/crates/arroyo-connectors/src/confluent/mod.rs b/crates/arroyo-connectors/src/confluent/mod.rs index 4db2ebf4b..27c5cab01 100644 --- a/crates/arroyo-connectors/src/confluent/mod.rs +++ b/crates/arroyo-connectors/src/confluent/mod.rs @@ -84,7 +84,7 @@ impl From for KafkaConfig { password: c.secret, }, schema_registry_enum: Some(c.schema_registry.into()), - connection_properties: vec![], + connection_properties: HashMap::new(), } } } diff --git a/crates/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs index 2f2668ad6..894b7edcf 100644 --- a/crates/arroyo-connectors/src/kafka/mod.rs +++ b/crates/arroyo-connectors/src/kafka/mod.rs @@ -105,7 +105,7 @@ impl KafkaConnector { authentication: auth, bootstrap_servers: BootstrapServers(pull_opt("bootstrap_servers", options)?), schema_registry_enum: schema_registry, - connection_properties: vec![], + connection_properties: HashMap::new(), }) } @@ -927,21 +927,18 @@ pub fn client_configs( } }; - for prop in connection.connection_properties.iter() { - if let Some((k, v)) = prop.split_once('=') { - client_configs.insert(k.to_string(), v.to_string()); - } else { - bail!("invalid connection property: {}", prop); - } + for (k, v) in connection.connection_properties.iter() { + client_configs.insert(k.to_string(), v.to_string()); } if let Some(table) = table { - client_configs.extend( - table - .client_configs - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())), - ); + for (k, v) in table.client_configs.iter() { + if connection.connection_properties.contains_key(k) { + warn!("rdkafka config key {} defined in both connection and table config", k); + } + + client_configs.insert(k.to_string(), v.to_string()); + } } Ok(client_configs) diff --git a/crates/arroyo-connectors/src/kafka/profile.json b/crates/arroyo-connectors/src/kafka/profile.json index 463472710..5cb596971 100644 --- a/crates/arroyo-connectors/src/kafka/profile.json +++ b/crates/arroyo-connectors/src/kafka/profile.json @@ -108,13 +108,15 @@ ] }, "connectionProperties": { - "type": "array", + "type": "object", "title": "Connection Properties", "description": "Key-value pairs of rdkafka configuration options", - "examples": ["client.id=arroyo"], "items": { "type": "string", "title": "property" + }, + "additionalProperties": { + "type": "string" } } }, From 9752a0310e0b910a5aa2c7a73f8975642779bbb1 Mon Sep 17 00:00:00 2001 From: Matt Forbes Date: Mon, 2 Dec 2024 14:19:31 -0800 Subject: [PATCH 3/3] format --- crates/arroyo-connectors/src/kafka/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs index 894b7edcf..5cb703d33 100644 --- a/crates/arroyo-connectors/src/kafka/mod.rs +++ b/crates/arroyo-connectors/src/kafka/mod.rs @@ -934,7 +934,10 @@ pub fn client_configs( if let Some(table) = table { for (k, v) in table.client_configs.iter() { if connection.connection_properties.contains_key(k) { - warn!("rdkafka config key {} defined in both connection and table config", k); + warn!( + "rdkafka config key {:?} defined in both connection and table config", + k + ); } client_configs.insert(k.to_string(), v.to_string());