Skip to content

Commit

Permalink
Update Arrow Flight Module with Arrow 7.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
vfraga committed Nov 1, 2021
1 parent b5907de commit cc8bd8a
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 19 deletions.
2 changes: 1 addition & 1 deletion services/arrow-flight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql</artifactId>
<version>6.0.0-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.sql.FlightSqlProducer;
import org.apache.arrow.flight.sql.FlightSqlUtils;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCrossReference;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.types.pojo.Schema;

Expand Down Expand Up @@ -129,7 +130,7 @@ private void getStreamLegacy(CallContext callContext, Ticket ticket, ServerStrea

@Override
public void getStreamPreparedStatement(CommandPreparedStatementQuery commandPreparedStatementQuery,
CallContext callContext, Ticket ticket,
CallContext callContext,
ServerStreamListener serverStreamListener) {
UserProtos.PreparedStatementHandle preparedStatementHandle;
try {
Expand Down Expand Up @@ -286,7 +287,6 @@ public SchemaResult getSchemaStatement(
@Override
public void getStreamStatement(TicketStatementQuery ticketStatementQuery,
CallContext callContext,
Ticket ticket,
ServerStreamListener serverStreamListener) {
try {
final UserProtos.PreparedStatementHandle preparedStatementHandle =
Expand Down Expand Up @@ -333,7 +333,7 @@ public FlightInfo getFlightInfoSqlInfo(CommandGetSqlInfo commandGetSqlInfo,

@Override
public void getStreamSqlInfo(CommandGetSqlInfo commandGetSqlInfo,
CallContext callContext, Ticket ticket,
CallContext callContext,
ServerStreamListener serverStreamListener) {
throw CallStatus.UNIMPLEMENTED.withDescription("CommandGetSqlInfo not supported.").toRuntimeException();
}
Expand All @@ -347,7 +347,7 @@ public FlightInfo getFlightInfoCatalogs(
}

@Override
public void getStreamCatalogs(CallContext callContext, Ticket ticket,
public void getStreamCatalogs(CallContext callContext,
ServerStreamListener serverStreamListener) {
final UserSession session = getUserSessionFromCallContext(callContext);

Expand All @@ -364,7 +364,7 @@ public FlightInfo getFlightInfoSchemas(CommandGetSchemas commandGetSchemas,

@Override
public void getStreamSchemas(CommandGetSchemas commandGetSchemas,
CallContext callContext, Ticket ticket,
CallContext callContext,
ServerStreamListener serverStreamListener) {
final UserSession session = getUserSessionFromCallContext(callContext);

Expand Down Expand Up @@ -392,7 +392,7 @@ public FlightInfo getFlightInfoTables(CommandGetTables commandGetTables,

@Override
public void getStreamTables(CommandGetTables commandGetTables,
CallContext callContext, Ticket ticket,
CallContext callContext,
ServerStreamListener serverStreamListener) {
final UserSession session = getUserSessionFromCallContext(callContext);

Expand All @@ -410,7 +410,7 @@ public FlightInfo getFlightInfoTableTypes(
}

@Override
public void getStreamTableTypes(CallContext callContext, Ticket ticket,
public void getStreamTableTypes(CallContext callContext,
ServerStreamListener serverStreamListener) {
flightWorkManager.runGetTablesTypes(serverStreamListener,
allocator);
Expand All @@ -426,7 +426,7 @@ public FlightInfo getFlightInfoPrimaryKeys(

@Override
public void getStreamPrimaryKeys(CommandGetPrimaryKeys commandGetPrimaryKeys,
CallContext callContext, Ticket ticket,
CallContext callContext,
ServerStreamListener serverStreamListener) {
throw CallStatus.UNIMPLEMENTED.withDescription("CommandGetPrimaryKeys not supported.").toRuntimeException();
}
Expand All @@ -435,14 +435,14 @@ public void getStreamPrimaryKeys(CommandGetPrimaryKeys commandGetPrimaryKeys,
public FlightInfo getFlightInfoExportedKeys(
CommandGetExportedKeys commandGetExportedKeys,
CallContext callContext, FlightDescriptor flightDescriptor) {
final Schema schema = Schemas.GET_IMPORTED_AND_EXPORTED_KEYS_SCHEMA;
final Schema schema = Schemas.GET_EXPORTED_KEYS_SCHEMA;
return new FlightInfo(schema, flightDescriptor, Collections.emptyList(), -1, -1);
}

@Override
public void getStreamExportedKeys(
CommandGetExportedKeys commandGetExportedKeys,
CallContext callContext, Ticket ticket,
CallContext callContext,
ServerStreamListener serverStreamListener) {
throw CallStatus.UNIMPLEMENTED.withDescription("CommandGetExportedKeys not supported.").toRuntimeException();
}
Expand All @@ -451,18 +451,33 @@ public void getStreamExportedKeys(
public FlightInfo getFlightInfoImportedKeys(
CommandGetImportedKeys commandGetImportedKeys,
CallContext callContext, FlightDescriptor flightDescriptor) {
final Schema schema = Schemas.GET_IMPORTED_AND_EXPORTED_KEYS_SCHEMA;
final Schema schema = Schemas.GET_IMPORTED_KEYS_SCHEMA;
return new FlightInfo(schema, flightDescriptor, Collections.emptyList(), -1, -1);
}

@Override
public FlightInfo getFlightInfoCrossReference(
CommandGetCrossReference commandGetCrossReference,
CallContext callContext, FlightDescriptor flightDescriptor) {
final Schema schema = Schemas.GET_CROSS_REFERENCE_SCHEMA;
return new FlightInfo(schema, flightDescriptor, Collections.emptyList(), -1, -1);
}

@Override
public void getStreamImportedKeys(
CommandGetImportedKeys commandGetImportedKeys,
CallContext callContext, Ticket ticket,
CallContext callContext,
ServerStreamListener serverStreamListener) {
throw CallStatus.UNIMPLEMENTED.withDescription("CommandGetImportedKeys not supported.").toRuntimeException();
}

@Override
public void getStreamCrossReference(
CommandGetCrossReference commandGetCrossReference,
CallContext callContext, ServerStreamListener serverStreamListener) {
throw CallStatus.UNIMPLEMENTED.withDescription("CommandGetCrossReference not supported.").toRuntimeException();
}

@Override
public void close() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

import static org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;

import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.sql.impl.FlightSql;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;

Expand All @@ -41,7 +46,6 @@ public class FlightPreparedStatement {

private final String query;
private final CancellableUserResponseHandler<UserProtos.CreatePreparedStatementArrowResp> responseHandler;
private static final IpcOption DEFAULT_IPC = new IpcOption();

public FlightPreparedStatement(String query,
CancellableUserResponseHandler<UserProtos.CreatePreparedStatementArrowResp> responseHandler) {
Expand Down Expand Up @@ -80,9 +84,15 @@ public FlightInfo getFlightInfo(Location location) {
public ActionCreatePreparedStatementResult createAction() {
final UserProtos.CreatePreparedStatementArrowResp createPreparedStatementResp = responseHandler.get();
final Schema schema = buildSchema(createPreparedStatementResp.getPreparedStatement().getArrowSchema());
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(outputStream)), schema);
} catch (IOException e) {
throw new RuntimeException("Failed to serialize schema", e);
}

return ActionCreatePreparedStatementResult.newBuilder()
.setDatasetSchema(ByteString.copyFrom(MessageSerializer.serializeMetadata(schema, DEFAULT_IPC)))
.setDatasetSchema(ByteString.copyFrom(ByteBuffer.wrap(outputStream.toByteArray())))
.setParameterSchema(ByteString.EMPTY)
.setPreparedStatementHandle(getServerHandle().toByteString())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package com.dremio.service.flight.impl;

import static com.dremio.common.types.Types.getJdbcTypeCode;
import static com.google.protobuf.ByteString.copyFrom;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -42,7 +44,7 @@
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
Expand Down Expand Up @@ -285,7 +287,14 @@ public void runGetTables(FlightSql.CommandGetTables commandGetTables,

if (includeSchema) {
final Schema columnSchema = new Schema(tableToFields.get(tableName));
schemaVector.setSafe(i, copyFrom(MessageSerializer.serializeMetadata(columnSchema, IpcOption.DEFAULT)).toByteArray());
final ByteArrayOutputStream columnOutputStream = new ByteArrayOutputStream();
try {
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(columnOutputStream)), columnSchema);
} catch (final IOException e) {
throw new RuntimeException("Failed to serialize schema", e);
}

schemaVector.setSafe(i, columnOutputStream.toByteArray());
}
});

Expand Down

0 comments on commit cc8bd8a

Please sign in to comment.