Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[arrow] Enable upper-case field name in arrow format writer #4263

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ public class ArrowBundleRecords implements BundleRecords {

private final VectorSchemaRoot vectorSchemaRoot;
private final RowType rowType;
private final boolean onlyLowerCase;

public ArrowBundleRecords(VectorSchemaRoot vectorSchemaRoot, RowType rowType) {
public ArrowBundleRecords(
VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean onlyLowerCase) {
this.vectorSchemaRoot = vectorSchemaRoot;
this.rowType = rowType;
this.onlyLowerCase = onlyLowerCase;
}

public VectorSchemaRoot getVectorSchemaRoot() {
Expand All @@ -49,7 +52,7 @@ public long rowCount() {

@Override
public Iterator<InternalRow> iterator() {
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType);
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, onlyLowerCase);
return arrowBatchReader.readBatch(vectorSchemaRoot).iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ public Timestamp getTimestamp(int i, int precision) {
return Timestamp.fromMicros(value);
} else {
return Timestamp.fromEpochMillis(
value / 1_000_000, (int) value % 1_000_000);
value / 1_000_000, (int) (value % 1_000_000));
}
}
};
Expand All @@ -417,7 +417,7 @@ public Timestamp getTimestamp(int i, int precision) {
return Timestamp.fromMicros(value);
} else {
return Timestamp.fromEpochMillis(
value / 1_000_000, (int) value % 1_000_000);
value / 1_000_000, (int) (value % 1_000_000));
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ public class ArrowBatchReader {
private final VectorizedColumnBatch batch;
private final Arrow2PaimonVectorConverter[] convertors;
private final RowType projectedRowType;
private final boolean toLowerCase;

public ArrowBatchReader(RowType rowType) {
public ArrowBatchReader(RowType rowType, boolean toLowerCase) {
this.internalRowSerializer = new InternalRowSerializer(rowType);
ColumnVector[] columnVectors = new ColumnVector[rowType.getFieldCount()];
this.convertors = new Arrow2PaimonVectorConverter[rowType.getFieldCount()];
Expand All @@ -52,6 +53,7 @@ public ArrowBatchReader(RowType rowType) {
for (int i = 0; i < columnVectors.length; i++) {
this.convertors[i] = Arrow2PaimonVectorConverter.construct(rowType.getTypeAt(i));
}
this.toLowerCase = toLowerCase;
}

public Iterable<InternalRow> readBatch(VectorSchemaRoot vsr) {
Expand All @@ -60,7 +62,9 @@ public Iterable<InternalRow> readBatch(VectorSchemaRoot vsr) {
List<DataField> dataFields = projectedRowType.getFields();
for (int i = 0; i < dataFields.size(); ++i) {
try {
Field field = arrowSchema.findField(dataFields.get(i).name().toLowerCase());
String fieldName = dataFields.get(i).name();
Field field =
arrowSchema.findField(toLowerCase ? fieldName.toLowerCase() : fieldName);
int idx = arrowSchema.getFields().indexOf(field);
mapping[i] = idx;
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.util.OversizedAllocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Write from {@link InternalRow} to {@link VectorSchemaRoot}. */
public class ArrowFormatWriter implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(ArrowFormatWriter.class);

private final VectorSchemaRoot vectorSchemaRoot;
private final ArrowFieldWriter[] fieldWriters;

Expand Down Expand Up @@ -72,6 +76,7 @@ public boolean write(InternalRow currentRow) {
fieldWriters[i].write(rowId, currentRow, i);
} catch (OversizedAllocationException | IndexOutOfBoundsException e) {
// maybe out of memory
LOG.warn("Arrow field writer failed while writing", e);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testWrite() {
writer.flush();
VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();

ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE);
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE, true);
Iterable<InternalRow> rows = arrowBatchReader.readBatch(vectorSchemaRoot);

Iterator<InternalRow> iterator = rows.iterator();
Expand Down Expand Up @@ -142,7 +142,7 @@ public void testReadWithSchemaMessUp() {
}
vectors.set(vectors.size() - 1, vector0);

ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE);
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE, true);
Iterable<InternalRow> rows = arrowBatchReader.readBatch(new VectorSchemaRoot(vectors));

Iterator<InternalRow> iterator = rows.iterator();
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testArrowBundleRecords() {
VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();

Iterator<InternalRow> iterator =
new ArrowBundleRecords(vectorSchemaRoot, PRIMITIVE_TYPE).iterator();
new ArrowBundleRecords(vectorSchemaRoot, PRIMITIVE_TYPE, true).iterator();
for (int i = 0; i < 1000; i++) {
InternalRow actual = iterator.next();
InternalRow expectec = list.get(i);
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testCWriter() {
writer.flush();
VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();

ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE);
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE, true);
Iterable<InternalRow> rows = arrowBatchReader.readBatch(vectorSchemaRoot);

Iterator<InternalRow> iterator = rows.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testFunction() {
new OneElementFieldVectorGenerator(rootAllocator, dataField, value);
try (FieldVector fieldVector = oneElementFieldVectorGenerator.get(10000)) {
ArrowBatchReader reader =
new ArrowBatchReader(new RowType(Arrays.asList(dataField)));
new ArrowBatchReader(new RowType(Arrays.asList(dataField)), true);
Iterable<InternalRow> it =
reader.readBatch(new VectorSchemaRoot(Arrays.asList(fieldVector)));
it.forEach(
Expand All @@ -68,7 +68,7 @@ public void testFunction() {
new OneElementFieldVectorGenerator(rootAllocator, dataField, value)) {
FieldVector fieldVector = oneElementFieldVectorGenerator.get(10000);
ArrowBatchReader reader =
new ArrowBatchReader(new RowType(Arrays.asList(dataField)));
new ArrowBatchReader(new RowType(Arrays.asList(dataField)), true);
Iterable<InternalRow> it =
reader.readBatch(new VectorSchemaRoot(Arrays.asList(fieldVector)));
it.forEach(i -> Assertions.assertThat(i.getInt(0)).isEqualTo(genericRow.getInt(0)));
Expand All @@ -85,7 +85,7 @@ public void testFunction() {
try (FieldVector fieldVector = oneElementFieldVectorGenerator.get(100000)) {
Assertions.assertThat(fieldVector.getValueCount()).isEqualTo(100000);
ArrowBatchReader reader =
new ArrowBatchReader(new RowType(Arrays.asList(dataField)));
new ArrowBatchReader(new RowType(Arrays.asList(dataField)), true);
Iterable<InternalRow> it =
reader.readBatch(new VectorSchemaRoot(Arrays.asList(fieldVector)));
it.forEach(
Expand Down
Loading