Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator #6867

Merged
merged 6 commits into from
Jun 8, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,11 @@
import io.grpc.BindableService;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.cert.CertificateException;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -44,13 +34,10 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.interceptor.AuthenticationInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;

public class GrpcServerBuilder {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
Expand All @@ -63,12 +50,7 @@ public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
serverBuilder = NettyServerBuilder.forPort(port);

try {
configSslContext(serverBuilder);
} catch (Exception e) {
log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e);
throw new RuntimeException("grpc tls set failed: " + e.getMessage());
}
serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator());

// build server
int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
Expand Down Expand Up @@ -116,45 +98,6 @@ public GrpcServer build() {
return new GrpcServer(this.serverBuilder.build());
}

protected void configSslContext(NettyServerBuilder serverBuilder) throws IOException, CertificateException {
if (null == serverBuilder) {
return;
}

TlsMode tlsMode = TlsSystemConfig.tlsMode;
if (!TlsMode.DISABLED.equals(tlsMode)) {
SslContext sslContext = loadSslContext();
if (TlsMode.PERMISSIVE.equals(tlsMode)) {
serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator(sslContext));
} else {
serverBuilder.sslContext(sslContext);
}
}
}

protected SslContext loadSslContext() throws CertificateException, IOException {
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
if (proxyConfig.isTlsTestModeEnable()) {
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
return GrpcSslContexts.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.clientAuth(ClientAuth.NONE)
.build();
} else {
String tlsKeyPath = ConfigurationManager.getProxyConfig().getTlsKeyPath();
String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath();
try (InputStream serverKeyInputStream = Files.newInputStream(Paths.get(tlsKeyPath));
InputStream serverCertificateStream = Files.newInputStream(Paths.get(tlsCertPath))) {
SslContext res = GrpcSslContexts.forServer(serverCertificateStream, serverKeyInputStream)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.clientAuth(ClientAuth.NONE)
.build();
log.info("load TLS configured OK");
return res;
}
}
}

public GrpcServerBuilder configInterceptor() {
// grpc interceptors, including acl, logging etc.
List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,44 @@
package org.apache.rocketmq.proxy.grpc;

import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
import io.grpc.netty.shaded.io.netty.util.AsciiString;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;

public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
private final SslContext sslContext;
protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

/**
* the length of the ssl record header (in bytes)
*/
private static final int SSL_RECORD_HEADER_LENGTH = 5;

public OptionalSSLProtocolNegotiator(SslContext sslContext) {
this.sslContext = sslContext;
private static SslContext sslContext;

public OptionalSSLProtocolNegotiator() {
sslContext = loadSslContext();
}

@Override
Expand All @@ -50,43 +63,81 @@ public AsciiString scheme() {
}

@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHttp2ConnectionHandler) {
ChannelHandler plaintext =
InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHttp2ConnectionHandler);
ChannelHandler ssl =
InternalProtocolNegotiators.serverTls(sslContext).newHandler(grpcHttp2ConnectionHandler);
return new PortUnificationServerHandler(ssl, plaintext);
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
return new PortUnificationServerHandler(grpcHandler);
}

@Override
public void close() {}

private static SslContext loadSslContext() {
try {
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
if (proxyConfig.isTlsTestModeEnable()) {
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
return GrpcSslContexts.forServer(selfSignedCertificate.certificate(),
selfSignedCertificate.privateKey())
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.clientAuth(ClientAuth.NONE)
.build();
} else {
String tlsKeyPath = ConfigurationManager.getProxyConfig().getTlsKeyPath();
String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath();
try (InputStream serverKeyInputStream = Files.newInputStream(
Paths.get(tlsKeyPath));
InputStream serverCertificateStream = Files.newInputStream(
Paths.get(tlsCertPath))) {
SslContext res = GrpcSslContexts.forServer(serverCertificateStream,
serverKeyInputStream)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.clientAuth(ClientAuth.NONE)
.build();
log.info("grpc load TLS configured OK");
return res;
}
}
} catch (Exception e) {
log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e);
throw new RuntimeException("grpc tls set failed: " + e.getMessage());
}
}

public static class PortUnificationServerHandler extends ByteToMessageDecoder {

private final ChannelHandler ssl;
private final ChannelHandler plaintext;

public PortUnificationServerHandler(ChannelHandler ssl, ChannelHandler plaintext) {
this.ssl = ssl;
this.plaintext = plaintext;
public PortUnificationServerHandler(GrpcHttp2ConnectionHandler grpcHandler) {
this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
.newHandler(grpcHandler);
this.plaintext = InternalProtocolNegotiators.serverPlaintext()
.newHandler(grpcHandler);
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
throws Exception {
try {
// in SslHandler.isEncrypted, it need at least 5 bytes to judge is encrypted or not
if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
return;
}
if (SslHandler.isEncrypted(in)) {
TlsMode tlsMode = TlsSystemConfig.tlsMode;
if (TlsMode.ENFORCING.equals(tlsMode)) {
ctx.pipeline().addAfter(ctx.name(), null, this.ssl);
} else {
} else if (TlsMode.DISABLED.equals(tlsMode)) {
ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
} else {
// in SslHandler.isEncrypted, it need at least 5 bytes to judge is encrypted or not
if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
return;
}
if (SslHandler.isEncrypted(in)) {
ctx.pipeline().addAfter(ctx.name(), null, this.ssl);
} else {
ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
}
}
ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
ctx.pipeline().remove(this);
} catch (Exception e) {
log.error("process protocol negotiator failed.", e);
log.error("process ssl protocol negotiator failed.", e);
throw e;
}
}
Expand Down