diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java index f30eac892400..cd1341822c58 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; @@ -32,6 +33,7 @@ import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StringType; import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.UUIDType; /** Element representing a table identifier, with namespace and name. */ public class TableReference implements IndexedRecord { @@ -39,23 +41,39 @@ public class TableReference implements IndexedRecord { private String catalog; private List namespace; private String name; + private UUID uuid; private final Schema avroSchema; static final int CATALOG = 10_600; static final int NAMESPACE = 10_601; static final int NAME = 10_603; + static final int TABLE_UUID = 10_604; public static final StructType ICEBERG_SCHEMA = StructType.of( NestedField.required(CATALOG, "catalog", StringType.get()), NestedField.required( NAMESPACE, "namespace", ListType.ofRequired(NAMESPACE + 1, StringType.get())), - NestedField.required(NAME, "name", StringType.get())); + NestedField.required(NAME, "name", StringType.get()), + NestedField.optional(TABLE_UUID, "table_uuid", UUIDType.get())); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, TableReference.class); + /** + * @deprecated since 1.11.0, will be removed in 1.12.0; use {@link TableReference#of(String, + * TableIdentifier, UUID)} + */ + @Deprecated public static TableReference of(String catalog, TableIdentifier tableIdentifier) { return new TableReference( - catalog, Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name()); + catalog, Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name(), null); + } + + public static TableReference of(String catalog, TableIdentifier tableIdentifier, UUID tableUuid) { + return new TableReference( + catalog, + Arrays.asList(tableIdentifier.namespace().levels()), + tableIdentifier.name(), + tableUuid); } // Used by Avro reflection to instantiate this class when reading events @@ -63,6 +81,11 @@ public TableReference(Schema avroSchema) { this.avroSchema = avroSchema; } + /** + * @deprecated since 1.11.0, will be removed in 1.12.0; use {@link TableReference#of(String, + * TableIdentifier, UUID)}. + */ + @Deprecated public TableReference(String catalog, List namespace, String name) { Preconditions.checkNotNull(catalog, "Catalog cannot be null"); Preconditions.checkNotNull(namespace, "Namespace cannot be null"); @@ -73,10 +96,25 @@ public TableReference(String catalog, List namespace, String name) { this.avroSchema = AVRO_SCHEMA; } + private TableReference(String catalog, List namespace, String name, UUID uuid) { + Preconditions.checkNotNull(catalog, "Catalog cannot be null"); + Preconditions.checkNotNull(namespace, "Namespace cannot be null"); + Preconditions.checkNotNull(name, "Name cannot be null"); + this.catalog = catalog; + this.namespace = namespace; + this.name = name; + this.uuid = uuid; + this.avroSchema = AVRO_SCHEMA; + } + public String catalog() { return catalog; } + public UUID uuid() { + return uuid; + } + public TableIdentifier identifier() { Namespace icebergNamespace = Namespace.of(namespace.toArray(new String[0])); return TableIdentifier.of(icebergNamespace, name); @@ -103,6 +141,9 @@ public void put(int i, Object v) { case NAME: this.name = v == null ? null : v.toString(); return; + case TABLE_UUID: + this.uuid = (UUID) v; + return; default: // ignore the object, it must be from a newer version of the format } @@ -117,6 +158,8 @@ public Object get(int i) { return namespace; case NAME: return name; + case TABLE_UUID: + return uuid; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); } @@ -133,11 +176,12 @@ public boolean equals(Object o) { TableReference that = (TableReference) o; return Objects.equals(catalog, that.catalog) && Objects.equals(namespace, that.namespace) - && Objects.equals(name, that.name); + && Objects.equals(name, that.name) + && Objects.equals(uuid, that.uuid); } @Override public int hashCode() { - return Objects.hash(catalog, namespace, name); + return Objects.hash(catalog, namespace, name, uuid); } } diff --git a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/TestEventSerialization.java b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/TestEventSerialization.java index 10054c66400d..eea2e6373019 100644 --- a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/TestEventSerialization.java +++ b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/TestEventSerialization.java @@ -21,8 +21,8 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Arrays; -import java.util.Collections; import java.util.UUID; +import org.apache.iceberg.catalog.TableIdentifier; import org.junit.jupiter.api.Test; public class TestEventSerialization { @@ -50,7 +50,7 @@ public void testDataWrittenSerialization() { new DataWritten( EventTestUtil.SPEC.partitionType(), commitId, - new TableReference("catalog", Collections.singletonList("db"), "tbl"), + TableReference.of("catalog", TableIdentifier.of("db", "tbl"), UUID.randomUUID()), Arrays.asList(EventTestUtil.createDataFile(), EventTestUtil.createDataFile()), Arrays.asList(EventTestUtil.createDeleteFile(), EventTestUtil.createDeleteFile()))); @@ -97,7 +97,7 @@ public void testCommitToTableSerialization() { "cg-connector", new CommitToTable( commitId, - new TableReference("catalog", Collections.singletonList("db"), "tbl"), + TableReference.of("catalog", TableIdentifier.of("db", "tbl"), UUID.randomUUID()), 1L, EventTestUtil.now())); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index 1e0fa3286fe5..02a09db4764c 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -193,6 +193,7 @@ private String offsetsToJson(Map offsets) { } } + @SuppressWarnings("checkstyle:CyclomaticComplexity") private void commitToTable( TableReference tableReference, List envelopeList, @@ -207,6 +208,14 @@ private void commitToTable( return; } + if (!Objects.equals(table.uuid(), tableReference.uuid())) { + LOG.warn( + "Skipping commits to table {} due to target table mismatch. Expected: {} Received: {}", + tableIdentifier, + table.uuid(), + tableReference.uuid()); + } + String branch = config.tableConfig(tableIdentifier.toString()).commitBranch(); // Control topic partition offsets may include a subset of partition ids if there were no diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java index 27c5b9622fd3..903be7070370 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java @@ -32,7 +32,6 @@ import org.apache.iceberg.connect.events.Event; import org.apache.iceberg.connect.events.PayloadType; import org.apache.iceberg.connect.events.StartCommit; -import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.connect.events.TopicPartitionOffset; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; @@ -101,7 +100,7 @@ protected boolean receive(Envelope envelope) { new DataWritten( writeResult.partitionStruct(), commitId, - TableReference.of(config.catalogName(), writeResult.tableIdentifier()), + writeResult.tableReference(), writeResult.dataFiles(), writeResult.deleteFiles()))) .collect(Collectors.toList()); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index b5be5b3a0047..a1b47e8c0e92 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -24,8 +24,8 @@ import java.util.List; import java.util.Locale; import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; @@ -35,23 +35,23 @@ class IcebergWriter implements RecordWriter { private final Table table; - private final String tableName; + private final TableReference tableReference; private final IcebergSinkConfig config; private final List writerResults; private RecordConverter recordConverter; private TaskWriter writer; - IcebergWriter(Table table, String tableName, IcebergSinkConfig config) { + IcebergWriter(Table table, TableReference tableReference, IcebergSinkConfig config) { this.table = table; - this.tableName = tableName; + this.tableReference = tableReference; this.config = config; this.writerResults = Lists.newArrayList(); initNewWriter(); } private void initNewWriter() { - this.writer = RecordUtils.createTableWriter(table, tableName, config); + this.writer = RecordUtils.createTableWriter(table, tableReference, config); this.recordConverter = new RecordConverter(table, config); } @@ -107,7 +107,7 @@ private void flush() { writerResults.add( new IcebergWriterResult( - TableIdentifier.parse(tableName), + tableReference, Arrays.asList(writeResult.dataFiles()), Arrays.asList(writeResult.deleteFiles()), table.spec().partitionType())); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 92f5af2d7a87..afb68f170136 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -28,6 +29,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -67,7 +69,15 @@ RecordWriter createWriter(String tableName, SinkRecord sample, boolean ignoreMis } } - return new IcebergWriter(table, tableName, config); + UUID tableUuid = table.uuid(); + if (tableUuid == null) { + LOG.warn( + "Table {} does not have a UUID, this may cause issues with commit coordination on table replace", + identifier); + } + TableReference tableReference = TableReference.of(catalog.name(), identifier, tableUuid); + + return new IcebergWriter(table, tableReference, config); } @VisibleForTesting diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java index 58695a5572b5..5667399cd74e 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java @@ -22,28 +22,54 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.types.Types.StructType; public class IcebergWriterResult { - private final TableIdentifier tableIdentifier; + private final TableReference tableReference; private final List dataFiles; private final List deleteFiles; private final StructType partitionStruct; + public IcebergWriterResult( + TableReference tableReference, + List dataFiles, + List deleteFiles, + StructType partitionStruct) { + this.tableReference = tableReference; + this.dataFiles = dataFiles; + this.deleteFiles = deleteFiles; + this.partitionStruct = partitionStruct; + } + + /** + * @deprecated since 1.11.0, will be removed in 1.12.0; use {@link + * IcebergWriterResult#IcebergWriterResult(TableReference, List, List, StructType)} instead + */ + @Deprecated public IcebergWriterResult( TableIdentifier tableIdentifier, List dataFiles, List deleteFiles, StructType partitionStruct) { - this.tableIdentifier = tableIdentifier; + this.tableReference = TableReference.of("unknown", tableIdentifier); this.dataFiles = dataFiles; this.deleteFiles = deleteFiles; this.partitionStruct = partitionStruct; } + public TableReference tableReference() { + return tableReference; + } + + /** + * @deprecated since 1.11.0, will be removed in 1.12.0; use {@code tableReference().identifier()} + * instead + */ + @Deprecated public TableIdentifier tableIdentifier() { - return tableIdentifier; + return tableReference.identifier(); } public List dataFiles() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java index d4bf4ce2a4d3..5ed820c8cbe1 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java @@ -27,6 +27,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.data.GenericFileWriterFactory; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileWriterFactory; @@ -95,7 +96,7 @@ private static Object valueFromMap(Map parent, List fields) { } public static TaskWriter createTableWriter( - Table table, String tableName, IcebergSinkConfig config) { + Table table, TableReference tableReference, IcebergSinkConfig config) { Map tableProps = Maps.newHashMap(table.properties()); tableProps.putAll(config.writeProps()); @@ -113,7 +114,7 @@ public static TaskWriter createTableWriter( Set identifierFieldIds = table.schema().identifierFieldIds(); // override the identifier fields if the config is set - List idCols = config.tableConfig(tableName).idColumns(); + List idCols = config.tableConfig(tableReference.identifier().name()).idColumns(); if (!idCols.isEmpty()) { identifierFieldIds = idCols.stream() diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index 60a085781171..a8fbce7a8b01 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -33,6 +33,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.connect.events.AvroUtil; import org.apache.iceberg.connect.events.CommitComplete; import org.apache.iceberg.connect.events.CommitToTable; @@ -189,7 +190,7 @@ private UUID coordinatorTest( new DataWritten( StructType.of(), commitId, - new TableReference("catalog", ImmutableList.of("db"), "tbl"), + TableReference.of("catalog", TableIdentifier.of("db", "tbl"), UUID.randomUUID()), dataFiles, deleteFiles)); bytes = AvroUtil.encode(commitResponse); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index a14ebcab7336..6baf72117d04 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -91,7 +91,7 @@ public void testDefaultRoute() { List writerResults = sinkWriterTest(value, config); assertThat(writerResults).hasSize(1); IcebergWriterResult writerResult = writerResults.get(0); - assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER); + assertThat(writerResult.tableReference().identifier()).isEqualTo(TABLE_IDENTIFIER); } @Test @@ -119,7 +119,7 @@ public void testStaticRoute() { List writerResults = sinkWriterTest(value, config); assertThat(writerResults).hasSize(1); IcebergWriterResult writerResult = writerResults.get(0); - assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER); + assertThat(writerResult.tableReference().identifier()).isEqualTo(TABLE_IDENTIFIER); } @Test @@ -150,7 +150,7 @@ public void testDynamicRoute() { List writerResults = sinkWriterTest(value, config); assertThat(writerResults).hasSize(1); IcebergWriterResult writerResult = writerResults.get(0); - assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER); + assertThat(writerResult.tableReference().identifier()).isEqualTo(TABLE_IDENTIFIER); } @Test diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/WriterTestBase.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/WriterTestBase.java index d25bfde85c0b..30b60fb3c542 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/WriterTestBase.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/WriterTestBase.java @@ -25,11 +25,14 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.UUID; import org.apache.iceberg.LocationProviders; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.data.Record; import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.inmemory.InMemoryFileIO; @@ -73,7 +76,9 @@ public void before() { protected WriteResult writeTest( List rows, IcebergSinkConfig config, Class expectedWriterClass) { - try (TaskWriter writer = RecordUtils.createTableWriter(table, "name", config)) { + TableReference tableReference = + TableReference.of("test_catalog", TableIdentifier.of("name"), UUID.randomUUID()); + try (TaskWriter writer = RecordUtils.createTableWriter(table, tableReference, config)) { assertThat(writer.getClass()).isEqualTo(expectedWriterClass); rows.forEach(