Skip to content

Commit

Permalink
apacheGH-38022: [Java][FlightRPC] Expose app_metadata on FlightInfo a…
Browse files Browse the repository at this point in the history
…nd FlightEndpoint (apache#38331)

Making necessary changes in Java to expose the newly added app_metadata.
* Closes: apache#38022

Authored-by: Diego Fernandez <aiguo.fernandez@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
aiguofer authored Oct 26, 2023
1 parent c46fd24 commit 57f643c
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 10 deletions.
2 changes: 1 addition & 1 deletion dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
Scenario(
"app_metadata_flight_info_endpoint",
description="Ensure support FlightInfo and Endpoint app_metadata",
skip_testers={"JS", "C#", "Rust", "Java"}
skip_testers={"JS", "C#", "Rust"}
),
Scenario(
"flight_sql",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import org.apache.arrow.flight.impl.Flight;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;

/**
Expand All @@ -39,6 +41,7 @@ public class FlightEndpoint {
private final List<Location> locations;
private final Ticket ticket;
private final Instant expirationTime;
private final byte[] appMetadata;

/**
* Constructs a new endpoint with no expiration time.
Expand All @@ -54,13 +57,22 @@ public FlightEndpoint(Ticket ticket, Location... locations) {
* Constructs a new endpoint with an expiration time.
*
* @param ticket A ticket that describe the key of a data stream.
* @param expirationTime (optional) When this endpoint expires.
* @param locations The possible locations the stream can be retrieved from.
*/
public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locations) {
this(ticket, expirationTime, null, Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations))));
}

/**
* Private constructor with all parameters. Should only be called by Builder.
*/
private FlightEndpoint(Ticket ticket, Instant expirationTime, byte[] appMetadata, List<Location> locations) {
Objects.requireNonNull(ticket);
this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations)));
this.locations = locations;
this.expirationTime = expirationTime;
this.ticket = ticket;
this.appMetadata = appMetadata;
}

/**
Expand All @@ -77,6 +89,7 @@ public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locatio
} else {
this.expirationTime = null;
}
this.appMetadata = (flt.getAppMetadata().size() == 0 ? null : flt.getAppMetadata().toByteArray());
this.ticket = new Ticket(flt.getTicket());
}

Expand All @@ -92,6 +105,10 @@ public Optional<Instant> getExpirationTime() {
return Optional.ofNullable(expirationTime);
}

public byte[] getAppMetadata() {
return appMetadata;
}

/**
* Converts to the protocol buffer representation.
*/
Expand All @@ -111,6 +128,10 @@ Flight.FlightEndpoint toProtocol() {
.build());
}

if (appMetadata != null) {
b.setAppMetadata(ByteString.copyFrom(appMetadata));
}

return b.build();
}

Expand Down Expand Up @@ -148,12 +169,13 @@ public boolean equals(Object o) {
FlightEndpoint that = (FlightEndpoint) o;
return locations.equals(that.locations) &&
ticket.equals(that.ticket) &&
Objects.equals(expirationTime, that.expirationTime);
Objects.equals(expirationTime, that.expirationTime) &&
Arrays.equals(appMetadata, that.appMetadata);
}

@Override
public int hashCode() {
return Objects.hash(locations, ticket, expirationTime);
return Objects.hash(locations, ticket, expirationTime, Arrays.hashCode(appMetadata));
}

@Override
Expand All @@ -162,6 +184,59 @@ public String toString() {
"locations=" + locations +
", ticket=" + ticket +
", expirationTime=" + (expirationTime == null ? "(none)" : expirationTime.toString()) +
", appMetadata=" + (appMetadata == null ? "(none)" : Base64.getEncoder().encodeToString(appMetadata)) +
'}';
}

/**
* Create a builder for FlightEndpoint.
*
* @param ticket A ticket that describe the key of a data stream.
* @param locations The possible locations the stream can be retrieved from.
*/
public static Builder builder(Ticket ticket, Location... locations) {
return new Builder(ticket, locations);
}

