Skip to content

Commit

Permalink
Close clients
Browse files Browse the repository at this point in the history
  • Loading branch information
Brent Gardner committed Aug 17, 2022
1 parent 5e4c385 commit 9f23ae9
Show file tree
Hide file tree
Showing 11 changed files with 657 additions and 506 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
/**
* The in-memory representation of FlightData used to manage a stream of Arrow messages.
*/
class ArrowMessage implements AutoCloseable {
public class ArrowMessage implements AutoCloseable {

// If true, deserialize Arrow data by giving Arrow a reference to the underlying gRPC buffer
// instead of copying the data. Defaults to true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public SchemaResult getSchema(FlightDescriptor descriptor, CallOption... options
public FlightStream getStream(Ticket ticket, CallOption... options) {
final io.grpc.CallOptions callOptions = CallOptions.wrapStub(asyncStub, options).getCallOptions();
ClientCall<Flight.Ticket, ArrowMessage> call = interceptedChannel.newCall(doGetDescriptor, callOptions);
FlightStream stream = new FlightStream(
FlightStream stream = new FlightStreamImpl(
allocator,
PENDING_REQUESTS,
(String message, Throwable cause) -> call.cancel(message, cause),
Expand Down Expand Up @@ -352,14 +352,14 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption..

try {
final ClientCall<ArrowMessage, ArrowMessage> call = interceptedChannel.newCall(doExchangeDescriptor, callOptions);
final FlightStream stream = new FlightStream(allocator, PENDING_REQUESTS, call::cancel, call::request);
final FlightStream stream = new FlightStreamImpl(allocator, PENDING_REQUESTS, call::cancel, call::request);
final ClientCallStreamObserver<ArrowMessage> observer = (ClientCallStreamObserver<ArrowMessage>)
ClientCalls.asyncBidiStreamingCall(call, stream.asObserver());
final ClientStreamListener writer = new PutObserver(
descriptor, observer, stream.cancelled::isDone,
descriptor, observer, stream.cancelled()::isDone,
() -> {
try {
stream.completed.get();
stream.completed().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw CallStatus.INTERNAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public StreamObserver<ArrowMessage> doPutCustom(final StreamObserver<Flight.PutR

final StreamPipe<PutResult, Flight.PutResult> ackStream = StreamPipe
.wrap(responseObserver, PutResult::toProtocol, this::handleExceptionWithMiddleware);
final FlightStream fs = new FlightStream(
final FlightStream fs = new FlightStreamImpl(
allocator,
PENDING_REQUESTS,
/* server-upload streams are not cancellable */null,
Expand Down Expand Up @@ -351,7 +351,7 @@ public StreamObserver<ArrowMessage> doExchangeCustom(StreamObserver<ArrowMessage
final ExchangeListener listener = new ExchangeListener(
responseObserver,
this::handleExceptionWithMiddleware);
final FlightStream fs = new FlightStream(
final FlightStream fs = new FlightStreamImpl(
allocator,
PENDING_REQUESTS,
/* server-upload streams are not cancellable */null,
Expand Down
Loading

0 comments on commit 9f23ae9

Please sign in to comment.