-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delta: Support Snapshot Delta Lake Table to Iceberg Table #6449
Conversation
…o do the migration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, I add an abstract class called BaseMigrateDeltaLakeTableAction
. This class takes in an icebergCatalog
and a delta lake table's path as suggested in #5331 (review)
.build(); | ||
} | ||
|
||
protected abstract Metrics getMetricsForFile(Table table, String fullFilePath, FileFormat format); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my understanding, we must access the relevant file's package (e.g. iceberg-parquet
) to get the metrics. Hence, I have to make this method abstract since this class is currently in iceberg-core
. I feel like we may want to implement this in some other ways: (e.g. make this class more abstract, add a new package called iceberg-migration
,etc). Please correct me if I misunderstand something about processing data files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, I was proposing iceberg-deltalake
, because for example if we can also later have iceberg-hudi
, and people can just take 1 dependency for their migration purpose, instead of multiple.
core/src/main/java/org/apache/iceberg/DeltaLakeDataTypeVisitor.java
Outdated
Show resolved
Hide resolved
I love seeing this functionality but I'm not sure it should be a first class citizen in the repo. The issue being this would require us to pull in all Delta Dependencies which would increase the complexity of our build as well as keeping up versioning. Is there a way to accomplish this without as heavy a dependency requirement? |
+1, what about having a The Delta dependencies could be marked as compileOnly, so no additional dependency will be pulled in in the runtime jar. Basically it's like any vendor integration. Would that work for minimizing dependency? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this from me and Eric, really appreciate the help! As a starting point, I think Russell and you also agree that this is better to put in a separated module, so let's first do that, and see how that goes.
core/src/main/java/org/apache/iceberg/BaseMigrateDeltaLakeTableAction.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/BaseMigrateDeltaLakeTableAction.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/BaseMigrateDeltaLakeTableAction.java
Outdated
Show resolved
Hide resolved
.build(); | ||
} | ||
|
||
protected abstract Metrics getMetricsForFile(Table table, String fullFilePath, FileFormat format); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, I was proposing iceberg-deltalake
, because for example if we can also later have iceberg-hudi
, and people can just take 1 dependency for their migration purpose, instead of multiple.
core/src/main/java/org/apache/iceberg/BaseMigrateDeltaLakeTableAction.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/BaseMigrateDeltaLakeTableAction.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/DeltaLakeDataTypeVisitor.java
Outdated
Show resolved
Hide resolved
…mber of data files migrated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to also get input from @rdblue or @danielcweeks here.
...c/integration/java/org/apache/iceberg/delta/DeltaLakeToIcebergMigrationSparkIntegration.java
Show resolved
Hide resolved
delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
Outdated
Show resolved
Hide resolved
* | ||
* <pre> | ||
* root | ||
* |-- address_nested: struct (nullable = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be eventually testing with all supported types rather than hardcoding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Thank you for pointing this out. I added another dataframe which includes all data types that delta lake supports (and also supported by iceberg). I will discuss details below
delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
Outdated
Show resolved
Hide resolved
delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
Outdated
Show resolved
Hide resolved
delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
Outdated
Show resolved
Hide resolved
* use assertj for all tests * add null check for the spark integration method * use a method to generate the hardcode dataframe * drop iceberg table afterwards * add typetest table * test all delta lake types * test conversion of NullType * fix format issue * add a second dataframe * refactor the integration test * correctly decoded delta's path * fix wrong decoding * fix wrong decoding 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nastra Thank you very much for your reviews! Based on your suggestions, I made some refactor to the integration test and also add additional checks to ensure the correctness.
.withColumn("shortCol", expr("CAST(longCol AS SHORT)")) | ||
.withColumn("mapCol", expr("MAP(longCol, decimalCol)")) | ||
.withColumn("arrayCol", expr("ARRAY(longCol)")) | ||
.withColumn("structCol", expr("STRUCT(mapCol, arrayCol)")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This frames aims to include all the DataType supported by the Delta Lake table and also supported by iceberg after the conversion.
The included types are:
- ArrayType
- BinaryType
- BooleanType
- ByteType
- DateType
- DecimalType
- DoubleType
- FloatType
- IntegerType
- LongType
- MapType
- ShortType
- StringType
- StructType
- TimeStampType
ref: Delta Lake Types
The NullType is not supported by the iceberg, and thus will be caught in the unit test for schema conversion.
.withColumn("mapCol1", expr("MAP(structCol1, structCol2)")) | ||
.withColumn("mapCol2", expr("MAP(longCol, dateString)")) | ||
.withColumn("mapCol3", expr("MAP(dateCol, arrayCol)")) | ||
.withColumn("structCol3", expr("STRUCT(structCol2, mapCol3, arrayCol)")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remove the hard coded dataframe and use spark session to generated a nested dataframe. This dataframe can help us verify the correctness of recursive schema conversion and the name mapping
SnapshotDeltaLakeTable.Result result = | ||
DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( | ||
spark, newTableIdentifier, typeTestTableLocation) | ||
.tableProperty(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* remove a redundant map collector in commitDeltaVersionLogToIcebergTransaction * get the earliest possible version rather than hard code from 0 * add unit test to check if table exists * refactor action extracted from the versionlog * fix format issue * move non-share table write operation to the test itself, instead of in before() * fix type
this.deltaLog = DeltaLog.forTable(conf, deltaTableLocation); | ||
this.deltaLakeFileIO = new HadoopFileIO(conf); | ||
// get the earliest version available in the delta lake table | ||
this.deltaStartVersion = deltaLog.getVersionAtOrAfterTimestamp(0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes the earliest available log may not be 0
. It is better to let deltaLog
tell us which version is the earliest one that is available.
dataFileActions = | ||
deltaLog.getSnapshotForVersionAsOf(deltaStartVersion).getAllFiles().stream() | ||
.map(addFile -> (Action) addFile) | ||
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is more logically reasonable to extract the datafiles from the snapshot for the start version instead of from the version log, since there may be versions already deleted from the table.
Also, doing this also allow us to add options to let users choose which version to start snapshot.
delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
Show resolved
Hide resolved
…eberg # Conflicts: # versions.props
this.deltaLog = DeltaLog.forTable(conf, deltaTableLocation); | ||
this.deltaLakeFileIO = new ResolvingFileIO(); | ||
this.deltaLakeFileIO.initialize(ImmutableMap.of()); | ||
this.deltaLakeFileIO.setConf(conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched to use ResolvingFileIO
here. The reason I directly initialize the IO here is that: based on previous discussion: #6449 (comment), we would like to maintain the consistency of the reader and the writer. Since the deltaLog
relies on the hadoop configuration, deltaLakeFileIo
should also use the hadoop configuration instead of other table properties. Hence, we shall initialize the io using an empty map.
@jackye1995, @danielcweeks . Could you please help me confirm the ResolvingFileIO
initialization process here? Thank you very much!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we actually want to use HadoopFileIO
specifically in this case, because the Hadoop configuration should dictate how the Delta files are read. If we use resolving FileIO, for Delta table on S3, it requires users to put another set of configurations for S3FileIO, instead of using the S3 FileSystem that is already configured to access the delta logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your explanation. I misunderstood that ResolvingFileIO would load hadoop configuration to S3FileIO during the instantiation. After I re-read the source code, I now understand it is not the case because S3FileIO is not even hadoop configurable. I will revert the implementation here to HadoopFileIO to maintain the consistency between the DeltaLog
and FileIO
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM and thanks for the contribution! Minor nits that might not worth blocking
delta-lake/src/test/java/org/apache/iceberg/delta/TestDeltaLakeTypeToType.java
Show resolved
Hide resolved
} else if (path.endsWith(ORC_SUFFIX)) { | ||
return FileFormat.ORC; | ||
} else { | ||
throw new ValidationException("Cannot determine file format from path %s", path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not sure if we continue to handle non-parquet suffix and consider this as "cannot determine" rather than "do not support" here after we removed dependencies, as now they will probably fail with Cannot get metrics from file format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing this out. I removed support for orc and avro and changed the error message to "do not support..."
Looks like all the comments are addressed. Synced offline with @danielcweeks and @nastra and I believe they don't have further comments either. Given the fact that this PR has been open for quite some time, and there are quite a few subsequent updates I would like to see, I will go ahead to merge it, and create some issues tracking some features that can be added to improve the migration experience. Thanks @JonasJ-ap and @ericlgoodman for the contribution, and thanks everyone for the review! |
Co-authored-by: ericlgoodman <erigood@amazon.com>
This PR follows #5331 and and introduce a new module called
iceberg-delta-lake
to provide support to snapshot a delta lake table to an iceberg table by translating schemas and committing all logs through iceberg transaction.The current implementation relies on
delta-standalone
to readdeltaLog
of the given delta tables. Sincedelta-standone
only support tables withminReaderVersion=1
andminWriterVersion=2
, the snapshot action does not support high versioned features such as ColumnMapping.Most of the tests for this module are based on
Spark3.3
and therefore are underintegrationTest
task