Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: Add mechanisms for routing records by topic name #11623

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 99 additions & 3 deletions docs/docs/kafka-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
|--------------------------------------------|------------------------------------------------------------------------------------------------------------------|
| iceberg.tables | Comma-separated list of destination tables |
| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` |
| iceberg.tables.route-with | Class to use for routing records from topics to tables |
| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) |
Expand All @@ -76,6 +77,8 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
| iceberg.table.\<table name\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name\>.partition-by | Comma-separated list of partition fields to use when creating the table |
| iceberg.table.\<table name\>.route-regex | The regex used to match a record's `routeField` to a table |
| iceberg.table.\<table name\>.topics | Comma-separated list of topic names to route to the table |
| iceberg.table.\<table name\>.topic-regex | The regex used to match a record's Kafka topic to a table |
| iceberg.control.topic | Name of the control topic, default is `control-iceberg` |
| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) |
| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) |
Expand All @@ -86,9 +89,28 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
| iceberg.hadoop.* | Properties passed through to the Hadoop configuration |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |

If `iceberg.tables.dynamic-enabled` is `false` (the default) then you must specify `iceberg.tables`. If
`iceberg.tables.dynamic-enabled` is `true` then you must specify `iceberg.tables.route-field` which will
contain the name of the table.
If `iceberg.tables.route-with` is `org.apache.iceberg.connect.data.RecordRouter$DynamicRecordRouter`,
or `iceberg.tables.dynamic-enabled` is true, you must specify `iceberg.tables.route-field` which will contain the name
of the table. Tables will be dynamically created and named using the value in the field specified in the route field.

If `iceberg.tables.route-with` is `org.apache.iceberg.connect.data.RecordRouter$StaticRecordRouter`,
or `iceberg.tables.route-field` is specified but `iceberg.tables.dynamic-enabled` is false, you must specify
`iceberg.tables` and `iceberg.table.\<table name\>.route-regex` for each table. Records will be routed only to a
table whose route regex matches the value in the route field.

If `iceberg.tables.route-with` is `org.apache.iceberg.connect.data.RecordRouter$TopicNameRecordRouter`, then you must
specify `iceberg.tables` and `iceberg.table.\<table name\>.topics` for each table. Records will be routed to a table
whose topics list contains the record's topic.

If `iceberg.tables.route-with` is `org.apache.iceberg.connect.data.RecordRouter$TopicRegexRecordRouter`,
then you must specify `iceberg.tables` and `iceberg.table.\<table name\>.topic-regex` for each table. Records will be
routed to a table whose topic regex matches the record's topic.

If `iceberg.tables.route-with` is unset and there is no `iceberg.tables.route-field` specified, records are routed to
all the tables from all the topics.

For backwards compatibility, setting `iceberg.tables.dynamic-enabled` to `true` will take precedence over the
`iceberg.tables.route-with` configuration.

### Kafka configuration

Expand Down Expand Up @@ -350,3 +372,77 @@ See above for creating two tables.
}
}
```

### Topic name based multi-table routing

This assumes that the source topics `events_create` and `events_list` already exist and are named as such.

This example writes to tables whose `topics` property includes the topic of the record. The records read from
topic `events_create` are routed to the `default.events_create` table and records read from the topic `events_list`
are routed to the `default.events_list` table.

#### Create two destination tables

See above for creating two tables.

#### Connector config

```json
{
"name": "events-sink",
"config": {
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "events_create,events_list",
"iceberg.tables": "default.events_list,default.events_create",
"iceberg.tables.route-with": "org.apache.iceberg.connect.data.RecordRouter$TopicNameRecordRouter",
"iceberg.table.default.events_list.topics": "events_list",
"iceberg.table.default.events_create.topics": "events_create",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse name>"
}
}
```

### Topic regex based multi-table routing

This assumes that the source topics `events_create` and `events_list` already exist and are named as such.

This example writes to tables whose `topic-regex` property matches the topic of the record. The records read from
topic `events_create` are routed to the `default.events_create` table and records read from the topic `events_list`
are routed to the `default.events_list` table. The regex based routing is particularly useful when the source topics
of the connector are specified using `topic.regex` configuration.

#### Create two destination tables

See above for creating two tables.

#### Connector config

```json
{
"name": "events-sink",
"config": {
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "events_create,events_list",
"iceberg.tables": "default.events_list,default.events_create",
"iceberg.tables.route-with": "org.apache.iceberg.connect.data.RecordRouter$TopicRegexRecordRouter",
"iceberg.table.default.events_list.topic-regex": "events_list",
"iceberg.table.default.events_create.topic-regex": "events_create",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse name>"
}
}
```

