Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lakehouse.committer;

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.metadata.TablePath;

/**
* The CommitterInitContext interface provides the context needed to create a LakeCommitter. It
* includes methods to obtain the table path.
*
* @since 0.7
*/
@PublicEvolving
public interface CommitterInitContext {

/**
* Returns the table path.
*
* @return the table path
*/
TablePath tablePath();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public interface LakeCommitter<WriteResult, CommittableT> extends AutoCloseable
* Commits the given committable object.
*
* @param committable the committable object
* @return the committed snapshot ID
* @throws IOException if an I/O error occurs
*/
void commit(CommittableT committable) throws IOException;
long commit(CommittableT committable) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is all lake formats' snapshot id use long type? otherwise generic type is recommended.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the previous investigation, all known lake formats, paimon, iceberg, delta, hudi can use long type to represent snapshot or other simliar concept, like timeline in hudi.

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface LakeStorage {
*
* @return the lake tiering factory
*/
LakeTieringFactory createLakeTieringFactory();
LakeTieringFactory<?, ?> createLakeTieringFactory();

/** Create lake catalog. */
LakeCatalog createLakeCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package com.alibaba.fluss.lakehouse.writer;

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.lakehouse.committer.CommitterInitContext;
import com.alibaba.fluss.lakehouse.committer.LakeCommitter;
import com.alibaba.fluss.lakehouse.serializer.SimpleVersionedSerializer;

import java.io.IOException;
import java.io.Serializable;

/**
* The LakeTieringFactory interface defines how to create lake writers and committers. It provides
Expand All @@ -32,7 +34,7 @@
* @since 0.7
*/
@PublicEvolving
public interface LakeTieringFactory<WriteResult, CommitableT> {
public interface LakeTieringFactory<WriteResult, CommitableT> extends Serializable {

/**
* Creates a lake writer to write Fluss's rows to Paimon/Iceberg rows.
Expand All @@ -57,7 +59,8 @@ LakeWriter<WriteResult> createLakeWriter(WriterInitContext writerInitContext)
* @return the lake committer
* @throws IOException if an I/O error occurs
*/
LakeCommitter<WriteResult, CommitableT> createLakeCommitter() throws IOException;
LakeCommitter<WriteResult, CommitableT> createLakeCommitter(
CommitterInitContext committerInitContext) throws IOException;

/**
* Returns the serializer for committable objects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package com.alibaba.fluss.lake.paimon;

import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.lake.paimon.tiering.PaimonCommittable;
import com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory;
import com.alibaba.fluss.lake.paimon.tiering.PaimonWriteResult;
import com.alibaba.fluss.lakehouse.lakestorage.LakeStorage;
import com.alibaba.fluss.lakehouse.writer.LakeTieringFactory;

Expand All @@ -30,8 +33,8 @@ public PaimonLakeStorage(Configuration configuration) {
}

@Override
public LakeTieringFactory createLakeTieringFactory() {
throw new UnsupportedOperationException("createLakeTieringFactory is not supported yet");
public LakeTieringFactory<PaimonWriteResult, PaimonCommittable> createLakeTieringFactory() {
return new PaimonLakeTieringFactory(paimonConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lake.paimon.tiering;

import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.row.TimestampLtz;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.RowKind;

import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toRowKind;

/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
public class FlussRecordAsPaimonRow implements InternalRow {

private final int bucket;
private LogRecord logRecord;
private int originRowFieldCount;
private com.alibaba.fluss.row.InternalRow internalRow;

public FlussRecordAsPaimonRow(int bucket) {
this.bucket = bucket;
}

public void setFlussRecord(LogRecord logRecord) {
this.logRecord = logRecord;
this.internalRow = logRecord.getRow();
this.originRowFieldCount = internalRow.getFieldCount();
}

@Override
public int getFieldCount() {
return
// three system fields: bucket, offset, timestamp
originRowFieldCount + 3;
}

@Override
public RowKind getRowKind() {
return toRowKind(logRecord.getChangeType());
}

@Override
public void setRowKind(RowKind rowKind) {
// do nothing
}

@Override
public boolean isNullAt(int pos) {
if (pos < originRowFieldCount) {
return internalRow.isNullAt(pos);
}
// is the last three system fields: bucket, offset, timestamp which are never null
return false;
}

@Override
public boolean getBoolean(int pos) {
return internalRow.getBoolean(pos);
}

@Override
public byte getByte(int pos) {
return internalRow.getByte(pos);
}

@Override
public short getShort(int pos) {
return internalRow.getShort(pos);
}

@Override
public int getInt(int pos) {
if (pos == originRowFieldCount) {
// bucket system column
return bucket;
}
return internalRow.getInt(pos);
}

@Override
public long getLong(int pos) {
if (pos == originRowFieldCount + 1) {
// offset system column
return logRecord.logOffset();
} else if (pos == originRowFieldCount + 2) {
// timestamp system column
return logRecord.timestamp();
}
// the origin RowData
return internalRow.getLong(pos);
}

@Override
public float getFloat(int pos) {
return internalRow.getFloat(pos);
}

@Override
public double getDouble(int pos) {
return internalRow.getDouble(pos);
}

@Override
public BinaryString getString(int pos) {
return BinaryString.fromBytes(internalRow.getString(pos).toBytes());
}

@Override
public Decimal getDecimal(int pos, int precision, int scale) {
com.alibaba.fluss.row.Decimal flussDecimal = internalRow.getDecimal(pos, precision, scale);
if (flussDecimal.isCompact()) {
return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale);
} else {
return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale);
}
}

@Override
public Timestamp getTimestamp(int pos, int precision) {
// it's timestamp system column
if (pos == originRowFieldCount + 2) {
return Timestamp.fromEpochMillis(logRecord.timestamp());
}
if (TimestampLtz.isCompact(precision)) {
return Timestamp.fromEpochMillis(
internalRow.getTimestampLtz(pos, precision).getEpochMillisecond());
} else {
TimestampLtz timestampLtz = internalRow.getTimestampLtz(pos, precision);
return Timestamp.fromEpochMillis(
timestampLtz.getEpochMillisecond(), timestampLtz.getNanoOfMillisecond());
}
}

@Override
public byte[] getBinary(int pos) {
return internalRow.getBytes(pos);
}

@Override
public InternalArray getArray(int pos) {
throw new UnsupportedOperationException(
"getArray is not support for Fluss record currently.");
}

@Override
public InternalMap getMap(int pos) {
throw new UnsupportedOperationException(
"getMap is not support for Fluss record currently.");
}

@Override
public InternalRow getRow(int pos, int pos1) {
throw new UnsupportedOperationException(
"getRow is not support for Fluss record currently.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lake.paimon.tiering;

import com.alibaba.fluss.config.Configuration;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.options.Options;

import java.io.Serializable;

/** A provider for Paimon catalog. */
public class PaimonCatalogProvider implements Serializable {

private static final long serialVersionUID = 1L;

private final Configuration paimonConfig;

public PaimonCatalogProvider(Configuration paimonConfig) {
this.paimonConfig = paimonConfig;
}

public Catalog get() {
return CatalogFactory.createCatalog(
CatalogContext.create(Options.fromMap(paimonConfig.toMap())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lake.paimon.tiering;

import org.apache.paimon.manifest.ManifestCommittable;

/** The committable that derived from {@link PaimonWriteResult} to commit to Paimon. */
public class PaimonCommittable {

private final ManifestCommittable manifestCommittable;

public PaimonCommittable(ManifestCommittable manifestCommittable) {
this.manifestCommittable = manifestCommittable;
}

public ManifestCommittable manifestCommittable() {
return manifestCommittable;
}
}
Loading