diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/batch/ArrowRecordBatch.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/batch/ArrowRecordBatch.java new file mode 100644 index 0000000000..20bc4d0fd1 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/batch/ArrowRecordBatch.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lakehouse.batch; + +import com.alibaba.fluss.annotation.PublicEvolving; + +/** + * The Arrow implementation of the RecordBatch interface. + * + * @since 0.7 + */ +@PublicEvolving +public class ArrowRecordBatch implements RecordBatch {} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/batch/RecordBatch.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/batch/RecordBatch.java new file mode 100644 index 0000000000..dd9e46b8e4 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/batch/RecordBatch.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lakehouse.batch; + +import com.alibaba.fluss.annotation.PublicEvolving; + +/** + * The RecordBatch interface represents a batch of records. + * + * @since 0.7 + */ +@PublicEvolving +public interface RecordBatch {} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/committer/LakeCommitter.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/committer/LakeCommitter.java new file mode 100644 index 0000000000..48525e2b6c --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/committer/LakeCommitter.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lakehouse.committer; + +import com.alibaba.fluss.annotation.PublicEvolving; + +import java.io.IOException; +import java.util.List; + +/** + * The LakeCommitter interface for committing write results. It extends the AutoCloseable interface + * to ensure resources are released after use. + * + * @param the type of the write result + * @param the type of the committable object + * @since 0.7 + */ +@PublicEvolving +public interface LakeCommitter extends AutoCloseable { + + /** + * Converts a list of write results to a committable object. + * + * @param writeResults the list of write results + * @return the committable object + * @throws IOException if an I/O error occurs + */ + CommittableT toCommitable(List writeResults) throws IOException; + + /** + * Commits the given committable object. + * + * @param committable the committable object + * @throws IOException if an I/O error occurs + */ + void commit(CommittableT committable) throws IOException; +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/serializer/SimpleVersionedSerializer.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/serializer/SimpleVersionedSerializer.java new file mode 100644 index 0000000000..991a03f41e --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/serializer/SimpleVersionedSerializer.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lakehouse.serializer; + +import com.alibaba.fluss.annotation.PublicEvolving; + +import java.io.IOException; + +/** + * The SimpleVersionedSerializer interface = for serializing and deserializing objects with + * versioning support. It provides methods to get the version, serialize an object to a byte array, + * and deserialize a byte array to an object. + * + * @param the type of the object to be serialized and deserialized + * @since 0.7 + */ +@PublicEvolving +public interface SimpleVersionedSerializer { + + /** + * Returns the version of the serializer. + * + * @return the version of the serializer + */ + int getVersion(); + + /** + * Serializes the given object to a byte array. + * + * @param obj the object to serialize + * @return the serialized byte array + * @throws IOException if an I/O error occurs during serialization + */ + byte[] serialize(E obj) throws IOException; + + /** + * Deserializes the given byte array to an object. + * + * @param version the version of the serialized data + * @param serialized the byte array to deserialize + * @return the deserialized object + * @throws IOException if an I/O error occurs during deserialization + */ + E deserialize(int version, byte[] serialized) throws IOException; +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/LakeTieringFactory.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/LakeTieringFactory.java new file mode 100644 index 0000000000..3842d8904e --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/LakeTieringFactory.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lakehouse.writer; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.lakehouse.committer.LakeCommitter; +import com.alibaba.fluss.lakehouse.serializer.SimpleVersionedSerializer; + +import java.io.IOException; + +/** + * The LakeTieringFactory interface defines how to create lake writers and committers. It provides + * methods to create writers and committers for Fluss's rows to Paimon/Iceberg rows, and to obtain + * serializers for write results and committable objects. + * + * @param the type of the write result + * @param the type of the committable object + * @since 0.7 + */ +@PublicEvolving +public interface LakeTieringFactory { + + /** + * Creates a lake writer to write Fluss's rows to Paimon/Iceberg rows. + * + * @param writerInitContext the context for initializing the writer + * @return the lake writer + * @throws IOException if an I/O error occurs + */ + LakeWriter createLakeWriter(WriterInitContext writerInitContext) + throws IOException; + + /** + * Returns the serializer for write results. + * + * @return the serializer for write results + */ + SimpleVersionedSerializer getWriteResultSerializer(); + + /** + * Creates a lake committer to commit to Paimon/Iceberg. + * + * @return the lake committer + * @throws IOException if an I/O error occurs + */ + LakeCommitter createLakeCommitter() throws IOException; + + /** + * Returns the serializer for committable objects. + * + * @return the serializer for committable objects + */ + SimpleVersionedSerializer getCommitableSerializer(); +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/LakeWriter.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/LakeWriter.java new file mode 100644 index 0000000000..8b4696162b --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/LakeWriter.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lakehouse.writer; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.record.LogRecord; + +import java.io.Closeable; +import java.io.IOException; + +/** + * The LakeWriter interface for writing records to a lake. It extends the Closeable interface to + * ensure resources are released after use. + * + * @param the type of the write result + * @since 0.7 + */ +@PublicEvolving +public interface LakeWriter extends Closeable { + /** + * Writes a record to the lake. + * + * @param record the record to write + * @throws IOException if an I/O error occurs + */ + void write(LogRecord record) throws IOException; + + /** + * Completes the writing process and returns the write result. + * + * @return the write result + * @throws IOException if an I/O error occurs + */ + WriteResult complete() throws IOException; +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/SupportsRecordBatchWrite.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/SupportsRecordBatchWrite.java new file mode 100644 index 0000000000..f86480158e --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/SupportsRecordBatchWrite.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lakehouse.writer; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.lakehouse.batch.RecordBatch; + +import java.io.IOException; + +/** + * The SupportsRecordBatchWrite interface for writing batches of records. It provides a method to + * write a batch of records to the underlying storage. + * + * @since 0.7 + */ +@PublicEvolving +public interface SupportsRecordBatchWrite { + + /** + * Writes a batch of records. + * + * @param recordBatch the batch of records to write + * @throws IOException if an I/O error occurs + */ + void write(RecordBatch recordBatch) throws IOException; +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/WriterInitContext.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/WriterInitContext.java new file mode 100644 index 0000000000..1fcfff460a --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/writer/WriterInitContext.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lakehouse.writer; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TablePath; + +import javax.annotation.Nullable; + +/** + * The WriterInitContext interface provides the context needed to create a LakeWriter. It includes + * methods to obtain the table path, table bucket, and an optional partition. + * + * @since 0.7 + */ +@PublicEvolving +public interface WriterInitContext { + + /** + * Returns the table path. + * + * @return the table path + */ + TablePath tablePath(); + + /** + * Returns the table bucket. + * + * @return the table bucket + */ + TableBucket tableBucket(); + + /** + * Returns the partition, or null if there is no partition. + * + * @return the partition, or null if there is no partition + */ + @Nullable + String partition(); +} diff --git a/fluss-lake/fluss-lake-common/pom.xml b/fluss-lake/fluss-lake-common/pom.xml deleted file mode 100644 index 66e3d51cb5..0000000000 --- a/fluss-lake/fluss-lake-common/pom.xml +++ /dev/null @@ -1,31 +0,0 @@ - - - - 4.0.0 - - - com.alibaba.fluss - fluss-lake - 0.6-SNAPSHOT - - - fluss-lake-common - Fluss : Lake : Common - - \ No newline at end of file diff --git a/fluss-lake/pom.xml b/fluss-lake/pom.xml index 5d5fb37a01..cb371cc1c7 100644 --- a/fluss-lake/pom.xml +++ b/fluss-lake/pom.xml @@ -28,7 +28,6 @@ fluss-lake Fluss : Lake : - fluss-lake-common pom \ No newline at end of file