diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/committer/CommitterInitContext.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/committer/CommitterInitContext.java new file mode 100644 index 0000000000..f180bf1ef3 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/committer/CommitterInitContext.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lakehouse.committer; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.metadata.TablePath; + +/** + * The CommitterInitContext interface provides the context needed to create a LakeCommitter. It + * includes methods to obtain the table path. + * + * @since 0.7 + */ +@PublicEvolving +public interface CommitterInitContext { + + /** + * Returns the table path. + * + * @return the table path + */ + TablePath tablePath(); +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/committer/LakeCommitter.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/committer/LakeCommitter.java index c5c98c390b..4ead042399 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/committer/LakeCommitter.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/committer/LakeCommitter.java @@ -45,7 +45,8 @@ public interface LakeCommitter extends AutoCloseable * Commits the given committable object. * * @param committable the committable object + * @return the committed snapshot ID * @throws IOException if an I/O error occurs */ - void commit(CommittableT committable) throws IOException; + long commit(CommittableT committable) throws IOException; } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/lakestorage/LakeStorage.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/lakestorage/LakeStorage.java index 97103ad1b4..b51f11341b 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/lakestorage/LakeStorage.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/lakestorage/LakeStorage.java @@ -33,7 +33,7 @@ public interface LakeStorage { * * @return the lake tiering factory */ - LakeTieringFactory createLakeTieringFactory(); + LakeTieringFactory createLakeTieringFactory(); /** Create lake catalog. */ LakeCatalog createLakeCatalog(); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/LakeTieringFactory.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/LakeTieringFactory.java index 8d6490dd41..f80c5fbf82 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/LakeTieringFactory.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/LakeTieringFactory.java @@ -17,10 +17,12 @@ package com.alibaba.fluss.lakehouse.writer; import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.lakehouse.committer.CommitterInitContext; import com.alibaba.fluss.lakehouse.committer.LakeCommitter; import com.alibaba.fluss.lakehouse.serializer.SimpleVersionedSerializer; import java.io.IOException; +import java.io.Serializable; /** * The LakeTieringFactory interface defines how to create lake writers and committers. It provides @@ -32,7 +34,7 @@ * @since 0.7 */ @PublicEvolving -public interface LakeTieringFactory { +public interface LakeTieringFactory extends Serializable { /** * Creates a lake writer to write Fluss's rows to Paimon/Iceberg rows. @@ -57,7 +59,8 @@ LakeWriter createLakeWriter(WriterInitContext writerInitContext) * @return the lake committer * @throws IOException if an I/O error occurs */ - LakeCommitter createLakeCommitter() throws IOException; + LakeCommitter createLakeCommitter( + CommitterInitContext committerInitContext) throws IOException; /** * Returns the serializer for committable objects. diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java index 0f7a58d999..2e304c8de3 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java @@ -17,6 +17,9 @@ package com.alibaba.fluss.lake.paimon; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lake.paimon.tiering.PaimonCommittable; +import com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory; +import com.alibaba.fluss.lake.paimon.tiering.PaimonWriteResult; import com.alibaba.fluss.lakehouse.lakestorage.LakeStorage; import com.alibaba.fluss.lakehouse.writer.LakeTieringFactory; @@ -30,8 +33,8 @@ public PaimonLakeStorage(Configuration configuration) { } @Override - public LakeTieringFactory createLakeTieringFactory() { - throw new UnsupportedOperationException("createLakeTieringFactory is not supported yet"); + public LakeTieringFactory createLakeTieringFactory() { + return new PaimonLakeTieringFactory(paimonConfig); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java new file mode 100644 index 0000000000..8908467dda --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.TimestampLtz; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.RowKind; + +import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toRowKind; + +/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */ +public class FlussRecordAsPaimonRow implements InternalRow { + + private final int bucket; + private LogRecord logRecord; + private int originRowFieldCount; + private com.alibaba.fluss.row.InternalRow internalRow; + + public FlussRecordAsPaimonRow(int bucket) { + this.bucket = bucket; + } + + public void setFlussRecord(LogRecord logRecord) { + this.logRecord = logRecord; + this.internalRow = logRecord.getRow(); + this.originRowFieldCount = internalRow.getFieldCount(); + } + + @Override + public int getFieldCount() { + return + // three system fields: bucket, offset, timestamp + originRowFieldCount + 3; + } + + @Override + public RowKind getRowKind() { + return toRowKind(logRecord.getChangeType()); + } + + @Override + public void setRowKind(RowKind rowKind) { + // do nothing + } + + @Override + public boolean isNullAt(int pos) { + if (pos < originRowFieldCount) { + return internalRow.isNullAt(pos); + } + // is the last three system fields: bucket, offset, timestamp which are never null + return false; + } + + @Override + public boolean getBoolean(int pos) { + return internalRow.getBoolean(pos); + } + + @Override + public byte getByte(int pos) { + return internalRow.getByte(pos); + } + + @Override + public short getShort(int pos) { + return internalRow.getShort(pos); + } + + @Override + public int getInt(int pos) { + if (pos == originRowFieldCount) { + // bucket system column + return bucket; + } + return internalRow.getInt(pos); + } + + @Override + public long getLong(int pos) { + if (pos == originRowFieldCount + 1) { + // offset system column + return logRecord.logOffset(); + } else if (pos == originRowFieldCount + 2) { + // timestamp system column + return logRecord.timestamp(); + } + // the origin RowData + return internalRow.getLong(pos); + } + + @Override + public float getFloat(int pos) { + return internalRow.getFloat(pos); + } + + @Override + public double getDouble(int pos) { + return internalRow.getDouble(pos); + } + + @Override + public BinaryString getString(int pos) { + return BinaryString.fromBytes(internalRow.getString(pos).toBytes()); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + com.alibaba.fluss.row.Decimal flussDecimal = internalRow.getDecimal(pos, precision, scale); + if (flussDecimal.isCompact()) { + return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale); + } else { + return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale); + } + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + // it's timestamp system column + if (pos == originRowFieldCount + 2) { + return Timestamp.fromEpochMillis(logRecord.timestamp()); + } + if (TimestampLtz.isCompact(precision)) { + return Timestamp.fromEpochMillis( + internalRow.getTimestampLtz(pos, precision).getEpochMillisecond()); + } else { + TimestampLtz timestampLtz = internalRow.getTimestampLtz(pos, precision); + return Timestamp.fromEpochMillis( + timestampLtz.getEpochMillisecond(), timestampLtz.getNanoOfMillisecond()); + } + } + + @Override + public byte[] getBinary(int pos) { + return internalRow.getBytes(pos); + } + + @Override + public InternalArray getArray(int pos) { + throw new UnsupportedOperationException( + "getArray is not support for Fluss record currently."); + } + + @Override + public InternalMap getMap(int pos) { + throw new UnsupportedOperationException( + "getMap is not support for Fluss record currently."); + } + + @Override + public InternalRow getRow(int pos, int pos1) { + throw new UnsupportedOperationException( + "getRow is not support for Fluss record currently."); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCatalogProvider.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCatalogProvider.java new file mode 100644 index 0000000000..dbc197136c --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCatalogProvider.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import com.alibaba.fluss.config.Configuration; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.options.Options; + +import java.io.Serializable; + +/** A provider for Paimon catalog. */ +public class PaimonCatalogProvider implements Serializable { + + private static final long serialVersionUID = 1L; + + private final Configuration paimonConfig; + + public PaimonCatalogProvider(Configuration paimonConfig) { + this.paimonConfig = paimonConfig; + } + + public Catalog get() { + return CatalogFactory.createCatalog( + CatalogContext.create(Options.fromMap(paimonConfig.toMap()))); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCommittable.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCommittable.java new file mode 100644 index 0000000000..dc28248d39 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCommittable.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import org.apache.paimon.manifest.ManifestCommittable; + +/** The committable that derived from {@link PaimonWriteResult} to commit to Paimon. */ +public class PaimonCommittable { + + private final ManifestCommittable manifestCommittable; + + public PaimonCommittable(ManifestCommittable manifestCommittable) { + this.manifestCommittable = manifestCommittable; + } + + public ManifestCommittable manifestCommittable() { + return manifestCommittable; + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCommittableSerializer.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCommittableSerializer.java new file mode 100644 index 0000000000..86c548f32f --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonCommittableSerializer.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import com.alibaba.fluss.lakehouse.serializer.SimpleVersionedSerializer; + +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.manifest.ManifestCommittableSerializer; + +import java.io.IOException; + +/** The serializer of {@link PaimonCommittable}. */ +public class PaimonCommittableSerializer implements SimpleVersionedSerializer { + + private static final int CURRENT_VERSION = 1; + + private final ManifestCommittableSerializer manifestCommittableSerializer = + new ManifestCommittableSerializer(); + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(PaimonCommittable paimonCommittable) throws IOException { + ManifestCommittable manifestCommittable = paimonCommittable.manifestCommittable(); + return manifestCommittableSerializer.serialize(manifestCommittable); + } + + @Override + public PaimonCommittable deserialize(int version, byte[] serialized) throws IOException { + if (version != CURRENT_VERSION) { + throw new UnsupportedOperationException( + "Expecting PaimonCommittable version to be " + + CURRENT_VERSION + + ", but found " + + version + + "."); + } + ManifestCommittable manifestCommittable = + manifestCommittableSerializer.deserialize(version, serialized); + return new PaimonCommittable(manifestCommittable); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java new file mode 100644 index 0000000000..9aaf2b7dfe --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import com.alibaba.fluss.lakehouse.committer.LakeCommitter; +import com.alibaba.fluss.metadata.TablePath; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.operation.FileStoreCommit; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitCallback; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; +import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon; +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; +import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER; + +/** Implementation of {@link LakeCommitter} for Paimon. */ +public class PaimonLakeCommitter implements LakeCommitter { + + private final Catalog paimonCatalog; + private final FileStoreTable fileStoreTable; + private FileStoreCommit fileStoreCommit; + + public PaimonLakeCommitter(PaimonCatalogProvider paimonCatalogProvider, TablePath tablePath) + throws IOException { + this.paimonCatalog = paimonCatalogProvider.get(); + this.fileStoreTable = getTable(tablePath); + } + + @Override + public PaimonCommittable toCommitable(List paimonWriteResults) + throws IOException { + ManifestCommittable committable = new ManifestCommittable(COMMIT_IDENTIFIER); + for (PaimonWriteResult paimonWriteResult : paimonWriteResults) { + committable.addFileCommittable(paimonWriteResult.commitMessage()); + } + return new PaimonCommittable(committable); + } + + @Override + public long commit(PaimonCommittable committable) throws IOException { + ManifestCommittable manifestCommittable = committable.manifestCommittable(); + PaimonCommitCallback paimonCommitCallback = new PaimonCommitCallback(); + try { + fileStoreCommit = + fileStoreTable + .store() + .newCommit( + FLUSS_LAKE_TIERING_COMMIT_USER, + Collections.singletonList(paimonCommitCallback)); + fileStoreCommit.commit(manifestCommittable, Collections.emptyMap()); + return checkNotNull( + paimonCommitCallback.commitSnapshotId, + "Paimon committed snapshot id must be non-null."); + } catch (Throwable t) { + if (fileStoreCommit != null) { + // if any error happen while commit, abort the commit to clean committable + fileStoreCommit.abort(manifestCommittable.fileCommittables()); + } + throw new IOException(t); + } + } + + @Override + public void close() throws Exception { + try { + if (fileStoreCommit != null) { + fileStoreCommit.close(); + } + if (paimonCatalog != null) { + paimonCatalog.close(); + } + } catch (Exception e) { + throw new IOException("Fail to close PaimonLakeCommitter.", e); + } + } + + private FileStoreTable getTable(TablePath tablePath) throws IOException { + try { + return (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath)); + } catch (Exception e) { + throw new IOException("Fail to get table " + tablePath + " in Paimon."); + } + } + + private static class PaimonCommitCallback implements CommitCallback { + + private Long commitSnapshotId = null; + + @Override + public void call(List list, Snapshot snapshot) { + this.commitSnapshotId = snapshot.id(); + } + + @Override + public void retry(ManifestCommittable manifestCommittable) { + // do-nothing + } + + @Override + public void close() throws Exception { + // do-nothing + } + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java new file mode 100644 index 0000000000..d2954fc0a5 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lakehouse.committer.CommitterInitContext; +import com.alibaba.fluss.lakehouse.committer.LakeCommitter; +import com.alibaba.fluss.lakehouse.serializer.SimpleVersionedSerializer; +import com.alibaba.fluss.lakehouse.writer.LakeTieringFactory; +import com.alibaba.fluss.lakehouse.writer.LakeWriter; +import com.alibaba.fluss.lakehouse.writer.WriterInitContext; + +import java.io.IOException; + +/** Implementation of {@link LakeTieringFactory} for Paimon . */ +public class PaimonLakeTieringFactory + implements LakeTieringFactory { + + public static final String FLUSS_LAKE_TIERING_COMMIT_USER = "fluss_lake_tiering"; + + private static final long serialVersionUID = 1L; + + private final PaimonCatalogProvider paimonCatalogProvider; + + public PaimonLakeTieringFactory(Configuration paimonConfig) { + this.paimonCatalogProvider = new PaimonCatalogProvider(paimonConfig); + } + + @Override + public LakeWriter createLakeWriter(WriterInitContext writerInitContext) + throws IOException { + return new PaimonLakeWriter(paimonCatalogProvider, writerInitContext); + } + + @Override + public SimpleVersionedSerializer getWriteResultSerializer() { + return new PaimonWriteResultSerializer(); + } + + @Override + public LakeCommitter createLakeCommitter( + CommitterInitContext committerInitContext) throws IOException { + return new PaimonLakeCommitter(paimonCatalogProvider, committerInitContext.tablePath()); + } + + @Override + public SimpleVersionedSerializer getCommitableSerializer() { + return new PaimonCommittableSerializer(); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeWriter.java new file mode 100644 index 0000000000..3b58d234e4 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeWriter.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import com.alibaba.fluss.lake.paimon.tiering.append.AppendOnlyWriter; +import com.alibaba.fluss.lake.paimon.tiering.mergetree.MergeTreeWriter; +import com.alibaba.fluss.lakehouse.writer.LakeWriter; +import com.alibaba.fluss.lakehouse.writer.WriterInitContext; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.record.LogRecord; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; + +import java.io.IOException; + +import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon; + +/** Implementation of {@link LakeWriter} for Paimon. */ +public class PaimonLakeWriter implements LakeWriter { + + private final Catalog paimonCatalog; + private final RecordWriter recordWriter; + + public PaimonLakeWriter( + PaimonCatalogProvider paimonCatalogProvider, WriterInitContext writerInitContext) + throws IOException { + this.paimonCatalog = paimonCatalogProvider.get(); + FileStoreTable fileStoreTable = getTable(writerInitContext.tablePath()); + + this.recordWriter = + fileStoreTable.primaryKeys().isEmpty() + ? new AppendOnlyWriter( + fileStoreTable, + writerInitContext.tableBucket(), + writerInitContext.partition()) + : new MergeTreeWriter( + fileStoreTable, + writerInitContext.tableBucket(), + writerInitContext.partition()); + } + + @Override + public void write(LogRecord record) throws IOException { + try { + recordWriter.write(record); + } catch (Exception e) { + throw new IOException("Fail to write Fluss record to Paimon.", e); + } + } + + @Override + public PaimonWriteResult complete() throws IOException { + CommitMessage commitMessage; + try { + commitMessage = recordWriter.complete(); + } catch (Exception e) { + throw new IOException("Fail to complete Paimon write.", e); + } + return new PaimonWriteResult(commitMessage); + } + + @Override + public void close() throws IOException { + try { + if (recordWriter != null) { + recordWriter.close(); + } + if (paimonCatalog != null) { + paimonCatalog.close(); + } + } catch (Exception e) { + throw new IOException("Fail to close PaimonLakeWriter.", e); + } + } + + private FileStoreTable getTable(TablePath tablePath) throws IOException { + try { + return (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath)); + } catch (Exception e) { + throw new IOException("Fail to get table " + tablePath + " in Paimon."); + } + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonWriteResult.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonWriteResult.java new file mode 100644 index 0000000000..c6cab3ece7 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonWriteResult.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import org.apache.paimon.table.sink.CommitMessage; + +import java.io.Serializable; + +/** The write result of Paimon lake writer to pass to commiter to commit. */ +public class PaimonWriteResult implements Serializable { + + private static final long serialVersionUID = 1L; + + private final CommitMessage commitMessage; + + public PaimonWriteResult(CommitMessage commitMessage) { + this.commitMessage = commitMessage; + } + + public CommitMessage commitMessage() { + return commitMessage; + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonWriteResultSerializer.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonWriteResultSerializer.java new file mode 100644 index 0000000000..71b758f24b --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonWriteResultSerializer.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import com.alibaba.fluss.lakehouse.serializer.SimpleVersionedSerializer; + +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageSerializer; + +import java.io.IOException; + +/** The {@link SimpleVersionedSerializer} for {@link PaimonWriteResult}. */ +public class PaimonWriteResultSerializer implements SimpleVersionedSerializer { + + private static final int CURRENT_VERSION = 1; + + private final CommitMessageSerializer messageSer = new CommitMessageSerializer(); + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(PaimonWriteResult paimonWriteResult) throws IOException { + CommitMessage commitMessage = paimonWriteResult.commitMessage(); + return messageSer.serialize(commitMessage); + } + + @Override + public PaimonWriteResult deserialize(int version, byte[] serialized) throws IOException { + if (version != CURRENT_VERSION) { + throw new UnsupportedOperationException( + "Expecting PaimonWriteResult version to be " + + CURRENT_VERSION + + ", but found " + + version + + "."); + } + CommitMessage commitMessage = messageSer.deserialize(version, serialized); + return new PaimonWriteResult(commitMessage); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/RecordWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/RecordWriter.java new file mode 100644 index 0000000000..5a8263432a --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/RecordWriter.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.record.LogRecord; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableWriteImpl; + +import javax.annotation.Nullable; + +import java.util.List; + +import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimonBinaryRow; +import static com.alibaba.fluss.utils.Preconditions.checkState; + +/** A base interface to write {@link LogRecord} to Paimon. */ +public abstract class RecordWriter implements AutoCloseable { + + protected final TableWriteImpl tableWrite; + protected final int bucket; + @Nullable protected final BinaryRow partition; + protected final FlussRecordAsPaimonRow flussRecordAsPaimonRow; + + public RecordWriter( + TableWriteImpl tableWrite, TableBucket tableBucket, @Nullable String partition) { + this.tableWrite = tableWrite; + this.bucket = tableBucket.getBucket(); + this.partition = toPaimonBinaryRow(partition); + this.flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket.getBucket()); + } + + public abstract void write(LogRecord record) throws Exception; + + CommitMessage complete() throws Exception { + List commitMessages = tableWrite.prepareCommit(); + checkState(commitMessages.size() == 1, "The size of CommitMessage must be 1."); + return commitMessages.get(0); + } + + public void close() throws Exception { + tableWrite.close(); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java new file mode 100644 index 0000000000..bc26b51b14 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering.append; + +import com.alibaba.fluss.lake.paimon.tiering.RecordWriter; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.record.LogRecord; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.TableWriteImpl; + +import javax.annotation.Nullable; + +import static com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; + +/** A {@link RecordWriter} to write to Paimon's append-only table. */ +public class AppendOnlyWriter extends RecordWriter { + + public AppendOnlyWriter( + FileStoreTable fileStoreTable, TableBucket tableBucket, @Nullable String partition) { + //noinspection unchecked + super( + (TableWriteImpl) + // todo: set ioManager to support write-buffer-spillable + fileStoreTable.newWrite(FLUSS_LAKE_TIERING_COMMIT_USER), + tableBucket, + partition); + } + + @Override + public void write(LogRecord record) throws Exception { + flussRecordAsPaimonRow.setFlussRecord(record); + // hacky, call internal method tableWrite.getWrite() to support + // to write to given partition, otherwise, it'll always extract a partition from Paimon row + // which may be costly + tableWrite.getWrite().write(partition, bucket, flussRecordAsPaimonRow); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java new file mode 100644 index 0000000000..826044c3de --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering.mergetree; + +import com.alibaba.fluss.lake.paimon.tiering.RecordWriter; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.record.LogRecord; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.RowKeyExtractor; +import org.apache.paimon.table.sink.TableWriteImpl; + +import javax.annotation.Nullable; + +import static com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; +import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toRowKind; + +/** A {@link RecordWriter} to write to Paimon's primary-key table. */ +public class MergeTreeWriter extends RecordWriter { + + private final KeyValue keyValue = new KeyValue(); + + private final RowKeyExtractor rowKeyExtractor; + + public MergeTreeWriter( + FileStoreTable fileStoreTable, TableBucket tableBucket, @Nullable String partition) { + super(createTableWrite(fileStoreTable), tableBucket, partition); + this.rowKeyExtractor = fileStoreTable.createRowKeyExtractor(); + } + + private static TableWriteImpl createTableWrite(FileStoreTable fileStoreTable) { + //noinspection unchecked + return (TableWriteImpl) fileStoreTable.newWrite(FLUSS_LAKE_TIERING_COMMIT_USER); + } + + @Override + public void write(LogRecord record) throws Exception { + flussRecordAsPaimonRow.setFlussRecord(record); + rowKeyExtractor.setRecord(flussRecordAsPaimonRow); + keyValue.replace( + rowKeyExtractor.trimmedPrimaryKey(), + KeyValue.UNKNOWN_SEQUENCE, + toRowKind(record.getChangeType()), + flussRecordAsPaimonRow); + // hacky, call internal method tableWrite.getWrite() to support + // to write to given partition, otherwise, it'll always extract a partition from Paimon row + // which may be costly + tableWrite.getWrite().write(partition, bucket, keyValue); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java new file mode 100644 index 0000000000..484eabed1b --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.utils; + +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.record.ChangeType; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.types.RowKind; + +import javax.annotation.Nullable; + +/** Utils for conversion between Paimon and Fluss. */ +public class PaimonConversions { + + public static RowKind toRowKind(ChangeType changeType) { + switch (changeType) { + case APPEND_ONLY: + case INSERT: + return RowKind.INSERT; + case UPDATE_BEFORE: + return RowKind.UPDATE_BEFORE; + case UPDATE_AFTER: + return RowKind.UPDATE_AFTER; + case DELETE: + return RowKind.DELETE; + default: + throw new IllegalArgumentException("Unsupported change type: " + changeType); + } + } + + public static Identifier toPaimon(TablePath tablePath) { + return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName()); + } + + public static BinaryRow toPaimonBinaryRow(@Nullable String value) { + if (value == null) { + return BinaryRow.EMPTY_ROW; + } + BinaryRow binaryRow = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(binaryRow); + writer.writeString(0, BinaryString.fromString(value)); + writer.complete(); + return binaryRow; + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java new file mode 100644 index 0000000000..ae75da0cf9 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import com.alibaba.fluss.record.GenericRecord; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.TimestampLtz; + +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.RowKind; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; + +import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY; +import static com.alibaba.fluss.record.ChangeType.DELETE; +import static com.alibaba.fluss.record.ChangeType.INSERT; +import static com.alibaba.fluss.record.ChangeType.UPDATE_AFTER; +import static com.alibaba.fluss.record.ChangeType.UPDATE_BEFORE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FlussRecordAsPaimonRow}. */ +class FlussRecordAsPaimonRowTest { + + @Test + void testLogTableRecordAllTypes() { + // Construct a FlussRecordAsPaimonRow instance + int bucket = 0; + FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(bucket); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(14); + genericRow.setField(0, true); + genericRow.setField(1, (byte) 1); + genericRow.setField(2, (short) 2); + genericRow.setField(3, 3); + genericRow.setField(4, 4L); + genericRow.setField(5, 5.1f); + genericRow.setField(6, 6.0d); + genericRow.setField(7, BinaryString.fromString("string")); + genericRow.setField(8, Decimal.fromUnscaledLong(9, 5, 2)); + genericRow.setField(9, Decimal.fromBigDecimal(new BigDecimal(10), 20, 0)); + genericRow.setField(10, TimestampLtz.fromEpochMillis(1698235273182L)); + genericRow.setField(11, TimestampLtz.fromEpochMillis(1698235273182L, 45678)); + genericRow.setField(12, new byte[] {1, 2, 3, 4}); + genericRow.setField(13, null); + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + // verify FlussRecordAsPaimonRow normal columns + assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue(); + assertThat(flussRecordAsPaimonRow.getByte(1)).isEqualTo((byte) 1); + assertThat(flussRecordAsPaimonRow.getShort(2)).isEqualTo((short) 2); + assertThat(flussRecordAsPaimonRow.getInt(3)).isEqualTo(3); + assertThat(flussRecordAsPaimonRow.getLong(4)).isEqualTo(4L); + assertThat(flussRecordAsPaimonRow.getFloat(5)).isEqualTo(5.1f); + assertThat(flussRecordAsPaimonRow.getDouble(6)).isEqualTo(6.0d); + assertThat(flussRecordAsPaimonRow.getString(7).toString()).isEqualTo("string"); + assertThat(flussRecordAsPaimonRow.getDecimal(8, 5, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("0.09")); + assertThat(flussRecordAsPaimonRow.getDecimal(9, 20, 0).toBigDecimal()) + .isEqualTo(new BigDecimal(10)); + assertThat(flussRecordAsPaimonRow.getTimestamp(10, 3).getMillisecond()) + .isEqualTo(1698235273182L); + assertThat(flussRecordAsPaimonRow.getTimestamp(11, 6).getMillisecond()) + .isEqualTo(1698235273182L); + assertThat(flussRecordAsPaimonRow.getBinary(12)).isEqualTo(new byte[] {1, 2, 3, 4}); + assertThat(flussRecordAsPaimonRow.isNullAt(13)).isTrue(); + // verify FlussRecordAsPaimonRow system columns + assertThat(flussRecordAsPaimonRow.getInt(14)).isEqualTo(bucket); + assertThat(flussRecordAsPaimonRow.getLong(15)).isEqualTo(logOffset); + assertThat(flussRecordAsPaimonRow.getLong(16)).isEqualTo(timeStamp); + assertThat(flussRecordAsPaimonRow.getTimestamp(16, 4)) + .isEqualTo(Timestamp.fromEpochMillis(timeStamp)); + assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.INSERT); + + assertThat(flussRecordAsPaimonRow.getFieldCount()) + .isEqualTo( + 14 + + + // 3 is for system columns + 3); + } + + @Test + void testPrimaryKeyTableRecord() { + int bucket = 0; + FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(bucket); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, true); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, INSERT, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue(); + // normal columns + system columns + assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(4); + // verify rowkind + assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.INSERT); + + logRecord = new GenericRecord(logOffset, timeStamp, UPDATE_BEFORE, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.UPDATE_BEFORE); + + logRecord = new GenericRecord(logOffset, timeStamp, UPDATE_AFTER, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.UPDATE_AFTER); + + logRecord = new GenericRecord(logOffset, timeStamp, DELETE, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.DELETE); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java new file mode 100644 index 0000000000..650a295955 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java @@ -0,0 +1,402 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.tiering; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lakehouse.committer.LakeCommitter; +import com.alibaba.fluss.lakehouse.serializer.SimpleVersionedSerializer; +import com.alibaba.fluss.lakehouse.writer.LakeWriter; +import com.alibaba.fluss.lakehouse.writer.WriterInitContext; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.GenericRecord; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.utils.types.Tuple2; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.CloseableIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon; +import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; +import static com.alibaba.fluss.record.ChangeType.DELETE; +import static com.alibaba.fluss.record.ChangeType.INSERT; +import static com.alibaba.fluss.record.ChangeType.UPDATE_AFTER; +import static com.alibaba.fluss.record.ChangeType.UPDATE_BEFORE; +import static com.alibaba.fluss.utils.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; + +/** The UT for tiering to Paimon via {@link PaimonLakeTieringFactory}. */ +class PaimonTieringTest { + + private @TempDir File tempWarehouseDir; + private PaimonLakeTieringFactory paimonLakeTieringFactory; + private Catalog paimonCatalog; + + @BeforeEach + void beforeEach() { + Configuration configuration = new Configuration(); + configuration.setString("warehouse", tempWarehouseDir.toString()); + paimonLakeTieringFactory = new PaimonLakeTieringFactory(configuration); + paimonCatalog = + CatalogFactory.createCatalog( + CatalogContext.create(Options.fromMap(configuration.toMap()))); + } + + private static Stream tieringWriteArgs() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false)); + } + + @ParameterizedTest + @MethodSource("tieringWriteArgs") + void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) throws Exception { + int bucketNum = 3; + TablePath tablePath = + TablePath.of( + "paimon", + String.format( + "test_tiering_table_%s_%s", + isPrimaryKeyTable ? "primary_key" : "log", + isPartitioned ? "partitioned" : "non_partitioned")); + createTable( + tablePath, isPrimaryKeyTable, isPartitioned, isPrimaryKeyTable ? bucketNum : null); + + List paimonWriteResults = new ArrayList<>(); + SimpleVersionedSerializer writeResultSerializer = + paimonLakeTieringFactory.getWriteResultSerializer(); + SimpleVersionedSerializer committableSerializer = + paimonLakeTieringFactory.getCommitableSerializer(); + + Map, List> recordsByBucket = new HashMap<>(); + List partitions = + isPartitioned ? Arrays.asList("p1", "p2", "p3") : Collections.singletonList(null); + // first, write data + for (int bucket = 0; bucket < bucketNum; bucket++) { + for (String partition : partitions) { + try (LakeWriter lakeWriter = + createLakeWriter(tablePath, bucket, partition)) { + Tuple2 partitionBucket = Tuple2.of(partition, bucket); + Tuple2, List> writeAndExpectRecords = + isPrimaryKeyTable + ? genPrimaryKeyTableRecords(partition, bucket) + : genLogTableRecords(partition, bucket, 10); + List writtenRecords = writeAndExpectRecords.f0; + List expectRecords = writeAndExpectRecords.f1; + recordsByBucket.put(partitionBucket, expectRecords); + for (LogRecord logRecord : writtenRecords) { + lakeWriter.write(logRecord); + } + // serialize/deserialize writeResult + PaimonWriteResult paimonWriteResult = lakeWriter.complete(); + byte[] serialized = writeResultSerializer.serialize(paimonWriteResult); + paimonWriteResults.add( + writeResultSerializer.deserialize( + writeResultSerializer.getVersion(), serialized)); + } + } + } + + // second, commit data + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath)) { + // serialize/deserialize committable + PaimonCommittable paimonCommittable = lakeCommitter.toCommitable(paimonWriteResults); + byte[] serialized = committableSerializer.serialize(paimonCommittable); + paimonCommittable = + committableSerializer.deserialize( + committableSerializer.getVersion(), serialized); + long snapshot = lakeCommitter.commit(paimonCommittable); + assertThat(snapshot).isEqualTo(1); + } + + // then, check data + for (int bucket = 0; bucket < 3; bucket++) { + for (String partition : partitions) { + Tuple2 partitionBucket = Tuple2.of(partition, bucket); + List expectRecords = recordsByBucket.get(partitionBucket); + CloseableIterator actualRecords = + getPaimonRows(tablePath, partition, isPrimaryKeyTable, bucket); + if (isPrimaryKeyTable) { + verifyPrimaryKeyTableRecord(actualRecords, expectRecords, bucket); + } else { + verifyLogTableRecords(actualRecords, expectRecords, bucket); + } + } + } + } + + private void verifyLogTableRecords( + CloseableIterator actualRecords, + List expectRecords, + int expectBucket) + throws Exception { + for (LogRecord expectRecord : expectRecords) { + InternalRow actualRow = actualRecords.next(); + // check normal columns: + assertThat(actualRow.getInt(0)).isEqualTo(expectRecord.getRow().getInt(0)); + assertThat(actualRow.getString(1).toString()) + .isEqualTo(expectRecord.getRow().getString(1).toString()); + assertThat(actualRow.getString(2).toString()) + .isEqualTo(expectRecord.getRow().getString(2).toString()); + + // check system columns: __bucket, __offset, __timestamp + assertThat(actualRow.getInt(3)).isEqualTo(expectBucket); + assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset()); + assertThat(actualRow.getTimestamp(5, 6).getMillisecond()) + .isEqualTo(expectRecord.timestamp()); + } + assertThat(actualRecords.hasNext()).isFalse(); + actualRecords.close(); + } + + private void verifyPrimaryKeyTableRecord( + CloseableIterator actualRecords, + List expectRecords, + int expectBucket) + throws Exception { + for (LogRecord expectRecord : expectRecords) { + InternalRow actualRow = actualRecords.next(); + // check normal columns: + assertThat(actualRow.getInt(0)).isEqualTo(expectRecord.getRow().getInt(0)); + assertThat(actualRow.getString(1).toString()) + .isEqualTo(expectRecord.getRow().getString(1).toString()); + assertThat(actualRow.getString(2).toString()) + .isEqualTo(expectRecord.getRow().getString(2).toString()); + + // check system columns: __bucket, __offset, __timestamp + assertThat(actualRow.getInt(3)).isEqualTo(expectBucket); + assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset()); + assertThat(actualRow.getTimestamp(5, 6).getMillisecond()) + .isEqualTo(expectRecord.timestamp()); + } + assertThat(actualRecords.hasNext()).isFalse(); + actualRecords.close(); + } + + private Tuple2, List> genLogTableRecords( + @Nullable String partition, int bucket, int numRecords) { + List logRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + GenericRow genericRow = new GenericRow(3); + genericRow.setField(0, i); + genericRow.setField(1, BinaryString.fromString("bucket" + bucket + "_" + i)); + genericRow.setField( + 2, + partition == null + ? BinaryString.fromString("bucket" + bucket) + : BinaryString.fromString(partition)); + LogRecord logRecord = + new GenericRecord( + i, System.currentTimeMillis(), ChangeType.APPEND_ONLY, genericRow); + logRecords.add(logRecord); + } + return Tuple2.of(logRecords, logRecords); + } + + private Tuple2, List> genPrimaryKeyTableRecords( + @Nullable String partition, int bucket) { + int offset = -1; + // gen +I, -U, +U, -D + List rows = genKvRow(partition, bucket, 0, 0, 4); + List writtenLogRecords = + new ArrayList<>( + Arrays.asList( + toRecord(++offset, rows.get(0), INSERT), + toRecord(++offset, rows.get(1), UPDATE_BEFORE), + toRecord(++offset, rows.get(2), UPDATE_AFTER), + toRecord(++offset, rows.get(3), DELETE))); + List expectLogRecords = new ArrayList<>(); + + // gen +I, -U, +U + rows = genKvRow(partition, bucket, 1, 4, 7); + writtenLogRecords.addAll( + Arrays.asList( + toRecord(++offset, rows.get(0), INSERT), + toRecord(++offset, rows.get(1), UPDATE_BEFORE), + toRecord(++offset, rows.get(2), UPDATE_AFTER))); + expectLogRecords.add(toRecord(offset, rows.get(2), UPDATE_AFTER)); + + // gen +I, +U + rows = genKvRow(partition, bucket, 2, 7, 9); + writtenLogRecords.addAll( + Arrays.asList( + toRecord(++offset, rows.get(0), INSERT), + toRecord(++offset, rows.get(1), UPDATE_AFTER))); + expectLogRecords.add(toRecord(offset, rows.get(1), UPDATE_AFTER)); + + // gen +I + rows = genKvRow(partition, bucket, 3, 9, 10); + writtenLogRecords.add(toRecord(++offset, rows.get(0), INSERT)); + expectLogRecords.add(toRecord(offset, rows.get(0), INSERT)); + + return Tuple2.of(writtenLogRecords, expectLogRecords); + } + + private List genKvRow( + @Nullable String partition, int bucket, int key, int from, int to) { + List rows = new ArrayList<>(); + for (int i = from; i < to; i++) { + GenericRow genericRow = new GenericRow(3); + genericRow.setField(0, key); + genericRow.setField(1, BinaryString.fromString("bucket" + bucket + "_" + i)); + genericRow.setField( + 2, + partition == null + ? BinaryString.fromString("bucket" + bucket) + : BinaryString.fromString(partition)); + rows.add(genericRow); + } + return rows; + } + + private GenericRecord toRecord(long offset, GenericRow row, ChangeType changeType) { + return new GenericRecord(offset, System.currentTimeMillis(), changeType, row); + } + + private CloseableIterator getPaimonRows( + TablePath tablePath, @Nullable String partition, boolean isPrimaryKeyTable, int bucket) + throws Exception { + Identifier identifier = toPaimon(tablePath); + FileStoreTable fileStoreTable = (FileStoreTable) paimonCatalog.getTable(identifier); + + ReadBuilder readBuilder = fileStoreTable.newReadBuilder(); + + if (partition != null) { + readBuilder = + readBuilder.withPartitionFilter(Collections.singletonMap("c3", partition)); + } + List splits = new ArrayList<>(); + if (isPrimaryKeyTable) { + splits = readBuilder.withBucketFilter(b -> b == bucket).newScan().plan().splits(); + } else { + // for log table, we can't filter by bucket directly, filter file by __bucket column + for (Split split : readBuilder.newScan().plan().splits()) { + DataSplit dataSplit = (DataSplit) split; + List dataFileMetas = dataSplit.dataFiles(); + checkState(dataFileMetas.size() == 1); + DataFileMeta dataFileMeta = dataFileMetas.get(0); + // filter by __bucket column + if (dataFileMeta.valueStats().maxValues().getInt(3) == bucket + && dataFileMeta.valueStats().minValues().getInt(3) == bucket) { + splits.add(split); + } + } + } + return readBuilder.newRead().createReader(splits).toCloseableIterator(); + } + + private LakeWriter createLakeWriter( + TablePath tablePath, int bucket, @Nullable String partition) throws IOException { + return paimonLakeTieringFactory.createLakeWriter( + new WriterInitContext() { + @Override + public TablePath tablePath() { + return tablePath; + } + + @Override + public TableBucket tableBucket() { + // don't care about tableId & partitionId + return new TableBucket(0, 0L, bucket); + } + + @Nullable + @Override + public String partition() { + return partition; + } + }); + } + + private LakeCommitter createLakeCommitter( + TablePath tablePath) throws IOException { + return paimonLakeTieringFactory.createLakeCommitter(() -> tablePath); + } + + private void createTable( + TablePath tablePath, + boolean isPrimaryTable, + boolean isPartitioned, + @Nullable Integer numBuckets) + throws Exception { + Schema.Builder builder = + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .column("c3", DataTypes.STRING()); + if (isPartitioned) { + builder.partitionKeys("c3"); + } + + if (isPrimaryTable) { + if (isPartitioned) { + builder.primaryKey("c1", "c3"); + } else { + builder.primaryKey("c1"); + } + } + + builder.column(BUCKET_COLUMN_NAME, DataTypes.INT()); + builder.column(OFFSET_COLUMN_NAME, DataTypes.BIGINT()); + builder.column(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()); + + if (numBuckets != null) { + builder.option(CoreOptions.BUCKET.key(), String.valueOf(numBuckets)); + } + + paimonCatalog.createDatabase(tablePath.getDatabaseName(), true); + paimonCatalog.createTable(toPaimon(tablePath), builder.build(), true); + } +} diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/lakehouse/TestingPaimonStoragePlugin.java b/fluss-server/src/test/java/com/alibaba/fluss/server/lakehouse/TestingPaimonStoragePlugin.java index e3c2df840b..176d5cacad 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/lakehouse/TestingPaimonStoragePlugin.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/lakehouse/TestingPaimonStoragePlugin.java @@ -48,7 +48,7 @@ public LakeStorage createLakeStorage(Configuration configuration) { public static class TestingPaimonLakeStorage implements LakeStorage { @Override - public LakeTieringFactory createLakeTieringFactory() { + public LakeTieringFactory createLakeTieringFactory() { throw new UnsupportedOperationException("createLakeTieringFactory is not supported."); }