Skip to content

Commit

Permalink
HBASE-26666 Another approach on not modifying NettyRpcConnection too …
Browse files Browse the repository at this point in the history
…much
  • Loading branch information
Apache9 committed Jul 27, 2022
1 parent a3eeab8 commit 5bbb300
Show file tree
Hide file tree
Showing 17 changed files with 2,508 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
@InterfaceAudience.Private
class BufferCallBeforeInitHandler extends ChannelDuplexHandler {

static final String NAME = "BufferCall";

private enum BufferCallAction {
FLUSH,
FAIL
Expand Down Expand Up @@ -77,6 +79,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// do not flush anything out
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof BufferCallEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
Expand All @@ -51,10 +54,12 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -156,11 +161,11 @@ public void cleanupConnection() {
private void established(Channel ch) throws IOException {
assert eventLoop.inEventLoop();
ChannelPipeline p = ch.pipeline();
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
p.addBefore(addBeforeHandler, null,
p.addBefore(BufferCallBeforeInitHandler.NAME, null,
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
p.addBefore(addBeforeHandler, null,
p.addBefore(BufferCallBeforeInitHandler.NAME, null,
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
p.addBefore(BufferCallBeforeInitHandler.NAME, null,
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
p.fireUserEventTriggered(BufferCallEvent.success());
}
Expand Down Expand Up @@ -216,7 +221,8 @@ private void saslNegotiate(final Channel ch) {
failInit(ch, e);
return;
}
ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler);
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder());
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
saslPromise.addListener(new FutureListener<Boolean>() {

@Override
Expand Down Expand Up @@ -274,17 +280,29 @@ private void connect() throws UnknownHostException {
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
.remoteAddress(remoteAddr).connect().addListener(new ChannelFutureListener() {
.handler(new ChannelInitializer<Channel>() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel ch = future.channel();
if (!future.isSuccess()) {
failInit(ch, toIOE(future.cause()));
rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), future.cause());
return;
protected void initChannel(Channel ch) throws Exception {
if (conf.getBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false)) {
X509Util x509Util = new X509Util(conf);
SSLContext sslContext = x509Util.createSSLContextAndOptions().getSSLContext();
SSLEngine sslEngine = sslContext.createSSLEngine(remoteId.address.getHostName(),
remoteId.address.getPort());
sslEngine.setUseClientMode(true);
SslHandler sslHandler = new SslHandler(sslEngine);
sslHandler.setHandshakeTimeoutMillis(
conf.getInt(X509Util.HBASE_CLIENT_NETTY_TLS_HANDSHAKETIMEOUT,
X509Util.DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS));
ch.pipeline().addFirst(sslHandler);
}
ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
new BufferCallBeforeInitHandler());
}
}).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect()
.addListener(new ChannelFutureListener() {

private void succeed(Channel ch) throws IOException {
ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
if (useSasl) {
saslNegotiate(ch);
Expand All @@ -294,6 +312,32 @@ public void operationComplete(ChannelFuture future) throws Exception {
established(ch);
}
}

private void fail(Channel ch, Throwable error) {
failInit(ch, toIOE(error));
rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), error);
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel ch = future.channel();
if (!future.isSuccess()) {
fail(ch, future.cause());
return;
}
SslHandler sslHandler = ch.pipeline().get(SslHandler.class);
if (sslHandler != null) {
sslHandler.handshakeFuture().addListener(f -> {
if (f.isSuccess()) {
succeed(ch);
} else {
fail(ch, f.cause());
}
});
} else {
succeed(ch);
}
}
}).channel();
}

Expand Down
10 changes: 10 additions & 0 deletions hbase-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@
<artifactId>kerb-simplekdc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.exceptions;

import org.apache.yetus.audience.InterfaceAudience;

/**
* This file has been copied from the Apache ZooKeeper project.
* @see <a href=
* "https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Exception.java">Base
* revision</a>
*/
@SuppressWarnings("serial")
@InterfaceAudience.Private
public class X509Exception extends Exception {

public X509Exception(String message) {
super(message);
}

public X509Exception(Throwable cause) {
super(cause);
}

public X509Exception(String message, Throwable cause) {
super(message, cause);
}

public static class KeyManagerException extends X509Exception {

public KeyManagerException(String message) {
super(message);
}

public KeyManagerException(Throwable cause) {
super(cause);
}

}

public static class TrustManagerException extends X509Exception {

public TrustManagerException(String message) {
super(message);
}

public TrustManagerException(Throwable cause) {
super(cause);
}

}

public static class SSLContextException extends X509Exception {

public SSLContextException(String message) {
super(message);
}

public SSLContextException(Throwable cause) {
super(cause);
}

public SSLContextException(String message, Throwable cause) {
super(message, cause);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.crypto.tls;

import org.apache.yetus.audience.InterfaceAudience;

/**
* This enum represents the file type of a KeyStore or TrustStore. Currently, JKS (Java keystore),
* PEM, PKCS12, and BCFKS types are supported.
* <p/>
* This file has been copied from the Apache ZooKeeper project.
* @see <a href=
* "https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/main/java/org/apache/zookeeper/common/KeyStoreFileType.java">Base
* revision</a>
*/
@InterfaceAudience.Private
public enum KeyStoreFileType {
JKS(".jks"),
PEM(".pem"),
PKCS12(".p12"),
BCFKS(".bcfks");

private final String defaultFileExtension;

KeyStoreFileType(String defaultFileExtension) {
this.defaultFileExtension = defaultFileExtension;
}

/**
* The property string that specifies that a key store or trust store should use this store file
* type.
*/
public String getPropertyValue() {
return this.name();
}

/**
* The file extension that is associated with this file type.
*/
public String getDefaultFileExtension() {
return defaultFileExtension;
}

/**
* Converts a property value to a StoreFileType enum. If the property value is <code>null</code>
* or an empty string, returns <code>null</code>.
* @param propertyValue the property value.
* @return the KeyStoreFileType, or <code>null</code> if <code>propertyValue</code> is
* <code>null</code> or empty.
* @throws IllegalArgumentException if <code>propertyValue</code> is not one of "JKS", "PEM",
* "BCFKS", "PKCS12", or empty/null.
*/
public static KeyStoreFileType fromPropertyValue(String propertyValue) {
if (propertyValue == null || propertyValue.length() == 0) {
return null;
}
return KeyStoreFileType.valueOf(propertyValue.toUpperCase());
}

/**
* Detects the type of KeyStore / TrustStore file from the file extension. If the file name ends
* with ".jks", returns <code>StoreFileType.JKS</code>. If the file name ends with ".pem", returns
* <code>StoreFileType.PEM</code>. If the file name ends with ".p12", returns
* <code>StoreFileType.PKCS12</code>. If the file name ends with ".bckfs", returns
* <code>StoreFileType.BCKFS</code>. Otherwise, throws an IllegalArgumentException.
* @param filename the filename of the key store or trust store file.
* @return a KeyStoreFileType.
* @throws IllegalArgumentException if the filename does not end with ".jks", ".pem", "p12" or
* "bcfks".
*/
public static KeyStoreFileType fromFilename(String filename) {
int i = filename.lastIndexOf('.');
if (i >= 0) {
String extension = filename.substring(i);
for (KeyStoreFileType storeFileType : KeyStoreFileType.values()) {
if (storeFileType.getDefaultFileExtension().equals(extension)) {
return storeFileType;
}
}
}
throw new IllegalArgumentException(
"Unable to auto-detect store file type from file name: " + filename);
}

/**
* If <code>propertyValue</code> is not null or empty, returns the result of
* <code>KeyStoreFileType.fromPropertyValue(propertyValue)</code>. Else, returns the result of
* <code>KeyStoreFileType.fromFileName(filename)</code>.
* @param propertyValue property value describing the KeyStoreFileType, or null/empty to
* auto-detect the type from the file name.
* @param filename file name of the key store file. The file extension is used to auto-detect
* the KeyStoreFileType when <code>propertyValue</code> is null or empty.
* @return a KeyStoreFileType.
* @throws IllegalArgumentException if <code>propertyValue</code> is not one of "JKS", "PEM",
* "PKCS12", "BCFKS", or empty/null.
* @throws IllegalArgumentException if <code>propertyValue</code>is empty or null and the type
* could not be determined from the file name.
*/
public static KeyStoreFileType fromPropertyValueOrFileName(String propertyValue,
String filename) {
KeyStoreFileType result = KeyStoreFileType.fromPropertyValue(propertyValue);
if (result == null) {
result = KeyStoreFileType.fromFilename(filename);
}
return result;
}
}
Loading

0 comments on commit 5bbb300

Please sign in to comment.