diff --git a/java/vector/src/main/codegen/templates/FixedValueVectors.java b/java/vector/src/main/codegen/templates/FixedValueVectors.java index 7958222f5c1..be385d146db 100644 --- a/java/vector/src/main/codegen/templates/FixedValueVectors.java +++ b/java/vector/src/main/codegen/templates/FixedValueVectors.java @@ -45,6 +45,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements FixedWidthVector{ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class); + public static final int TYPE_WIDTH = ${type.width}; + private final Accessor accessor = new Accessor(); private final Mutator mutator = new Mutator(); diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java index 2c4274c13ee..6a9ce65392f 100644 --- a/java/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java @@ -37,7 +37,7 @@ import org.apache.arrow.flatbuf.Precision; /** - * Nullable${minor.class} implements a vector of values which could be null. Elements in the vector + * ${className} implements a vector of values which could be null. Elements in the vector * are first checked against a fixed length vector of boolean values. Then the element is retrieved * from the base class (if not null). * @@ -47,7 +47,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidthVector, NullableVector, FieldVector { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${className}.class); - private final FieldReader reader = new ${minor.class}ReaderImpl(Nullable${minor.class}Vector.this); + private final FieldReader reader = new ${minor.class}ReaderImpl(${className}.this); private final String bitsField = "$bits$"; private final String valuesField = "$values$"; @@ -67,7 +67,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type public ${className}(String name, BufferAllocator allocator, int precision, int scale) { super(name, allocator); - values = new ${minor.class}Vector(valuesField, allocator, precision, scale); + values = new ${valuesName}(valuesField, allocator, precision, scale); this.precision = precision; this.scale = scale; mutator = new Mutator(); @@ -81,7 +81,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type <#else> public ${className}(String name, BufferAllocator allocator) { super(name, allocator); - values = new ${minor.class}Vector(valuesField, allocator); + values = new ${valuesName}(valuesField, allocator); mutator = new Mutator(); accessor = new Accessor(); <#if minor.class == "TinyInt" || @@ -144,6 +144,13 @@ public List getChildrenFromFields() { @Override public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers) { + <#if type.major = "VarLen"> + // variable width values: truncate offset vector buffer to size (#1) + org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, values.offsetVector.getBufferSizeFor(fieldNode.getLength() + 1)); + <#else> + // fixed width values truncate value vector to size (#1) + org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, values.getBufferSizeFor(fieldNode.getLength())); + org.apache.arrow.vector.BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers); bits.valueCount = fieldNode.getLength(); } @@ -229,13 +236,6 @@ public void setInitialCapacity(int numRecords) { values.setInitialCapacity(numRecords); } -// @Override -// public SerializedField.Builder getMetadataBuilder() { -// return super.getMetadataBuilder() -// .addChild(bits.getMetadata()) -// .addChild(values.getMetadata()); -// } - @Override public void allocateNew() { if(!allocateNewSafe()){ @@ -329,20 +329,6 @@ public void zeroVector() { } - -// @Override -// public void load(SerializedField metadata, ArrowBuf buffer) { -// clear(); - // the bits vector is the first child (the order in which the children are added in getMetadataBuilder is significant) -// final SerializedField bitsField = metadata.getChild(0); -// bits.load(bitsField, buffer); -// -// final int capacity = buffer.capacity(); -// final int bitsLength = bitsField.getBufferLength(); -// final SerializedField valuesField = metadata.getChild(1); -// values.load(valuesField, buffer.slice(bitsLength, capacity - bitsLength)); -// } - @Override public TransferPair getTransferPair(BufferAllocator allocator){ return new TransferImpl(name, allocator); @@ -356,10 +342,10 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator){ @Override public TransferPair makeTransferPair(ValueVector to) { - return new TransferImpl((Nullable${minor.class}Vector) to); + return new TransferImpl((${className}) to); } - public void transferTo(Nullable${minor.class}Vector target){ + public void transferTo(${className} target){ bits.transferTo(target.bits); values.transferTo(target.values); <#if type.major == "VarLen"> @@ -368,7 +354,7 @@ public void transferTo(Nullable${minor.class}Vector target){ clear(); } - public void splitAndTransferTo(int startIndex, int length, Nullable${minor.class}Vector target) { + public void splitAndTransferTo(int startIndex, int length, ${className} target) { bits.splitAndTransferTo(startIndex, length, target.bits); values.splitAndTransferTo(startIndex, length, target.values); <#if type.major == "VarLen"> @@ -377,22 +363,22 @@ public void splitAndTransferTo(int startIndex, int length, Nullable${minor.class } private class TransferImpl implements TransferPair { - Nullable${minor.class}Vector to; + ${className} to; public TransferImpl(String name, BufferAllocator allocator){ <#if minor.class == "Decimal"> - to = new Nullable${minor.class}Vector(name, allocator, precision, scale); + to = new ${className}(name, allocator, precision, scale); <#else> - to = new Nullable${minor.class}Vector(name, allocator); + to = new ${className}(name, allocator); } - public TransferImpl(Nullable${minor.class}Vector to){ + public TransferImpl(${className} to){ this.to = to; } @Override - public Nullable${minor.class}Vector getTo(){ + public ${className} getTo(){ return to; } @@ -408,7 +394,7 @@ public void splitAndTransfer(int startIndex, int length) { @Override public void copyValueSafe(int fromIndex, int toIndex) { - to.copyFromSafe(fromIndex, toIndex, Nullable${minor.class}Vector.this); + to.copyFromSafe(fromIndex, toIndex, ${className}.this); } } @@ -422,14 +408,14 @@ public Mutator getMutator(){ return mutator; } - public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){ + public void copyFrom(int fromIndex, int thisIndex, ${className} from){ final Accessor fromAccessor = from.getAccessor(); if (!fromAccessor.isNull(fromIndex)) { mutator.set(thisIndex, fromAccessor.get(fromIndex)); } } - public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){ + public void copyFromSafe(int fromIndex, int thisIndex, ${valuesName} from){ <#if type.major == "VarLen"> mutator.fillEmpties(thisIndex); @@ -437,7 +423,7 @@ public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from bits.getMutator().setSafe(thisIndex, 1); } - public void copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){ + public void copyFromSafe(int fromIndex, int thisIndex, ${className} from){ <#if type.major == "VarLen"> mutator.fillEmpties(thisIndex); @@ -640,7 +626,7 @@ public void set(int index, ${minor.class}Holder holder){ } public boolean isSafe(int outIndex) { - return outIndex < Nullable${minor.class}Vector.this.getValueCapacity(); + return outIndex < ${className}.this.getValueCapacity(); } <#assign fields = minor.fields!type.fields /> diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index ea1fdf6bd60..af041a93d7e 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -103,6 +103,8 @@ public List getChildrenFromFields() { @Override public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers) { + // truncate types vector buffer to size (#0) + org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 0, typeVector.getBufferSizeFor(fieldNode.getLength())); BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers); this.valueCount = fieldNode.getLength(); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java index 4c6d363f21c..b7df8d13ee6 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java @@ -30,6 +30,9 @@ public abstract class BaseDataValueVector extends BaseValueVector implements Buf protected final static byte[] emptyByteArray = new byte[]{}; // Nullable vectors use this + /** maximum extra size at the end of the buffer */ + private static final int MAX_BUFFER_PADDING = 64; + public static void load(ArrowFieldNode fieldNode, List vectors, List buffers) { int expectedSize = vectors.size(); if (buffers.size() != expectedSize) { @@ -40,6 +43,20 @@ public static void load(ArrowFieldNode fieldNode, List vectors, Li } } + public static void truncateBufferBasedOnSize(List buffers, int bufferIndex, int byteSize) { + if (bufferIndex >= buffers.size()) { + throw new IllegalArgumentException("no buffer at index " + bufferIndex + ": " + buffers); + } + ArrowBuf buffer = buffers.get(bufferIndex); + if (buffer.writerIndex() < byteSize) { + throw new IllegalArgumentException("can not truncate buffer to a larger size " + byteSize + ": " + buffer.writerIndex()); + } + if (buffer.writerIndex() - byteSize > MAX_BUFFER_PADDING) { + throw new IllegalArgumentException("Buffer too large to resize to " + byteSize + ": " + buffer.writerIndex()); + } + buffer.writerIndex(byteSize); + } + public static List unload(List vectors) { List result = new ArrayList<>(vectors.size()); for (BufferBacked vector : vectors) { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java index 7ce1236b2ec..48da8e77d68 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java @@ -68,7 +68,7 @@ public void load(ArrowFieldNode fieldNode, ArrowBuf data) { int remainder = count % 8; // set remaining bits if (remainder > 0) { - byte bitMask = (byte) (0xFFL >>> ((8 - remainder) & 7));; + byte bitMask = (byte) (0xFFL >>> ((8 - remainder) & 7)); this.data.setByte(fullBytesCount, bitMask); } } else if (fieldNode.getNullCount() == fieldNode.getLength()) { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 757f061dd5a..5c1176cf95d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -82,7 +82,7 @@ private void loadBuffers(FieldVector vector, Field field, Iterator buf vector.loadFieldBuffers(fieldNode, ownBuffers); } catch (RuntimeException e) { throw new IllegalArgumentException("Could not load buffers for field " + - field + " error message" + e.getMessage(), e); + field + ". error message: " + e.getMessage(), e); } List children = field.getChildren(); if (children.size() > 0) { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index e18f99f95d7..461bdbcda1b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -93,6 +93,8 @@ public List getChildrenFromFields() { @Override public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers) { + // variable width values: truncate offset vector buffer to size (#1) + org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, offsets.getBufferSizeFor(fieldNode.getLength() + 1)); BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers); } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java index 9dfe8d840e4..7a70ffd9047 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -32,6 +33,7 @@ import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter; import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; import org.apache.arrow.vector.complex.writer.BigIntWriter; import org.apache.arrow.vector.complex.writer.IntWriter; @@ -99,6 +101,79 @@ public void testUnloadLoad() throws IOException { } } + @Test + public void testUnloadLoadAddPadding() throws IOException { + int count = 10000; + Schema schema; + try ( + BufferAllocator originalVectorsAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", originalVectorsAllocator, null)) { + + // write some data + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + ListWriter list = rootWriter.list("list"); + IntWriter intWriter = list.integer(); + for (int i = 0; i < count; i++) { + list.setPosition(i); + list.startList(); + for (int j = 0; j < i % 4 + 1; j++) { + intWriter.writeInt(i); + } + list.endList(); + } + writer.setValueCount(count); + + // unload it + FieldVector root = parent.getChild("root"); + schema = new Schema(root.getField().getChildren()); + VectorUnloader vectorUnloader = newVectorUnloader(root); + try ( + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); + BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); + VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator); + ) { + List oldBuffers = recordBatch.getBuffers(); + List newBuffers = new ArrayList<>(); + for (ArrowBuf oldBuffer : oldBuffers) { + int l = oldBuffer.readableBytes(); + if (l % 64 != 0) { + // pad + l = l + 64 - l % 64; + } + ArrowBuf newBuffer = allocator.buffer(l); + for (int i = oldBuffer.readerIndex(); i < oldBuffer.writerIndex(); i++) { + newBuffer.setByte(i - oldBuffer.readerIndex(), oldBuffer.getByte(i)); + } + newBuffer.readerIndex(0); + newBuffer.writerIndex(l); + newBuffers.add(newBuffer); + } + + try (ArrowRecordBatch newBatch = new ArrowRecordBatch(recordBatch.getLength(), recordBatch.getNodes(), newBuffers);) { + // load it + VectorLoader vectorLoader = new VectorLoader(newRoot); + + vectorLoader.load(newBatch); + + FieldReader reader = newRoot.getVector("list").getReader(); + for (int i = 0; i < count; i++) { + reader.setPosition(i); + List expected = new ArrayList<>(); + for (int j = 0; j < i % 4 + 1; j++) { + expected.add(i); + } + Assert.assertEquals(expected, reader.readObject()); + } + } + + for (ArrowBuf newBuf : newBuffers) { + newBuf.release(); + } + } + } + } + /** * The validity buffer can be empty if: * - all values are defined @@ -113,12 +188,17 @@ public void testLoadEmptyValidityBuffer() throws IOException { )); int count = 10; ArrowBuf validity = allocator.getEmpty(); - ArrowBuf values = allocator.buffer(count * 4); // integers - for (int i = 0; i < count; i++) { - values.setInt(i * 4, i); + ArrowBuf[] values = new ArrowBuf[2]; + for (int i = 0; i < values.length; i++) { + ArrowBuf arrowBuf = allocator.buffer(count * 4); // integers + values[i] = arrowBuf; + for (int j = 0; j < count; j++) { + arrowBuf.setInt(j * 4, j); + } + arrowBuf.writerIndex(count * 4); } try ( - ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values, validity, values)); + ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values[0], validity, values[1])); BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator); ) { @@ -153,7 +233,9 @@ public void testLoadEmptyValidityBuffer() throws IOException { assertFalse(intDefinedVector.getAccessor().isNull(count + 10)); assertEquals(1234, intDefinedVector.getAccessor().get(count + 10)); } finally { - values.release(); + for (ArrowBuf arrowBuf : values) { + arrowBuf.release(); + } } }