Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add option for cancelling queries when closing client #3276

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ abstract class AbstractReadContext

abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadContext> {
private SessionImpl session;
private boolean cancelQueryWhenClientIsClosed;
private SpannerRpc rpc;
private ISpan span;
private TraceWrapper tracer;
Expand All @@ -91,6 +92,11 @@ B setSession(SessionImpl session) {
return self();
}

B setCancelQueryWhenClientIsClosed(boolean cancelQueryWhenClientIsClosed) {
this.cancelQueryWhenClientIsClosed = cancelQueryWhenClientIsClosed;
return self();
}

B setRpc(SpannerRpc rpc) {
this.rpc = rpc;
return self();
Expand Down Expand Up @@ -440,6 +446,7 @@ void initTransaction() {

final Object lock = new Object();
final SessionImpl session;
final boolean cancelQueryWhenClientIsClosed;
final SpannerRpc rpc;
final ExecutorProvider executorProvider;
ISpan span;
Expand Down Expand Up @@ -469,6 +476,7 @@ void initTransaction() {

AbstractReadContext(Builder<?, ?> builder) {
this.session = builder.session;
this.cancelQueryWhenClientIsClosed = builder.cancelQueryWhenClientIsClosed;
this.rpc = builder.rpc;
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
Expand Down Expand Up @@ -749,7 +757,8 @@ ResultSet executeQueryInternalWithOptions(
rpc.getExecuteQueryRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
GrpcStreamIterator stream =
new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed);
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
Expand Down Expand Up @@ -922,7 +931,8 @@ ResultSet readInternalWithOptions(
rpc.getReadRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
GrpcStreamIterator stream =
new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed);
TransactionSelector selector = null;
if (resumeToken != null) {
builder.setResumeToken(resumeToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
return new BatchReadOnlyTransactionImpl(
MultiUseReadOnlyTransaction.newBuilder()
.setSession(session)
.setCancelQueryWhenClientIsClosed(true)
.setRpc(sessionClient.getSpanner().getRpc())
.setTimestampBound(bound)
.setDefaultQueryOptions(
Expand All @@ -75,6 +76,7 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
return new BatchReadOnlyTransactionImpl(
MultiUseReadOnlyTransaction.newBuilder()
.setSession(session)
.setCancelQueryWhenClientIsClosed(true)
.setRpc(sessionClient.getSpanner().getRpc())
.setTransactionId(batchTransactionId.getTransactionId())
.setTimestamp(batchTransactionId.getTimestamp())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName());
private static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build();

private final ConsumerImpl consumer = new ConsumerImpl();
private final ConsumerImpl consumer;
private final BlockingQueue<PartialResultSet> stream;
private final Statement statement;

Expand All @@ -49,13 +49,15 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
private SpannerException error;

@VisibleForTesting
GrpcStreamIterator(int prefetchChunks) {
this(null, prefetchChunks);
GrpcStreamIterator(int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
this(null, prefetchChunks, cancelQueryWhenClientIsClosed);
}

@VisibleForTesting
GrpcStreamIterator(Statement statement, int prefetchChunks) {
GrpcStreamIterator(
Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
this.statement = statement;
this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed);
// One extra to allow for END_OF_STREAM message.
this.stream = new LinkedBlockingQueue<>(prefetchChunks + 1);
}
Expand Down Expand Up @@ -136,6 +138,12 @@ private void addToStream(PartialResultSet results) {
}

private class ConsumerImpl implements SpannerRpc.ResultStreamConsumer {
private final boolean cancelQueryWhenClientIsClosed;

ConsumerImpl(boolean cancelQueryWhenClientIsClosed) {
this.cancelQueryWhenClientIsClosed = cancelQueryWhenClientIsClosed;
}

@Override
public void onPartialResultSet(PartialResultSet results) {
addToStream(results);
Expand Down Expand Up @@ -168,5 +176,10 @@ public void onError(SpannerException e) {
error = e;
addToStream(END_OF_STREAM);
}

@Override
public boolean cancelQueryWhenClientIsClosed() {
return this.cancelQueryWhenClientIsClosed;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.gax.rpc.UnavailableException;
import com.google.common.base.Predicate;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;

/**
* Predicate that checks whether an exception is a ChannelShutdownException. This exception is
* thrown by gRPC if the underlying gRPC stub has been shut down and uses the UNAVAILABLE error
* code. This means that it would normally be retried by the Spanner client, but this specific
* UNAVAILABLE error should not be retried, as it would otherwise directly return the same error.
*/
class IsChannelShutdownException implements Predicate<Throwable> {

@Override
public boolean apply(Throwable input) {
Throwable cause = input;
do {
if (isUnavailableError(cause)
&& (cause.getMessage().contains("Channel shutdown invoked")
|| cause.getMessage().contains("Channel shutdownNow invoked"))) {
return true;
}
} while ((cause = cause.getCause()) != null);
return false;
}

private boolean isUnavailableError(Throwable cause) {
return (cause instanceof UnavailableException)
|| (cause instanceof StatusRuntimeException
&& ((StatusRuntimeException) cause).getStatus().getCode() == Code.UNAVAILABLE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,9 @@ private static boolean isRetryable(ErrorCode code, @Nullable Throwable cause) {
case UNAVAILABLE:
// SSLHandshakeException is (probably) not retryable, as it is an indication that the server
// certificate was not accepted by the client.
return !hasCauseMatching(cause, Matchers.isSSLHandshakeException);
// Channel shutdown is also not a retryable exception.
return !(hasCauseMatching(cause, Matchers.isSSLHandshakeException)
|| hasCauseMatching(cause, Matchers.IS_CHANNEL_SHUTDOWN_EXCEPTION));
case RESOURCE_EXHAUSTED:
return SpannerException.extractRetryDelay(cause) > 0;
default:
Expand All @@ -345,5 +347,8 @@ private static class Matchers {

static final Predicate<Throwable> isRetryableInternalError = new IsRetryableInternalError();
static final Predicate<Throwable> isSSLHandshakeException = new IsSslHandshakeException();

static final Predicate<Throwable> IS_CHANNEL_SHUTDOWN_EXCEPTION =
new IsChannelShutdownException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -262,6 +263,9 @@ public class GapicSpannerRpc implements SpannerRpc {

private final ScheduledExecutorService spannerWatchdog;

private final ConcurrentLinkedDeque<SpannerResponseObserver> responseObservers =
new ConcurrentLinkedDeque<>();

private final boolean throttleAdministrativeRequests;
private final RetrySettings retryAdministrativeRequestsSettings;
private static final double ADMINISTRATIVE_REQUESTS_RATE_LIMIT = 1.0D;
Expand Down Expand Up @@ -2004,9 +2008,29 @@ <ReqT, RespT> GrpcCallContext newCallContext(
return (GrpcCallContext) context.merge(apiCallContextFromContext);
}

void registerResponseObserver(SpannerResponseObserver responseObserver) {
responseObservers.add(responseObserver);
}

void unregisterResponseObserver(SpannerResponseObserver responseObserver) {
responseObservers.remove(responseObserver);
}

void closeResponseObservers() {
responseObservers.forEach(SpannerResponseObserver::close);
responseObservers.clear();
}

@InternalApi
@VisibleForTesting
public int getNumActiveResponseObservers() {
return responseObservers.size();
}

@Override
public void shutdown() {
this.rpcIsClosed = true;
closeResponseObservers();
if (this.spannerStub != null) {
this.spannerStub.close();
this.partitionedDmlStub.close();
Expand All @@ -2028,6 +2052,7 @@ public void shutdown() {

public void shutdownNow() {
this.rpcIsClosed = true;
closeResponseObservers();
this.spannerStub.close();
this.partitionedDmlStub.close();
this.instanceAdminStub.close();
Expand Down Expand Up @@ -2085,7 +2110,7 @@ public void cancel(@Nullable String message) {
* A {@code ResponseObserver} that exposes the {@code StreamController} and delegates callbacks to
* the {@link ResultStreamConsumer}.
*/
private static class SpannerResponseObserver implements ResponseObserver<PartialResultSet> {
private class SpannerResponseObserver implements ResponseObserver<PartialResultSet> {

private StreamController controller;
private final ResultStreamConsumer consumer;
Expand All @@ -2094,13 +2119,21 @@ public SpannerResponseObserver(ResultStreamConsumer consumer) {
this.consumer = consumer;
}

void close() {
if (this.controller != null) {
this.controller.cancel();
}
}

@Override
public void onStart(StreamController controller) {

// Disable the auto flow control to allow client library
// set the number of messages it prefers to request
controller.disableAutoInboundFlowControl();
this.controller = controller;
if (this.consumer.cancelQueryWhenClientIsClosed()) {
registerResponseObserver(this);
}
}

@Override
Expand All @@ -2110,11 +2143,19 @@ public void onResponse(PartialResultSet response) {

@Override
public void onError(Throwable t) {
// Unregister the response observer when the query has completed with an error.
if (this.consumer.cancelQueryWhenClientIsClosed()) {
unregisterResponseObserver(this);
}
consumer.onError(newSpannerException(t));
}

@Override
public void onComplete() {
// Unregister the response observer when the query has completed normally.
if (this.consumer.cancelQueryWhenClientIsClosed()) {
unregisterResponseObserver(this);
}
consumer.onCompleted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ interface ResultStreamConsumer {
void onCompleted();

void onError(SpannerException e);

/**
* Returns true if the stream should be cancelled when the Spanner client is closed. This
* returns true for {@link com.google.cloud.spanner.BatchReadOnlyTransaction}, as these use a
* non-pooled session. Pooled sessions are deleted when the Spanner client is closed, and this
* automatically also cancels any query that uses the session, which means that we don't need to
* explicitly cancel those queries when the Spanner client is closed.
*/
boolean cancelQueryWhenClientIsClosed();
}

/** Handle for cancellation of a streaming read or query call. */
Expand Down
Loading
Loading