-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement UPDATE for the Iceberg connector #12026
Conversation
2ccef25
to
ef32f36
Compare
8136bb5
to
07ce007
Compare
1e115b6
to
0603fdd
Compare
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/CountingAccessHiveMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java
Outdated
Show resolved
Hide resolved
0603fdd
to
e4f0fad
Compare
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java
Outdated
Show resolved
Hide resolved
@findepi AC thanks |
e4f0fad
to
eb1ca9c
Compare
} | ||
released = true; | ||
ReentrantLock lock = tableLocks.get(tableName); | ||
// Currently, metastore locks are always acquired and released in the same thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this, if they are always acquired and released in the same thread what is the point of using locks instead of booleans?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple threads can request the lock but whichever thread acquires it, must be the one to release it because of how ReentrantLock works. If the lock was acquired for the entire query, instead of just the commit block, it'd be harder to ensure that.
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java
Outdated
Show resolved
Hide resolved
// Currently, metastore locks are always acquired and released in the same thread. | ||
lock.unlock(); | ||
if (!lock.isLocked()) { | ||
tableLocks.remove(tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove(tableName, lock)
to ensure you remove the thing you want to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more importantly, i don't think we can base our logic on lock.isLocked
.
there can be a thread doing acquire which pulled a lock from the map but didn't .lock()
on it yet.
it would not be correct to remove that from the map.
i think we can do
- replace Lock with some other primitive (like
boolean locked
) and implement locking based on the fact the class is synchronized - implement locking with CDL or Sempahore to allow locks cleanup
however, the more i am thinking about this the more i feel we shouldn't do this here. it's just distracting from the main PR.
let's remove removing from the map. and add a TODO comment here.
I don't think such leak can be a problem in tests and for more sustained use this will need to be revisited.
// There is a memory leak. TODO remove unused locks from tableLocks.
// Currently, there is no cleanup as this is used for testing purposes where the whole metastore instance
// is not longed-lived.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the isLocked
call and left a TODO to clean up the locks.
@@ -169,7 +169,8 @@ public void testV2TableWithEqualityDelete() | |||
Table icebergTable = updateTableToV2(tableName); | |||
writeEqualityDeleteToNationTable(icebergTable); | |||
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1"); | |||
assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line I added below is the same query with an extra column
@@ -169,7 +169,8 @@ public void testV2TableWithEqualityDelete() | |||
Table icebergTable = updateTableToV2(tableName); | |||
writeEqualityDeleteToNationTable(icebergTable); | |||
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1"); | |||
assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1"); | |||
// natiokey is before the equality delete column in the table schema, comment is after | |||
assertQuery("SELECT nationkey, comment FROM " + tableName, "SELECT nationkey, comment FROM nation WHERE regionkey != 1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this exercise both bugs mentioned in the commit message?
BTW can you come up with a better message than "Fix Iceberg delete filtering bugs"?
Maybe even splitting the commit into two, so that you can call out each fix separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kinda hard to split up because I pretty much had to re-do everything I had in TrinoDeleteFilter
.
I'll try to rephrase this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it some more, there was only really one bug in the existing code, having to do with equality deletes. I just also had to change how the interaction with deref pushdown worked in order to fix that bug.
@@ -1247,32 +1305,55 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan | |||
rowDelta.validateNoConflictingDataFiles(); | |||
} | |||
|
|||
if (isUpdate) { | |||
rowDelta.validateDeletedFiles(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only for update? maybe add a comment.
or can this be unconditional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a comment. Deleting a row from two commits concurrently shouldn't cause a validation to fail, but deleting a row and updating it concurrently should fail since the update might un-do the delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly, the method names validateDeletedFiles
and validateNoConflictingDeleteFiles
do not suggest to me that they should be called for updates and shouldn't be called for deletes.
I see that this is what Spark Iceberg does https://github.com/apache/iceberg/blob/f6e11148d31b408a7aea57a0efcb4428134f6a99/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java#L191-L194
keep as is
eb1ca9c
to
ac41305
Compare
(squashed and rebased) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Add locking when commiting Iceberg tables to FileHiveMetastore" LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Fix reading Iceberg equality deletes" LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to add locking to FileHiveMetastore
, especially not a broken implementation with a TODO that seems unlikely to be fixed.
Instead, you can use CAS to safely support Iceberg:
if (existingTable.tableType().equalsIgnoreCase("iceberg") && !Objects.equals(
existingTable.parameters().get("metadata_location"),
replacementTable.parameters().get("previous_metadata_location"))) {
throw new MetastoreException("Cannot update Iceberg table: supplied previous location does not match current location");
}
File metastore is not always used for testing purposes. I strongly dislike adding broken stuff to it just for testing purposes. We made the code a lot more complex just to save having a trivial FileMetastoreTableOperations
implementation.
25fbe8f
to
1b71ff1
Compare
core/trino-spi/src/main/java/io/trino/spi/connector/UpdatablePageSource.java
Outdated
Show resolved
Hide resolved
Columns passed to the Iceberg DeleteFilter must be in the same order as they are in the TrinoRow created from the Page in IcebergPageSource, but the `filterSchema` method in TrinoDeleteFilter did not ensure that.
1a5c9c4
to
ef7bd23
Compare
@rdblue I had a partition scheme question on this. If the table's partition scheme has been updated since a data file was written, and then a row from the file is updated, should the file with the updated rows get written using the old scheme or the updated one? |
@alexjo2144, it's up to you want makes the most sense for the updated rows. For the deletes, you have to make sure that the partition of any delete files matches the data that they apply to. |
Yep, I've got that part. Right now the updated rows are using the scheme matching the data file so all the new files are stored with each other in the same directory, which is kinda nice. But I guess if a user has changed the scheme to improve read performance for their queries it might make sense to respect the new layout. @findepi any opinion? |
The Spark implementation uses the current partition spec for updated rows, for what it's worth. |
ef7bd23
to
780934f
Compare
Thanks Ryan. Piotr voiced an option for using the current partition spec, so I'll go with that. Just pushed an update. |
.withCleanupQuery(cleanupQuery) | ||
.experiencing(TASK_GET_RESULTS_REQUEST_TIMEOUT) | ||
.at(boundaryDistributedStage()) | ||
.failsWithoutRetries(failure -> failure.hasMessageContaining("Encountered too many errors talking to a worker node")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has to be hasMessageFindingMatch("Encountered too many errors talking to a worker node|Error closing remote buffer")
, otherwise the test might be flaky (I'm currently fixing it in other places)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #12274
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks
780934f
to
e5569bb
Compare
@alexjo2144 please mind the CI, it's quite unhappy. |
Using v2 merge-on-read deletes. Co-authored-by: Jack Ye <yzhaoqin@amazon.com>
e5569bb
to
c376da2
Compare
Delta failure seems like a flake. It passed on the first of the two empty commit runs. Created an issue: #12300 |
Description
Add support for updating individual rows using the Iceberg connector. The IcebergUpdatablePageSource will write a new data file containing the new data, as well as a positional delete file removing the old rows from the existing file.
New feature
Iceberg connector
Add support for updating individual rows using the Iceberg connector.
Related issues, pull requests, and links
Based on #11886
Documentation
( ) No documentation is needed.
( ) Sufficient documentation is included in this PR.
(x) Documentation PR is available with #12326
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
( ) No release notes entries required.
( ) Release notes entries required with the following suggested text: