Skip to content

Commit

Permalink
[arrow] Enable upper-case field name in arrow format writer (#4263)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Sep 26, 2024
1 parent 40f53fd commit d80f514
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ public class ArrowBundleRecords implements BundleRecords {

private final VectorSchemaRoot vectorSchemaRoot;
private final RowType rowType;
private final boolean onlyLowerCase;

public ArrowBundleRecords(VectorSchemaRoot vectorSchemaRoot, RowType rowType) {
public ArrowBundleRecords(
VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean onlyLowerCase) {
this.vectorSchemaRoot = vectorSchemaRoot;
this.rowType = rowType;
this.onlyLowerCase = onlyLowerCase;
}

public VectorSchemaRoot getVectorSchemaRoot() {
Expand All @@ -49,7 +52,7 @@ public long rowCount() {

@Override
public Iterator<InternalRow> iterator() {
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType);
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, onlyLowerCase);
return arrowBatchReader.readBatch(vectorSchemaRoot).iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ public Timestamp getTimestamp(int i, int precision) {
return Timestamp.fromMicros(value);
} else {
return Timestamp.fromEpochMillis(
value / 1_000_000, (int) value % 1_000_000);
value / 1_000_000, (int) (value % 1_000_000));
}
}
};
Expand All @@ -417,7 +417,7 @@ public Timestamp getTimestamp(int i, int precision) {
return Timestamp.fromMicros(value);
} else {
return Timestamp.fromEpochMillis(
value / 1_000_000, (int) value % 1_000_000);
value / 1_000_000, (int) (value % 1_000_000));
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ public class ArrowBatchReader {
private final VectorizedColumnBatch batch;
private final Arrow2PaimonVectorConverter[] convertors;
private final RowType projectedRowType;
private final boolean toLowerCase;

public ArrowBatchReader(RowType rowType) {
public ArrowBatchReader(RowType rowType, boolean toLowerCase) {
this.internalRowSerializer = new InternalRowSerializer(rowType);
ColumnVector[] columnVectors = new ColumnVector[rowType.getFieldCount()];
this.convertors = new Arrow2PaimonVectorConverter[rowType.getFieldCount()];
Expand All @@ -52,6 +53,7 @@ public ArrowBatchReader(RowType rowType) {
for (int i = 0; i < columnVectors.length; i++) {
this.convertors[i] = Arrow2PaimonVectorConverter.construct(rowType.getTypeAt(i));
}
this.toLowerCase = toLowerCase;
}

public Iterable<InternalRow> readBatch(VectorSchemaRoot vsr) {
Expand All @@ -60,7 +62,9 @@ public Iterable<InternalRow> readBatch(VectorSchemaRoot vsr) {
List<DataField> dataFields = projectedRowType.getFields();
for (int i = 0; i < dataFields.size(); ++i) {
try {
Field field = arrowSchema.findField(dataFields.get(i).name().toLowerCase());
String fieldName = dataFields.get(i).name();
Field field =
arrowSchema.findField(toLowerCase ? fieldName.toLowerCase() : fieldName);
int idx = arrowSchema.getFields().indexOf(field);
mapping[i] = idx;
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.util.OversizedAllocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Write from {@link InternalRow} to {@link VectorSchemaRoot}. */
public class ArrowFormatWriter implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(ArrowFormatWriter.class);

private final VectorSchemaRoot vectorSchemaRoot;
private final ArrowFieldWriter[] fieldWriters;

Expand Down Expand Up @@ -72,6 +76,7 @@ public boolean write(InternalRow currentRow) {
fieldWriters[i].write(rowId, currentRow, i);
} catch (OversizedAllocationException | IndexOutOfBoundsException e) {
// maybe out of memory
LOG.warn("Arrow field writer failed while writing", e);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testWrite() {
writer.flush();
VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();

ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE);
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE, true);
Iterable<InternalRow> rows = arrowBatchReader.readBatch(vectorSchemaRoot);

Iterator<InternalRow> iterator = rows.iterator();
Expand Down Expand Up @@ -142,7 +142,7 @@ public void testReadWithSchemaMessUp() {
}
vectors.set(vectors.size() - 1, vector0);

ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE);
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE, true);
Iterable<InternalRow> rows = arrowBatchReader.readBatch(new VectorSchemaRoot(vectors));

Iterator<InternalRow> iterator = rows.iterator();
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testArrowBundleRecords() {
VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();

Iterator<InternalRow> iterator =
new ArrowBundleRecords(vectorSchemaRoot, PRIMITIVE_TYPE).iterator();
new ArrowBundleRecords(vectorSchemaRoot, PRIMITIVE_TYPE, true).iterator();
for (int i = 0; i < 1000; i++) {
InternalRow actual = iterator.next();
InternalRow expectec = list.get(i);
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testCWriter() {
writer.flush();
VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();

ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE);
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE, true);
Iterable<InternalRow> rows = arrowBatchReader.readBatch(vectorSchemaRoot);

Iterator<InternalRow> iterator = rows.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testFunction() {
new OneElementFieldVectorGenerator(rootAllocator, dataField, value);
try (FieldVector fieldVector = oneElementFieldVectorGenerator.get(10000)) {
ArrowBatchReader reader =
new ArrowBatchReader(new RowType(Arrays.asList(dataField)));
new ArrowBatchReader(new RowType(Arrays.asList(dataField)), true);
Iterable<InternalRow> it =
reader.readBatch(new VectorSchemaRoot(Arrays.asList(fieldVector)));
it.forEach(
Expand All @@ -68,7 +68,7 @@ public void testFunction() {
new OneElementFieldVectorGenerator(rootAllocator, dataField, value)) {
FieldVector fieldVector = oneElementFieldVectorGenerator.get(10000);
ArrowBatchReader reader =
new ArrowBatchReader(new RowType(Arrays.asList(dataField)));
new ArrowBatchReader(new RowType(Arrays.asList(dataField)), true);
Iterable<InternalRow> it =
reader.readBatch(new VectorSchemaRoot(Arrays.asList(fieldVector)));
it.forEach(i -> Assertions.assertThat(i.getInt(0)).isEqualTo(genericRow.getInt(0)));
Expand All @@ -85,7 +85,7 @@ public void testFunction() {
try (FieldVector fieldVector = oneElementFieldVectorGenerator.get(100000)) {
Assertions.assertThat(fieldVector.getValueCount()).isEqualTo(100000);
ArrowBatchReader reader =
new ArrowBatchReader(new RowType(Arrays.asList(dataField)));
new ArrowBatchReader(new RowType(Arrays.asList(dataField)), true);
Iterable<InternalRow> it =
reader.readBatch(new VectorSchemaRoot(Arrays.asList(fieldVector)));
it.forEach(
Expand Down

0 comments on commit d80f514

Please sign in to comment.