### Custom routing of records

Routing records from Kafka to tables can be customized by providing your own implementation of the `RecordRouter`.
Implementations must inherit `org.apache.iceberg.connect.data.RecordRouter`. To write a record to a table,
implementations can call the `writeToTable` method of the `RecordRouter`. You can then set `iceberg.tables.route-with`
to the class name of your plugin. The class must be available at runtime to the Kafka connect workers.
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;

public class IntegrationTopicRoutingTest extends IntegrationTestBase {

private static final String TEST_DB = "test";
private static final String TEST_TABLE1 = "table1";
private static final String TEST_TABLE2 = "table2";
private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1);
private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2);
private static final String TEST_TOPIC1 = "topic1";
private static final String TEST_TOPIC2 = "topic2";

@BeforeEach
public void before() {
createTopic(TEST_TOPIC1, TEST_TOPIC_PARTITIONS);
createTopic(TEST_TOPIC2, TEST_TOPIC_PARTITIONS);
((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB));
}

@AfterEach
public void after() {
context().stopConnector(connectorName());
deleteTopic(TEST_TOPIC1);
deleteTopic(TEST_TOPIC2);
catalog().dropTable(TABLE_IDENTIFIER1);
catalog().dropTable(TABLE_IDENTIFIER2);
((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB));
}

@ParameterizedTest
@NullSource
@ValueSource(strings = "test_branch")
public void testTopicNameRouter(String branch) {
// partitioned table
catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC);
// unpartitioned table
catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA);

boolean useSchema = branch == null; // use a schema for one of the tests
// set offset reset to earliest so we don't miss any test messages
KafkaConnectUtils.Config connectorConfig =
new KafkaConnectUtils.Config(connectorName())
.config("topics", String.format("%s,%s", TEST_TOPIC1, TEST_TOPIC2))
.config("connector.class", IcebergSinkConnector.class.getName())
.config("tasks.max", 2)
.config("consumer.override.auto.offset.reset", "earliest")
.config("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("key.converter.schemas.enable", false)
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("value.converter.schemas.enable", useSchema)
.config(
"iceberg.tables.route-with",
"org.apache.iceberg.connect.data.RecordRouter$TopicNameRecordRouter")
.config(
"iceberg.tables",
String.format("%s.%s,%s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2))
.config(String.format("iceberg.table.%s.%s.topics", TEST_DB, TEST_TABLE1), TEST_TOPIC1)
.config(String.format("iceberg.table.%s.%s.topics", TEST_DB, TEST_TABLE2), TEST_TOPIC2)
.config("iceberg.control.commit.interval-ms", 1000)
.config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE)
.config("iceberg.kafka.auto.offset.reset", "earliest");

runTest(connectorConfig, branch, useSchema);
}

@ParameterizedTest
@NullSource
@ValueSource(strings = "test_branch")
public void testTopicRegexRouter(String branch) {
// partitioned table
catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC);
// unpartitioned table
catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA);

boolean useSchema = branch == null; // use a schema for one of the tests
// set offset reset to earliest so we don't miss any test messages
KafkaConnectUtils.Config connectorConfig =
new KafkaConnectUtils.Config(connectorName())
.config("topics", String.format("%s,%s", TEST_TOPIC1, TEST_TOPIC2))
.config("connector.class", IcebergSinkConnector.class.getName())
.config("tasks.max", 2)
.config("consumer.override.auto.offset.reset", "earliest")
.config("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("key.converter.schemas.enable", false)
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("value.converter.schemas.enable", useSchema)
.config(
"iceberg.tables.route-with",
"org.apache.iceberg.connect.data.RecordRouter$TopicRegexRecordRouter")
.config(
"iceberg.tables",
String.format("%s.%s,%s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2))
.config(String.format("iceberg.table.%s.%s.topic-regex", TEST_DB, TEST_TABLE1), ".*1")
.config(String.format("iceberg.table.%s.%s.topic-regex", TEST_DB, TEST_TABLE2), ".*2")
.config("iceberg.control.commit.interval-ms", 1000)
.config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE)
.config("iceberg.kafka.auto.offset.reset", "earliest");

runTest(connectorConfig, branch, useSchema);
}

private void runTest(KafkaConnectUtils.Config connectorConfig, String branch, boolean useSchema) {

context().connectorCatalogProperties().forEach(connectorConfig::config);

if (branch != null) {
connectorConfig.config("iceberg.tables.default-commit-branch", branch);
}

if (!useSchema) {
connectorConfig.config("value.converter.schemas.enable", false);
}

context().startConnector(connectorConfig);

TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "test1");
TestEvent event2 = new TestEvent(2, "type2", Instant.now(), "test2");
TestEvent event3 = new TestEvent(3, "type3", Instant.now(), "test3");

send(TEST_TOPIC1, event1, useSchema);
send(TEST_TOPIC2, event2, useSchema);
send(TEST_TOPIC2, event3, useSchema);
flush();

Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(this::assertSnapshotAdded);

List<DataFile> files = dataFiles(TABLE_IDENTIFIER1, branch);
assertThat(files).hasSize(1);
assertThat(files.get(0).recordCount()).isEqualTo(1);
assertSnapshotProps(TABLE_IDENTIFIER1, branch);

files = dataFiles(TABLE_IDENTIFIER2, branch);
assertThat(files).hasSize(2);
assertThat(files.get(0).recordCount()).isEqualTo(1);
assertThat(files.get(1).recordCount()).isEqualTo(1);
assertSnapshotProps(TABLE_IDENTIFIER2, branch);
}

