diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml index 7477d5358c9b..9a30ae406d09 100644 --- a/hbase-common/pom.xml +++ b/hbase-common/pom.xml @@ -130,6 +130,11 @@ junit-vintage-engine test + + org.awaitility + awaitility + test + com.github.stephenc.findbugs findbugs-annotations diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java index 3ac4076766b8..fbcdbf41f816 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -173,8 +174,10 @@ public void stop() { } } - String getWatcherThreadName() { - return watcherThread.getName(); + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + Thread getWatcherThread() { + return watcherThread; } private static void handleException(Thread thread, Throwable e) { diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java index 106c077e22d8..358ea172e143 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hbase.io; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.endsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import java.io.File; import java.io.IOException; @@ -30,22 +32,23 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseCommonTestingUtil; import org.apache.hadoop.hbase.io.crypto.tls.X509Util; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.hamcrest.Matchers; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,31 +58,51 @@ * "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base * revision */ -@Category({ IOTests.class, SmallTests.class }) +@Tag(IOTests.TAG) +@Tag(SmallTests.TAG) public class TestFileChangeWatcher { - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestFileChangeWatcher.class); - - private static File tempFile; - private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class); private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); - private static final long FS_TIMEOUT = 30000L; + private static File dir; + private static final Duration POLL_INTERVAL = Duration.ofMillis(100); - @BeforeClass - public static void createTempFile() throws IOException { - tempFile = File.createTempFile("zk_test_", ""); + private File tempFile; + + private FileChangeWatcher watcher; + + @BeforeAll + public static void setUpBeforeAll() throws IOException { + dir = new File(UTIL.getDataTestDir().toString()).getAbsoluteFile(); + if (!dir.mkdirs()) { + throw new IOException("can not mkdir " + dir); + } } - @AfterClass + @AfterAll public static void cleanupTempDir() { UTIL.cleanupTestDir(); } + @BeforeEach + public void setUp(TestInfo testInfo) throws IOException { + tempFile = new File(dir, "file_change_test_" + testInfo.getDisplayName()); + if (!tempFile.createNewFile()) { + throw new IOException("failed to create new empty file " + tempFile); + } + } + + @AfterEach + public void tearDown() throws InterruptedException { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + watcher = null; + } + } + @Test public void testEnableCertFileReloading() throws IOException { Configuration myConf = new Configuration(); @@ -91,7 +114,7 @@ public void testEnableCertFileReloading() throws IOException { X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> { }); assertNotNull(keystoreWatcher.get()); - assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("foo.jks")); + assertThat(keystoreWatcher.get().getWatcherThread().getName(), endsWith("foo.jks")); assertNull(truststoreWatcher.get()); keystoreWatcher.getAndSet(null).stop(); @@ -103,149 +126,97 @@ public void testEnableCertFileReloading() throws IOException { }); assertNotNull(keystoreWatcher.get()); - assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("foo.jks")); + assertThat(keystoreWatcher.get().getWatcherThread().getName(), endsWith("foo.jks")); assertNotNull(truststoreWatcher.get()); - assertThat(truststoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("bar.jks")); + assertThat(truststoreWatcher.get().getWatcherThread().getName(), endsWith("bar.jks")); keystoreWatcher.getAndSet(null).stop(); truststoreWatcher.getAndSet(null).stop(); } + // wait until watcher thread finish loading the last modified time, we check this by checking + // whether the watcher thread has been in TIMED_WAITING state, i.e, waiting for the next runLoop + private void awaitWatcherThreadInitialized() throws InterruptedException { + watcher.waitForState(FileChangeWatcher.State.RUNNING); + await().atMost(Duration.ofSeconds(2)).pollInSameThread().pollInterval(Duration.ofMillis(10)) + .until(() -> watcher.getWatcherThread().getState() == Thread.State.TIMED_WAITING); + } + @Test - public void testNoFalseNotifications() throws IOException, InterruptedException { - FileChangeWatcher watcher = null; - try { - final List notifiedPaths = new ArrayList<>(); - watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { - LOG.info("Got an update on path {}", path); - synchronized (notifiedPaths) { - notifiedPaths.add(path); - notifiedPaths.notifyAll(); - } - }); - watcher.start(); - watcher.waitForState(FileChangeWatcher.State.RUNNING); - Thread.sleep(1000L); // TODO hack - assertEquals("Should not have been notified", 0, notifiedPaths.size()); - } finally { - if (watcher != null) { - watcher.stop(); - watcher.waitForState(FileChangeWatcher.State.STOPPED); + public void testNoFalseNotifications() throws Exception { + final List notifiedPaths = new ArrayList<>(); + watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { + LOG.info("Got an update on path {}", path); + synchronized (notifiedPaths) { + notifiedPaths.add(path); + notifiedPaths.notifyAll(); } - } + }); + watcher.start(); + awaitWatcherThreadInitialized(); + await().during(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertEquals("Should not have been notified", 0, notifiedPaths.size())); } @Test public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException { - FileChangeWatcher watcher = null; - try { - final List notifiedPaths = new ArrayList<>(); - watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { - LOG.info("Got an update on path {}", path); - synchronized (notifiedPaths) { - notifiedPaths.add(path); - notifiedPaths.notifyAll(); - } - }); - watcher.start(); - watcher.waitForState(FileChangeWatcher.State.RUNNING); - Thread.sleep(1000L); // TODO hack - for (int i = 0; i < 3; i++) { - LOG.info("Modifying file, attempt {}", (i + 1)); - FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8, - true); - synchronized (notifiedPaths) { - if (notifiedPaths.size() < i + 1) { - notifiedPaths.wait(FS_TIMEOUT); - } - assertEquals("Wrong number of notifications", i + 1, notifiedPaths.size()); - Path path = notifiedPaths.get(i); - assertEquals(tempFile.getPath(), path.toString()); - } - } - } finally { - if (watcher != null) { - watcher.stop(); - watcher.waitForState(FileChangeWatcher.State.STOPPED); - } + final List notifiedPaths = Collections.synchronizedList(new ArrayList<>()); + watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { + LOG.info("Got an update on path {}", path); + notifiedPaths.add(path); + }); + watcher.start(); + awaitWatcherThreadInitialized(); + for (int i = 0; i < 3; i++) { + final int index = i; + LOG.info("Modifying file, attempt {}", (index + 1)); + FileUtils.writeStringToFile(tempFile, "Hello world " + index + "\n", StandardCharsets.UTF_8, + true); + await().atMost(Duration.ofSeconds(2)).untilAsserted( + () -> assertEquals("Wrong number of notifications", index + 1, notifiedPaths.size())); + Path path = notifiedPaths.get(index); + assertEquals(tempFile.getPath(), path.toString()); } } @Test public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException { - FileChangeWatcher watcher = null; - try { - final List notifiedPaths = new ArrayList<>(); - watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { - LOG.info("Got an update on path {}", path); - synchronized (notifiedPaths) { - notifiedPaths.add(path); - notifiedPaths.notifyAll(); - } - }); - watcher.start(); - watcher.waitForState(FileChangeWatcher.State.RUNNING); - Thread.sleep(1000L); // TODO hack - LOG.info("Touching file"); - FileUtils.touch(tempFile); - synchronized (notifiedPaths) { - if (notifiedPaths.isEmpty()) { - notifiedPaths.wait(FS_TIMEOUT); - } - assertFalse(notifiedPaths.isEmpty()); - Path path = notifiedPaths.get(0); - assertEquals(tempFile.getPath(), path.toString()); - } - } finally { - if (watcher != null) { - watcher.stop(); - watcher.waitForState(FileChangeWatcher.State.STOPPED); - } - } + final List notifiedPaths = Collections.synchronizedList(new ArrayList<>()); + watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { + LOG.info("Got an update on path {}", path); + notifiedPaths.add(path); + }); + watcher.start(); + awaitWatcherThreadInitialized(); + LOG.info("Touching file"); + FileUtils.touch(tempFile); + await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertFalse(notifiedPaths.isEmpty())); + Path path = notifiedPaths.get(0); + assertEquals(tempFile.getPath(), path.toString()); } @Test - public void testCallbackErrorDoesNotCrashWatcherThread() - throws IOException, InterruptedException { - FileChangeWatcher watcher = null; - try { - final AtomicInteger callCount = new AtomicInteger(0); - watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { - LOG.info("Got an update for path {}", path); - int oldValue; - synchronized (callCount) { - oldValue = callCount.getAndIncrement(); - callCount.notifyAll(); - } - if (oldValue == 0) { - throw new RuntimeException("This error should not crash the watcher thread"); - } - }); - watcher.start(); - watcher.waitForState(FileChangeWatcher.State.RUNNING); - Thread.sleep(1000L); // TODO hack - LOG.info("Modifying file"); - FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true); - synchronized (callCount) { - while (callCount.get() == 0) { - callCount.wait(FS_TIMEOUT); - } - } - LOG.info("Modifying file again"); - FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true); - synchronized (callCount) { - if (callCount.get() == 1) { - callCount.wait(FS_TIMEOUT); - } - } - // The value of callCount can exceed 1 only if the callback thread - // survives the exception thrown by the first callback. - assertTrue(callCount.get() > 1); - } finally { - if (watcher != null) { - watcher.stop(); - watcher.waitForState(FileChangeWatcher.State.STOPPED); - } - } + public void testCallbackErrorDoesNotCrashWatcherThread() throws Exception { + final AtomicInteger callCount = new AtomicInteger(0); + watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { + LOG.info("Got an update for path {}", path); + callCount.incrementAndGet(); + throw new RuntimeException("This error should not crash the watcher thread"); + }); + watcher.start(); + awaitWatcherThreadInitialized(); + + LOG.info("Modifying file"); + FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true); + await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(1, callCount.get())); + + // make sure we can still receive the update event, which means the watcher thread is still + // alive + LOG.info("Modifying file again"); + FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true); + await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(2, callCount.get())); + + // also make sure that the thread is not terminated + assertNotEquals(Thread.State.TERMINATED, watcher.getWatcherThread()); } }