-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core: allow per-service/method executor #8266
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* Copyright 2021 The gRPC Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.grpc; | ||
|
||
import java.util.concurrent.Executor; | ||
import javax.annotation.Nullable; | ||
|
||
/** | ||
* Defines what executor the server call is handled, based on each RPC call information at runtime. | ||
* */ | ||
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8274") | ||
public interface ServerCallExecutorSupplier { | ||
|
||
/** | ||
* Returns an executor to handle the server call. | ||
* It should never throw. It should return null to fallback to the default executor. | ||
* */ | ||
@Nullable | ||
<ReqT, RespT> Executor getExecutor(ServerCall<ReqT, RespT> call, Metadata metadata); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,7 +59,7 @@ private static AtomicHelper getAtomicHelper() { | |
private static final int RUNNING = -1; | ||
|
||
/** Underlying executor that all submitted Runnable objects are run on. */ | ||
private final Executor executor; | ||
private Executor executor; | ||
|
||
/** A list of Runnables to be run in order. */ | ||
private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>(); | ||
|
@@ -76,6 +76,11 @@ public SerializingExecutor(Executor executor) { | |
this.executor = executor; | ||
} | ||
|
||
public void setExecutor(Executor executor) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Document that this can only be called from a Runnable running within the SerializingExecutor. |
||
Preconditions.checkNotNull(executor, "'executor' must not be null."); | ||
this.executor = executor; | ||
} | ||
|
||
/** | ||
* Runs the given runnable strictly after all Runnables that were submitted | ||
* before it, and using the {@code executor} passed to the constructor. . | ||
|
@@ -118,7 +123,8 @@ private void schedule(@Nullable Runnable removable) { | |
public void run() { | ||
Runnable r; | ||
try { | ||
while ((r = runQueue.poll()) != null) { | ||
Executor oldExecutor = executor; | ||
while (oldExecutor == executor && (r = runQueue.poll()) != null ) { | ||
try { | ||
r.run(); | ||
} catch (RuntimeException e) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ | |
import io.grpc.InternalServerInterceptors; | ||
import io.grpc.Metadata; | ||
import io.grpc.ServerCall; | ||
import io.grpc.ServerCallExecutorSupplier; | ||
import io.grpc.ServerCallHandler; | ||
import io.grpc.ServerInterceptor; | ||
import io.grpc.ServerMethodDefinition; | ||
|
@@ -125,6 +126,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume | |
private final InternalChannelz channelz; | ||
private final CallTracer serverCallTracer; | ||
private final Deadline.Ticker ticker; | ||
private final ServerCallExecutorSupplier executorSupplier; | ||
|
||
/** | ||
* Construct a server. | ||
|
@@ -159,6 +161,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume | |
this.serverCallTracer = builder.callTracerFactory.create(); | ||
this.ticker = checkNotNull(builder.ticker, "ticker"); | ||
channelz.addServer(this); | ||
this.executorSupplier = builder.executorSupplier; | ||
} | ||
|
||
/** | ||
|
@@ -466,14 +469,14 @@ public void streamCreated(ServerStream stream, String methodName, Metadata heade | |
|
||
private void streamCreatedInternal( | ||
final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) { | ||
final Executor wrappedExecutor; | ||
final Executor wrapExecutor; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/wrapExecutor/wrappedExecutor/ to keep the name? |
||
// This is a performance optimization that avoids the synchronization and queuing overhead | ||
// that comes with SerializingExecutor. | ||
if (executor == directExecutor()) { | ||
wrappedExecutor = new SerializeReentrantCallsDirectExecutor(); | ||
stream.optimizeForDirectExecutor(); | ||
if (executorSupplier != null || executor != directExecutor()) { | ||
wrapExecutor = new SerializingExecutor(executor); | ||
} else { | ||
wrappedExecutor = new SerializingExecutor(executor); | ||
wrapExecutor = new SerializeReentrantCallsDirectExecutor(); | ||
stream.optimizeForDirectExecutor(); | ||
} | ||
|
||
if (headers.containsKey(MESSAGE_ENCODING_KEY)) { | ||
|
@@ -499,52 +502,123 @@ private void streamCreatedInternal( | |
|
||
final JumpToApplicationThreadServerStreamListener jumpListener | ||
= new JumpToApplicationThreadServerStreamListener( | ||
wrappedExecutor, executor, stream, context, tag); | ||
wrapExecutor, executor, stream, context, tag); | ||
stream.setListener(jumpListener); | ||
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks | ||
// are delivered, including any errors. Callbacks can still be triggered, but they will be | ||
// queued. | ||
|
||
final class StreamCreated extends ContextRunnable { | ||
StreamCreated() { | ||
final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create(); | ||
// Run in serializing executor so jumpListener.setListener() is called before any callbacks | ||
// are delivered, including any errors. Callbacks can still be triggered. | ||
// If callExecutor needs no executor switch, they will be queued at serializing executor. | ||
// If callExecutor needs a switch due to executorSupplier, they will be queued at jumpListener | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks out-of-date. I guess the old comment isn't inaccurate now, although don't know if we want to describe more. |
||
// first then delivered to serializing executor for the second queueing. | ||
|
||
final class MethodLookup extends ContextRunnable { | ||
MethodLookup() { | ||
super(context); | ||
} | ||
|
||
@Override | ||
public void runInContext() { | ||
PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag); | ||
PerfMark.startTask("ServerTransportListener$MethodLookup.startCall", tag); | ||
PerfMark.linkIn(link); | ||
try { | ||
runInternal(); | ||
} finally { | ||
PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag); | ||
PerfMark.stopTask("ServerTransportListener$MethodLookup.startCall", tag); | ||
} | ||
} | ||
|
||
private void runInternal() { | ||
ServerStreamListener listener = NOOP_LISTENER; | ||
ServerMethodDefinition<?, ?> wrapMethod; | ||
ServerCallParameters<?, ?> callParams; | ||
try { | ||
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName); | ||
if (method == null) { | ||
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority()); | ||
} | ||
if (method == null) { | ||
Status status = Status.UNIMPLEMENTED.withDescription( | ||
"Method not found: " + methodName); | ||
"Method not found: " + methodName); | ||
// TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in | ||
// memory as a map whose key is the method name, this would allow a misbehaving | ||
// client to blow up the server in-memory stats storage by sending large number of | ||
// distinct unimplemented method | ||
// names. (https://github.com/grpc/grpc-java/issues/2285) | ||
stream.close(status, new Metadata()); | ||
context.cancel(null); | ||
future.cancel(false); | ||
return; | ||
} | ||
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx, tag); | ||
wrapMethod = wrapMethod(stream, method, statsTraceCtx); | ||
callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag); | ||
future.set(callParams); | ||
} catch (Throwable t) { | ||
stream.close(Status.fromThrowable(t), new Metadata()); | ||
context.cancel(null); | ||
future.cancel(false); | ||
throw t; | ||
} | ||
} | ||
|
||
private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor( | ||
final ServerMethodDefinition<ReqT, RespT> methodDef, | ||
final ServerStream stream, | ||
final Metadata headers, | ||
final Context.CancellableContext context, | ||
final Tag tag) { | ||
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>( | ||
stream, | ||
methodDef.getMethodDescriptor(), | ||
headers, | ||
context, | ||
decompressorRegistry, | ||
compressorRegistry, | ||
serverCallTracer, | ||
tag); | ||
if (executorSupplier != null) { | ||
Executor switchingExecutor = executorSupplier.getExecutor(call, headers); | ||
if (switchingExecutor != null) { | ||
((SerializingExecutor)wrapExecutor).setExecutor(switchingExecutor); | ||
} | ||
} | ||
return new ServerCallParameters<>(call, methodDef); | ||
} | ||
} | ||
|
||
final class ServerCallHandled extends ContextRunnable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why "handled" in the name, since this handles the RPC, it isn't run after the RPC is handled. Maybe |
||
ServerCallHandled() { | ||
super(context); | ||
} | ||
|
||
@Override | ||
public void runInContext() { | ||
PerfMark.startTask("ServerTransportListener$ServerCallHandled.startCall", tag); | ||
PerfMark.linkIn(link); | ||
try { | ||
runInternal(); | ||
} finally { | ||
PerfMark.stopTask("ServerTransportListener$ServerCallHandled.startCall", tag); | ||
} | ||
} | ||
|
||
private void runInternal() { | ||
ServerStreamListener listener = NOOP_LISTENER; | ||
ServerCallParameters<?,?> callParameters; | ||
try { | ||
if (future.isCancelled()) { | ||
return; | ||
} | ||
if (!future.isDone() || (callParameters = future.get()) == null) { | ||
Status status = Status.INTERNAL.withDescription( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this should throw this as an exception? If it's an exception then it will be logged (and the catch will handle it). Seems that would make it more likely we'd learn about the bug. |
||
"Fail to start server call."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make the message a bit more clear there's a bug in grpc. |
||
stream.close(status, new Metadata()); | ||
context.cancel(null); | ||
return; | ||
} | ||
listener = startWrappedCall(methodName, callParameters, headers); | ||
} catch (Throwable ex) { | ||
stream.close(Status.fromThrowable(ex), new Metadata()); | ||
context.cancel(null); | ||
throw new IllegalStateException(ex); | ||
} finally { | ||
jumpListener.setListener(listener); | ||
} | ||
|
@@ -568,7 +642,8 @@ public void cancelled(Context context) { | |
} | ||
} | ||
|
||
wrappedExecutor.execute(new StreamCreated()); | ||
wrapExecutor.execute(new MethodLookup()); | ||
wrapExecutor.execute(new ServerCallHandled()); | ||
} | ||
|
||
private Context.CancellableContext createContext( | ||
|
@@ -593,9 +668,8 @@ private Context.CancellableContext createContext( | |
} | ||
|
||
/** Never returns {@code null}. */ | ||
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName, | ||
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers, | ||
Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) { | ||
private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream, | ||
ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx) { | ||
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method? | ||
statsTraceCtx.serverCallStarted( | ||
new ServerCallInfoImpl<>( | ||
|
@@ -609,34 +683,31 @@ private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String | |
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler); | ||
ServerMethodDefinition<?, ?> wMethodDef = binlog == null | ||
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef); | ||
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag); | ||
return wMethodDef; | ||
} | ||
|
||
private final class ServerCallParameters<ReqT, RespT> { | ||
ServerCallImpl<ReqT, RespT> call; | ||
ServerMethodDefinition<ReqT, RespT> methodDef; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use ServerCallHandler here to be more clear that the MethodDescriptor within the definition isn't used? |
||
|
||
public ServerCallParameters(ServerCallImpl<ReqT, RespT> call, | ||
ServerMethodDefinition<ReqT, RespT> methodDef) { | ||
this.call = call; | ||
this.methodDef = methodDef; | ||
} | ||
} | ||
|
||
private <WReqT, WRespT> ServerStreamListener startWrappedCall( | ||
String fullMethodName, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you are interested: Looks like we can remove this argument by using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh looks this is only for logging the original fullMethodName. And this is an existing code path. |
||
ServerMethodDefinition<WReqT, WRespT> methodDef, | ||
ServerStream stream, | ||
Metadata headers, | ||
Context.CancellableContext context, | ||
Tag tag) { | ||
|
||
ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>( | ||
stream, | ||
methodDef.getMethodDescriptor(), | ||
headers, | ||
context, | ||
decompressorRegistry, | ||
compressorRegistry, | ||
serverCallTracer, | ||
tag); | ||
|
||
ServerCall.Listener<WReqT> listener = | ||
methodDef.getServerCallHandler().startCall(call, headers); | ||
if (listener == null) { | ||
ServerCallParameters<WReqT, WRespT> params, | ||
Metadata headers) { | ||
ServerCall.Listener<WReqT> callListener = | ||
params.methodDef.getServerCallHandler().startCall(params.call, headers); | ||
if (callListener == null) { | ||
throw new NullPointerException( | ||
"startCall() returned a null listener for method " + fullMethodName); | ||
"startCall() returned a null listener for method " + fullMethodName); | ||
} | ||
return call.newServerStreamListener(listener); | ||
return params.call.newServerStreamListener(callListener); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"handles the server call"