Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.fluss.types.BooleanType;
import org.apache.fluss.types.BytesType;
import org.apache.fluss.types.CharType;
import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataTypeVisitor;
import org.apache.fluss.types.DateType;
import org.apache.fluss.types.DecimalType;
Expand All @@ -41,6 +42,9 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import java.util.ArrayList;
import java.util.List;

/** Convert from Fluss's data type to Iceberg's data type. */
public class FlussDataTypeToIcebergDataType implements DataTypeVisitor<Type> {

Expand Down Expand Up @@ -168,6 +172,28 @@ public Type visit(MapType mapType) {

@Override
public Type visit(RowType rowType) {
throw new UnsupportedOperationException("Unsupported row type");
List<Types.NestedField> fields = new ArrayList<>();

for (DataField field : rowType.getFields()) {
Type fieldType = field.getType().accept(this);

if (field.getType().isNullable()) {
fields.add(
Types.NestedField.optional(
getNextId(),
field.getName(),
fieldType,
field.getDescription().orElse(null)));
} else {
fields.add(
Types.NestedField.required(
getNextId(),
field.getName(),
fieldType,
field.getDescription().orElse(null)));
}
}

return Types.StructType.of(fields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.lake.iceberg.source;

import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.ArrayType;
Expand Down Expand Up @@ -179,6 +180,15 @@ private FlussRowToIcebergFieldConverter createTypeConverter(DataType flussType,
? null
: new FlussArrayAsIcebergList(array, arrayType.getElementType());
};
} else if (flussType instanceof RowType) {
RowType rowType = (RowType) flussType;
Types.StructType nestedStructType =
(Types.StructType) rowType.accept(FlussDataTypeToIcebergDataType.INSTANCE);

return row -> {
InternalRow nestedRow = row.getRow(pos, rowType.getFieldCount());
return new FlussRowAsIcebergRecord(nestedStructType, rowType, nestedRow);
};
} else {
throw new UnsupportedOperationException(
"Unsupported data type conversion for Fluss type: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public class IcebergRecordAsFlussRow implements InternalRow {

public IcebergRecordAsFlussRow() {}

public IcebergRecordAsFlussRow(Record icebergRecord) {
this.icebergRecord = icebergRecord;
}

public IcebergRecordAsFlussRow replaceIcebergRecord(Record icebergRecord) {
this.icebergRecord = icebergRecord;
return this;
Expand Down Expand Up @@ -169,7 +173,18 @@ public InternalMap getMap(int pos) {

@Override
public InternalRow getRow(int pos, int numFields) {
// TODO: Support Row type conversion from Iceberg to Fluss
throw new UnsupportedOperationException();
Object value = icebergRecord.get(pos);
if (value == null) {
return null;
}
if (value instanceof Record) {
return new IcebergRecordAsFlussRow((Record) value);
} else {
throw new IllegalArgumentException(
"Expected Iceberg Record for nested row at position "
+ pos
+ " but found: "
+ value.getClass().getName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
Expand All @@ -38,6 +39,7 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;

import static org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
Expand Down Expand Up @@ -128,7 +130,16 @@ private static DataType convertIcebergTypeToFlussType(Type icebergType) {
} else if (icebergType instanceof Types.ListType) {
Types.ListType listType = (Types.ListType) icebergType;
return DataTypes.ARRAY(convertIcebergTypeToFlussType(listType.elementType()));
} else if (icebergType.isStructType()) {
Types.StructType structType = icebergType.asStructType();
List<DataField> fields = new ArrayList<>();
for (Types.NestedField nestedField : structType.fields()) {
DataType fieldType = convertIcebergTypeToFlussType(nestedField.type());
fields.add(new DataField(nestedField.name(), fieldType));
}
return DataTypes.ROW(fields.toArray(new DataField[0]));
}
Comment on lines +133 to 141
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The struct type handling is added within a sequence of if-else statements but the closing brace at line 139 appears disconnected from the opening at line 133. The control flow suggests this else-if block should close at line 141, but line 139 has a closing brace that doesn't align with the logic structure. Verify the brace placement is correct.

Copilot uses AI. Check for mistakes.

throw new UnsupportedOperationException(
"Unsupported data type conversion for Iceberg type: "
+ icebergType.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception {
// check filter push down
assertThat(plan)
.contains("TableSourceScan(")
.contains("LogicalFilter(condition=[=($15, _UTF-16LE'" + partition + "'")
.contains("LogicalFilter(condition=[=($16, _UTF-16LE'" + partition + "'")
.contains("filter=[=(p, _UTF-16LE'" + partition + "'");

List<Row> expectedFiltered =
writtenRows.stream()
.filter(r -> partition.equals(r.getField(15)))
.filter(r -> partition.equals(r.getField(16)))
.collect(Collectors.toList());

List<Row> actualFiltered =
Expand Down Expand Up @@ -295,7 +295,12 @@ protected long createFullTypeLogTable(
.column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6))
.column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
.column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
.column("f_binary", DataTypes.BINARY(4));
.column("f_binary", DataTypes.BINARY(4))
.column(
"f_row",
DataTypes.ROW(
DataTypes.FIELD("f_nested_int", DataTypes.INT()),
DataTypes.FIELD("f_nested_string", DataTypes.STRING())));

TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
Expand Down Expand Up @@ -342,7 +347,8 @@ private List<Row> writeFullTypeRows(
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
TimestampNtz.fromMillis(1698235273501L),
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8}));
new byte[] {5, 6, 7, 8},
row(10, "nested_string")));

flinkRows.add(
Row.of(
Expand All @@ -364,7 +370,8 @@ private List<Row> writeFullTypeRows(
Instant.ofEpochMilli(1698235273501L),
ZoneId.of("UTC"))
.plusNanos(8000),
new byte[] {5, 6, 7, 8}));
new byte[] {5, 6, 7, 8},
Row.of(10, "nested_string")));
} else {
rows.add(
row(
Expand All @@ -383,6 +390,7 @@ private List<Row> writeFullTypeRows(
TimestampNtz.fromMillis(1698235273501L),
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
row(10, "nested_string"),
partition));

flinkRows.add(
Expand All @@ -406,6 +414,7 @@ private List<Row> writeFullTypeRows(
ZoneId.of("UTC"))
.plusNanos(8000),
new byte[] {5, 6, 7, 8},
Row.of(10, "nested_string"),
partition));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.GenericArray;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
Expand Down Expand Up @@ -100,7 +102,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273183L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
partition));
partition,
Row.of(1, "nested_row1")));
expectedRows.add(
Row.of(
true,
Expand All @@ -119,7 +122,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
partition));
partition,
Row.of(2, "nested_row2")));
}
} else {
expectedRows =
Expand All @@ -141,7 +145,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273183L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
null),
null,
Row.of(1, "nested_row1")),
Row.of(
true,
(byte) 10,
Expand All @@ -159,7 +164,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
null));
null,
Row.of(2, "nested_row2")));
}

