-
Notifications
You must be signed in to change notification settings - Fork 490
[lake/paimon] Support paimon tiering factory #853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
0a35f68 to
cc2c383
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces support for paimon tiering in Fluss, providing a new tiering factory implementation along with writers, committers, serializers, and corresponding tests.
- Added generic type support to the LakeTieringFactory interface and its usage in PaimonLakeStorage.
- Implemented classes for writing and committing paimon data (e.g., PaimonLakeWriter, MergeTreeWriter, AppendOnlyWriter) along with serializers (PaimonWriteResultSerializer, PaimonCommittableSerializer).
- Added comprehensive tests for paimon record conversions and tiering functionality (PaimonTieringTest, FlussRecordAsPaimonRowTest).
Reviewed Changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-server/src/test/java/com/alibaba/fluss/server/lakehouse/TestingPaimonStoragePlugin.java | Updated method signature to return a generic LakeTieringFactory. |
| fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java | Added tests for writing and committing records with paimon tiering. |
| fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java | Added tests for verifying the conversion of Fluss records to Paimon rows. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java | Introduced new conversion utilities for mapping between Fluss and Paimon constructs. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java | Implemented a record writer for Paimon’s primary-key table using merge tree strategy. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java | Implemented a record writer for Paimon’s append-only table. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/RecordWriter.java | Defined a common abstract base for writing records to Paimon. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonWriteResultSerializer.java | Added serializer for write results. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonWriteResult.java | Introduced a write result class to convey commit information. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeWriter.java | Provided an implementation of LakeWriter for paimon tiering. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java | Added the paimon tiering factory implementation. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java | Implemented the LakeCommitter for paimon with commit logic. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCommittableSerializer.java | Added serializer for committable objects. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCommittable.java | Introduced a committable class to encapsulate manifest commit information. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCatalogProvider.java | Added a provider for creating Paimon catalogs. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java | Provided an adapter to wrap Fluss records as Paimon InternalRow instances. |
| fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java | Updated to return a proper LakeTieringFactory for paimon. |
| fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/LakeTieringFactory.java | Extended the interface to support generic types and include CommitterInitContext. |
| fluss-common/src/main/java/com/alibaba/fluss/lakehouse/lakestorage/LakeStorage.java | Updated LakeStorage to use a generic LakeTieringFactory. |
| fluss-common/src/main/java/com/alibaba/fluss/lakehouse/committer/CommitterInitContext.java | Introduced context for committer initialization. |
cc2c383 to
c0bad6c
Compare
ef3db1d to
0eaa752
Compare
0eaa752 to
4f06f60
Compare
| * @throws IOException if an I/O error occurs | ||
| */ | ||
| void commit(CommittableT committable) throws IOException; | ||
| long commit(CommittableT committable) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is all lake formats' snapshot id use long type? otherwise generic type is recommended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in the previous investigation, all known lake formats, paimon, iceberg, delta, hudi can use long type to represent snapshot or other simliar concept, like timeline in hudi.
| } | ||
|
|
||
| @Override | ||
| public long commit(PaimonCommittable committable) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For successfully commits, I didn't find the logic how we notify Fluss coordinator, could you clarify this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the next following pr to introduce a committer operator to Flink, it'll notify Fluss coordinator after call commit method.
leonardBang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @luoyuxia for the clarification, +1
Purpose
Linked issue: close #438
Brief change log
PaimonLakeTieringFactoryto createPaimonLakeWriterandPaimonLakeCommiterPaimonLakeWriter/PaimonLakeCommiteruse Paimon sdk to write/commit to paimonNote:
TableWrite#writeBundle(BinaryRow partition, int bucket, BundleRecords bundle)to writeArrowBatchof Fluss to paimon directly.Tests
PaimonTieringTestto verify the logic ofPaimonLakeWriter/PaimonLakeCommiterAPI and Format
Documentation