From 668d681141047b1bc323d1ee6d44e05db59eb1fa Mon Sep 17 00:00:00 2001 From: Denis Jakupovic Date: Mon, 10 Nov 2025 12:49:31 +0100 Subject: [PATCH 1/4] add dynamic routing with namespace awareness --- .../iceberg/connect/IcebergSinkConfig.java | 11 +++++++++++ .../iceberg/connect/data/SinkWriter.java | 6 +++++- .../iceberg/connect/data/TestSinkWriter.java | 18 ++++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..16947ae23e2a 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -67,6 +67,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String CATALOG_NAME_PROP = "iceberg.catalog"; private static final String TABLES_PROP = "iceberg.tables"; private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled"; + private static final String TABLES_ROUTE_NAMESPACE_PROP = "iceberg.tables.route-namespace"; private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field"; private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch"; private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns"; @@ -127,6 +128,12 @@ private static ConfigDef newConfigDef() { false, Importance.MEDIUM, "Enable dynamic routing to tables based on a record value"); + configDef.define( + TABLES_ROUTE_NAMESPACE_PROP, + ConfigDef.Type.STRING, + null, + Importance.MEDIUM, + "Target namespace for routing records to tables"); configDef.define( TABLES_ROUTE_FIELD_PROP, ConfigDef.Type.STRING, @@ -335,6 +342,10 @@ public boolean dynamicTablesEnabled() { return getBoolean(TABLES_DYNAMIC_PROP); } + public String tablesRouteNamespace() { + return getString(TABLES_ROUTE_NAMESPACE_PROP); + } + public String tablesRouteField() { return getString(TABLES_ROUTE_FIELD_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index f81155e13777..6a408cb4c05a 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -115,12 +115,16 @@ private void routeRecordStatically(SinkRecord record) { } private void routeRecordDynamically(SinkRecord record) { + String routeNamespace = config.tablesRouteNamespace(); String routeField = config.tablesRouteField(); Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing"); String routeValue = extractRouteValue(record.value(), routeField); if (routeValue != null) { - String tableName = routeValue.toLowerCase(Locale.ROOT); + String tableName = (routeNamespace !=null) + ? routeNamespace.toLowerCase() + "." + routeValue.toLowerCase(Locale.ROOT) + : routeValue.toLowerCase(Locale.ROOT); + writerForTable(tableName, record, true).write(record); } } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index a14ebcab7336..98e0724e90a9 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -61,6 +61,7 @@ public class TestSinkWriter { optional(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get()), optional(3, "date", Types.StringType.get())); + private static final String ROUTE_NAMESPACE = "namespace"; private static final String ROUTE_FIELD = "fld"; @BeforeEach @@ -153,6 +154,23 @@ public void testDynamicRoute() { assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER); } + @Test + public void testDynamicRoute() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.dynamicTablesEnabled()).thenReturn(true); + when(config.tablesRouteNamespace()).thenReturn(ROUTE_NAMESPACE); + when(config.tablesRouteField()).thenReturn(ROUTE_FIELD); + + Map value = ImmutableMap.of(ROUTE_FIELD, ROUTE_NAMESPACE); + + List writerResults = sinkWriterTest(value, config); + assertThat(writerResults).hasSize(1); + IcebergWriterResult writerResult = writerResults.get(0); + assertThat(writerResult.tableIdentifier()).isEqualTo(ROUTE_NAMESPACE + "." + ROUTE_FIELD); + } + @Test public void testDynamicNoRoute() { IcebergSinkConfig config = mock(IcebergSinkConfig.class); From 337548182035b9ff07234dcef5ca7f292c513e92 Mon Sep 17 00:00:00 2001 From: Denis Jakupovic Date: Mon, 10 Nov 2025 12:56:56 +0100 Subject: [PATCH 2/4] fix test assert --- .../java/org/apache/iceberg/connect/data/TestSinkWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index 98e0724e90a9..621992fdd71e 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -155,7 +155,7 @@ public void testDynamicRoute() { } @Test - public void testDynamicRoute() { + public void testDynamicNamespaceRoute() { IcebergSinkConfig config = mock(IcebergSinkConfig.class); when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); From b06cff00aa5134ecf2851b8552ced031d56de182 Mon Sep 17 00:00:00 2001 From: Denis Jakupovic Date: Mon, 10 Nov 2025 13:00:15 +0100 Subject: [PATCH 3/4] fix formatting --- .../java/org/apache/iceberg/connect/data/SinkWriter.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index 6a408cb4c05a..88ca00ee9a03 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -121,10 +121,11 @@ private void routeRecordDynamically(SinkRecord record) { String routeValue = extractRouteValue(record.value(), routeField); if (routeValue != null) { - String tableName = (routeNamespace !=null) - ? routeNamespace.toLowerCase() + "." + routeValue.toLowerCase(Locale.ROOT) - : routeValue.toLowerCase(Locale.ROOT); - + String tableName = + (routeNamespace != null) + ? routeNamespace.toLowerCase() + "." + routeValue.toLowerCase(Locale.ROOT) + : routeValue.toLowerCase(Locale.ROOT); + writerForTable(tableName, record, true).write(record); } } From 6b4de88db8cc423dc11776d1d4ddfd356d8357f6 Mon Sep 17 00:00:00 2001 From: Denis Jakupovic Date: Mon, 10 Nov 2025 15:24:27 +0100 Subject: [PATCH 4/4] added route prefix for dynamic routing --- .../org/apache/iceberg/connect/IcebergSinkConfig.java | 11 +++++++++++ .../org/apache/iceberg/connect/data/SinkWriter.java | 3 +++ 2 files changed, 14 insertions(+) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 16947ae23e2a..a10896750d60 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -68,6 +68,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String TABLES_PROP = "iceberg.tables"; private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled"; private static final String TABLES_ROUTE_NAMESPACE_PROP = "iceberg.tables.route-namespace"; + private static final String TABLES_ROUTE_PREFIX_PROP = "iceberg.tables.route-prefix"; private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field"; private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch"; private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns"; @@ -134,6 +135,12 @@ private static ConfigDef newConfigDef() { null, Importance.MEDIUM, "Target namespace for routing records to tables"); + configDef.define( + TABLES_ROUTE_PREFIX_PROP, + ConfigDef.Type.STRING, + null, + Importance.MEDIUM, + "Target prefix for routing records to tables"); configDef.define( TABLES_ROUTE_FIELD_PROP, ConfigDef.Type.STRING, @@ -346,6 +353,10 @@ public String tablesRouteNamespace() { return getString(TABLES_ROUTE_NAMESPACE_PROP); } + public String tablesRoutePrefix() { + return getString(TABLES_ROUTE_PREFIX_PROP); + } + public String tablesRouteField() { return getString(TABLES_ROUTE_FIELD_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index 88ca00ee9a03..9951a34c4e69 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -120,6 +120,9 @@ private void routeRecordDynamically(SinkRecord record) { Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing"); String routeValue = extractRouteValue(record.value(), routeField); + if (config.tablesRoutePrefix() != null) { + routeValue = config.tablesRoutePrefix() + "_" + routeValue; + } if (routeValue != null) { String tableName = (routeNamespace != null)