Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hbase-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -173,8 +174,10 @@ public void stop() {
}
}

String getWatcherThreadName() {
return watcherThread.getName();
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
Thread getWatcherThread() {
return watcherThread;
}

private static void handleException(Thread thread, Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,38 @@
*/
package org.apache.hadoop.hbase.io;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.endsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,31 +58,51 @@
* "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base
* revision</a>
*/
@Category({ IOTests.class, SmallTests.class })
@Tag(IOTests.TAG)
@Tag(SmallTests.TAG)
public class TestFileChangeWatcher {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFileChangeWatcher.class);

private static File tempFile;

private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class);
private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();

private static final long FS_TIMEOUT = 30000L;
private static File dir;

private static final Duration POLL_INTERVAL = Duration.ofMillis(100);

@BeforeClass
public static void createTempFile() throws IOException {
tempFile = File.createTempFile("zk_test_", "");
private File tempFile;

private FileChangeWatcher watcher;

@BeforeAll
public static void setUpBeforeAll() throws IOException {
dir = new File(UTIL.getDataTestDir().toString()).getAbsoluteFile();
if (!dir.mkdirs()) {
throw new IOException("can not mkdir " + dir);
}
}

@AfterClass
@AfterAll
public static void cleanupTempDir() {
UTIL.cleanupTestDir();
}

@BeforeEach
public void setUp(TestInfo testInfo) throws IOException {
tempFile = new File(dir, "file_change_test_" + testInfo.getDisplayName());
if (!tempFile.createNewFile()) {
throw new IOException("failed to create new empty file " + tempFile);
}
}

@AfterEach
public void tearDown() throws InterruptedException {
if (watcher != null) {
watcher.stop();
watcher.waitForState(FileChangeWatcher.State.STOPPED);
watcher = null;
}
}

@Test
public void testEnableCertFileReloading() throws IOException {
Configuration myConf = new Configuration();
Expand All @@ -91,7 +114,7 @@ public void testEnableCertFileReloading() throws IOException {
X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> {
});
assertNotNull(keystoreWatcher.get());
assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("foo.jks"));
assertThat(keystoreWatcher.get().getWatcherThread().getName(), endsWith("foo.jks"));
assertNull(truststoreWatcher.get());

keystoreWatcher.getAndSet(null).stop();
Expand All @@ -103,149 +126,97 @@ public void testEnableCertFileReloading() throws IOException {
});

assertNotNull(keystoreWatcher.get());
assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("foo.jks"));
assertThat(keystoreWatcher.get().getWatcherThread().getName(), endsWith("foo.jks"));
assertNotNull(truststoreWatcher.get());
assertThat(truststoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("bar.jks"));
assertThat(truststoreWatcher.get().getWatcherThread().getName(), endsWith("bar.jks"));

keystoreWatcher.getAndSet(null).stop();
truststoreWatcher.getAndSet(null).stop();
}

// wait until watcher thread finish loading the last modified time, we check this by checking
// whether the watcher thread has been in TIMED_WAITING state, i.e, waiting for the next runLoop
private void awaitWatcherThreadInitialized() throws InterruptedException {
watcher.waitForState(FileChangeWatcher.State.RUNNING);
await().atMost(Duration.ofSeconds(2)).pollInSameThread().pollInterval(Duration.ofMillis(10))
.until(() -> watcher.getWatcherThread().getState() == Thread.State.TIMED_WAITING);
}

@Test
public void testNoFalseNotifications() throws IOException, InterruptedException {
FileChangeWatcher watcher = null;
try {
final List<Path> notifiedPaths = new ArrayList<>();
watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
LOG.info("Got an update on path {}", path);
synchronized (notifiedPaths) {
notifiedPaths.add(path);
notifiedPaths.notifyAll();
}
});
watcher.start();
watcher.waitForState(FileChangeWatcher.State.RUNNING);
Thread.sleep(1000L); // TODO hack
assertEquals("Should not have been notified", 0, notifiedPaths.size());
} finally {
if (watcher != null) {
watcher.stop();
watcher.waitForState(FileChangeWatcher.State.STOPPED);
public void testNoFalseNotifications() throws Exception {
final List<Path> notifiedPaths = new ArrayList<>();
watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
LOG.info("Got an update on path {}", path);
synchronized (notifiedPaths) {
notifiedPaths.add(path);
notifiedPaths.notifyAll();
}
}
});
watcher.start();
awaitWatcherThreadInitialized();
await().during(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(3))
.untilAsserted(() -> assertEquals("Should not have been notified", 0, notifiedPaths.size()));
}

