From 27b390928b5895f5f4f895878826acb911bc2f0f Mon Sep 17 00:00:00 2001 From: Nong Li Date: Thu, 19 Jan 2017 14:57:42 -0800 Subject: [PATCH 1/7] [ARROW-499]: [Java] Update file serialization to use the streaming serialization format. This removes the old serialization code and unifies the code paths. This also changes the serialization format to match the alignment requirements from the previous file format. --- format/File.fbs | 5 +- .../apache/arrow/vector/file/ArrowBlock.java | 25 ++---- .../apache/arrow/vector/file/ArrowFooter.java | 10 ++- .../apache/arrow/vector/file/ArrowReader.java | 63 +++------------ .../apache/arrow/vector/file/ArrowWriter.java | 45 ++--------- .../apache/arrow/vector/file/ReadChannel.java | 11 ++- .../vector/stream/MessageSerializer.java | 81 ++++++++++++++++--- .../arrow/vector/file/TestArrowFile.java | 4 - .../arrow/vector/file/TestArrowFooter.java | 8 ++ 9 files changed, 119 insertions(+), 133 deletions(-) diff --git a/format/File.fbs b/format/File.fbs index f28dc204d58d9..a2ed01eb2b22d 100644 --- a/format/File.fbs +++ b/format/File.fbs @@ -37,10 +37,7 @@ struct Block { offset: long; - metaDataLength: int; - - bodyLength: long; - + length: int; } root_type Footer; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java index 90fb02b059707..c853de5826ab7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java @@ -25,39 +25,32 @@ public class ArrowBlock implements FBSerializable { private final long offset; - private final int metadataLength; - private final long bodyLength; + private final int length; - public ArrowBlock(long offset, int metadataLength, long bodyLength) { + public ArrowBlock(long offset, int length) { super(); this.offset = offset; - this.metadataLength = metadataLength; - this.bodyLength = bodyLength; + this.length = length; } public long getOffset() { return offset; } - public int getMetadataLength() { - return metadataLength; - } - - public long getBodyLength() { - return bodyLength; + public int getLength() { + return length; } @Override public int writeTo(FlatBufferBuilder builder) { - return Block.createBlock(builder, offset, metadataLength, bodyLength); + return Block.createBlock(builder, offset, length); } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + (int) (bodyLength ^ (bodyLength >>> 32)); - result = prime * result + metadataLength; + result = prime * result + (length ^ (length >>> 32)); result = prime * result + (int) (offset ^ (offset >>> 32)); return result; } @@ -71,9 +64,7 @@ public boolean equals(Object obj) { if (getClass() != obj.getClass()) return false; ArrowBlock other = (ArrowBlock) obj; - if (bodyLength != other.bodyLength) - return false; - if (metadataLength != other.metadataLength) + if (length != other.length) return false; if (offset != other.offset) return false; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java index 3be19296cb56d..b45ef2bf37654 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java @@ -55,21 +55,23 @@ public ArrowFooter(Footer footer) { private static List recordBatches(Footer footer) { List recordBatches = new ArrayList<>(); Block tempBlock = new Block(); + int recordBatchesLength = footer.recordBatchesLength(); for (int i = 0; i < recordBatchesLength; i++) { Block block = footer.recordBatches(tempBlock, i); - recordBatches.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength())); + recordBatches.add(new ArrowBlock(block.offset(), block.length())); } return recordBatches; } private static List dictionaries(Footer footer) { List dictionaries = new ArrayList<>(); - Block tempBLock = new Block(); + Block tempBlock = new Block(); + int dictionariesLength = footer.dictionariesLength(); for (int i = 0; i < dictionariesLength; i++) { - Block block = footer.dictionaries(tempBLock, i); - dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength())); + Block block = footer.dictionaries(tempBlock, i); + dictionaries.add(new ArrowBlock(block.offset(), block.length())); } return dictionaries; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java index 58c51605c5600..3c65af0a38cd6 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java @@ -20,23 +20,15 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; -import org.apache.arrow.flatbuf.Buffer; -import org.apache.arrow.flatbuf.FieldNode; import org.apache.arrow.flatbuf.Footer; -import org.apache.arrow.flatbuf.RecordBatch; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.schema.ArrowFieldNode; import org.apache.arrow.vector.schema.ArrowRecordBatch; import org.apache.arrow.vector.stream.MessageSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ArrowBuf; - public class ArrowReader implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(ArrowReader.class); @@ -54,15 +46,6 @@ public ArrowReader(SeekableByteChannel in, BufferAllocator allocator) { this.allocator = allocator; } - private int readFully(ArrowBuf buffer, int l) throws IOException { - int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l)); - buffer.writerIndex(n); - if (n != l) { - throw new IllegalStateException(n + " != " + l); - } - return n; - } - private int readFully(ByteBuffer buffer) throws IOException { int total = 0; int n; @@ -104,46 +87,20 @@ public ArrowFooter readFooter() throws IOException { // TODO: read dictionaries - public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOException { - LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", recordBatchBlock.getOffset(), recordBatchBlock.getMetadataLength(), recordBatchBlock.getBodyLength())); - int l = (int)(recordBatchBlock.getMetadataLength() + recordBatchBlock.getBodyLength()); - if (l < 0) { - throw new InvalidArrowFileException("block invalid: " + recordBatchBlock); - } - final ArrowBuf buffer = allocator.buffer(l); - LOGGER.debug("allocated buffer " + buffer); - in.position(recordBatchBlock.getOffset()); - int n = readFully(buffer, l); - if (n != l) { - throw new IllegalStateException(n + " != " + l); - } - - // Record batch flatbuffer is prefixed by its size as int32le - final ArrowBuf metadata = buffer.slice(4, recordBatchBlock.getMetadataLength() - 4); - RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(metadata.nioBuffer().asReadOnlyBuffer()); - - int nodesLength = recordBatchFB.nodesLength(); - final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength()); - List nodes = new ArrayList<>(); - for (int i = 0; i < nodesLength; ++i) { - FieldNode node = recordBatchFB.nodes(i); - nodes.add(new ArrowFieldNode(node.length(), node.nullCount())); + public ArrowRecordBatch readRecordBatch(ArrowBlock block) throws IOException { + LOGGER.debug(String.format("RecordBatch at offset %d len: %d", + block.getOffset(), block.getLength())); + in.position(block.getOffset()); + ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch( + new ReadChannel(in, block.getOffset()), (int)block.getLength(), allocator); + if (batch == null) { + throw new IOException("Invalid file. No batch at offset: " + block.getOffset()); } - List buffers = new ArrayList<>(); - for (int i = 0; i < recordBatchFB.buffersLength(); ++i) { - Buffer bufferFB = recordBatchFB.buffers(i); - LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", bufferFB.offset(), bufferFB.length())); - ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), (int)bufferFB.length()); - buffers.add(vectorBuffer); - } - ArrowRecordBatch arrowRecordBatch = new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers); - LOGGER.debug("released buffer " + buffer); - buffer.release(); - return arrowRecordBatch; + return batch; } + @Override public void close() throws IOException { in.close(); } - } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java index 3febd11f4c76a..d5b92320c2c7e 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java @@ -23,14 +23,12 @@ import java.util.Collections; import java.util.List; -import org.apache.arrow.vector.schema.ArrowBuffer; import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.stream.MessageSerializer; import org.apache.arrow.vector.types.pojo.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ArrowBuf; - public class ArrowWriter implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class); @@ -39,7 +37,6 @@ public class ArrowWriter implements AutoCloseable { private final Schema schema; private final List recordBatches = new ArrayList<>(); - private boolean started = false; public ArrowWriter(WritableByteChannel out, Schema schema) { @@ -49,47 +46,17 @@ public ArrowWriter(WritableByteChannel out, Schema schema) { private void start() throws IOException { writeMagic(); + MessageSerializer.serialize(out, schema); } - // TODO: write dictionaries public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException { checkStarted(); - out.align(); - - // write metadata header with int32 size prefix - long offset = out.getCurrentPosition(); - out.write(recordBatch, true); - out.align(); - // write body - long bodyOffset = out.getCurrentPosition(); - List buffers = recordBatch.getBuffers(); - List buffersLayout = recordBatch.getBuffersLayout(); - if (buffers.size() != buffersLayout.size()) { - throw new IllegalStateException("the layout does not match: " + buffers.size() + " != " + buffersLayout.size()); - } - for (int i = 0; i < buffers.size(); i++) { - ArrowBuf buffer = buffers.get(i); - ArrowBuffer layout = buffersLayout.get(i); - long startPosition = bodyOffset + layout.getOffset(); - if (startPosition != out.getCurrentPosition()) { - out.writeZeros((int)(startPosition - out.getCurrentPosition())); - } - - out.write(buffer); - if (out.getCurrentPosition() != startPosition + layout.getSize()) { - throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() + " != " + startPosition + layout.getSize()); - } - } - int metadataLength = (int)(bodyOffset - offset); - if (metadataLength <= 0) { - throw new InvalidArrowFileException("invalid recordBatch"); - } - long bodyLength = out.getCurrentPosition() - bodyOffset; - LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", offset, metadataLength, bodyLength)); - // add metadata to footer - recordBatches.add(new ArrowBlock(offset, metadataLength, bodyLength)); + ArrowBlock batchDesc = MessageSerializer.serialize(out, recordBatch); + LOGGER.debug(String.format("RecordBatch at offset: %d len: %d", + batchDesc.getOffset(), batchDesc.getLength())); + recordBatches.add(batchDesc); } private void checkStarted() throws IOException { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java index b062f3826eab3..a9dc1293b8193 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java @@ -32,9 +32,16 @@ public class ReadChannel implements AutoCloseable { private ReadableByteChannel in; private long bytesRead = 0; + // The starting byte offset into 'in'. + private final long startByteOffset; - public ReadChannel(ReadableByteChannel in) { + public ReadChannel(ReadableByteChannel in, long startByteOffset) { this.in = in; + this.startByteOffset = startByteOffset; + } + + public ReadChannel(ReadableByteChannel in) { + this(in, 0); } public long bytesRead() { return bytesRead; } @@ -65,6 +72,8 @@ public int readFully(ArrowBuf buffer, int l) throws IOException { return n; } + public long getCurrentPositiion() { return startByteOffset + bytesRead; } + @Override public void close() throws IOException { if (this.in != null) { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index 22c46e2817b1e..a93d8a8c31417 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -29,6 +29,7 @@ import org.apache.arrow.flatbuf.MetadataVersion; import org.apache.arrow.flatbuf.RecordBatch; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.file.ArrowBlock; import org.apache.arrow.vector.file.ReadChannel; import org.apache.arrow.vector.file.WriteChannel; import org.apache.arrow.vector.schema.ArrowBuffer; @@ -52,7 +53,8 @@ * For RecordBatch messages the serialization is: * 1. 4 byte little endian batch metadata header * 2. FB serialized RowBatch - * 3. serialized RowBatch buffers. + * 3. Padding to align to 8 byte boundary. + * 4. serialized RowBatch buffers. */ public class MessageSerializer { @@ -98,26 +100,41 @@ public static Schema deserializeSchema(ReadChannel in) throws IOException { } /** - * Serializes an ArrowRecordBatch. + * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch. */ - public static long serialize(WriteChannel out, ArrowRecordBatch batch) + public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) throws IOException { long start = out.getCurrentPosition(); int bodyLength = batch.computeBodyLength(); - ByteBuffer metadata = WriteChannel.serialize(batch); + + int messageLength = 4 + metadata.remaining() + bodyLength; ByteBuffer serializedHeader = - serializeHeader(MessageHeader.RecordBatch, bodyLength + metadata.remaining() + 4); + serializeHeader(MessageHeader.RecordBatch, messageLength); + + // Compute the required alignment. This is not a great way to do it. The issue is + // that we need to know the message size to serialize the message header but the + // size depends on the alignment, which depends on the message header. + // This will serialize the header again with the updated size alignment adjusted. + // TODO: We really just want sizeof(MessageHeader) from the serializeHeader() above. + // Is there a way to do this? + long bufferOffset = start + 4 + serializedHeader.remaining() + 4 + metadata.remaining(); + if (bufferOffset % 8 != 0) { + messageLength += 8 - bufferOffset % 8; + serializedHeader = serializeHeader(MessageHeader.RecordBatch, messageLength); + } // Write message header. out.writeIntLittleEndian(serializedHeader.remaining()); out.write(serializedHeader); - // Write the metadata, with the 4 byte little endian prefix + // Write batch header. with the 4 byte little endian prefix out.writeIntLittleEndian(metadata.remaining()); out.write(metadata); - // Write batch header. + // Align the output to 8 byte boundary. + out.align(); + long offset = out.getCurrentPosition(); List buffers = batch.getBuffers(); List buffersLayout = batch.getBuffersLayout(); @@ -135,7 +152,7 @@ public static long serialize(WriteChannel out, ArrowRecordBatch batch) " != " + startPosition + layout.getSize()); } } - return out.getCurrentPosition() - start; + return new ArrowBlock(start, (int)(out.getCurrentPosition() - start)); } /** @@ -149,17 +166,59 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, int messageLen = (int)header.bodyLength(); // Now read the buffer. This has the metadata followed by the data. ArrowBuf buffer = alloc.buffer(messageLen); + long readPosition = in.getCurrentPositiion(); + if (in.readFully(buffer, messageLen) != messageLen) { + throw new IOException("Unexpected end of input trying to read batch."); + } + return deserializeRecordBatch(buffer, readPosition, messageLen); + } + + /** + * Deserializes a RecordBatch knowing the size of the entire message up front. This + * minimizes the number of reads to the underlying stream. + */ + public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, int messageLen, + BufferAllocator alloc) throws IOException { + ArrowBuf buffer = alloc.buffer(messageLen); + long readPosition = in.getCurrentPositiion(); if (in.readFully(buffer, messageLen) != messageLen) { throw new IOException("Unexpected end of input trying to read batch."); } + byte[] headerLenBytes = new byte[4]; + buffer.getBytes(0, headerLenBytes); + int headerLen = bytesToInt(headerLenBytes); + buffer = buffer.slice(4, messageLen - 4); + messageLen -=4; + readPosition += 4; + + Message header = Message.getRootAsMessage(buffer.nioBuffer()); + if (header.headerType() != MessageHeader.RecordBatch) { + throw new IOException("Invalid message: expecting " + MessageHeader.RecordBatch + + ". Message contained: " + header.headerType()); + } + + buffer = buffer.slice(headerLen, messageLen - headerLen); + messageLen -= headerLen; + readPosition += headerLen; + return deserializeRecordBatch(buffer, readPosition, messageLen); + } + + private static ArrowRecordBatch deserializeRecordBatch( + ArrowBuf buffer, long readPosition, int bufferLen) { // Read the metadata. It starts with the 4 byte size of the metadata. int metadataSize = buffer.readInt(); RecordBatch recordBatchFB = - RecordBatch.getRootAsRecordBatch( buffer.nioBuffer().asReadOnlyBuffer()); + RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer()); + + int bufferOffset = 4 + metadataSize; + readPosition += bufferOffset; + if (readPosition % 8 != 0) { + bufferOffset += (int)(8 - readPosition % 8); + } - // No read the body - final ArrowBuf body = buffer.slice(4 + metadataSize, messageLen - metadataSize - 4); + // Now read the body + final ArrowBuf body = buffer.slice(bufferOffset, bufferLen - bufferOffset); int nodesLength = recordBatchFB.nodesLength(); List nodes = new ArrayList<>(); for (int i = 0; i < nodesLength; ++i) { diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java index bf635fb39f5b8..9b9914480bad0 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java @@ -109,8 +109,6 @@ public void testWriteRead() throws IOException { List recordBatches = footer.getRecordBatches(); for (ArrowBlock rbBlock : recordBatches) { - Assert.assertEquals(0, rbBlock.getOffset() % 8); - Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { List buffersLayout = recordBatch.getBuffersLayout(); for (ArrowBuffer arrowBuffer : buffersLayout) { @@ -271,8 +269,6 @@ public void testWriteReadMultipleRBs() throws IOException { for (ArrowBlock rbBlock : recordBatches) { Assert.assertTrue(rbBlock.getOffset() + " > " + previousOffset, rbBlock.getOffset() > previousOffset); previousOffset = rbBlock.getOffset(); - Assert.assertEquals(0, rbBlock.getOffset() % 8); - Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { Assert.assertEquals("RB #" + i, counts[i], recordBatch.getLength()); List buffersLayout = recordBatch.getBuffersLayout(); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java index 707dba2af9898..1b67ac2a5e7d7 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java @@ -21,7 +21,9 @@ import static org.junit.Assert.assertEquals; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import org.apache.arrow.flatbuf.Footer; import org.apache.arrow.vector.types.pojo.ArrowType; @@ -41,6 +43,12 @@ public void test() { ArrowFooter footer = new ArrowFooter(schema, Collections.emptyList(), Collections.emptyList()); ArrowFooter newFooter = roundTrip(footer); assertEquals(footer, newFooter); + + List ids = new ArrayList<>(); + ids.add(new ArrowBlock(0, 1)); + ids.add(new ArrowBlock(4, 5)); + footer = new ArrowFooter(schema, ids, ids); + assertEquals(footer, roundTrip(footer)); } From 7c6f7efe1277586fb20de3330e8f29f9bf8ab962 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Fri, 20 Jan 2017 00:55:12 -0800 Subject: [PATCH 2/7] Update to restore Block behavior --- format/File.fbs | 8 ++- .../apache/arrow/vector/file/ArrowBlock.java | 25 ++++++--- .../apache/arrow/vector/file/ArrowFooter.java | 5 +- .../apache/arrow/vector/file/ArrowReader.java | 7 +-- .../apache/arrow/vector/file/ArrowWriter.java | 6 ++- .../vector/stream/MessageSerializer.java | 53 +++++++++---------- .../arrow/vector/file/TestArrowFooter.java | 4 +- .../vector/file/TestArrowReaderWriter.java | 14 +++++ 8 files changed, 76 insertions(+), 46 deletions(-) diff --git a/format/File.fbs b/format/File.fbs index a2ed01eb2b22d..86b4b22a92d57 100644 --- a/format/File.fbs +++ b/format/File.fbs @@ -35,9 +35,15 @@ table Footer { struct Block { + /// Index to the start of the RecordBlock (note this is past the Message header) offset: long; - length: int; + /// Length of the metadata + metaDataLength: int; + + /// Length of the data (this is aligned so there can be a gap between this and + /// the metatdata). + bodyLength: int; } root_type Footer; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java index c853de5826ab7..a55c283f40b55 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java @@ -25,32 +25,39 @@ public class ArrowBlock implements FBSerializable { private final long offset; - private final int length; + private final int metadataLength; + private final int bodyLength; - public ArrowBlock(long offset, int length) { + public ArrowBlock(long offset, int metadataLength, int bodyLength) { super(); this.offset = offset; - this.length = length; + this.metadataLength = metadataLength; + this.bodyLength = bodyLength; } public long getOffset() { return offset; } - public int getLength() { - return length; + public int getMetadataLength() { + return metadataLength; + } + + public int getBodyLength() { + return bodyLength; } @Override public int writeTo(FlatBufferBuilder builder) { - return Block.createBlock(builder, offset, length); + return Block.createBlock(builder, offset, metadataLength, bodyLength); } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + (length ^ (length >>> 32)); + result = prime * result + bodyLength; + result = prime * result + metadataLength; result = prime * result + (int) (offset ^ (offset >>> 32)); return result; } @@ -64,7 +71,9 @@ public boolean equals(Object obj) { if (getClass() != obj.getClass()) return false; ArrowBlock other = (ArrowBlock) obj; - if (length != other.length) + if (bodyLength != other.bodyLength) + return false; + if (metadataLength != other.metadataLength) return false; if (offset != other.offset) return false; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java index b45ef2bf37654..38903068570c7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java @@ -55,11 +55,10 @@ public ArrowFooter(Footer footer) { private static List recordBatches(Footer footer) { List recordBatches = new ArrayList<>(); Block tempBlock = new Block(); - int recordBatchesLength = footer.recordBatchesLength(); for (int i = 0; i < recordBatchesLength; i++) { Block block = footer.recordBatches(tempBlock, i); - recordBatches.add(new ArrowBlock(block.offset(), block.length())); + recordBatches.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength())); } return recordBatches; } @@ -71,7 +70,7 @@ private static List dictionaries(Footer footer) { int dictionariesLength = footer.dictionariesLength(); for (int i = 0; i < dictionariesLength; i++) { Block block = footer.dictionaries(tempBlock, i); - dictionaries.add(new ArrowBlock(block.offset(), block.length())); + dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength())); } return dictionaries; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java index 3c65af0a38cd6..8f4f4978d66cf 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java @@ -88,11 +88,12 @@ public ArrowFooter readFooter() throws IOException { // TODO: read dictionaries public ArrowRecordBatch readRecordBatch(ArrowBlock block) throws IOException { - LOGGER.debug(String.format("RecordBatch at offset %d len: %d", - block.getOffset(), block.getLength())); + LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", + block.getOffset(), block.getMetadataLength(), + block.getBodyLength())); in.position(block.getOffset()); ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch( - new ReadChannel(in, block.getOffset()), (int)block.getLength(), allocator); + new ReadChannel(in, block.getOffset()), block, allocator); if (batch == null) { throw new IOException("Invalid file. No batch at offset: " + block.getOffset()); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java index d5b92320c2c7e..24c667e67d98d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java @@ -54,8 +54,10 @@ private void start() throws IOException { public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException { checkStarted(); ArrowBlock batchDesc = MessageSerializer.serialize(out, recordBatch); - LOGGER.debug(String.format("RecordBatch at offset: %d len: %d", - batchDesc.getOffset(), batchDesc.getLength())); + LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", + batchDesc.getOffset(), batchDesc.getMetadataLength(), batchDesc.getBodyLength())); + + // add metadata to footer recordBatches.add(batchDesc); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index a93d8a8c31417..61f59ae39e7ff 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -130,19 +130,21 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) // Write batch header. with the 4 byte little endian prefix out.writeIntLittleEndian(metadata.remaining()); + int metadataSize = metadata.remaining(); + long batchStart = out.getCurrentPosition(); out.write(metadata); // Align the output to 8 byte boundary. out.align(); - long offset = out.getCurrentPosition(); + long bufferStart = out.getCurrentPosition(); List buffers = batch.getBuffers(); List buffersLayout = batch.getBuffersLayout(); for (int i = 0; i < buffers.size(); i++) { ArrowBuf buffer = buffers.get(i); ArrowBuffer layout = buffersLayout.get(i); - long startPosition = offset + layout.getOffset(); + long startPosition = bufferStart + layout.getOffset(); if (startPosition != out.getCurrentPosition()) { out.writeZeros((int)(startPosition - out.getCurrentPosition())); } @@ -152,7 +154,7 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) " != " + startPosition + layout.getSize()); } } - return new ArrowBlock(start, (int)(out.getCurrentPosition() - start)); + return new ArrowBlock(batchStart, metadataSize, (int)(out.getCurrentPosition() - bufferStart)); } /** @@ -170,48 +172,45 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, if (in.readFully(buffer, messageLen) != messageLen) { throw new IOException("Unexpected end of input trying to read batch."); } - return deserializeRecordBatch(buffer, readPosition, messageLen); + + // Read the length of the metadata. + int metadataLen = buffer.readInt(); + buffer = buffer.slice(4, messageLen - 4); + readPosition += 4; + messageLen -= 4; + return deserializeRecordBatch(buffer, readPosition, metadataLen, messageLen); } /** * Deserializes a RecordBatch knowing the size of the entire message up front. This * minimizes the number of reads to the underlying stream. */ - public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, int messageLen, + public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock block, BufferAllocator alloc) throws IOException { - ArrowBuf buffer = alloc.buffer(messageLen); long readPosition = in.getCurrentPositiion(); - if (in.readFully(buffer, messageLen) != messageLen) { - throw new IOException("Unexpected end of input trying to read batch."); + int totalLen = block.getMetadataLength() + block.getBodyLength(); + if ((readPosition + block.getMetadataLength()) % 8 != 0) { + // Compute padded size. + totalLen += (8 - (readPosition + block.getMetadataLength()) % 8); } - byte[] headerLenBytes = new byte[4]; - buffer.getBytes(0, headerLenBytes); - int headerLen = bytesToInt(headerLenBytes); - buffer = buffer.slice(4, messageLen - 4); - messageLen -=4; - readPosition += 4; - - Message header = Message.getRootAsMessage(buffer.nioBuffer()); - if (header.headerType() != MessageHeader.RecordBatch) { - throw new IOException("Invalid message: expecting " + MessageHeader.RecordBatch + - ". Message contained: " + header.headerType()); + ArrowBuf buffer = alloc.buffer(totalLen); + if (in.readFully(buffer, totalLen) != totalLen) { + throw new IOException("Unexpected end of input trying to read batch."); } - buffer = buffer.slice(headerLen, messageLen - headerLen); - messageLen -= headerLen; - readPosition += headerLen; - return deserializeRecordBatch(buffer, readPosition, messageLen); + return deserializeRecordBatch(buffer, readPosition, block.getMetadataLength(), totalLen); } + // Deserializes a record batch. Buffer should start at the RecordBatch and include + // all the bytes for the metadata and then data buffers. private static ArrowRecordBatch deserializeRecordBatch( - ArrowBuf buffer, long readPosition, int bufferLen) { - // Read the metadata. It starts with the 4 byte size of the metadata. - int metadataSize = buffer.readInt(); + ArrowBuf buffer, long readPosition, int metadataLen, int bufferLen) { + // Read the metadata. RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer()); - int bufferOffset = 4 + metadataSize; + int bufferOffset = metadataLen; readPosition += bufferOffset; if (readPosition % 8 != 0) { bufferOffset += (int)(8 - readPosition % 8); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java index 1b67ac2a5e7d7..1e514585e502f 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java @@ -45,8 +45,8 @@ public void test() { assertEquals(footer, newFooter); List ids = new ArrayList<>(); - ids.add(new ArrowBlock(0, 1)); - ids.add(new ArrowBlock(4, 5)); + ids.add(new ArrowBlock(0, 1, 2)); + ids.add(new ArrowBlock(4, 5, 6)); footer = new ArrowFooter(schema, ids, ids); assertEquals(footer, roundTrip(footer)); } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java index 8ed89fa347b3b..f41866435f8d6 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java @@ -24,10 +24,13 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.util.Collections; import java.util.List; +import org.apache.arrow.flatbuf.FieldNode; +import org.apache.arrow.flatbuf.RecordBatch; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.schema.ArrowFieldNode; @@ -96,6 +99,17 @@ public void test() throws IOException { assertArrayEquals(validity, array(buffers.get(0))); assertArrayEquals(values, array(buffers.get(1))); + // Read just the header. This demonstrates being able to read without need to + // deserialize the buffer. + ByteBuffer headerBuffer = ByteBuffer.allocate(recordBatches.get(0).getMetadataLength()); + headerBuffer.put(byteArray, (int)recordBatches.get(0).getOffset(), headerBuffer.capacity()); + headerBuffer.rewind(); + RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(headerBuffer); + assertEquals(2, recordBatchFB.buffersLength()); + assertEquals(1, recordBatchFB.nodesLength()); + FieldNode nodeFB = recordBatchFB.nodes(0); + assertEquals(16, nodeFB.length()); + assertEquals(8, nodeFB.nullCount()); } } From 21854ccdeb7ca068dbe65542bd01e0ef9016c55c Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 20 Jan 2017 09:17:14 -0500 Subject: [PATCH 3/7] Restore Block.bodyLength to long Change-Id: I4bae06d5833ffd24f94522ad23ea2dfcc459d86b --- format/File.fbs | 2 +- .../org/apache/arrow/vector/file/ArrowBlock.java | 8 ++++---- .../arrow/vector/stream/MessageSerializer.java | 15 ++++++++++----- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/format/File.fbs b/format/File.fbs index 86b4b22a92d57..e8d6da4f848ff 100644 --- a/format/File.fbs +++ b/format/File.fbs @@ -43,7 +43,7 @@ struct Block { /// Length of the data (this is aligned so there can be a gap between this and /// the metatdata). - bodyLength: int; + bodyLength: long; } root_type Footer; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java index a55c283f40b55..90fb02b059707 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java @@ -26,9 +26,9 @@ public class ArrowBlock implements FBSerializable { private final long offset; private final int metadataLength; - private final int bodyLength; + private final long bodyLength; - public ArrowBlock(long offset, int metadataLength, int bodyLength) { + public ArrowBlock(long offset, int metadataLength, long bodyLength) { super(); this.offset = offset; this.metadataLength = metadataLength; @@ -43,7 +43,7 @@ public int getMetadataLength() { return metadataLength; } - public int getBodyLength() { + public long getBodyLength() { return bodyLength; } @@ -56,7 +56,7 @@ public int writeTo(FlatBufferBuilder builder) { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + bodyLength; + result = prime * result + (int) (bodyLength ^ (bodyLength >>> 32)); result = prime * result + metadataLength; result = prime * result + (int) (offset ^ (offset >>> 32)); return result; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index 61f59ae39e7ff..41164d6a348bc 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -154,7 +154,7 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) " != " + startPosition + layout.getSize()); } } - return new ArrowBlock(batchStart, metadataSize, (int)(out.getCurrentPosition() - bufferStart)); + return new ArrowBlock(batchStart, metadataSize, out.getCurrentPosition() - bufferStart); } /** @@ -188,18 +188,23 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock block, BufferAllocator alloc) throws IOException { long readPosition = in.getCurrentPositiion(); - int totalLen = block.getMetadataLength() + block.getBodyLength(); + long totalLen = block.getMetadataLength() + block.getBodyLength(); if ((readPosition + block.getMetadataLength()) % 8 != 0) { // Compute padded size. totalLen += (8 - (readPosition + block.getMetadataLength()) % 8); } - ArrowBuf buffer = alloc.buffer(totalLen); - if (in.readFully(buffer, totalLen) != totalLen) { + if (totalLen > Integer.MAX_VALUE) { + throw new IOException("Cannot currently deserialize record batches over 2GB"); + } + + + ArrowBuf buffer = alloc.buffer((int) totalLen); + if (in.readFully(buffer, (int) totalLen) != totalLen) { throw new IOException("Unexpected end of input trying to read batch."); } - return deserializeRecordBatch(buffer, readPosition, block.getMetadataLength(), totalLen); + return deserializeRecordBatch(buffer, readPosition, block.getMetadataLength(), (int) totalLen); } // Deserializes a record batch. Buffer should start at the RecordBatch and include From ba8db914f00a717dfb334b4c02692d7977684d1f Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 20 Jan 2017 10:43:19 -0500 Subject: [PATCH 4/7] Redo MessageSerializer with unions. Still has bugs Change-Id: Ib8beb014310219a7ab8263802ec94d2ea5af6805 --- cpp/src/arrow/ipc/adapter.cc | 11 +- cpp/src/arrow/ipc/metadata-internal.cc | 21 +-- .../vector/stream/MessageSerializer.java | 153 ++++++++---------- 3 files changed, 75 insertions(+), 110 deletions(-) diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 2b5ef11f861af..7b4d18c267d43 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -129,13 +129,12 @@ class RecordBatchWriter : public ArrayVisitor { num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb)); // Need to write 4 bytes (metadata size), the metadata, plus padding to - // fall on a 64-byte offset - int64_t padded_metadata_length = - BitUtil::RoundUpToMultipleOf64(metadata_fb->size() + 4); + // fall on an 8-byte offset + int64_t padded_metadata_length = BitUtil::CeilByte(metadata_fb->size() + 4); // The returned metadata size includes the length prefix, the flatbuffer, // plus padding - *metadata_length = padded_metadata_length; + *metadata_length = static_cast(padded_metadata_length); // Write the flatbuffer size prefix int32_t flatbuffer_size = metadata_fb->size(); @@ -604,7 +603,9 @@ Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length, return Status::Invalid(ss.str()); } - *metadata = std::make_shared(buffer, sizeof(int32_t)); + std::shared_ptr message; + RETURN_NOT_OK(Message::Open(buffer, 4, &message)); + *metadata = std::make_shared(message); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 16069a8f9dcf0..cc160c42ec9ef 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -320,23 +320,10 @@ Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length, Status WriteRecordBatchMetadata(int32_t length, int64_t body_length, const std::vector& nodes, const std::vector& buffers, std::shared_ptr* out) { - flatbuffers::FlatBufferBuilder fbb; - - auto batch = flatbuf::CreateRecordBatch( - fbb, length, fbb.CreateVectorOfStructs(nodes), fbb.CreateVectorOfStructs(buffers)); - - fbb.Finish(batch); - - int32_t size = fbb.GetSize(); - - auto result = std::make_shared(); - RETURN_NOT_OK(result->Resize(size)); - - uint8_t* dst = result->mutable_data(); - memcpy(dst, fbb.GetBufferPointer(), size); - - *out = result; - return Status::OK(); + MessageBuilder builder; + RETURN_NOT_OK(builder.SetRecordBatch(length, body_length, nodes, buffers)); + RETURN_NOT_OK(builder.Finish()); + return builder.GetBuffer(out); } Status MessageBuilder::Finish() { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index 41164d6a348bc..4f487b9ac586b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -70,14 +70,10 @@ public static int bytesToInt(byte[] bytes) { */ public static long serialize(WriteChannel out, Schema schema) throws IOException { FlatBufferBuilder builder = new FlatBufferBuilder(); - builder.finish(schema.getSchema(builder)); - ByteBuffer serializedBody = builder.dataBuffer(); - ByteBuffer serializedHeader = - serializeHeader(MessageHeader.Schema, serializedBody.remaining()); - - long size = out.writeIntLittleEndian(serializedHeader.remaining()); - size += out.write(serializedHeader); - size += out.write(serializedBody); + int schemaOffset = schema.getSchema(builder); + ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.Schema, schemaOffset, 0); + long size = out.writeIntLittleEndian(serializedMessage.remaining()); + size += out.write(serializedMessage); return size; } @@ -85,18 +81,13 @@ public static long serialize(WriteChannel out, Schema schema) throws IOException * Deserializes a schema object. Format is from serialize(). */ public static Schema deserializeSchema(ReadChannel in) throws IOException { - Message header = deserializeHeader(in, MessageHeader.Schema); - if (header == null) { + Message message = deserializeMessage(in, MessageHeader.Schema); + if (message == null) { throw new IOException("Unexpected end of input. Missing schema."); } - // Now read the schema. - ByteBuffer buffer = ByteBuffer.allocate((int)header.bodyLength()); - if (in.readFully(buffer) != header.bodyLength()) { - throw new IOException("Unexpected end of input trying to read schema."); - } - buffer.rewind(); - return Schema.deserialize(buffer); + return Schema.convertSchema((org.apache.arrow.flatbuf.Schema) + message.header(new org.apache.arrow.flatbuf.Schema())); } /** @@ -106,37 +97,22 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) throws IOException { long start = out.getCurrentPosition(); int bodyLength = batch.computeBodyLength(); - ByteBuffer metadata = WriteChannel.serialize(batch); - - int messageLength = 4 + metadata.remaining() + bodyLength; - ByteBuffer serializedHeader = - serializeHeader(MessageHeader.RecordBatch, messageLength); - - // Compute the required alignment. This is not a great way to do it. The issue is - // that we need to know the message size to serialize the message header but the - // size depends on the alignment, which depends on the message header. - // This will serialize the header again with the updated size alignment adjusted. - // TODO: We really just want sizeof(MessageHeader) from the serializeHeader() above. - // Is there a way to do this? - long bufferOffset = start + 4 + serializedHeader.remaining() + 4 + metadata.remaining(); - if (bufferOffset % 8 != 0) { - messageLength += 8 - bufferOffset % 8; - serializedHeader = serializeHeader(MessageHeader.RecordBatch, messageLength); - } - // Write message header. - out.writeIntLittleEndian(serializedHeader.remaining()); - out.write(serializedHeader); + FlatBufferBuilder builder = new FlatBufferBuilder(); + int batchOffset = batch.writeTo(builder); + + ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.RecordBatch, + batchOffset, bodyLength); - // Write batch header. with the 4 byte little endian prefix - out.writeIntLittleEndian(metadata.remaining()); - int metadataSize = metadata.remaining(); - long batchStart = out.getCurrentPosition(); - out.write(metadata); + long metadataStart = out.getCurrentPosition(); + out.writeIntLittleEndian(serializedMessage.remaining()); + out.write(serializedMessage); // Align the output to 8 byte boundary. out.align(); + long metadataSize = out.getCurrentPosition() - metadataStart; + long bufferStart = out.getCurrentPosition(); List buffers = batch.getBuffers(); List buffersLayout = batch.getBuffersLayout(); @@ -154,7 +130,7 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) " != " + startPosition + layout.getSize()); } } - return new ArrowBlock(batchStart, metadataSize, out.getCurrentPosition() - bufferStart); + return new ArrowBlock(start, (int) metadataSize, out.getCurrentPosition() - bufferStart); } /** @@ -162,23 +138,23 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) */ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, BufferAllocator alloc) throws IOException { - Message header = deserializeHeader(in, MessageHeader.RecordBatch); - if (header == null) return null; + Message message = deserializeMessage(in, MessageHeader.RecordBatch); + if (message == null) return null; - int messageLen = (int)header.bodyLength(); - // Now read the buffer. This has the metadata followed by the data. - ArrowBuf buffer = alloc.buffer(messageLen); - long readPosition = in.getCurrentPositiion(); - if (in.readFully(buffer, messageLen) != messageLen) { - throw new IOException("Unexpected end of input trying to read batch."); + if (message.bodyLength() > Integer.MAX_VALUE) { + throw new IOException("Cannot currently deserialize record batches over 2GB"); } - // Read the length of the metadata. - int metadataLen = buffer.readInt(); - buffer = buffer.slice(4, messageLen - 4); - readPosition += 4; - messageLen -= 4; - return deserializeRecordBatch(buffer, readPosition, metadataLen, messageLen); + RecordBatch recordBatchFB = (RecordBatch) message.header(new RecordBatch()); + + int bodyLength = (int) message.bodyLength(); + + // Now read the record batch body + ArrowBuf buffer = alloc.buffer(bodyLength); + if (in.readFully(buffer, bodyLength) != bodyLength) { + throw new IOException("Unexpected end of input trying to read batch."); + } + return deserializeRecordBatch(recordBatchFB, buffer); } /** @@ -188,41 +164,41 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock block, BufferAllocator alloc) throws IOException { long readPosition = in.getCurrentPositiion(); + + // Metadata length contains byte padding long totalLen = block.getMetadataLength() + block.getBodyLength(); - if ((readPosition + block.getMetadataLength()) % 8 != 0) { - // Compute padded size. - totalLen += (8 - (readPosition + block.getMetadataLength()) % 8); - } if (totalLen > Integer.MAX_VALUE) { throw new IOException("Cannot currently deserialize record batches over 2GB"); } - ArrowBuf buffer = alloc.buffer((int) totalLen); if (in.readFully(buffer, (int) totalLen) != totalLen) { throw new IOException("Unexpected end of input trying to read batch."); } - return deserializeRecordBatch(buffer, readPosition, block.getMetadataLength(), (int) totalLen); + return deserializeRecordBatch(buffer, block.getMetadataLength(), (int) totalLen); } // Deserializes a record batch. Buffer should start at the RecordBatch and include // all the bytes for the metadata and then data buffers. - private static ArrowRecordBatch deserializeRecordBatch( - ArrowBuf buffer, long readPosition, int metadataLen, int bufferLen) { + private static ArrowRecordBatch deserializeRecordBatch(ArrowBuf buffer, int metadataLen, + int bufferLen) { // Read the metadata. RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer()); int bufferOffset = metadataLen; - readPosition += bufferOffset; - if (readPosition % 8 != 0) { - bufferOffset += (int)(8 - readPosition % 8); - } // Now read the body final ArrowBuf body = buffer.slice(bufferOffset, bufferLen - bufferOffset); + return deserializeRecordBatch(recordBatchFB, body); + } + + // Deserializes a record batch given the Flatbuffer metadata and in-memory body + private static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, + ArrowBuf body) { + // Now read the body int nodesLength = recordBatchFB.nodesLength(); List nodes = new ArrayList<>(); for (int i = 0; i < nodesLength; ++i) { @@ -237,43 +213,44 @@ private static ArrowRecordBatch deserializeRecordBatch( } ArrowRecordBatch arrowRecordBatch = new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers); - buffer.release(); + body.release(); return arrowRecordBatch; } /** * Serializes a message header. */ - private static ByteBuffer serializeHeader(byte headerType, int bodyLength) { - FlatBufferBuilder headerBuilder = new FlatBufferBuilder(); - Message.startMessage(headerBuilder); - Message.addHeaderType(headerBuilder, headerType); - Message.addVersion(headerBuilder, MetadataVersion.V1); - Message.addBodyLength(headerBuilder, bodyLength); - headerBuilder.finish(Message.endMessage(headerBuilder)); - return headerBuilder.dataBuffer(); + private static ByteBuffer serializeMessage(FlatBufferBuilder builder, byte headerType, + int headerOffset, int bodyLength) { + Message.startMessage(builder); + Message.addHeaderType(builder, headerType); + Message.addHeader(builder, headerOffset); + Message.addVersion(builder, MetadataVersion.V1); + Message.addBodyLength(builder, bodyLength); + builder.finish(Message.endMessage(builder)); + return builder.dataBuffer(); } - private static Message deserializeHeader(ReadChannel in, byte headerType) throws IOException { - // Read the header size. There is an i32 little endian prefix. + private static Message deserializeMessage(ReadChannel in, byte headerType) throws IOException { + // Read the message size. There is an i32 little endian prefix. ByteBuffer buffer = ByteBuffer.allocate(4); if (in.readFully(buffer) != 4) { return null; } - int headerLength = bytesToInt(buffer.array()); - buffer = ByteBuffer.allocate(headerLength); - if (in.readFully(buffer) != headerLength) { + int messageLength = bytesToInt(buffer.array()); + buffer = ByteBuffer.allocate(messageLength); + if (in.readFully(buffer) != messageLength) { throw new IOException( - "Unexpected end of stream trying to read header."); + "Unexpected end of stream trying to read message."); } buffer.rewind(); - Message header = Message.getRootAsMessage(buffer); - if (header.headerType() != headerType) { + Message message = Message.getRootAsMessage(buffer); + if (message.headerType() != headerType) { throw new IOException("Invalid message: expecting " + headerType + - ". Message contained: " + header.headerType()); + ". Message contained: " + message.headerType()); } - return header; + return message; } } From 664d5bea52fa4d69f42c401e1140eb5463774ee2 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 20 Jan 2017 11:27:15 -0500 Subject: [PATCH 5/7] Fixes, stream tests pass again Change-Id: I2571b4ec6b753a4e207c7dbbd2059b7c2bfc0be2 --- .../vector/stream/MessageSerializer.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index 4f487b9ac586b..0302f0d022342 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -104,15 +104,20 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.RecordBatch, batchOffset, bodyLength); - long metadataStart = out.getCurrentPosition(); - out.writeIntLittleEndian(serializedMessage.remaining()); + int metadataLength = serializedMessage.remaining(); + + // Add extra padding bytes so that length prefix + metadata is a multiple + // of 8 after alignment + if ((metadataLength + 4) % 8 != 0) { + metadataLength += 8 - (metadataLength + 4) % 8; + } + + out.writeIntLittleEndian(metadataLength); out.write(serializedMessage); // Align the output to 8 byte boundary. out.align(); - long metadataSize = out.getCurrentPosition() - metadataStart; - long bufferStart = out.getCurrentPosition(); List buffers = batch.getBuffers(); List buffersLayout = batch.getBuffersLayout(); @@ -130,7 +135,8 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) " != " + startPosition + layout.getSize()); } } - return new ArrowBlock(start, (int) metadataSize, out.getCurrentPosition() - bufferStart); + // Metadata size in the Block account for the size prefix + return new ArrowBlock(start, metadataLength + 4, out.getCurrentPosition() - bufferStart); } /** @@ -165,7 +171,7 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock BufferAllocator alloc) throws IOException { long readPosition = in.getCurrentPositiion(); - // Metadata length contains byte padding + // Metadata length contains integer prefix plus byte padding long totalLen = block.getMetadataLength() + block.getBodyLength(); if (totalLen > Integer.MAX_VALUE) { @@ -177,22 +183,21 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock throw new IOException("Unexpected end of input trying to read batch."); } - return deserializeRecordBatch(buffer, block.getMetadataLength(), (int) totalLen); - } + ArrowBuf metadataBuffer = buffer.slice(4, block.getMetadataLength() - 4); - // Deserializes a record batch. Buffer should start at the RecordBatch and include - // all the bytes for the metadata and then data buffers. - private static ArrowRecordBatch deserializeRecordBatch(ArrowBuf buffer, int metadataLen, - int bufferLen) { // Read the metadata. RecordBatch recordBatchFB = - RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer()); - - int bufferOffset = metadataLen; + RecordBatch.getRootAsRecordBatch(metadataBuffer.nioBuffer().asReadOnlyBuffer()); // Now read the body - final ArrowBuf body = buffer.slice(bufferOffset, bufferLen - bufferOffset); - return deserializeRecordBatch(recordBatchFB, body); + final ArrowBuf body = buffer.slice(block.getMetadataLength(), + (int) totalLen - block.getMetadataLength()); + ArrowRecordBatch result = deserializeRecordBatch(recordBatchFB, body); + + metadataBuffer.release(); + buffer.release(); + + return result; } // Deserializes a record batch given the Flatbuffer metadata and in-memory body From e3af4347f099a6f7e5d472c1e3bcf3d5492f2be9 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 20 Jan 2017 11:30:32 -0500 Subject: [PATCH 6/7] Remove unused variable Change-Id: I2ca87b9e944ce9613f63cee7af81f5556a67b5e8 --- .../java/org/apache/arrow/vector/stream/MessageSerializer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index 0302f0d022342..02bfd6b09755c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -169,8 +169,6 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, */ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock block, BufferAllocator alloc) throws IOException { - long readPosition = in.getCurrentPositiion(); - // Metadata length contains integer prefix plus byte padding long totalLen = block.getMetadataLength() + block.getBodyLength(); From 18890a9abbb70e776f7c987f84bf38d6b4458730 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 20 Jan 2017 14:26:32 -0500 Subject: [PATCH 7/7] Message fixes. Fix Java test suite. Integration tests pass --- integration/integration_test.py | 2 +- .../arrow/vector/stream/MessageSerializer.java | 14 ++++++-------- .../arrow/vector/file/TestArrowReaderWriter.java | 6 ++++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/integration/integration_test.py b/integration/integration_test.py index 417354bc83d9e..77510daecc0b4 100644 --- a/integration/integration_test.py +++ b/integration/integration_test.py @@ -648,7 +648,7 @@ def get_static_json_files(): def run_all_tests(debug=False): - testers = [JavaTester(debug=debug), CPPTester(debug=debug)] + testers = [CPPTester(debug=debug), JavaTester(debug=debug)] static_json_files = get_static_json_files() generated_json_files = get_generated_json_files() json_files = static_json_files + generated_json_files diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index 02bfd6b09755c..6e22dbd164d6e 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -108,8 +108,8 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) // Add extra padding bytes so that length prefix + metadata is a multiple // of 8 after alignment - if ((metadataLength + 4) % 8 != 0) { - metadataLength += 8 - (metadataLength + 4) % 8; + if ((start + metadataLength + 4) % 8 != 0) { + metadataLength += 8 - (start + metadataLength + 4) % 8; } out.writeIntLittleEndian(metadataLength); @@ -183,18 +183,16 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock ArrowBuf metadataBuffer = buffer.slice(4, block.getMetadataLength() - 4); - // Read the metadata. - RecordBatch recordBatchFB = - RecordBatch.getRootAsRecordBatch(metadataBuffer.nioBuffer().asReadOnlyBuffer()); + Message messageFB = + Message.getRootAsMessage(metadataBuffer.nioBuffer().asReadOnlyBuffer()); + + RecordBatch recordBatchFB = (RecordBatch) messageFB.header(new RecordBatch()); // Now read the body final ArrowBuf body = buffer.slice(block.getMetadataLength(), (int) totalLen - block.getMetadataLength()); ArrowRecordBatch result = deserializeRecordBatch(recordBatchFB, body); - metadataBuffer.release(); - buffer.release(); - return result; } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java index f41866435f8d6..96bcbb1dae71c 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java @@ -30,6 +30,7 @@ import java.util.List; import org.apache.arrow.flatbuf.FieldNode; +import org.apache.arrow.flatbuf.Message; import org.apache.arrow.flatbuf.RecordBatch; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -103,8 +104,9 @@ public void test() throws IOException { // deserialize the buffer. ByteBuffer headerBuffer = ByteBuffer.allocate(recordBatches.get(0).getMetadataLength()); headerBuffer.put(byteArray, (int)recordBatches.get(0).getOffset(), headerBuffer.capacity()); - headerBuffer.rewind(); - RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(headerBuffer); + headerBuffer.position(4); + Message messageFB = Message.getRootAsMessage(headerBuffer); + RecordBatch recordBatchFB = (RecordBatch) messageFB.header(new RecordBatch()); assertEquals(2, recordBatchFB.buffersLength()); assertEquals(1, recordBatchFB.nodesLength()); FieldNode nodeFB = recordBatchFB.nodes(0);