idColumns() {
return idColumns;
}
diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordRouter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordRouter.java
new file mode 100644
index 000000000000..d4135a2a0fc4
--- /dev/null
+++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordRouter.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+/**
+ * Abstract class for routing records to tables.
+ *
+ * This class should be extended to create custom routing of records to tables.
+ */
+public abstract class RecordRouter {
+
+ private final IcebergWriterFactory writerFactory;
+ private final IcebergSinkConfig config;
+ private final Map writers = Maps.newHashMap();
+
+ public RecordRouter(Catalog catalog, IcebergSinkConfig config) {
+ this.config = config;
+ this.writerFactory = new IcebergWriterFactory(catalog, config);
+ }
+
+ public abstract void routeRecord(SinkRecord record);
+
+ /** Get the configuration passed to the Kafka connect plugin. */
+ public IcebergSinkConfig config() {
+ return config;
+ }
+
+ /**
+ * Write the record to the table
+ *
+ * @param tableName The name of the table to write the record to.
+ * @param record The record to write to the table.
+ * @param ignoreMissingTable If true, missing tables are ignored and no error is thrown.
+ */
+ public void writeToTable(String tableName, SinkRecord record, boolean ignoreMissingTable) {
+ writers
+ .computeIfAbsent(
+ tableName, notUsed -> writerFactory.createWriter(tableName, record, ignoreMissingTable))
+ .write(record);
+ }
+
+ /**
+ * Extract the value of a field from the record as a string.
+ *
+ * @param record The Kafka record to extract the value from.
+ * @param field The name of the field to extract.
+ * @return The value of the field in the record as a string.
+ */
+ public String extractFieldFromRecordValue(SinkRecord record, String field) {
+ if (record.value() == null) {
+ return null;
+ }
+ Object value = RecordUtils.extractFromRecordValue(record.value(), field);
+ return value == null ? null : value.toString();
+ }
+
+ void close() {
+ writers.values().forEach(RecordWriter::close);
+ }
+
+ List completeWrite() {
+ return writers.values().stream()
+ .flatMap(writer -> writer.complete().stream())
+ .collect(Collectors.toList());
+ }
+
+ void clearWriters() {
+ writers.clear();
+ }
+
+ /** Route record to all the tables */
+ public static class AllTablesRecordRouter extends RecordRouter {
+ public AllTablesRecordRouter(Catalog catalog, IcebergSinkConfig config) {
+ super(catalog, config);
+ }
+
+ @Override
+ public void routeRecord(SinkRecord record) {
+ config().tables().forEach(tableName -> writeToTable(tableName, record, false));
+ }
+ }
+
+ /** Route records to tables based on a regex that matches the value of a field in the data. */
+ public static class StaticRecordRouter extends RecordRouter {
+ private final String routeField;
+ private final Map tablePatterns = Maps.newHashMap();
+
+ public StaticRecordRouter(Catalog catalog, IcebergSinkConfig config) {
+ super(catalog, config);
+ this.routeField = config.tablesRouteField();
+ Preconditions.checkNotNull(routeField, "Route field cannot be null with static routing");
+ config
+ .tables()
+ .forEach(
+ tableName ->
+ tablePatterns.put(tableName, config.tableConfig(tableName).routeRegex()));
+ }
+
+ @Override
+ public void routeRecord(SinkRecord record) {
+ String routeValue = extractFieldFromRecordValue(record, routeField);
+ if (routeValue != null) {
+ tablePatterns.forEach(
+ (tableName, tablePattern) -> {
+ if (tablePattern != null && tablePattern.matcher(routeValue).matches()) {
+ writeToTable(tableName, record, false);
+ }
+ });
+ }
+ }
+ }
+
+ /** Route records to the table specified in a field in the data. */
+ public static class DynamicRecordRouter extends RecordRouter {
+ private final String routeField;
+
+ public DynamicRecordRouter(Catalog catalog, IcebergSinkConfig config) {
+ super(catalog, config);
+ routeField = config.tablesRouteField();
+ Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing");
+ }
+
+ @Override
+ public void routeRecord(SinkRecord record) {
+ String routeValue = extractFieldFromRecordValue(record, routeField);
+ if (routeValue != null) {
+ String tableName = routeValue.toLowerCase(Locale.ROOT);
+ writeToTable(tableName, record, true);
+ }
+ }
+ }
+
+ /** Route records to tables with the same name as the record's topic. */
+ public static class TopicNameRecordRouter extends RecordRouter {
+ private final Map> topicTablesMap = Maps.newHashMap();
+
+ public TopicNameRecordRouter(Catalog catalog, IcebergSinkConfig config) {
+ super(catalog, config);
+ config
+ .tables()
+ .forEach(
+ tableName -> {
+ Iterable topics =
+ Splitter.on(',').split(config.tableConfig(tableName).topics());
+ for (String topic : topics) {
+ topicTablesMap.computeIfAbsent(topic, k -> Lists.newArrayList()).add(tableName);
+ }
+ });
+ }
+
+ @Override
+ public void routeRecord(SinkRecord record) {
+ topicTablesMap
+ .getOrDefault(record.topic(), Collections.emptyList())
+ .forEach(tableName -> writeToTable(tableName, record, true));
+ }
+ }
+
+ /** Route records to tables using a regex match the topic of the record. */
+ public static class TopicRegexRecordRouter extends RecordRouter {
+ private final Map tablePatterns = Maps.newHashMap();
+
+ public TopicRegexRecordRouter(Catalog catalog, IcebergSinkConfig config) {
+ super(catalog, config);
+ config
+ .tables()
+ .forEach(
+ tableName ->
+ tablePatterns.putIfAbsent(tableName, config.tableConfig(tableName).topicRegex()));
+ }
+
+ @Override
+ public void routeRecord(SinkRecord record) {
+ // If the mapping isn't pre-computed, check against each table's pattern
+ tablePatterns.forEach(
+ (tableName, tablePattern) -> {
+ if (tablePattern != null && tablePattern.matcher(record.topic()).matches()) {
+ writeToTable(tableName, record, false);
+ }
+ });
+ }
+ }
+}
diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java
index f81155e13777..7da8182dd5fa 100644
--- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java
+++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java
@@ -18,47 +18,57 @@
*/
package org.apache.iceberg.connect.data;
+import java.lang.reflect.InvocationTargetException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.connect.IcebergSinkConfig;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
public class SinkWriter {
- private final IcebergSinkConfig config;
- private final IcebergWriterFactory writerFactory;
- private final Map writers;
private final Map sourceOffsets;
+ private final RecordRouter router;
public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
- this.config = config;
- this.writerFactory = new IcebergWriterFactory(catalog, config);
- this.writers = Maps.newHashMap();
this.sourceOffsets = Maps.newHashMap();
+ if (config.dynamicTablesEnabled()) {
+ router = new RecordRouter.DynamicRecordRouter(catalog, config);
+ } else if (config.tablesRouteWith() == null && config.tablesRouteField() != null) {
+ router = new RecordRouter.StaticRecordRouter(catalog, config);
+ } else if (config.tablesRouteWith() != null) {
+ try {
+ router =
+ config
+ .tablesRouteWith()
+ .getDeclaredConstructor(Catalog.class, IcebergSinkConfig.class)
+ .newInstance(catalog, config);
+ } catch (NoSuchMethodException
+ | InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException e) {
+ throw new IllegalArgumentException(
+ "Cannot create router from iceberg.tables.route-with", e);
+ }
+ } else {
+ router = new RecordRouter.AllTablesRecordRouter(catalog, config);
+ }
}
public void close() {
- writers.values().forEach(RecordWriter::close);
+ router.close();
}
public SinkWriterResult completeWrite() {
- List writerResults =
- writers.values().stream()
- .flatMap(writer -> writer.complete().stream())
- .collect(Collectors.toList());
+ List writerResults = router.completeWrite();
Map offsets = Maps.newHashMap(sourceOffsets);
- writers.clear();
+ router.clearWriters();
sourceOffsets.clear();
return new SinkWriterResult(writerResults, offsets);
@@ -79,63 +89,6 @@ private void save(SinkRecord record) {
new TopicPartition(record.topic(), record.kafkaPartition()),
new Offset(record.kafkaOffset() + 1, timestamp));
- if (config.dynamicTablesEnabled()) {
- routeRecordDynamically(record);
- } else {
- routeRecordStatically(record);
- }
- }
-
- private void routeRecordStatically(SinkRecord record) {
- String routeField = config.tablesRouteField();
-
- if (routeField == null) {
- // route to all tables
- config
- .tables()
- .forEach(
- tableName -> {
- writerForTable(tableName, record, false).write(record);
- });
-
- } else {
- String routeValue = extractRouteValue(record.value(), routeField);
- if (routeValue != null) {
- config
- .tables()
- .forEach(
- tableName -> {
- Pattern regex = config.tableConfig(tableName).routeRegex();
- if (regex != null && regex.matcher(routeValue).matches()) {
- writerForTable(tableName, record, false).write(record);
- }
- });
- }
- }
- }
-
- private void routeRecordDynamically(SinkRecord record) {
- String routeField = config.tablesRouteField();
- Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing");
-
- String routeValue = extractRouteValue(record.value(), routeField);
- if (routeValue != null) {
- String tableName = routeValue.toLowerCase(Locale.ROOT);
- writerForTable(tableName, record, true).write(record);
- }
- }
-
- private String extractRouteValue(Object recordValue, String routeField) {
- if (recordValue == null) {
- return null;
- }
- Object routeValue = RecordUtils.extractFromRecordValue(recordValue, routeField);
- return routeValue == null ? null : routeValue.toString();
- }
-
- private RecordWriter writerForTable(
- String tableName, SinkRecord sample, boolean ignoreMissingTable) {
- return writers.computeIfAbsent(
- tableName, notUsed -> writerFactory.createWriter(tableName, sample, ignoreMissingTable));
+ router.routeRecord(record);
}
}
diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java
index 4a17b926fc56..d0123bfa3576 100644
--- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java
+++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java
@@ -105,6 +105,43 @@ public void testDefaultNoRoute() {
assertThat(writerResults.size()).isEqualTo(0);
}
+ @Test
+ public void testTopicRegexRoute() {
+ TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+ when(tableConfig.topicRegex()).thenReturn(Pattern.compile("topic"));
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tablesRouteWith())
+ .thenAnswer(invocation -> RecordRouter.TopicRegexRecordRouter.class);
+ when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+ when(config.tableConfig(any())).thenReturn(tableConfig);
+
+ Map value = ImmutableMap.of();
+ List writerResults = sinkWriterTest(value, config);
+ assertThat(writerResults.size()).isEqualTo(1);
+ IcebergWriterResult writerResult = writerResults.get(0);
+ assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER);
+ }
+
+ @Test
+ public void testTopicNameRoute() {
+ TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+ when(tableConfig.topics()).thenReturn("topic");
+
+ System.out.println(RecordRouter.DynamicRecordRouter.class.getName());
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tablesRouteWith())
+ .thenAnswer(invocation -> RecordRouter.TopicNameRecordRouter.class);
+ when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+ when(config.tableConfig(any())).thenReturn(tableConfig);
+
+ Map value = ImmutableMap.of();
+ List writerResults = sinkWriterTest(value, config);
+ assertThat(writerResults.size()).isEqualTo(1);
+ IcebergWriterResult writerResult = writerResults.get(0);
+ assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER);
+ }
+
@Test
public void testStaticRoute() {
TableSinkConfig tableConfig = mock(TableSinkConfig.class);