String query = "select * from " + tableName;
Expand Down Expand Up @@ -202,7 +208,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
partition));
partition,
Row.of(2, "nested_row2")));
expectedRows2.add(
Row.ofKind(
RowKind.UPDATE_AFTER,
Expand All @@ -222,7 +229,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
new float[] {2.1f, 2.2f, 2.3f},
partition));
partition,
Row.of(3, "nested_update")));
}
} else {
expectedRows2.add(
Expand All @@ -244,7 +252,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
null));
null,
Row.of(2, "nested_row2")));
expectedRows2.add(
Row.ofKind(
RowKind.UPDATE_AFTER,
Expand All @@ -264,7 +273,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
new float[] {2.1f, 2.2f, 2.3f},
null));
null,
Row.of(3, "nested_update")));
}

if (isPartitioned) {
Expand Down Expand Up @@ -349,6 +359,10 @@ void testReadIcebergLakeTable(boolean isPartitioned) throws Exception {
}

private void writeFullTypeRow(TablePath tablePath, String partition) throws Exception {
GenericRow nestedRow = new GenericRow(2);
nestedRow.setField(0, 3);
nestedRow.setField(1, BinaryString.fromString("nested_update"));

List<InternalRow> rows =
Collections.singletonList(
row(
Expand All @@ -368,7 +382,8 @@ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exce
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
new GenericArray(new float[] {2.1f, 2.2f, 2.3f}),
partition));
partition,
nestedRow));
writeRows(tablePath, rows, false);
}

Expand Down Expand Up @@ -422,7 +437,12 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean
.column("c14", DataTypes.TIMESTAMP(6))
.column("c15", DataTypes.BINARY(4))
.column("c16", DataTypes.ARRAY(DataTypes.FLOAT()))
.column("c17", DataTypes.STRING());
.column("c17", DataTypes.STRING())
.column(
"c18",
DataTypes.ROW(
DataTypes.FIELD("a", DataTypes.INT()),
DataTypes.FIELD("b", DataTypes.STRING())));

TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
Expand All @@ -444,6 +464,14 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean
}

private List<InternalRow> generateKvRowsFullType(@Nullable String partition) {
GenericRow nestedRow1 = new GenericRow(2);
nestedRow1.setField(0, 1);
nestedRow1.setField(1, BinaryString.fromString("nested_row1"));

GenericRow nestedRow2 = new GenericRow(2);
nestedRow2.setField(0, 2);
nestedRow2.setField(1, BinaryString.fromString("nested_row2"));

return Arrays.asList(
row(
false,
Expand All @@ -462,7 +490,8 @@ private List<InternalRow> generateKvRowsFullType(@Nullable String partition) {
TimestampNtz.fromMillis(1698235273183L, 6000),
new byte[] {1, 2, 3, 4},
new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
partition),
partition,
nestedRow1),
row(
true,
(byte) 10,
Expand All @@ -480,7 +509,8 @@ private List<InternalRow> generateKvRowsFullType(@Nullable String partition) {
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
partition));
partition,
nestedRow2));
}

private Map<TableBucket, Long> getBucketLogEndOffset(
Expand Down
Loading