/**
* Builder for FlightEndpoint.
*/
public static final class Builder {
private final Ticket ticket;
private final List<Location> locations;
private Instant expirationTime = null;
private byte[] appMetadata = null;

private Builder(Ticket ticket, Location... locations) {
this.ticket = ticket;
this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations)));
}

/**
* Set expiration time for the endpoint. Default is null, which means don't expire.
*
* @param expirationTime (optional) When this endpoint expires.
*/
public Builder setExpirationTime(Instant expirationTime) {
this.expirationTime = expirationTime;
return this;
}

/**
* Set the app metadata to send along with the flight. Default is null;
*
* @param appMetadata Metadata to send along with the flight
*/
public Builder setAppMetadata(byte[] appMetadata) {
this.appMetadata = appMetadata;
return this;
}

/**
* Build FlightEndpoint object.
*/
public FlightEndpoint build() {
return new FlightEndpoint(ticket, expirationTime, appMetadata, locations);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -51,6 +53,7 @@ public class FlightInfo {
private final long records;
private final boolean ordered;
private final IpcOption option;
private final byte[] appMetadata;

/**
* Constructs a new instance.
Expand Down Expand Up @@ -94,6 +97,23 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoin
*/
public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes,
long records, boolean ordered, IpcOption option) {
this(schema, descriptor, endpoints, bytes, records, ordered, option, null);
}

/**
* Constructs a new instance.
*
* @param schema The schema of the Flight
* @param descriptor An identifier for the Flight.
* @param endpoints A list of endpoints that have the flight available.
* @param bytes The number of bytes in the flight
* @param records The number of records in the flight.
* @param ordered Whether the endpoints in this flight are ordered.
* @param option IPC write options.
* @param appMetadata Metadata to send along with the flight
*/
public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes,
long records, boolean ordered, IpcOption option, byte[] appMetadata) {
Objects.requireNonNull(descriptor);
Objects.requireNonNull(endpoints);
if (schema != null) {
Expand All @@ -106,6 +126,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoin
this.records = records;
this.ordered = ordered;
this.option = option;
this.appMetadata = appMetadata;
}

/**
Expand All @@ -131,6 +152,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoin
bytes = pbFlightInfo.getTotalBytes();
records = pbFlightInfo.getTotalRecords();
ordered = pbFlightInfo.getOrdered();
appMetadata = (pbFlightInfo.getAppMetadata().size() == 0 ? null : pbFlightInfo.getAppMetadata().toByteArray());
option = IpcOption.DEFAULT;
}

Expand Down Expand Up @@ -167,6 +189,10 @@ public boolean getOrdered() {
return ordered;
}

public byte[] getAppMetadata() {
return appMetadata;
}

/**
* Converts to the protocol buffer representation.
*/
Expand All @@ -189,6 +215,9 @@ Flight.FlightInfo toProtocol() {
throw new RuntimeException(e);
}
}
if (appMetadata != null) {
builder.setAppMetadata(ByteString.copyFrom(appMetadata));
}
return builder.build();
}

Expand Down Expand Up @@ -229,12 +258,13 @@ public boolean equals(Object o) {
schema.equals(that.schema) &&
descriptor.equals(that.descriptor) &&
endpoints.equals(that.endpoints) &&
ordered == that.ordered;
ordered == that.ordered &&
Arrays.equals(appMetadata, that.appMetadata);
}

@Override
public int hashCode() {
return Objects.hash(schema, descriptor, endpoints, bytes, records, ordered);
return Objects.hash(schema, descriptor, endpoints, bytes, records, ordered, Arrays.hashCode(appMetadata));
}

@Override
Expand All @@ -246,6 +276,95 @@ public String toString() {
", bytes=" + bytes +
", records=" + records +
", ordered=" + ordered +
", appMetadata=" + (appMetadata == null ? "(none)" : Base64.getEncoder().encodeToString(appMetadata)) +
'}';
}

