diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java index faffb66961f..0ca6a1fcbd5 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java @@ -19,21 +19,11 @@ import io.grpc.BindableService; import io.grpc.ServerInterceptor; import io.grpc.ServerServiceDefinition; -import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup; import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel; import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; -import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth; -import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; -import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; -import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.security.cert.CertificateException; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -44,13 +34,10 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; -import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.grpc.interceptor.AuthenticationInterceptor; import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor; import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor; import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor; -import org.apache.rocketmq.remoting.common.TlsMode; -import org.apache.rocketmq.remoting.netty.TlsSystemConfig; public class GrpcServerBuilder { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -63,12 +50,7 @@ public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) { serverBuilder = NettyServerBuilder.forPort(port); - try { - configSslContext(serverBuilder); - } catch (Exception e) { - log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e); - throw new RuntimeException("grpc tls set failed: " + e.getMessage()); - } + serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator()); // build server int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum(); @@ -116,45 +98,6 @@ public GrpcServer build() { return new GrpcServer(this.serverBuilder.build()); } - protected void configSslContext(NettyServerBuilder serverBuilder) throws IOException, CertificateException { - if (null == serverBuilder) { - return; - } - - TlsMode tlsMode = TlsSystemConfig.tlsMode; - if (!TlsMode.DISABLED.equals(tlsMode)) { - SslContext sslContext = loadSslContext(); - if (TlsMode.PERMISSIVE.equals(tlsMode)) { - serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator(sslContext)); - } else { - serverBuilder.sslContext(sslContext); - } - } - } - - protected SslContext loadSslContext() throws CertificateException, IOException { - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - if (proxyConfig.isTlsTestModeEnable()) { - SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate(); - return GrpcSslContexts.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()) - .trustManager(InsecureTrustManagerFactory.INSTANCE) - .clientAuth(ClientAuth.NONE) - .build(); - } else { - String tlsKeyPath = ConfigurationManager.getProxyConfig().getTlsKeyPath(); - String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath(); - try (InputStream serverKeyInputStream = Files.newInputStream(Paths.get(tlsKeyPath)); - InputStream serverCertificateStream = Files.newInputStream(Paths.get(tlsCertPath))) { - SslContext res = GrpcSslContexts.forServer(serverCertificateStream, serverKeyInputStream) - .trustManager(InsecureTrustManagerFactory.INSTANCE) - .clientAuth(ClientAuth.NONE) - .build(); - log.info("load TLS configured OK"); - return res; - } - } - } - public GrpcServerBuilder configInterceptor() { // grpc interceptors, including acl, logging etc. List accessValidators = ServiceProvider.load(AccessValidator.class); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java index bf19abf855f..670e1c1a212 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.proxy.grpc; import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler; +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent; import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator; import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators; @@ -24,24 +25,36 @@ import io.grpc.netty.shaded.io.netty.channel.ChannelHandler; import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext; import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder; +import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate; import io.grpc.netty.shaded.io.netty.util.AsciiString; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.List; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.remoting.common.TlsMode; +import org.apache.rocketmq.remoting.netty.TlsSystemConfig; public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator { - private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - private final SslContext sslContext; + protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + /** * the length of the ssl record header (in bytes) */ private static final int SSL_RECORD_HEADER_LENGTH = 5; - public OptionalSSLProtocolNegotiator(SslContext sslContext) { - this.sslContext = sslContext; + private static SslContext sslContext; + + public OptionalSSLProtocolNegotiator() { + sslContext = loadSslContext(); } @Override @@ -50,43 +63,81 @@ public AsciiString scheme() { } @Override - public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHttp2ConnectionHandler) { - ChannelHandler plaintext = - InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHttp2ConnectionHandler); - ChannelHandler ssl = - InternalProtocolNegotiators.serverTls(sslContext).newHandler(grpcHttp2ConnectionHandler); - return new PortUnificationServerHandler(ssl, plaintext); + public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { + return new PortUnificationServerHandler(grpcHandler); } @Override public void close() {} + private static SslContext loadSslContext() { + try { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + if (proxyConfig.isTlsTestModeEnable()) { + SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate(); + return GrpcSslContexts.forServer(selfSignedCertificate.certificate(), + selfSignedCertificate.privateKey()) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .clientAuth(ClientAuth.NONE) + .build(); + } else { + String tlsKeyPath = ConfigurationManager.getProxyConfig().getTlsKeyPath(); + String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath(); + try (InputStream serverKeyInputStream = Files.newInputStream( + Paths.get(tlsKeyPath)); + InputStream serverCertificateStream = Files.newInputStream( + Paths.get(tlsCertPath))) { + SslContext res = GrpcSslContexts.forServer(serverCertificateStream, + serverKeyInputStream) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .clientAuth(ClientAuth.NONE) + .build(); + log.info("grpc load TLS configured OK"); + return res; + } + } + } catch (Exception e) { + log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e); + throw new RuntimeException("grpc tls set failed: " + e.getMessage()); + } + } + public static class PortUnificationServerHandler extends ByteToMessageDecoder { + private final ChannelHandler ssl; private final ChannelHandler plaintext; - public PortUnificationServerHandler(ChannelHandler ssl, ChannelHandler plaintext) { - this.ssl = ssl; - this.plaintext = plaintext; + public PortUnificationServerHandler(GrpcHttp2ConnectionHandler grpcHandler) { + this.ssl = InternalProtocolNegotiators.serverTls(sslContext) + .newHandler(grpcHandler); + this.plaintext = InternalProtocolNegotiators.serverPlaintext() + .newHandler(grpcHandler); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) - throws Exception { + throws Exception { try { - // in SslHandler.isEncrypted, it need at least 5 bytes to judge is encrypted or not - if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) { - return; - } - if (SslHandler.isEncrypted(in)) { + TlsMode tlsMode = TlsSystemConfig.tlsMode; + if (TlsMode.ENFORCING.equals(tlsMode)) { ctx.pipeline().addAfter(ctx.name(), null, this.ssl); - } else { + } else if (TlsMode.DISABLED.equals(tlsMode)) { ctx.pipeline().addAfter(ctx.name(), null, this.plaintext); + } else { + // in SslHandler.isEncrypted, it need at least 5 bytes to judge is encrypted or not + if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) { + return; + } + if (SslHandler.isEncrypted(in)) { + ctx.pipeline().addAfter(ctx.name(), null, this.ssl); + } else { + ctx.pipeline().addAfter(ctx.name(), null, this.plaintext); + } } ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault()); ctx.pipeline().remove(this); } catch (Exception e) { - log.error("process protocol negotiator failed.", e); + log.error("process ssl protocol negotiator failed.", e); throw e; } }