Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-499: Update file serialization to use the streaming serialization format. #292

Closed
wants to merge 8 commits into from
5 changes: 1 addition & 4 deletions format/File.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ struct Block {

offset: long;

metaDataLength: int;

bodyLength: long;

length: int;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with combining the metadata and body length is that it makes it difficult / impossible to do partial reads / field projections in a record batch. If you can inspect the metadata without reading the body, you can determine the byte ranges you need to read only a certain subset of fields.

}

root_type Footer;
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,32 @@
public class ArrowBlock implements FBSerializable {

private final long offset;
private final int metadataLength;
private final long bodyLength;
private final int length;

public ArrowBlock(long offset, int metadataLength, long bodyLength) {
public ArrowBlock(long offset, int length) {
super();
this.offset = offset;
this.metadataLength = metadataLength;
this.bodyLength = bodyLength;
this.length = length;
}

public long getOffset() {
return offset;
}

public int getMetadataLength() {
return metadataLength;
}

public long getBodyLength() {
return bodyLength;
public int getLength() {
return length;
}

@Override
public int writeTo(FlatBufferBuilder builder) {
return Block.createBlock(builder, offset, metadataLength, bodyLength);
return Block.createBlock(builder, offset, length);
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (bodyLength ^ (bodyLength >>> 32));
result = prime * result + metadataLength;
result = prime * result + (length ^ (length >>> 32));
result = prime * result + (int) (offset ^ (offset >>> 32));
return result;
}
Expand All @@ -71,9 +64,7 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass())
return false;
ArrowBlock other = (ArrowBlock) obj;
if (bodyLength != other.bodyLength)
return false;
if (metadataLength != other.metadataLength)
if (length != other.length)
return false;
if (offset != other.offset)
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,23 @@ public ArrowFooter(Footer footer) {
private static List<ArrowBlock> recordBatches(Footer footer) {
List<ArrowBlock> recordBatches = new ArrayList<>();
Block tempBlock = new Block();

int recordBatchesLength = footer.recordBatchesLength();
for (int i = 0; i < recordBatchesLength; i++) {
Block block = footer.recordBatches(tempBlock, i);
recordBatches.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
recordBatches.add(new ArrowBlock(block.offset(), block.length()));
}
return recordBatches;
}

private static List<ArrowBlock> dictionaries(Footer footer) {
List<ArrowBlock> dictionaries = new ArrayList<>();
Block tempBLock = new Block();
Block tempBlock = new Block();

int dictionariesLength = footer.dictionariesLength();
for (int i = 0; i < dictionariesLength; i++) {
Block block = footer.dictionaries(tempBLock, i);
dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
Block block = footer.dictionaries(tempBlock, i);
dictionaries.add(new ArrowBlock(block.offset(), block.length()));
}
return dictionaries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,15 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.arrow.flatbuf.Buffer;
import org.apache.arrow.flatbuf.FieldNode;
import org.apache.arrow.flatbuf.Footer;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.stream.MessageSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ArrowBuf;

public class ArrowReader implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowReader.class);

Expand All @@ -54,15 +46,6 @@ public ArrowReader(SeekableByteChannel in, BufferAllocator allocator) {
this.allocator = allocator;
}

private int readFully(ArrowBuf buffer, int l) throws IOException {
int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l));
buffer.writerIndex(n);
if (n != l) {
throw new IllegalStateException(n + " != " + l);
}
return n;
}

private int readFully(ByteBuffer buffer) throws IOException {
int total = 0;
int n;
Expand Down Expand Up @@ -104,46 +87,20 @@ public ArrowFooter readFooter() throws IOException {

// TODO: read dictionaries

public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOException {
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", recordBatchBlock.getOffset(), recordBatchBlock.getMetadataLength(), recordBatchBlock.getBodyLength()));
int l = (int)(recordBatchBlock.getMetadataLength() + recordBatchBlock.getBodyLength());
if (l < 0) {
throw new InvalidArrowFileException("block invalid: " + recordBatchBlock);
}
final ArrowBuf buffer = allocator.buffer(l);
LOGGER.debug("allocated buffer " + buffer);
in.position(recordBatchBlock.getOffset());
int n = readFully(buffer, l);
if (n != l) {
throw new IllegalStateException(n + " != " + l);
}

// Record batch flatbuffer is prefixed by its size as int32le
final ArrowBuf metadata = buffer.slice(4, recordBatchBlock.getMetadataLength() - 4);
RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(metadata.nioBuffer().asReadOnlyBuffer());

int nodesLength = recordBatchFB.nodesLength();
final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength());
List<ArrowFieldNode> nodes = new ArrayList<>();
for (int i = 0; i < nodesLength; ++i) {
FieldNode node = recordBatchFB.nodes(i);
nodes.add(new ArrowFieldNode(node.length(), node.nullCount()));
public ArrowRecordBatch readRecordBatch(ArrowBlock block) throws IOException {
LOGGER.debug(String.format("RecordBatch at offset %d len: %d",
block.getOffset(), block.getLength()));
in.position(block.getOffset());
ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(
new ReadChannel(in, block.getOffset()), (int)block.getLength(), allocator);
if (batch == null) {
throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
}
List<ArrowBuf> buffers = new ArrayList<>();
for (int i = 0; i < recordBatchFB.buffersLength(); ++i) {
Buffer bufferFB = recordBatchFB.buffers(i);
LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", bufferFB.offset(), bufferFB.length()));
ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), (int)bufferFB.length());
buffers.add(vectorBuffer);
}
ArrowRecordBatch arrowRecordBatch = new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
LOGGER.debug("released buffer " + buffer);
buffer.release();
return arrowRecordBatch;
return batch;
}

@Override
public void close() throws IOException {
in.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import java.util.Collections;
import java.util.List;

import org.apache.arrow.vector.schema.ArrowBuffer;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.stream.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ArrowBuf;

public class ArrowWriter implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class);

Expand All @@ -39,7 +37,6 @@ public class ArrowWriter implements AutoCloseable {
private final Schema schema;

private final List<ArrowBlock> recordBatches = new ArrayList<>();

private boolean started = false;

public ArrowWriter(WritableByteChannel out, Schema schema) {
Expand All @@ -49,47 +46,17 @@ public ArrowWriter(WritableByteChannel out, Schema schema) {

private void start() throws IOException {
writeMagic();
MessageSerializer.serialize(out, schema);
}


// TODO: write dictionaries

public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
checkStarted();
out.align();

// write metadata header with int32 size prefix
long offset = out.getCurrentPosition();
out.write(recordBatch, true);
out.align();
// write body
long bodyOffset = out.getCurrentPosition();
List<ArrowBuf> buffers = recordBatch.getBuffers();
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
if (buffers.size() != buffersLayout.size()) {
throw new IllegalStateException("the layout does not match: " + buffers.size() + " != " + buffersLayout.size());
}
for (int i = 0; i < buffers.size(); i++) {
ArrowBuf buffer = buffers.get(i);
ArrowBuffer layout = buffersLayout.get(i);
long startPosition = bodyOffset + layout.getOffset();
if (startPosition != out.getCurrentPosition()) {
out.writeZeros((int)(startPosition - out.getCurrentPosition()));
}

out.write(buffer);
if (out.getCurrentPosition() != startPosition + layout.getSize()) {
throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() + " != " + startPosition + layout.getSize());
}
}
int metadataLength = (int)(bodyOffset - offset);
if (metadataLength <= 0) {
throw new InvalidArrowFileException("invalid recordBatch");
}
long bodyLength = out.getCurrentPosition() - bodyOffset;
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", offset, metadataLength, bodyLength));
// add metadata to footer
recordBatches.add(new ArrowBlock(offset, metadataLength, bodyLength));
ArrowBlock batchDesc = MessageSerializer.serialize(out, recordBatch);
LOGGER.debug(String.format("RecordBatch at offset: %d len: %d",
batchDesc.getOffset(), batchDesc.getLength()));
recordBatches.add(batchDesc);
}

private void checkStarted() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ public class ReadChannel implements AutoCloseable {

private ReadableByteChannel in;
private long bytesRead = 0;
// The starting byte offset into 'in'.
private final long startByteOffset;

public ReadChannel(ReadableByteChannel in) {
public ReadChannel(ReadableByteChannel in, long startByteOffset) {
this.in = in;
this.startByteOffset = startByteOffset;
}

public ReadChannel(ReadableByteChannel in) {
this(in, 0);
}

public long bytesRead() { return bytesRead; }
Expand Down Expand Up @@ -65,6 +72,8 @@ public int readFully(ArrowBuf buffer, int l) throws IOException {
return n;
}

public long getCurrentPositiion() { return startByteOffset + bytesRead; }

@Override
public void close() throws IOException {
if (this.in != null) {
Expand Down
Loading