Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
Expand All @@ -32,37 +33,59 @@
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StringType;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.types.Types.UUIDType;

/** Element representing a table identifier, with namespace and name. */
public class TableReference implements IndexedRecord {

private String catalog;
private List<String> namespace;
private String name;
private UUID uuid;
private final Schema avroSchema;

static final int CATALOG = 10_600;
static final int NAMESPACE = 10_601;
static final int NAME = 10_603;
static final int TABLE_UUID = 10_604;

public static final StructType ICEBERG_SCHEMA =
StructType.of(
NestedField.required(CATALOG, "catalog", StringType.get()),
NestedField.required(
NAMESPACE, "namespace", ListType.ofRequired(NAMESPACE + 1, StringType.get())),
NestedField.required(NAME, "name", StringType.get()));
NestedField.required(NAME, "name", StringType.get()),
NestedField.optional(TABLE_UUID, "table_uuid", UUIDType.get()));
private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, TableReference.class);

/**
* @deprecated since 1.11.0, will be removed in 1.12.0; use {@link TableReference#of(String,
* TableIdentifier, UUID)}
*/
@Deprecated
public static TableReference of(String catalog, TableIdentifier tableIdentifier) {
return new TableReference(
catalog, Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name());
catalog, Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name(), null);
}

public static TableReference of(String catalog, TableIdentifier tableIdentifier, UUID tableUuid) {
return new TableReference(
catalog,
Arrays.asList(tableIdentifier.namespace().levels()),
tableIdentifier.name(),
tableUuid);
}

// Used by Avro reflection to instantiate this class when reading events
public TableReference(Schema avroSchema) {
this.avroSchema = avroSchema;
}

