Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create helidon context in gRPC server implementation #769

Merged
merged 3 commits into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void main(String[] args) throws Exception {

Tracer tracer = TracerBuilder.create(config.get("tracing")).build();

TracingConfiguration tracingConfig = new TracingConfiguration.Builder()
TracingConfiguration tracingConfig = TracingConfiguration.builder()
.withStreaming()
.withVerbosity()
.withTracedAttributes(ServerRequestAttribute.CALL_ATTRIBUTES,
Expand Down
8 changes: 8 additions & 0 deletions grpc/core/src/main/java/io/helidon/grpc/core/ContextKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.helidon.grpc.core;

import io.grpc.Context;
import io.grpc.Metadata;

/**
Expand All @@ -28,6 +29,13 @@ public final class ContextKeys {
public static final Metadata.Key<String> AUTHORIZATION =
Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER);

/**
* The gRPC context key to use to obtain the Helidon {@link io.helidon.common.context.Context}
* from the gRPC {@link Context}.
*/
public static final Context.Key<io.helidon.common.context.Context> HELIDON_CONTEXT =
Context.key(io.helidon.common.context.Context.class.getCanonicalName());

/**
* Private constructor for utility class.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import io.helidon.common.context.Context;
import io.helidon.grpc.core.PriorityBag;

import io.grpc.ServerInterceptor;
Expand All @@ -45,6 +46,13 @@ public interface GrpcServer {
*/
GrpcServerConfiguration configuration();

/**
* Gets a {@link GrpcServer} context.
*
* @return a server context
*/
Context context();

/**
* Starts the server. Has no effect if server is running.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package io.helidon.grpc.server;

import io.helidon.common.context.Context;

import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;

/**
* Configuration class for the {@link GrpcServer} implementations.
Expand All @@ -39,33 +40,23 @@ public class GrpcServerBasicConfig

private final SslConfiguration sslConfig;

private final Context context;

/**
* Construct {@link GrpcServerBasicConfig} instance.
*
* @param name the server name
* @param port the port to listen on
* @param workers a count of threads in a pool used to tryProcess HTTP requests
* @param nativeTransport {@code true} to enable native transport for
* the server
* @param tracer the tracer to use
* @param tracingConfig the tracing configuration
* @param sslConfig the SSL configuration
* @param builder the {@link GrpcServerConfiguration.Builder} to use to configure
* this {@link GrpcServerBasicConfig}.
*/
public GrpcServerBasicConfig(String name,
int port,
int workers,
boolean nativeTransport,
Tracer tracer,
TracingConfiguration tracingConfig,
SslConfiguration sslConfig) {

this.name = name == null || name.trim().isEmpty() ? DEFAULT_NAME : name.trim();
this.port = port <= 0 ? 0 : port;
this.nativeTransport = nativeTransport;
this.tracer = tracer == null ? GlobalTracer.get() : tracer;
this.tracingConfig = tracingConfig == null ? new TracingConfiguration.Builder().build() : tracingConfig;
this.workers = workers > 0 ? workers : DEFAULT_WORKER_COUNT;
this.sslConfig = sslConfig;
GrpcServerBasicConfig(GrpcServerConfiguration.Builder builder) {
this.name = builder.name();
this.port = builder.port();
this.context = builder.context();
this.nativeTransport = builder.useNativeTransport();
this.tracer = builder.tracer();
this.tracingConfig = builder.tracingConfig();
this.workers = builder.workers();
this.sslConfig = builder.sslConfig();
}

// ---- accessors ---------------------------------------------------
Expand All @@ -90,6 +81,11 @@ public int port() {
return port;
}

@Override
public Context context() {
return context;
}

/**
* Determine whether use native transport if possible.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.function.Supplier;

import io.helidon.common.context.Context;
import io.helidon.config.Config;

import io.opentracing.Tracer;
Expand Down Expand Up @@ -55,6 +56,12 @@ public interface GrpcServerConfiguration {
*/
int port();

/**
* The top level {@link io.helidon.common.context.Context} to be used by the server.
* @return a context instance with registered application scoped instances
*/
Context context();

/**
* Determine whether use native transport if possible.
* <p>
Expand Down Expand Up @@ -153,6 +160,8 @@ final class Builder implements io.helidon.common.Builder<GrpcServerConfiguration

private SslConfiguration sslConfig = null;

private Context context;

private Builder() {
}

Expand All @@ -179,7 +188,7 @@ public GrpcServerConfiguration.Builder config(Config config) {
* @return an updated builder
*/
public Builder name(String name) {
this.name = name;
this.name = name == null ? null : name.trim();
return this;
}

Expand All @@ -196,6 +205,16 @@ public Builder port(int port) {
return this;
}

/**
* Configure the application scoped context to be used as a parent for webserver request contexts.
* @param context top level context
* @return an updated builder
*/
public Builder context(Context context) {
this.context = context;
return this;
}

/**
* Sets an <a href="http://opentracing.io">opentracing.io</a> tracer. (Default is {@link GlobalTracer}.)
*
Expand Down Expand Up @@ -255,15 +274,69 @@ public Builder sslConfig(SslConfiguration sslConfig) {
return this;
}

String name() {
return name;
}

int port() {
return port;
}

public Context context() {
return context;
}

Tracer tracer() {
return tracer;
}

TracingConfiguration tracingConfig() {
return tracingConfig;
}

SslConfiguration sslConfig() {
return sslConfig;
}

boolean useNativeTransport() {
return useNativeTransport;
}

int workers() {
return workers;
}

@Override
public GrpcServerConfiguration build() {
return new GrpcServerBasicConfig(name,
port,
workers,
useNativeTransport,
tracer,
tracingConfig,
sslConfig);
if (name == null || name.isEmpty()) {
name = DEFAULT_NAME;
}

if (port < 0) {
port = 0;
}

if (context == null) {
context = Context.create();
}

if (tracer == null) {
tracer = GlobalTracer.get();
}

if (tracingConfig == null) {
tracingConfig = TracingConfiguration.create();
}

if (!context.get(Tracer.class).isPresent()) {
context.register(tracer);
}

if (workers <= 0) {
workers = DEFAULT_WORKER_COUNT;
}

return new GrpcServerBasicConfig(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,29 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.annotation.Priority;
import javax.net.ssl.SSLContext;

import io.helidon.common.configurable.Resource;
import io.helidon.common.context.Context;
import io.helidon.common.context.Contexts;
import io.helidon.common.pki.KeyConfig;
import io.helidon.grpc.core.ContextKeys;
import io.helidon.grpc.core.InterceptorPriorities;
import io.helidon.grpc.core.PriorityBag;

import io.grpc.BindableService;
import io.grpc.HandlerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
Expand All @@ -63,6 +72,8 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.ThreadPerTaskExecutor;
import org.eclipse.microprofile.health.HealthCheck;

import static java.lang.String.format;
Expand Down Expand Up @@ -116,6 +127,8 @@ public class GrpcServerImpl implements GrpcServer {
*/
private Map<String, ServiceDescriptor> services = new ConcurrentHashMap<>();

private final Context context;

// ---- constructors ----------------------------------------------------

/**
Expand All @@ -125,6 +138,8 @@ public class GrpcServerImpl implements GrpcServer {
*/
GrpcServerImpl(GrpcServerConfiguration config) {
this.config = config;
this.context = config.context();

}

// ---- GrpcServer interface --------------------------------------------
Expand Down Expand Up @@ -213,6 +228,11 @@ public GrpcServerConfiguration configuration() {
return config;
}

@Override
public Context context() {
return context;
}

@Override
public CompletionStage<GrpcServer> whenShutdown() {
return shutdownFuture;
Expand Down Expand Up @@ -270,7 +290,10 @@ private NettyServerBuilder configureNetty(NettyServerBuilder builder) {
LOGGER.log(Level.FINE, () -> "Using NIO transport");
channelType = NioServerSocketChannel.class;
boss = new NioEventLoopGroup(1);
workers = workersCount <= 0 ? new NioEventLoopGroup() : new NioEventLoopGroup(workersCount);
Executor executor = new ThreadPerTaskExecutor(new ContextAwareThreadFactory(NioEventLoopGroup.class));
workers = workersCount <= 0
? new NioEventLoopGroup(0, executor)
: new NioEventLoopGroup(workersCount, executor);
}

return builder
Expand All @@ -284,10 +307,13 @@ private NettyServerBuilder configureNetty(NettyServerBuilder builder) {
*
* @param serviceDescriptor the service to deploy
* @param globalInterceptors the global {@link io.grpc.ServerInterceptor}s to wrap all services with
* @throws NullPointerException if {@code serviceDescriptor} is {@code null}
* @throws NullPointerException if any of the parameters is {@code null}
*/
public void deploy(ServiceDescriptor serviceDescriptor, PriorityBag<ServerInterceptor> globalInterceptors) {
Objects.requireNonNull(serviceDescriptor);
Objects.requireNonNull(globalInterceptors);

globalInterceptors.add(new ContextAwareServerInterceptor());

String serverName = config.name();
BindableService service = serviceDescriptor.bindableService(globalInterceptors);
Expand Down Expand Up @@ -439,4 +465,43 @@ private static X509Certificate[] loadX509Cert(File... aFile)

return aCerts;
}

/**
* A {@link ServerInterceptor} that will set the Helidon {@link io.helidon.common.context.Context}
* into the gRPC {@link io.grpc.Context}.
*/
@Priority(InterceptorPriorities.CONTEXT - 1)
private class ContextAwareServerInterceptor
implements ServerInterceptor {

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {

Context context = Context.create(context());
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(ContextKeys.HELIDON_CONTEXT, context);
return io.grpc.Contexts.interceptCall(grpcContext, call, headers, next);
}
}

/**
* An extension to {@link DefaultThreadFactory} that ensures threads have
* a {@link io.helidon.common.context.Context} set.
*/
private class ContextAwareThreadFactory
extends DefaultThreadFactory {

private ContextAwareThreadFactory(Class<?> poolType) {
super(poolType);
}

@Override
public Thread newThread(Runnable runnable) {
return super.newThread(() -> {
Context context = Context.create(context());
Contexts.runInContext(context, runnable);
});
}
}
}
Loading