Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,13 @@ public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {

this.hudiColumnNames = params.get("hudi_column_names");
this.hudiColumnTypes = params.get("hudi_column_types").split("#");
this.requiredFields = params.get("required_fields").split(",");

// Required fields will be empty when only partition fields are selected
// This is because partition fields are not stored in the data files
if (!params.get("required_fields").equals("")) {
this.requiredFields = params.get("required_fields").split(",");
} else {
this.requiredFields = new String[0];
}
this.fieldInspectors = new ObjectInspector[requiredFields.length];
this.structFields = new StructField[requiredFields.length];
this.fsOptionsProps = Maps.newHashMap();
Expand Down Expand Up @@ -166,14 +171,20 @@ public int getNext() throws IOException {
if (!reader.next(key, value)) {
break;
}
Object rowData = deserializer.deserialize(value);
for (int i = 0; i < fields.length; i++) {
Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]);
columnValue.setRow(fieldData);
columnValue.setField(types[i], fieldInspectors[i]);
appendData(i, columnValue);
if (fields.length > 0) {
Object rowData = deserializer.deserialize(value);
for (int i = 0; i < fields.length; i++) {
Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]);
columnValue.setRow(fieldData);
columnValue.setField(types[i], fieldInspectors[i]);
appendData(i, columnValue);
}
}
}
// vectorTable is virtual
if (fields.length == 0) {
vectorTable.appendVirtualData(numRows);
}
return numRows;
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public class VectorTable {
private final VectorColumn meta;
private final boolean onlyReadable;
private final int numRowsOfReadable;
// this will be true if fields is empty
private final boolean isVirtual;
// count rows when isVirtual is true
private int numVirtualRows = 0;

// Create writable vector table
private VectorTable(ColumnType[] types, String[] fields, int capacity) {
Expand All @@ -51,6 +55,7 @@ private VectorTable(ColumnType[] types, String[] fields, int capacity) {
this.meta = VectorColumn.createWritableColumn(new ColumnType("#meta", Type.BIGINT), metaSize);
this.onlyReadable = false;
numRowsOfReadable = -1;
this.isVirtual = columns.length == 0;
}

// Create readable vector table
Expand All @@ -72,6 +77,7 @@ private VectorTable(ColumnType[] types, String[] fields, long metaAddress) {
this.meta = VectorColumn.createReadableColumn(metaAddress, metaSize, new ColumnType("#meta", Type.BIGINT));
this.onlyReadable = true;
numRowsOfReadable = numRows;
this.isVirtual = columns.length == 0;
}

public static VectorTable createWritableTable(ColumnType[] types, String[] fields, int capacity) {
Expand Down Expand Up @@ -114,16 +120,24 @@ public static VectorTable createReadableTable(Map<String, String> params) {

public void appendNativeData(int fieldId, NativeColumnValue o) {
assert (!onlyReadable);
assert (!isVirtual);
columns[fieldId].appendNativeValue(o);
}

public void appendData(int fieldId, ColumnValue o) {
assert (!onlyReadable);
assert (!isVirtual);
columns[fieldId].appendValue(o);
}

public void appendVirtualData(int numRows) {
assert (isVirtual);
numVirtualRows += numRows;
}

public void appendData(int fieldId, Object[] batch, ColumnValueConverter converter, boolean isNullable) {
assert (!onlyReadable);
assert (!isVirtual);
if (converter != null) {
columns[fieldId].appendObjectColumn(converter.convert(batch), isNullable);
} else {
Expand Down Expand Up @@ -193,6 +207,8 @@ public void releaseColumn(int fieldId) {
public int getNumRows() {
if (onlyReadable) {
return numRowsOfReadable;
} else if (isVirtual) {
return numVirtualRows;
} else {
return columns[0].numRows();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !test_only_partition_columns --
2023-11-15

Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,15 @@ suite("test_hudi_catalog", "p2,external,hudi,external_remote,external_remote_hud
sql """ set enable_fallback_to_original_planner=false """
def tables = sql """ show tables; """
assertTrue(tables.size() > 0)
try {
sql """ set force_jni_scanner = true; """
qt_test_only_partition_columns """
select signup_date from user_activity_log_mor_partition order by signup_date limit 1;
"""
} catch (Exception e) {
logger.error("Error occurred while executing query", e)
} finally {
sql """ set force_jni_scanner = false; """
}
sql """drop catalog if exists ${catalog_name};"""
}