diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index ba387d2bf320..606749315f41 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.exceptions.X509Exception; +import org.apache.hadoop.hbase.io.FileChangeWatcher; import org.apache.hadoop.hbase.io.crypto.tls.X509Util; import org.apache.hadoop.hbase.util.NettyFutureUtils; import org.apache.hadoop.hbase.util.Pair; @@ -50,6 +51,8 @@ public class NettyRpcClient extends AbstractRpcClient { private final boolean shutdownGroupWhenClose; private final AtomicReference sslContextForClient = new AtomicReference<>(); + private final AtomicReference keyStoreWatcher = new AtomicReference<>(); + private final AtomicReference trustStoreWatcher = new AtomicReference<>(); public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, MetricsConnection metrics) { @@ -86,6 +89,14 @@ protected void closeInternal() { if (shutdownGroupWhenClose) { NettyFutureUtils.consume(group.shutdownGracefully()); } + FileChangeWatcher ks = keyStoreWatcher.getAndSet(null); + if (ks != null) { + ks.stop(); + } + FileChangeWatcher ts = trustStoreWatcher.getAndSet(null); + if (ts != null) { + ts.stop(); + } } SslContext getSslContext() throws X509Exception, IOException { @@ -95,6 +106,12 @@ SslContext getSslContext() throws X509Exception, IOException { if (!sslContextForClient.compareAndSet(null, result)) { // lost the race, another thread already set the value result = sslContextForClient.get(); + } else if ( + keyStoreWatcher.get() == null && trustStoreWatcher.get() == null + && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false) + ) { + X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher, + () -> sslContextForClient.set(null)); } } return result; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java new file mode 100644 index 000000000000..77e0e4e750ce --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.function.Consumer; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.server.ZooKeeperThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Instances of this class can be used to watch a directory for file changes. When a file is added + * to, deleted from, or is modified in the given directory, the callback provided by the user will + * be called from a background thread. Some things to keep in mind: + *
    + *
  • The callback should be thread-safe.
  • + *
  • Changes that happen around the time the thread is started may be missed.
  • + *
  • There is a delay between a file changing and the callback firing.
  • + *
  • The watch is not recursive - changes to subdirectories will not trigger a callback.
  • + *
+ *

+ * This file has been copied from the Apache ZooKeeper project. + * @see Base + * revision + */ +@InterfaceAudience.Private +public final class FileChangeWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class); + + public enum State { + NEW, // object created but start() not called yet + STARTING, // start() called but background thread has not entered main loop + RUNNING, // background thread is running + STOPPING, // stop() called but background thread has not exited main loop + STOPPED // stop() called and background thread has exited, or background thread crashed + } + + private final WatcherThread watcherThread; + private State state; // protected by synchronized(this) + + /** + * Creates a watcher that watches dirPath and invokes callback on + * changes. + * @param dirPath the directory to watch. + * @param callback the callback to invoke with events. event.kind() will return the + * type of event, and event.context() will return the filename + * relative to dirPath. + * @throws IOException if there is an error creating the WatchService. + */ + public FileChangeWatcher(Path dirPath, Consumer> callback) throws IOException { + FileSystem fs = dirPath.getFileSystem(); + WatchService watchService = fs.newWatchService(); + + LOG.debug("Registering with watch service: {}", dirPath); + + dirPath.register(watchService, + new WatchEvent.Kind[] { StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.OVERFLOW }); + state = State.NEW; + this.watcherThread = new WatcherThread(watchService, callback); + this.watcherThread.setDaemon(true); + } + + /** + * Returns the current {@link FileChangeWatcher.State}. + * @return the current state. + */ + public synchronized State getState() { + return state; + } + + /** + * Blocks until the current state becomes desiredState. Currently only used by tests, + * thus package-private. + * @param desiredState the desired state. + * @throws InterruptedException if the current thread gets interrupted. + */ + synchronized void waitForState(State desiredState) throws InterruptedException { + while (this.state != desiredState) { + this.wait(); + } + } + + /** + * Sets the state to newState. + * @param newState the new state. + */ + private synchronized void setState(State newState) { + state = newState; + this.notifyAll(); + } + + /** + * Atomically sets the state to update if and only if the state is currently + * expected. + * @param expected the expected state. + * @param update the new state. + * @return true if the update succeeds, or false if the current state does not equal + * expected. + */ + private synchronized boolean compareAndSetState(State expected, State update) { + if (state == expected) { + setState(update); + return true; + } else { + return false; + } + } + + /** + * Atomically sets the state to update if and only if the state is currently one of + * expectedStates. + * @param expectedStates the expected states. + * @param update the new state. + * @return true if the update succeeds, or false if the current state does not equal any of the + * expectedStates. + */ + private synchronized boolean compareAndSetState(State[] expectedStates, State update) { + for (State expected : expectedStates) { + if (state == expected) { + setState(update); + return true; + } + } + return false; + } + + /** + * Tells the background thread to start. Does not wait for it to be running. Calling this method + * more than once has no effect. + */ + public void start() { + if (!compareAndSetState(State.NEW, State.STARTING)) { + // If previous state was not NEW, start() has already been called. + return; + } + this.watcherThread.start(); + } + + /** + * Tells the background thread to stop. Does not wait for it to exit. + */ + public void stop() { + if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) { + watcherThread.interrupt(); + } + } + + /** + * Inner class that implements the watcher thread logic. + */ + private class WatcherThread extends ZooKeeperThread { + + private static final String THREAD_NAME = "FileChangeWatcher"; + + final WatchService watchService; + final Consumer> callback; + + WatcherThread(WatchService watchService, Consumer> callback) { + super(THREAD_NAME); + this.watchService = watchService; + this.callback = callback; + } + + @Override + public void run() { + try { + LOG.info("{} thread started", getName()); + if ( + !compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING) + ) { + // stop() called shortly after start(), before + // this thread started running. + FileChangeWatcher.State state = FileChangeWatcher.this.getState(); + if (state != FileChangeWatcher.State.STOPPING) { + throw new IllegalStateException("Unexpected state: " + state); + } + return; + } + runLoop(); + } catch (Exception e) { + LOG.warn("Error in runLoop()", e); + throw e; + } finally { + try { + watchService.close(); + } catch (IOException e) { + LOG.warn("Error closing watch service", e); + } + LOG.info("{} thread finished", getName()); + FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED); + } + } + + private void runLoop() { + while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) { + WatchKey key; + try { + key = watchService.take(); + } catch (InterruptedException | ClosedWatchServiceException e) { + LOG.debug("{} was interrupted and is shutting down...", getName()); + break; + } + for (WatchEvent event : key.pollEvents()) { + LOG.debug("Got file changed event: {} with context: {}", event.kind(), event.context()); + try { + callback.accept(event); + } catch (Throwable e) { + LOG.error("Error from callback", e); + } + } + boolean isKeyValid = key.reset(); + if (!isKeyValid) { + // This is likely a problem, it means that file reloading is broken, probably because the + // directory we are watching was deleted or otherwise became inaccessible (unmounted, + // permissions + // changed, ???). + // For now, we log an error and exit the watcher thread. + LOG.error("Watch key no longer valid, maybe the directory is inaccessible?"); + break; + } + } + } + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java index 96aa66364bec..00a59acf41ab 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hbase.io.crypto.tls; import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; import java.security.GeneralSecurityException; import java.security.KeyStore; import java.security.Security; @@ -25,6 +29,7 @@ import java.security.cert.X509CertSelector; import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.CertPathTrustManagerParameters; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; @@ -38,6 +43,7 @@ import org.apache.hadoop.hbase.exceptions.SSLContextException; import org.apache.hadoop.hbase.exceptions.TrustManagerException; import org.apache.hadoop.hbase.exceptions.X509Exception; +import org.apache.hadoop.hbase.io.FileChangeWatcher; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +85,7 @@ public final class X509Util { CONFIG_PREFIX + "host-verification.reverse-dns.enabled"; private static final String TLS_ENABLED_PROTOCOLS = CONFIG_PREFIX + "enabledProtocols"; private static final String TLS_CIPHER_SUITES = CONFIG_PREFIX + "ciphersuites"; + public static final String TLS_CERT_RELOAD = CONFIG_PREFIX + "certReload"; public static final String DEFAULT_PROTOCOL = "TLSv1.2"; // @@ -244,7 +251,6 @@ public static SslContext createSslContextForServer(Configuration config) } SslContextBuilder sslContextBuilder; - sslContextBuilder = SslContextBuilder .forServer(createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType)); @@ -395,4 +401,74 @@ private static String[] getCipherSuites(Configuration config) { return cipherSuitesInput.split(","); } } + + /** + * Enable certificate file reloading by creating FileWatchers for keystore and truststore. + * AtomicReferences will be set with the new instances. resetContext - if not null - will be + * called when the file has been modified. + * @param keystoreWatcher Reference to keystoreFileWatcher. + * @param trustStoreWatcher Reference to truststoreFileWatcher. + * @param resetContext Callback for file changes. + */ + public static void enableCertFileReloading(Configuration config, + AtomicReference keystoreWatcher, + AtomicReference trustStoreWatcher, Runnable resetContext) + throws IOException { + String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, ""); + keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext)); + String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, ""); + trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext)); + } + + private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runnable resetContext) + throws IOException { + if (fileLocation == null || fileLocation.isEmpty() || resetContext == null) { + return null; + } + final Path filePath = Paths.get(fileLocation).toAbsolutePath(); + Path parentPath = filePath.getParent(); + if (parentPath == null) { + throw new IOException("Key/trust store path does not have a parent: " + filePath); + } + FileChangeWatcher fileChangeWatcher = new FileChangeWatcher(parentPath, watchEvent -> { + handleWatchEvent(filePath, watchEvent, resetContext); + }); + fileChangeWatcher.start(); + return fileChangeWatcher; + } + + /** + * Handler for watch events that let us know a file we may care about has changed on disk. + * @param filePath the path to the file we are watching for changes. + * @param event the WatchEvent. + */ + private static void handleWatchEvent(Path filePath, WatchEvent event, Runnable resetContext) { + boolean shouldResetContext = false; + Path dirPath = filePath.getParent(); + if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) { + // If we get notified about possibly missed events, reload the key store / trust store just to + // be sure. + shouldResetContext = true; + } else if ( + event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY) + || event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE) + ) { + Path eventFilePath = dirPath.resolve((Path) event.context()); + if (filePath.equals(eventFilePath)) { + shouldResetContext = true; + } + } + // Note: we don't care about delete events + if (shouldResetContext) { + LOG.info( + "Attempting to reset default SSL context after receiving watch event: {} with context: {}", + event.kind(), event.context()); + resetContext.run(); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring watch event and keeping previous default SSL context. " + + "Event kind: {} with context: {}", event.kind(), event.context()); + } + } + } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java new file mode 100644 index 000000000000..0d94a550d792 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This file has been copied from the Apache ZooKeeper project. + * @see Base + * revision + */ +@Category({ IOTests.class, SmallTests.class }) +public class TestFileChangeWatcher { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFileChangeWatcher.class); + + private static File tempDir; + private static File tempFile; + + private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class); + private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + private static final long FS_TIMEOUT = 30000L; + + @BeforeClass + public static void createTempFile() throws IOException { + tempDir = new File(UTIL.getDataTestDir(TestFileChangeWatcher.class.getSimpleName()).toString()) + .getCanonicalFile(); + FileUtils.forceMkdir(tempDir); + tempFile = File.createTempFile("zk_test_", "", tempDir); + } + + @AfterClass + public static void cleanupTempDir() { + UTIL.cleanupTestDir(); + } + + @Test + public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: {} {}", event.kind(), event.context()); + // Filter out the extra ENTRY_CREATE events that are + // sometimes seen at the start. Even though we create the watcher + // after the file exists, sometimes we still get a create event. + if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { + return; + } + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // TODO hack + for (int i = 0; i < 3; i++) { + LOG.info("Modifying file, attempt {}", (i + 1)); + FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8, + true); + synchronized (events) { + if (events.size() < i + 1) { + events.wait(FS_TIMEOUT); + } + assertEquals("Wrong number of events", i + 1, events.size()); + WatchEvent event = events.get(i); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test + public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: {} {}", event.kind(), event.context()); + // Filter out the extra ENTRY_CREATE events that are + // sometimes seen at the start. Even though we create the watcher + // after the file exists, sometimes we still get a create event. + if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { + return; + } + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // TODO hack + LOG.info("Touching file"); + FileUtils.touch(tempFile); + synchronized (events) { + if (events.isEmpty()) { + events.wait(FS_TIMEOUT); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test + public void testCallbackWorksOnFileAdded() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: {} {}", event.kind(), event.context()); + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // TODO hack + File tempFile2 = File.createTempFile("zk_test_", "", tempDir); + tempFile2.deleteOnExit(); + synchronized (events) { + if (events.isEmpty()) { + events.wait(FS_TIMEOUT); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_CREATE, event.kind()); + assertEquals(tempFile2.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test + public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: {} {}", event.kind(), event.context()); + // Filter out the extra ENTRY_CREATE events that are + // sometimes seen at the start. Even though we create the watcher + // after the file exists, sometimes we still get a create event. + if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { + return; + } + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // TODO hack + tempFile.delete(); + synchronized (events) { + if (events.isEmpty()) { + events.wait(FS_TIMEOUT); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_DELETE, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test + public void testCallbackErrorDoesNotCrashWatcherThread() + throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final AtomicInteger callCount = new AtomicInteger(0); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: {} {}", event.kind(), event.context()); + int oldValue; + synchronized (callCount) { + oldValue = callCount.getAndIncrement(); + callCount.notifyAll(); + } + if (oldValue == 0) { + throw new RuntimeException("This error should not crash the watcher thread"); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // TODO hack + LOG.info("Modifying file"); + FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true); + synchronized (callCount) { + while (callCount.get() == 0) { + callCount.wait(FS_TIMEOUT); + } + } + LOG.info("Modifying file again"); + FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true); + synchronized (callCount) { + if (callCount.get() == 1) { + callCount.wait(FS_TIMEOUT); + } + } + // The value of callCount can exceed 1 only if the callback thread + // survives the exception thrown by the first callback. + assertTrue(callCount.get() > 1); + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java index 583e7efcfa99..dd2975070f29 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java @@ -57,16 +57,16 @@ public final class X509TestContext { private final File tempDir; private final Configuration conf; - private final X509Certificate trustStoreCertificate; + private X509Certificate trustStoreCertificate; private final char[] trustStorePassword; - private final KeyPair trustStoreKeyPair; + private KeyPair trustStoreKeyPair; private File trustStoreJksFile; private File trustStorePemFile; private File trustStorePkcs12File; private File trustStoreBcfksFile; - private final KeyPair keyStoreKeyPair; - private final X509Certificate keyStoreCertificate; + private KeyPair keyStoreKeyPair; + private X509Certificate keyStoreCertificate; private final char[] keyStorePassword; private File keyStoreJksFile; private File keyStorePemFile; @@ -101,16 +101,8 @@ private X509TestContext(Configuration conf, File tempDir, KeyPair trustStoreKeyP this.keyStoreKeyPair = requireNonNull(keyStoreKeyPair); this.keyStorePassword = requireNonNull(keyStorePassword); - X500NameBuilder caNameBuilder = new X500NameBuilder(BCStyle.INSTANCE); - caNameBuilder.addRDN(BCStyle.CN, - MethodHandles.lookup().lookupClass().getCanonicalName() + " Root CA"); - trustStoreCertificate = - X509TestHelpers.newSelfSignedCACert(caNameBuilder.build(), trustStoreKeyPair); + createCertificates(); - X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE); - nameBuilder.addRDN(BCStyle.CN, - MethodHandles.lookup().lookupClass().getCanonicalName() + " Zookeeper Test"); - keyStoreCertificate = newCert(nameBuilder.build()); trustStorePkcs12File = null; trustStorePemFile = null; trustStoreJksFile = null; @@ -197,74 +189,85 @@ public File getTrustStoreFile(KeyStoreFileType storeFileType) throws IOException private File getTrustStoreJksFile() throws IOException { if (trustStoreJksFile == null) { - File trustStoreJksFile = File.createTempFile(TRUST_STORE_PREFIX, + trustStoreJksFile = File.createTempFile(TRUST_STORE_PREFIX, KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir); trustStoreJksFile.deleteOnExit(); - try ( - final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) { - byte[] bytes = - X509TestHelpers.certToJavaTrustStoreBytes(trustStoreCertificate, trustStorePassword); - trustStoreOutputStream.write(bytes); - trustStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.trustStoreJksFile = trustStoreJksFile; + generateTrustStoreJksFile(); } return trustStoreJksFile; } + private void generateTrustStoreJksFile() throws IOException { + try (final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) { + byte[] bytes = + X509TestHelpers.certToJavaTrustStoreBytes(trustStoreCertificate, trustStorePassword); + trustStoreOutputStream.write(bytes); + trustStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + private File getTrustStorePemFile() throws IOException { if (trustStorePemFile == null) { - File trustStorePemFile = File.createTempFile(TRUST_STORE_PREFIX, + trustStorePemFile = File.createTempFile(TRUST_STORE_PREFIX, KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir); trustStorePemFile.deleteOnExit(); - FileUtils.writeStringToFile(trustStorePemFile, - X509TestHelpers.pemEncodeX509Certificate(trustStoreCertificate), StandardCharsets.US_ASCII, - false); - this.trustStorePemFile = trustStorePemFile; + generateTrustStorePemFile(); } return trustStorePemFile; } + private void generateTrustStorePemFile() throws IOException { + FileUtils.writeStringToFile(trustStorePemFile, + X509TestHelpers.pemEncodeX509Certificate(trustStoreCertificate), StandardCharsets.US_ASCII, + false); + } + private File getTrustStorePkcs12File() throws IOException { if (trustStorePkcs12File == null) { - File trustStorePkcs12File = File.createTempFile(TRUST_STORE_PREFIX, + trustStorePkcs12File = File.createTempFile(TRUST_STORE_PREFIX, KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir); trustStorePkcs12File.deleteOnExit(); - try (final FileOutputStream trustStoreOutputStream = - new FileOutputStream(trustStorePkcs12File)) { - byte[] bytes = - X509TestHelpers.certToPKCS12TrustStoreBytes(trustStoreCertificate, trustStorePassword); - trustStoreOutputStream.write(bytes); - trustStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.trustStorePkcs12File = trustStorePkcs12File; + generateTrustStorePkcs12File(); } return trustStorePkcs12File; } + private void generateTrustStorePkcs12File() throws IOException { + try ( + final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStorePkcs12File)) { + byte[] bytes = + X509TestHelpers.certToPKCS12TrustStoreBytes(trustStoreCertificate, trustStorePassword); + trustStoreOutputStream.write(bytes); + trustStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + private File getTrustStoreBcfksFile() throws IOException { if (trustStoreBcfksFile == null) { - File trustStoreBcfksFile = File.createTempFile(TRUST_STORE_PREFIX, + trustStoreBcfksFile = File.createTempFile(TRUST_STORE_PREFIX, KeyStoreFileType.BCFKS.getDefaultFileExtension(), tempDir); trustStoreBcfksFile.deleteOnExit(); - try ( - final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreBcfksFile)) { - byte[] bytes = - X509TestHelpers.certToBCFKSTrustStoreBytes(trustStoreCertificate, trustStorePassword); - trustStoreOutputStream.write(bytes); - trustStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.trustStoreBcfksFile = trustStoreBcfksFile; + generateTrustStoreBcfksFile(); } return trustStoreBcfksFile; } + private void generateTrustStoreBcfksFile() throws IOException { + try ( + final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreBcfksFile)) { + byte[] bytes = + X509TestHelpers.certToBCFKSTrustStoreBytes(trustStoreCertificate, trustStorePassword); + trustStoreOutputStream.write(bytes); + trustStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + public X509Certificate getKeyStoreCertificate() { return keyStoreCertificate; } @@ -307,33 +310,32 @@ public File getKeyStoreFile(KeyStoreFileType storeFileType) throws IOException { private File getKeyStoreJksFile() throws IOException { if (keyStoreJksFile == null) { - File keyStoreJksFile = File.createTempFile(KEY_STORE_PREFIX, + keyStoreJksFile = File.createTempFile(KEY_STORE_PREFIX, KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir); keyStoreJksFile.deleteOnExit(); - try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreJksFile)) { - byte[] bytes = X509TestHelpers.certAndPrivateKeyToJavaKeyStoreBytes(keyStoreCertificate, - keyStoreKeyPair.getPrivate(), keyStorePassword); - keyStoreOutputStream.write(bytes); - keyStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.keyStoreJksFile = keyStoreJksFile; + generateKeyStoreJksFile(); } return keyStoreJksFile; } + private void generateKeyStoreJksFile() throws IOException { + try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreJksFile)) { + byte[] bytes = X509TestHelpers.certAndPrivateKeyToJavaKeyStoreBytes(keyStoreCertificate, + keyStoreKeyPair.getPrivate(), keyStorePassword); + keyStoreOutputStream.write(bytes); + keyStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + private File getKeyStorePemFile() throws IOException { if (keyStorePemFile == null) { try { - File keyStorePemFile = File.createTempFile(KEY_STORE_PREFIX, + keyStorePemFile = File.createTempFile(KEY_STORE_PREFIX, KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir); keyStorePemFile.deleteOnExit(); - FileUtils.writeStringToFile(keyStorePemFile, - X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate, - keyStoreKeyPair.getPrivate(), keyStorePassword), - StandardCharsets.US_ASCII, false); - this.keyStorePemFile = keyStorePemFile; + generateKeyStorePemFile(); } catch (OperatorCreationException e) { throw new IOException(e); } @@ -341,42 +343,55 @@ private File getKeyStorePemFile() throws IOException { return keyStorePemFile; } + private void generateKeyStorePemFile() throws IOException, OperatorCreationException { + FileUtils.writeStringToFile(keyStorePemFile, + X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate, keyStoreKeyPair.getPrivate(), + keyStorePassword), + StandardCharsets.US_ASCII, false); + } + private File getKeyStorePkcs12File() throws IOException { if (keyStorePkcs12File == null) { - File keyStorePkcs12File = File.createTempFile(KEY_STORE_PREFIX, + keyStorePkcs12File = File.createTempFile(KEY_STORE_PREFIX, KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir); keyStorePkcs12File.deleteOnExit(); - try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStorePkcs12File)) { - byte[] bytes = X509TestHelpers.certAndPrivateKeyToPKCS12Bytes(keyStoreCertificate, - keyStoreKeyPair.getPrivate(), keyStorePassword); - keyStoreOutputStream.write(bytes); - keyStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.keyStorePkcs12File = keyStorePkcs12File; + generateKeyStorePkcs12File(); } return keyStorePkcs12File; } + private void generateKeyStorePkcs12File() throws IOException { + try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStorePkcs12File)) { + byte[] bytes = X509TestHelpers.certAndPrivateKeyToPKCS12Bytes(keyStoreCertificate, + keyStoreKeyPair.getPrivate(), keyStorePassword); + keyStoreOutputStream.write(bytes); + keyStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + private File getKeyStoreBcfksFile() throws IOException { if (keyStoreBcfksFile == null) { - File keyStoreBcfksFile = File.createTempFile(KEY_STORE_PREFIX, + keyStoreBcfksFile = File.createTempFile(KEY_STORE_PREFIX, KeyStoreFileType.BCFKS.getDefaultFileExtension(), tempDir); keyStoreBcfksFile.deleteOnExit(); - try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreBcfksFile)) { - byte[] bytes = X509TestHelpers.certAndPrivateKeyToBCFKSBytes(keyStoreCertificate, - keyStoreKeyPair.getPrivate(), keyStorePassword); - keyStoreOutputStream.write(bytes); - keyStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.keyStoreBcfksFile = keyStoreBcfksFile; + generateKeyStoreBcfksFile(); } return keyStoreBcfksFile; } + private void generateKeyStoreBcfksFile() throws IOException { + try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreBcfksFile)) { + byte[] bytes = X509TestHelpers.certAndPrivateKeyToBCFKSBytes(keyStoreCertificate, + keyStoreKeyPair.getPrivate(), keyStorePassword); + keyStoreOutputStream.write(bytes); + keyStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + /** * Sets the SSL system properties such that the given X509Util object can be used to create SSL * Contexts that will use the trust store and key store files created by this test context. @@ -445,6 +460,59 @@ public X509TestContext cloneWithNewKeystoreCert(X509Certificate cert) { keyStoreKeyPair, keyStorePassword, cert); } + public void regenerateStores(X509KeyType keyStoreKeyType, X509KeyType trustStoreKeyType, + KeyStoreFileType keyStoreFileType, KeyStoreFileType trustStoreFileType) + throws GeneralSecurityException, IOException, OperatorCreationException { + + trustStoreKeyPair = X509TestHelpers.generateKeyPair(trustStoreKeyType); + keyStoreKeyPair = X509TestHelpers.generateKeyPair(keyStoreKeyType); + createCertificates(); + + switch (keyStoreFileType) { + case JKS: + generateKeyStoreJksFile(); + break; + case PEM: + generateKeyStorePemFile(); + break; + case BCFKS: + generateKeyStoreBcfksFile(); + break; + case PKCS12: + generateKeyStorePkcs12File(); + break; + } + + switch (trustStoreFileType) { + case JKS: + generateTrustStoreJksFile(); + break; + case PEM: + generateTrustStorePemFile(); + break; + case PKCS12: + generateTrustStorePkcs12File(); + break; + case BCFKS: + generateTrustStoreBcfksFile(); + break; + } + } + + private void createCertificates() + throws GeneralSecurityException, IOException, OperatorCreationException { + X500NameBuilder caNameBuilder = new X500NameBuilder(BCStyle.INSTANCE); + caNameBuilder.addRDN(BCStyle.CN, + MethodHandles.lookup().lookupClass().getCanonicalName() + " Root CA"); + trustStoreCertificate = + X509TestHelpers.newSelfSignedCACert(caNameBuilder.build(), trustStoreKeyPair); + + X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE); + nameBuilder.addRDN(BCStyle.CN, + MethodHandles.lookup().lookupClass().getCanonicalName() + " Zookeeper Test"); + keyStoreCertificate = newCert(nameBuilder.build()); + } + /** * Builder class, used for creating new instances of X509TestContext. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index de39ace5d2d7..f3ead471fe61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -27,11 +27,13 @@ import java.net.SocketAddress; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.exceptions.X509Exception; +import org.apache.hadoop.hbase.io.FileChangeWatcher; import org.apache.hadoop.hbase.io.crypto.tls.X509Util; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -95,6 +97,9 @@ public class NettyRpcServer extends RpcServer { private final Channel serverChannel; final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); private final ByteBufAllocator channelAllocator; + private final AtomicReference sslContextForServer = new AtomicReference<>(); + private final AtomicReference keyStoreWatcher = new AtomicReference<>(); + private final AtomicReference trustStoreWatcher = new AtomicReference<>(); public NettyRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, @@ -202,6 +207,14 @@ public synchronized void stop() { return; } LOG.info("Stopping server on " + this.serverChannel.localAddress()); + FileChangeWatcher ks = keyStoreWatcher.getAndSet(null); + if (ks != null) { + ks.stop(); + } + FileChangeWatcher ts = trustStoreWatcher.getAndSet(null); + if (ts != null) { + ts.stop(); + } if (authTokenSecretMgr != null) { authTokenSecretMgr.stop(); authTokenSecretMgr = null; @@ -251,7 +264,7 @@ public Pair call(BlockingService service, MethodDescriptor private void initSSL(ChannelPipeline p, boolean supportPlaintext) throws X509Exception, IOException { - SslContext nettySslContext = X509Util.createSslContextForServer(conf); + SslContext nettySslContext = getSslContext(); if (supportPlaintext) { p.addLast("ssl", new OptionalSslHandler(nettySslContext)); @@ -285,4 +298,22 @@ private void initSSL(ChannelPipeline p, boolean supportPlaintext) LOG.debug("SSL handler added for channel: {}", p.channel()); } } + + SslContext getSslContext() throws X509Exception, IOException { + SslContext result = sslContextForServer.get(); + if (result == null) { + result = X509Util.createSslContextForServer(conf); + if (!sslContextForServer.compareAndSet(null, result)) { + // lost the race, another thread already set the value + result = sslContextForServer.get(); + } else if ( + keyStoreWatcher.get() == null && trustStoreWatcher.get() == null + && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false) + ) { + X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher, + () -> sslContextForServer.set(null)); + } + } + return result; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java new file mode 100644 index 000000000000..34c812a3bb90 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.GeneralSecurityException; +import java.security.Security; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType; +import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType; +import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext; +import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider; +import org.apache.hadoop.hbase.io.crypto.tls.X509Util; +import org.apache.hadoop.hbase.ipc.AbstractRpcClient; +import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; +import org.apache.hadoop.hbase.ipc.NettyRpcClient; +import org.apache.hadoop.hbase.ipc.NettyRpcServer; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.operator.OperatorCreationException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos; + +@RunWith(Parameterized.class) +@Category({ RPCTests.class, MediumTests.class }) +public class TestNettyTLSIPCFileWatcher { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestNettyTLSIPCFileWatcher.class); + + private static final Configuration CONF = HBaseConfiguration.create(); + private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(CONF); + private static HRegionServer SERVER; + private static X509TestContextProvider PROVIDER; + private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG; + + private X509TestContext x509TestContext; + + @Parameterized.Parameter(0) + public X509KeyType keyType; + + @Parameterized.Parameter(1) + public KeyStoreFileType storeFileType; + + @Parameterized.Parameters(name = "{index}: keyType={0}, storeFileType={1}") + public static List data() { + List params = new ArrayList<>(); + for (X509KeyType caKeyType : X509KeyType.values()) { + for (KeyStoreFileType ks : KeyStoreFileType.values()) { + params.add(new Object[] { caKeyType, ks }); + } + } + return params; + } + + @BeforeClass + public static void setUpBeforeClass() throws IOException { + Security.addProvider(new BouncyCastleProvider()); + File dir = new File(UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString()) + .getCanonicalFile(); + FileUtils.forceMkdir(dir); + // server must enable tls + CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true); + PROVIDER = new X509TestContextProvider(CONF, dir); + EVENT_LOOP_GROUP_CONFIG = + new NettyEventLoopGroupConfig(CONF, TestNettyTLSIPCFileWatcher.class.getSimpleName()); + SERVER = mock(HRegionServer.class); + when(SERVER.getEventLoopGroupConfig()).thenReturn(EVENT_LOOP_GROUP_CONFIG); + } + + @AfterClass + public static void tearDownAfterClass() throws InterruptedException { + Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME); + EVENT_LOOP_GROUP_CONFIG.group().shutdownGracefully().sync(); + UTIL.cleanupTestDir(); + } + + @Before + public void setUp() throws IOException { + x509TestContext = PROVIDER.get(keyType, keyType, "keyPa$$word".toCharArray()); + x509TestContext.setConfigurations(storeFileType, storeFileType); + CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false); + CONF.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, true); + CONF.setBoolean(X509Util.TLS_CERT_RELOAD, true); + } + + @After + public void tearDown() { + x509TestContext.clearConfigurations(); + x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP); + x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR); + x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL); + System.clearProperty("com.sun.net.ssl.checkRevocation"); + System.clearProperty("com.sun.security.enableCRLDP"); + Security.setProperty("ocsp.enable", Boolean.FALSE.toString()); + Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString()); + } + + @Test + public void testReplaceServerKeystore() + throws IOException, ServiceException, GeneralSecurityException, OperatorCreationException { + Configuration clientConf = new Configuration(CONF); + RpcServer rpcServer = createRpcServer("testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); + + try { + rpcServer.start(); + + try (AbstractRpcClient client = new NettyRpcClient(clientConf)) { + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + newBlockingStub(client, rpcServer.getListenerAddress()); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + String message = "hello"; + assertEquals(message, + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) + .getMessage()); + assertNull(pcrc.cellScanner()); + } + + // Replace keystore + x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType); + + try (AbstractRpcClient client = new NettyRpcClient(clientConf)) { + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + newBlockingStub(client, rpcServer.getListenerAddress()); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + String message = "hello"; + assertEquals(message, + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) + .getMessage()); + assertNull(pcrc.cellScanner()); + } + + } finally { + rpcServer.stop(); + } + } + + @Test + public void testReplaceClientAndServerKeystore() + throws GeneralSecurityException, IOException, OperatorCreationException, ServiceException { + Configuration clientConf = new Configuration(CONF); + RpcServer rpcServer = createRpcServer("testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); + + try { + rpcServer.start(); + + try (AbstractRpcClient client = new NettyRpcClient(clientConf)) { + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + newBlockingStub(client, rpcServer.getListenerAddress()); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + String message = "hello"; + assertEquals(message, + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) + .getMessage()); + assertNull(pcrc.cellScanner()); + + // Replace keystore and cancel client connections + x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType); + client.cancelConnections( + ServerName.valueOf(Address.fromSocketAddress(rpcServer.getListenerAddress()), 0L)); + + assertEquals(message, + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) + .getMessage()); + assertNull(pcrc.cellScanner()); + } + } finally { + rpcServer.stop(); + } + } + + private RpcServer createRpcServer(String name, + List services, InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { + return new NettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler, true); + } +}