/**
* Create a builder for FlightInfo.
*
* @param schema The schema of the Flight
* @param descriptor An identifier for the Flight.
* @param endpoints A list of endpoints that have the flight available.
*/
public static Builder builder(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints) {
return new Builder(schema, descriptor, endpoints);
}

/**
* Builder for FlightInfo.
*/
public static final class Builder {
private final Schema schema;
private final FlightDescriptor descriptor;
private final List<FlightEndpoint> endpoints;
private long bytes = -1;
private long records = -1;
private boolean ordered = false;
private IpcOption option = IpcOption.DEFAULT;
private byte[] appMetadata = null;

private Builder(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints) {
this.schema = schema;
this.descriptor = descriptor;
this.endpoints = endpoints;
}

/**
* Set the number of bytes for the flight. Default to -1 for unknown.
*
* @param bytes The number of bytes in the flight
*/
public Builder setBytes(long bytes) {
this.bytes = bytes;
return this;
}

/**
* Set the number of records for the flight. Default to -1 for unknown.
*
* @param records The number of records in the flight.
*/
public Builder setRecords(long records) {
this.records = records;
return this;
}

/**
* Set whether the flight endpoints are ordered. Default is false.
*
* @param ordered Whether the endpoints in this flight are ordered.
*/
public Builder setOrdered(boolean ordered) {
this.ordered = ordered;
return this;
}

/**
* Set IPC write options. Default is IpcOption.DEFAULT
*
* @param option IPC write options.
*/
public Builder setOption(IpcOption option) {
this.option = option;
return this;
}

/**
* Set the app metadata to send along with the flight. Default is null.
*
* @param appMetadata Metadata to send along with the flight
*/
public Builder setAppMetadata(byte[] appMetadata) {
this.appMetadata = appMetadata;
return this;
}

/**
* Build FlightInfo object.
*/
public FlightInfo build() {
return new FlightInfo(schema, descriptor, endpoints, bytes, records, ordered, option, appMetadata);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,13 @@ public void roundTripInfo() throws Exception {
Field.nullable("a", new ArrowType.Int(32, true)),
Field.nullable("b", new ArrowType.FixedSizeBinary(32))
), metadata);
final FlightInfo info1 = new FlightInfo(schema, FlightDescriptor.path(), Collections.emptyList(), -1, -1);
final FlightInfo info1 = FlightInfo.builder(schema, FlightDescriptor.path(), Collections.emptyList())
.setAppMetadata("foo".getBytes()).build();
final FlightInfo info2 = new FlightInfo(schema, FlightDescriptor.command(new byte[2]),
Collections.singletonList(new FlightEndpoint(
new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock"))), 200, 500);
Collections.singletonList(
FlightEndpoint.builder(new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock"))
.setAppMetadata("bar".getBytes()).build()
), 200, 500);
final FlightInfo info3 = new FlightInfo(schema, FlightDescriptor.path("a", "b"),
Arrays.asList(new FlightEndpoint(
new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -137,7 +138,8 @@ public void supportsNullSchemas() throws Exception
@Override
public FlightInfo getFlightInfo(CallContext context,
FlightDescriptor descriptor) {
return new FlightInfo(null, descriptor, Collections.emptyList(), 0, 0);
return new FlightInfo(null, descriptor, Collections.emptyList(),
0, 0, false, IpcOption.DEFAULT, "foo".getBytes());
}
};

Expand All @@ -147,6 +149,7 @@ public FlightInfo getFlightInfo(CallContext context,
FlightInfo flightInfo = client.getInfo(FlightDescriptor.path("test"));
Assertions.assertEquals(Optional.empty(), flightInfo.getSchemaOptional());
Assertions.assertEquals(new Schema(Collections.emptyList()), flightInfo.getSchema());
Assertions.assertArrayEquals(flightInfo.getAppMetadata(), "foo".getBytes());

Exception e = Assertions.assertThrows(
FlightRuntimeException.class,
Expand Down
Loading

0 comments on commit 57f643c

Please sign in to comment.