@Test
public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException {
FileChangeWatcher watcher = null;
try {
final List<Path> notifiedPaths = new ArrayList<>();
watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
LOG.info("Got an update on path {}", path);
synchronized (notifiedPaths) {
notifiedPaths.add(path);
notifiedPaths.notifyAll();
}
});
watcher.start();
watcher.waitForState(FileChangeWatcher.State.RUNNING);
Thread.sleep(1000L); // TODO hack
for (int i = 0; i < 3; i++) {
LOG.info("Modifying file, attempt {}", (i + 1));
FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8,
true);
synchronized (notifiedPaths) {
if (notifiedPaths.size() < i + 1) {
notifiedPaths.wait(FS_TIMEOUT);
}
assertEquals("Wrong number of notifications", i + 1, notifiedPaths.size());
Path path = notifiedPaths.get(i);
assertEquals(tempFile.getPath(), path.toString());
}
}
} finally {
if (watcher != null) {
watcher.stop();
watcher.waitForState(FileChangeWatcher.State.STOPPED);
}
final List<Path> notifiedPaths = Collections.synchronizedList(new ArrayList<>());
watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
LOG.info("Got an update on path {}", path);
notifiedPaths.add(path);
});
watcher.start();
awaitWatcherThreadInitialized();
for (int i = 0; i < 3; i++) {
final int index = i;
LOG.info("Modifying file, attempt {}", (index + 1));
FileUtils.writeStringToFile(tempFile, "Hello world " + index + "\n", StandardCharsets.UTF_8,
true);
await().atMost(Duration.ofSeconds(2)).untilAsserted(
() -> assertEquals("Wrong number of notifications", index + 1, notifiedPaths.size()));
Path path = notifiedPaths.get(index);
assertEquals(tempFile.getPath(), path.toString());
}
}

@Test
public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException {
FileChangeWatcher watcher = null;
try {
final List<Path> notifiedPaths = new ArrayList<>();
watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
LOG.info("Got an update on path {}", path);
synchronized (notifiedPaths) {
notifiedPaths.add(path);
notifiedPaths.notifyAll();
}
});
watcher.start();
watcher.waitForState(FileChangeWatcher.State.RUNNING);
Thread.sleep(1000L); // TODO hack
LOG.info("Touching file");
FileUtils.touch(tempFile);
synchronized (notifiedPaths) {
if (notifiedPaths.isEmpty()) {
notifiedPaths.wait(FS_TIMEOUT);
}
assertFalse(notifiedPaths.isEmpty());
Path path = notifiedPaths.get(0);
assertEquals(tempFile.getPath(), path.toString());
}
} finally {
if (watcher != null) {
watcher.stop();
watcher.waitForState(FileChangeWatcher.State.STOPPED);
}
}
final List<Path> notifiedPaths = Collections.synchronizedList(new ArrayList<>());
watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
LOG.info("Got an update on path {}", path);
notifiedPaths.add(path);
});
watcher.start();
awaitWatcherThreadInitialized();
LOG.info("Touching file");
FileUtils.touch(tempFile);
await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertFalse(notifiedPaths.isEmpty()));
Path path = notifiedPaths.get(0);
assertEquals(tempFile.getPath(), path.toString());
}

@Test
public void testCallbackErrorDoesNotCrashWatcherThread()
throws IOException, InterruptedException {
FileChangeWatcher watcher = null;
try {
final AtomicInteger callCount = new AtomicInteger(0);
watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
LOG.info("Got an update for path {}", path);
int oldValue;
synchronized (callCount) {
oldValue = callCount.getAndIncrement();
callCount.notifyAll();
}
if (oldValue == 0) {
throw new RuntimeException("This error should not crash the watcher thread");
}
});
watcher.start();
watcher.waitForState(FileChangeWatcher.State.RUNNING);
Thread.sleep(1000L); // TODO hack
LOG.info("Modifying file");
FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true);
synchronized (callCount) {
while (callCount.get() == 0) {
callCount.wait(FS_TIMEOUT);
}
}
LOG.info("Modifying file again");
FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true);
synchronized (callCount) {
if (callCount.get() == 1) {
callCount.wait(FS_TIMEOUT);
}
}
// The value of callCount can exceed 1 only if the callback thread
// survives the exception thrown by the first callback.
assertTrue(callCount.get() > 1);
} finally {
if (watcher != null) {
watcher.stop();
watcher.waitForState(FileChangeWatcher.State.STOPPED);
}
}
public void testCallbackErrorDoesNotCrashWatcherThread() throws Exception {
final AtomicInteger callCount = new AtomicInteger(0);
watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
LOG.info("Got an update for path {}", path);
callCount.incrementAndGet();
throw new RuntimeException("This error should not crash the watcher thread");
});
watcher.start();
awaitWatcherThreadInitialized();

LOG.info("Modifying file");
FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true);
await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(1, callCount.get()));

// make sure we can still receive the update event, which means the watcher thread is still
// alive
LOG.info("Modifying file again");
FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true);
await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(2, callCount.get()));

// also make sure that the thread is not terminated
assertNotEquals(Thread.State.TERMINATED, watcher.getWatcherThread());
}
}