Skip to content

Commit

Permalink
Cancel subscribeToLogs and messageStream RPCs when the Session closes (
Browse files Browse the repository at this point in the history
…#5433)

This PR fixes subscribeToLogs and messageStream RPCs; previously, they were staying open when a Session closed. Session closing behavior for a number of RPCs was specifically added as a tests here. I'll note that we are inconsistent in how we handle this case - some RPCs will complete successfully, others will pass along UNAUTHENTICATED or CANCELLED. We should aim to clean this up, but I'm proposing we do that as a follow-up PR to avoid PR bloat.

The fix involves hooking into SessionServiceCallListener, which is a centralized place where we maintain Session state information. We should strongly consider migrating other RPC Session close behavior to here to simplify the implementations and maintain consistency. Again, I'd suggest doing that as a separate PR.

Fixes #5415
  • Loading branch information
devinrsmith authored May 15, 2024
1 parent 3b97aaf commit 5e0964e
Show file tree
Hide file tree
Showing 7 changed files with 477 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ public void subscribeToLogs(
GrpcUtil.safelyError(responseObserver, Code.FAILED_PRECONDITION, "Remote console disabled");
return;
}
// Session close logic implicitly handled in
// io.deephaven.server.session.SessionServiceGrpcImpl.SessionServiceInterceptor
final LogsClient client =
new LogsClient(request, (ServerCallStreamObserver<LogSubscriptionData>) responseObserver);
client.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ public void onCompleted() {
@Override
public StreamObserver<StreamRequest> messageStream(StreamObserver<StreamResponse> responseObserver) {
SessionState session = sessionService.getCurrentSession();
// Session close logic implicitly handled in
// io.deephaven.server.session.SessionServiceGrpcImpl.SessionServiceInterceptor
return new SendMessageObserver(session, responseObserver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.*;
import io.deephaven.proto.backplane.script.grpc.ConsoleServiceGrpc;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.function.ThrowingRunnable;
import io.grpc.Context;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
Expand All @@ -36,9 +36,11 @@

import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.Closeable;
import java.lang.Object;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

public class SessionServiceGrpcImpl extends SessionServiceGrpc.SessionServiceImplBase {
Expand Down Expand Up @@ -310,10 +312,19 @@ private void addHeaders(final Metadata md) {

@Singleton
public static class SessionServiceInterceptor implements ServerInterceptor {
private static final Status AUTHENTICATION_DETAILS_INVALID =
Status.UNAUTHENTICATED.withDescription("Authentication details invalid");

// We can't use just io.grpc.MethodDescriptor (unless we chose provide and inject the named method descriptors),
// some of our methods are overridden from stock gRPC; for example,
// io.deephaven.server.object.ObjectServiceGrpcBinding.bindService.
// The goal should be to migrate all of the existing RPC Session close management logic to here if possible.
private static final Set<String> CANCEL_RPC_ON_SESSION_CLOSE = Set.of(
ConsoleServiceGrpc.getSubscribeToLogsMethod().getFullMethodName(),
ObjectServiceGrpc.getMessageStreamMethod().getFullMethodName());

private final SessionService service;
private final SessionService.ErrorTransformer errorTransformer;
private static final Status authenticationDetailsInvalid =
Status.UNAUTHENTICATED.withDescription("Authentication details invalid");

@Inject
public SessionServiceInterceptor(
Expand Down Expand Up @@ -344,12 +355,8 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<Re
try {
session = service.getSessionForAuthToken(token);
} catch (AuthenticationException e) {
try {
call.close(authenticationDetailsInvalid, new Metadata());
} catch (IllegalStateException ignored) {
// could be thrown if the call was already closed. As an interceptor, we can't throw,
// so ignoring this and just returning the no-op listener.
}
// As an interceptor, we can't throw, so ignoring this and just returning the no-op listener.
safeClose(call, AUTHENTICATION_DETAILS_INVALID, new Metadata(), false);
return new ServerCall.Listener<>() {};
}
}
Expand All @@ -363,33 +370,61 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<Re

final MutableObject<SessionServiceCallListener<ReqT, RespT>> listener = new MutableObject<>();
rpcWrapper(serverCall, context, finalSession, errorTransformer, () -> listener.setValue(
new SessionServiceCallListener<>(serverCallHandler.startCall(serverCall, metadata), serverCall,
context, finalSession, errorTransformer)));
listener(serverCall, metadata, serverCallHandler, context, finalSession)));
if (listener.getValue() == null) {
return new ServerCall.Listener<>() {};
}
return listener.getValue();
}

private <ReqT, RespT> @NotNull SessionServiceCallListener<ReqT, RespT> listener(
InterceptedCall<ReqT, RespT> serverCall,
Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler,
Context context,
SessionState session) {
return new SessionServiceCallListener<>(
serverCallHandler.startCall(serverCall, metadata),
serverCall,
context,
session,
errorTransformer,
CANCEL_RPC_ON_SESSION_CLOSE.contains(serverCall.getMethodDescriptor().getFullMethodName()));
}
}

private static class SessionServiceCallListener<ReqT, RespT> extends
ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> implements Closeable {
private static final Status SESSION_CLOSED = Status.CANCELLED.withDescription("Session closed");

private final ServerCall<ReqT, RespT> call;
private final Context context;
private final SessionState session;
private final SessionService.ErrorTransformer errorTransformer;
private final boolean autoCancelOnSessionClose;

public SessionServiceCallListener(
SessionServiceCallListener(
ServerCall.Listener<ReqT> delegate,
ServerCall<ReqT, RespT> call,
Context context,
SessionState session,
SessionService.ErrorTransformer errorTransformer) {
SessionService.ErrorTransformer errorTransformer,
boolean autoCancelOnSessionClose) {
super(delegate);
this.call = call;
this.context = context;
this.session = session;
this.errorTransformer = errorTransformer;
this.autoCancelOnSessionClose = autoCancelOnSessionClose;
if (autoCancelOnSessionClose && session != null) {
session.addOnCloseCallback(this);
}
}

@Override
public void close() {
// session.addOnCloseCallback
safeClose(call, SESSION_CLOSED, new Metadata(), false);
}

@Override
Expand All @@ -405,11 +440,17 @@ public void onHalfClose() {
@Override
public void onCancel() {
rpcWrapper(call, context, session, errorTransformer, super::onCancel);
if (autoCancelOnSessionClose && session != null) {
session.removeOnCloseCallback(this);
}
}

@Override
public void onComplete() {
rpcWrapper(call, context, session, errorTransformer, super::onComplete);
if (autoCancelOnSessionClose && session != null) {
session.removeOnCloseCallback(this);
}
}

@Override
Expand All @@ -432,34 +473,44 @@ private static <ReqT, RespT> void rpcWrapper(
@NotNull final Context context,
@Nullable final SessionState session,
@NotNull final SessionService.ErrorTransformer errorTransformer,
@NotNull final ThrowingRunnable<InterruptedException> lambda) {
@NotNull final Runnable lambda) {
Context previous = context.attach();
// note: we'll open the execution context here so that it may be used by the error transformer
try (final SafeCloseable ignored1 = session == null ? null : session.getExecutionContext().open()) {
try (final SafeCloseable ignored2 = LivenessScopeStack.open()) {
lambda.run();
} catch (final InterruptedException err) {
Thread.currentThread().interrupt();
closeWithError(call, errorTransformer.transform(err));
} catch (final Throwable err) {
closeWithError(call, errorTransformer.transform(err));
} catch (final RuntimeException err) {
safeClose(call, errorTransformer.transform(err));
} catch (final Error error) {
// Indicates a very serious failure; debateable whether we should even try to send close.
safeClose(call, Status.INTERNAL, new Metadata(), false);
throw error;
} finally {
context.detach(previous);
}
}
}

private static <ReqT, RespT> void closeWithError(
@NotNull final ServerCall<ReqT, RespT> call,
private static void safeClose(
@NotNull final ServerCall<?, ?> call,
@NotNull final StatusRuntimeException err) {
Metadata metadata = Status.trailersFromThrowable(err);
if (metadata == null) {
metadata = new Metadata();
}
safeClose(call, Status.fromThrowable(err), metadata, true);
}

private static void safeClose(ServerCall<?, ?> call, Status status, Metadata trailers, boolean logOnError) {
try {
Metadata metadata = Status.trailersFromThrowable(err);
if (metadata == null) {
metadata = new Metadata();
call.close(status, trailers);
} catch (IllegalStateException e) {
// IllegalStateException is explicitly documented as thrown if the call is already closed. It might be nice
// if there was a more explicit exception type, but this should suffice. We _could_ try and check the text
// "call already closed", but that is an undocumented implementation detail we should probably not rely on.
if (logOnError && log.isDebugEnabled()) {
log.debug().append("call.close error: ").append(e).endl();
}
call.close(Status.fromThrowable(err), metadata);
} catch (final Exception unexpectedErr) {
log.debug().append("Unanticipated gRPC Error: ").append(unexpectedErr).endl();
}
}
}
Loading

0 comments on commit 5e0964e

Please sign in to comment.