Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-10962: [FlightRPC][Java] fill in empty body buffer if needed #8963

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,34 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
// ignore unknown fields.
}
}

// Protobuf implementations can omit empty fields, such as body; for some message types, like RecordBatch,
// this will fail later as we still expect an empty buffer. In those cases only, fill in an empty buffer here -
// in other cases, like Schema, having an unexpected empty buffer will also cause failures.
// We don't fill in defaults for fields like header, for which there is no reasonable default, or for appMetadata
// or descriptor, which are intended to be empty in some cases.
if (header != null) {
switch (HeaderType.getHeader(header.headerType())) {
case SCHEMA:
// Ignore 0-length buffers in case a Protobuf implementation wrote it out
if (body != null && body.capacity() == 0) {
body.close();
body = null;
}
break;
case DICTIONARY_BATCH:
case RECORD_BATCH:
// A Protobuf implementation can skip 0-length bodies, so ensure we fill it in here
if (body == null) {
body = allocator.getEmpty();
}
break;
case NONE:
case TENSOR:
default:
// Do nothing
break;
}
}
return new ArrowMessage(descriptor, header, appMetadata, body);
} catch (Exception ioe) {
throw new RuntimeException(ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.arrow.flight;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
Expand All @@ -33,12 +37,16 @@
import org.apache.arrow.flight.FlightClient.ClientStreamListener;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.Flight.FlightDescriptor.DescriptorType;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -48,6 +56,8 @@
import com.google.common.base.Charsets;
import com.google.protobuf.ByteString;

import io.grpc.MethodDescriptor;

/**
* Test the operations of a basic flight service.
*/
Expand Down Expand Up @@ -317,6 +327,85 @@ private void test(BiConsumer<FlightClient, BufferAllocator> consumer) throws Exc
}
}

/** Helper method to convert an ArrowMessage into a Protobuf message. */
private Flight.FlightData arrowMessageToProtobuf(
MethodDescriptor.Marshaller<ArrowMessage> marshaller, ArrowMessage message) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final InputStream serialized = marshaller.stream(message)) {
final byte[] buf = new byte[1024];
while (true) {
int read = serialized.read(buf);
if (read < 0) {
break;
}
baos.write(buf, 0, read);
}
}
final byte[] serializedMessage = baos.toByteArray();
return Flight.FlightData.parseFrom(serializedMessage);
}

/** ARROW-10962: accept FlightData messages generated by Protobuf (which can omit empty fields). */
@Test
public void testProtobufRecordBatchCompatibility() throws Exception {
final Schema schema = new Schema(Collections.singletonList(Field.nullable("foo", new ArrowType.Int(32, true))));
try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
final VectorUnloader unloader = new VectorUnloader(root);
root.setRowCount(0);
final MethodDescriptor.Marshaller<ArrowMessage> marshaller = ArrowMessage.createMarshaller(allocator);
try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), null, new IpcOption())) {
Assert.assertEquals(ArrowMessage.HeaderType.RECORD_BATCH, message.getMessageType());
// Should have at least one empty body buffer (there may be multiple for e.g. data and validity)
Iterator<ArrowBuf> iterator = message.getBufs().iterator();
Assert.assertTrue(iterator.hasNext());
while (iterator.hasNext()) {
Assert.assertEquals(0, iterator.next().capacity());
}
final Flight.FlightData protobufData = arrowMessageToProtobuf(marshaller, message)
.toBuilder()
.clearDataBody()
.build();
Assert.assertEquals(0, protobufData.getDataBody().size());
ArrowMessage parsedMessage = marshaller.parse(new ByteArrayInputStream(protobufData.toByteArray()));
// Should have an empty body buffer
Iterator<ArrowBuf> parsedIterator = parsedMessage.getBufs().iterator();
Assert.assertTrue(parsedIterator.hasNext());
Assert.assertEquals(0, parsedIterator.next().capacity());
// Should have only one (the parser synthesizes exactly one); in the case of empty buffers, this is equivalent
Assert.assertFalse(parsedIterator.hasNext());
// Should not throw
final ArrowRecordBatch rb = parsedMessage.asRecordBatch();
Assert.assertEquals(rb.computeBodyLength(), 0);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do you think we also need a test case for which a schema has an empty body?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I added a test for this as well.

}

/** ARROW-10962: accept FlightData messages generated by Protobuf (which can omit empty fields). */
@Test
public void testProtobufSchemaCompatibility() throws Exception {
final Schema schema = new Schema(Collections.singletonList(Field.nullable("foo", new ArrowType.Int(32, true))));
try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) {
final MethodDescriptor.Marshaller<ArrowMessage> marshaller = ArrowMessage.createMarshaller(allocator);
Flight.FlightDescriptor descriptor = FlightDescriptor.command(new byte[0]).toProtocol();
try (final ArrowMessage message = new ArrowMessage(descriptor, schema, new IpcOption())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the message body here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added assertions here.

Assert.assertEquals(ArrowMessage.HeaderType.SCHEMA, message.getMessageType());
// Should have no body buffers
Assert.assertFalse(message.getBufs().iterator().hasNext());
final Flight.FlightData protobufData = arrowMessageToProtobuf(marshaller, message)
.toBuilder()
.setDataBody(ByteString.EMPTY)
.build();
Assert.assertEquals(0, protobufData.getDataBody().size());
final ArrowMessage parsedMessage = marshaller.parse(new ByteArrayInputStream(protobufData.toByteArray()));
// Should have no body buffers
Assert.assertFalse(parsedMessage.getBufs().iterator().hasNext());
// Should not throw
parsedMessage.asSchema();
}
}
}

/**
* An example FlightProducer for test purposes.
*/
Expand Down