private void assertSnapshotAdded() {
Table table = catalog().loadTable(TABLE_IDENTIFIER1);
assertThat(table.snapshots()).hasSize(1);
table = catalog().loadTable(TABLE_IDENTIFIER2);
assertThat(table.snapshots()).hasSize(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.connect.data.RecordRouter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
Expand All @@ -53,6 +54,8 @@ public class IcebergSinkConfig extends AbstractConfig {
public static final String INTERNAL_TRANSACTIONAL_SUFFIX_PROP =
"iceberg.coordinator.transactional.suffix";
private static final String ROUTE_REGEX = "route-regex";
private static final String TOPICS = "topics";
private static final String TOPIC_REGEX = "topic-regex";
private static final String ID_COLUMNS = "id-columns";
private static final String PARTITION_BY = "partition-by";
private static final String COMMIT_BRANCH = "commit-branch";
Expand All @@ -66,6 +69,7 @@ public class IcebergSinkConfig extends AbstractConfig {

private static final String CATALOG_NAME_PROP = "iceberg.catalog";
private static final String TABLES_PROP = "iceberg.tables";
private static final String TABLES_ROUTE_WITH_PROP = "iceberg.tables.route-with";
private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled";
private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field";
private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch";
Expand Down Expand Up @@ -114,6 +118,12 @@ private static ConfigDef newConfigDef() {
null,
Importance.HIGH,
"Comma-delimited list of destination tables");
configDef.define(
TABLES_ROUTE_WITH_PROP,
ConfigDef.Type.CLASS,
null,
Importance.MEDIUM,
"Routing mechanism for the tables: FIELD_VALUE, FIELD_REGEX, TOPIC_NAME, TOPIC_REGEX");
configDef.define(
TABLES_DYNAMIC_PROP,
ConfigDef.Type.BOOLEAN,
Expand Down Expand Up @@ -306,6 +316,10 @@ public boolean dynamicTablesEnabled() {
return getBoolean(TABLES_DYNAMIC_PROP);
}

public <T extends RecordRouter> Class<T> tablesRouteWith() {
return (Class<T>) getClass(TABLES_ROUTE_WITH_PROP);
}

public String tablesRouteField() {
return getString(TABLES_ROUTE_FIELD_PROP);
}
Expand All @@ -332,6 +346,11 @@ public TableSinkConfig tableConfig(String tableName) {
String routeRegexStr = tableConfig.get(ROUTE_REGEX);
Pattern routeRegex = routeRegexStr == null ? null : Pattern.compile(routeRegexStr);

String topics = tableConfig.get(TOPICS);

String topicRegexStr = tableConfig.get(TOPIC_REGEX);
Pattern topicRegex = topicRegexStr == null ? null : Pattern.compile(topicRegexStr);

String idColumnsStr = tableConfig.getOrDefault(ID_COLUMNS, tablesDefaultIdColumns());
List<String> idColumns = stringToList(idColumnsStr, ",");

Expand All @@ -342,7 +361,8 @@ public TableSinkConfig tableConfig(String tableName) {
String commitBranch =
tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch());

return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch);
return new TableSinkConfig(
routeRegex, topics, topicRegex, idColumns, partitionBy, commitBranch);
});
}

Expand Down
Loading