Skip to content

Commit

Permalink
ARROW-10962: [FlightRPC][Java] fill in empty body buffer if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Dec 18, 2020
1 parent 519e9da commit bcaa599
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,34 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
// ignore unknown fields.
}
}

// Protobuf implementations can omit empty fields, such as body; for some message types, like RecordBatch,
// this will fail later as we still expect an empty buffer. In those cases only, fill in an empty buffer here -
// in other cases, like Schema, having an unexpected empty buffer will also cause failures.
// We don't fill in defaults for fields like header, for which there is no reasonable default, or for appMetadata
// or descriptor, which are intended to be empty in some cases.
if (header != null) {
switch (HeaderType.getHeader(header.headerType())) {
case SCHEMA:
// Ignore 0-length buffers in case a Protobuf implementation wrote it out
if (body != null && body.capacity() == 0) {
body.close();
body = null;
}
break;
case DICTIONARY_BATCH:
case RECORD_BATCH:
// A Protobuf implementation can skip 0-length bodies, so ensure we fill it in here
if (body == null) {
body = allocator.getEmpty();
}
break;
case NONE:
case TENSOR:
default:
// Do nothing
break;
}
}
return new ArrowMessage(descriptor, header, appMetadata, body);
} catch (Exception ioe) {
throw new RuntimeException(ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.arrow.flight;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
Expand All @@ -39,6 +42,9 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -48,6 +54,8 @@
import com.google.common.base.Charsets;
import com.google.protobuf.ByteString;

import io.grpc.MethodDescriptor;

/**
* Test the operations of a basic flight service.
*/
Expand Down Expand Up @@ -317,6 +325,39 @@ private void test(BiConsumer<FlightClient, BufferAllocator> consumer) throws Exc
}
}

/** ARROW-10939: accept FlightData messages generated by Protobuf (which can omit empty fields). */
@Test
public void testProtobufCompat() throws Exception {
final Schema schema = new Schema(Collections.singletonList(Field.nullable("foo", new ArrowType.Int(32, true))));
try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
final VectorUnloader unloader = new VectorUnloader(root);
root.setRowCount(0);
final MethodDescriptor.Marshaller<ArrowMessage> marshaller = ArrowMessage.createMarshaller(allocator);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), null, new IpcOption())) {
Assert.assertEquals(ArrowMessage.HeaderType.RECORD_BATCH, message.getMessageType());
try (final InputStream serialized = marshaller.stream(message)) {
final byte[] buf = new byte[1024];
while (true) {
int read = serialized.read(buf);
if (read < 0) {
break;
}
baos.write(buf, 0, read);
}
}
}
final byte[] serializedMessage = baos.toByteArray();
final Flight.FlightData protobufData = Flight.FlightData.parseFrom(serializedMessage);
Assert.assertEquals(0, protobufData.getDataBody().size());
// Should not throw
final ArrowRecordBatch rb =
marshaller.parse(new ByteArrayInputStream(protobufData.toByteArray())).asRecordBatch();
Assert.assertEquals(rb.computeBodyLength(), 0);
}
}

/**
* An example FlightProducer for test purposes.
*/
Expand Down

0 comments on commit bcaa599

Please sign in to comment.