/**
* @deprecated since 1.11.0, will be removed in 1.12.0; use {@link TableReference#of(String,
* TableIdentifier, UUID)}.
*/
@Deprecated
public TableReference(String catalog, List<String> namespace, String name) {
Preconditions.checkNotNull(catalog, "Catalog cannot be null");
Preconditions.checkNotNull(namespace, "Namespace cannot be null");
Expand All @@ -73,10 +96,25 @@ public TableReference(String catalog, List<String> namespace, String name) {
this.avroSchema = AVRO_SCHEMA;
}

private TableReference(String catalog, List<String> namespace, String name, UUID uuid) {
Preconditions.checkNotNull(catalog, "Catalog cannot be null");
Preconditions.checkNotNull(namespace, "Namespace cannot be null");
Preconditions.checkNotNull(name, "Name cannot be null");
this.catalog = catalog;
this.namespace = namespace;
this.name = name;
this.uuid = uuid;
this.avroSchema = AVRO_SCHEMA;
}

public String catalog() {
return catalog;
}

public UUID uuid() {
return uuid;
}

public TableIdentifier identifier() {
Namespace icebergNamespace = Namespace.of(namespace.toArray(new String[0]));
return TableIdentifier.of(icebergNamespace, name);
Expand All @@ -103,6 +141,9 @@ public void put(int i, Object v) {
case NAME:
this.name = v == null ? null : v.toString();
return;
case TABLE_UUID:
this.uuid = (UUID) v;
return;
default:
// ignore the object, it must be from a newer version of the format
}
Expand All @@ -117,6 +158,8 @@ public Object get(int i) {
return namespace;
case NAME:
return name;
case TABLE_UUID:
return uuid;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
Expand All @@ -133,11 +176,12 @@ public boolean equals(Object o) {
TableReference that = (TableReference) o;
return Objects.equals(catalog, that.catalog)
&& Objects.equals(namespace, that.namespace)
&& Objects.equals(name, that.name);
&& Objects.equals(name, that.name)
&& Objects.equals(uuid, that.uuid);
}

@Override
public int hashCode() {
return Objects.hash(catalog, namespace, name);
return Objects.hash(catalog, namespace, name, uuid);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.jupiter.api.Test;

public class TestEventSerialization {
Expand Down Expand Up @@ -50,7 +50,7 @@ public void testDataWrittenSerialization() {
new DataWritten(
EventTestUtil.SPEC.partitionType(),
commitId,
new TableReference("catalog", Collections.singletonList("db"), "tbl"),
TableReference.of("catalog", TableIdentifier.of("db", "tbl"), UUID.randomUUID()),
Arrays.asList(EventTestUtil.createDataFile(), EventTestUtil.createDataFile()),
Arrays.asList(EventTestUtil.createDeleteFile(), EventTestUtil.createDeleteFile())));

Expand Down Expand Up @@ -97,7 +97,7 @@ public void testCommitToTableSerialization() {
"cg-connector",
new CommitToTable(
commitId,
new TableReference("catalog", Collections.singletonList("db"), "tbl"),
TableReference.of("catalog", TableIdentifier.of("db", "tbl"), UUID.randomUUID()),
1L,
EventTestUtil.now()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ private String offsetsToJson(Map<Integer, Long> offsets) {
}
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private void commitToTable(
TableReference tableReference,
List<Envelope> envelopeList,
Expand All @@ -207,6 +208,14 @@ private void commitToTable(
return;
}

if (!Objects.equals(table.uuid(), tableReference.uuid())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder the case where lets say some one does a library upgrade and it has events without the uuid in the tableReference, what would happen in this case ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The events will be skipped. We should probably only check the UUID if the ref UUID is not null, even though that isn't ideal.

LOG.warn(
"Skipping commits to table {} due to target table mismatch. Expected: {} Received: {}",
tableIdentifier,
table.uuid(),
tableReference.uuid());
Comment on lines +212 to +216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to return here ? seems like we are just logging ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, we need to return here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open a PR to address these two issues, I approved a little too soon.

}

String branch = config.tableConfig(tableIdentifier.toString()).commitBranch();

// Control topic partition offsets may include a subset of partition ids if there were no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.iceberg.connect.events.Event;
import org.apache.iceberg.connect.events.PayloadType;
import org.apache.iceberg.connect.events.StartCommit;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.connect.events.TopicPartitionOffset;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
Expand Down Expand Up @@ -101,7 +100,7 @@ protected boolean receive(Envelope envelope) {
new DataWritten(
writeResult.partitionStruct(),
commitId,
TableReference.of(config.catalogName(), writeResult.tableIdentifier()),
writeResult.tableReference(),
writeResult.dataFiles(),
writeResult.deleteFiles())))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.List;
import java.util.Locale;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
Expand All @@ -35,23 +35,23 @@

class IcebergWriter implements RecordWriter {
private final Table table;
private final String tableName;
private final TableReference tableReference;
private final IcebergSinkConfig config;
private final List<IcebergWriterResult> writerResults;

private RecordConverter recordConverter;
private TaskWriter<Record> writer;

IcebergWriter(Table table, String tableName, IcebergSinkConfig config) {
IcebergWriter(Table table, TableReference tableReference, IcebergSinkConfig config) {
this.table = table;
this.tableName = tableName;
this.tableReference = tableReference;
this.config = config;
this.writerResults = Lists.newArrayList();
initNewWriter();
}

private void initNewWriter() {
this.writer = RecordUtils.createTableWriter(table, tableName, config);
this.writer = RecordUtils.createTableWriter(table, tableReference, config);
this.recordConverter = new RecordConverter(table, config);
}

Expand Down Expand Up @@ -107,7 +107,7 @@ private void flush() {

writerResults.add(
new IcebergWriterResult(
TableIdentifier.parse(tableName),
tableReference,
Arrays.asList(writeResult.dataFiles()),
Arrays.asList(writeResult.deleteFiles()),
table.spec().partitionType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
Expand All @@ -28,6 +29,7 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand Down Expand Up @@ -67,7 +69,15 @@ RecordWriter createWriter(String tableName, SinkRecord sample, boolean ignoreMis
}
}

return new IcebergWriter(table, tableName, config);
UUID tableUuid = table.uuid();
if (tableUuid == null) {
LOG.warn(
"Table {} does not have a UUID, this may cause issues with commit coordination on table replace",
identifier);
}
TableReference tableReference = TableReference.of(catalog.name(), identifier, tableUuid);

return new IcebergWriter(table, tableReference, config);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,54 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.types.Types.StructType;

public class IcebergWriterResult {

private final TableIdentifier tableIdentifier;
private final TableReference tableReference;
private final List<DataFile> dataFiles;
private final List<DeleteFile> deleteFiles;
private final StructType partitionStruct;

public IcebergWriterResult(
TableReference tableReference,
List<DataFile> dataFiles,
List<DeleteFile> deleteFiles,
StructType partitionStruct) {
this.tableReference = tableReference;
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.partitionStruct = partitionStruct;
}

/**
* @deprecated since 1.11.0, will be removed in 1.12.0; use {@link
* IcebergWriterResult#IcebergWriterResult(TableReference, List, List, StructType)} instead
*/
@Deprecated
public IcebergWriterResult(
TableIdentifier tableIdentifier,
List<DataFile> dataFiles,
List<DeleteFile> deleteFiles,
StructType partitionStruct) {
this.tableIdentifier = tableIdentifier;
this.tableReference = TableReference.of("unknown", tableIdentifier);
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.partitionStruct = partitionStruct;
}

public TableReference tableReference() {
return tableReference;
}

/**
* @deprecated since 1.11.0, will be removed in 1.12.0; use {@code tableReference().identifier()}
* instead
*/
@Deprecated
public TableIdentifier tableIdentifier() {
return tableIdentifier;
return tableReference.identifier();
}

public List<DataFile> dataFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.data.GenericFileWriterFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileWriterFactory;
Expand Down Expand Up @@ -95,7 +96,7 @@ private static Object valueFromMap(Map<String, ?> parent, List<String> fields) {
}

public static TaskWriter<Record> createTableWriter(
Table table, String tableName, IcebergSinkConfig config) {
Table table, TableReference tableReference, IcebergSinkConfig config) {
Map<String, String> tableProps = Maps.newHashMap(table.properties());
tableProps.putAll(config.writeProps());

Expand All @@ -113,7 +114,7 @@ public static TaskWriter<Record> createTableWriter(
Set<Integer> identifierFieldIds = table.schema().identifierFieldIds();

// override the identifier fields if the config is set
List<String> idCols = config.tableConfig(tableName).idColumns();
List<String> idCols = config.tableConfig(tableReference.identifier().name()).idColumns();
if (!idCols.isEmpty()) {
identifierFieldIds =
idCols.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.events.AvroUtil;
import org.apache.iceberg.connect.events.CommitComplete;
import org.apache.iceberg.connect.events.CommitToTable;
Expand Down Expand Up @@ -189,7 +190,7 @@ private UUID coordinatorTest(
new DataWritten(
StructType.of(),
commitId,
new TableReference("catalog", ImmutableList.of("db"), "tbl"),
TableReference.of("catalog", TableIdentifier.of("db", "tbl"), UUID.randomUUID()),
dataFiles,
deleteFiles));
bytes = AvroUtil.encode(commitResponse);
Expand Down
Loading