-
Notifications
You must be signed in to change notification settings - Fork 3k
Kafka Connect: validate table uuid on commit #14979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...onnect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
Outdated
Show resolved
Hide resolved
...onnect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
Show resolved
Hide resolved
|
|
||
| if (expectedUuid != null && !expectedUuid.equals(payloadTableUuid)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of skipping should we fail instead, otherwise can it lead to data loss as the offsets will committed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the difficult things with KC and errors like this. If we fail, then you have events in the control topic, which will remain. The only options at that point are to clear the control topic or move the offsets forward. However, doing that can result in losing commit data for other tables since events for multiple tables are all intermixed.
I don't think there's really a safe recovery and if the data needs to be recovered, you would create a new consumer group for the control topic and reset the consumer offset back to prior to these events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to skipping, we also skip if the table name is not found. I feel we could move this check up, to right after the table load and verify that the UUID matches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bryanck has a good point here. The envelopes are already collected by TableReference before reaching this point. All we need is to ensure that the latest loaded table is consistent with the table reference used to partition the messages.
| LOG.warn( | ||
| "Skipping commits to table {} due to target table mismatch. Expected: {} Received: {}", | ||
| tableIdentifier, | ||
| table.uuid(), | ||
| tableReference.uuid()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to return here ? seems like we are just logging ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, we need to return here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll open a PR to address these two issues, I approved a little too soon.
| return; | ||
| } | ||
|
|
||
| if (!Objects.equals(table.uuid(), tableReference.uuid())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder the case where lets say some one does a library upgrade and it has events without the uuid in the tableReference, what would happen in this case ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The events will be skipped. We should probably only check the UUID if the ref UUID is not null, even though that isn't ideal.
|
I opened #15011 to fix a couple of things @singhpk234 pointed out. |
Kafka connect events for data written currently include the name of the target table but don't capture the UUID. Since the coordinator loads the table at time of commit and then processes the events, this can result in a number of issues if the table was dropped/moved and recreated including:
This PR validates that the UUID of the table used when constructing the writer is consistent with the target table UUID at commit time.