Skip to content

Commit

Permalink
Add optional content checking to ResourceWatcher (#79423)
Browse files Browse the repository at this point in the history
In some cloud environments, there may be frequent synchronization of
configuration files from the orchestration layer to the ES container.

This can trigger frequent, unnecessary reloading of files.

Previously, code that used the ResourceWatcherService / FileWatcher
would need to detect "no-op" file changes itself. With the addition of
this content checking support, it can be handled efficiently by the
Resource Watcher Service.
  • Loading branch information
tvernum committed Oct 20, 2021
1 parent ca0d435 commit f7454b8
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Objects;
Expand All @@ -24,6 +25,8 @@
*/
public final class MessageDigests {

static final int STREAM_DIGEST_BLOCK_SIZE = 1024;

private static ThreadLocal<MessageDigest> createThreadLocalMessageDigest(String digest) {
return ThreadLocal.withInitial(() -> {
try {
Expand Down Expand Up @@ -142,4 +145,18 @@ public static byte[] digest(BytesReference bytesReference, MessageDigest digest)
return digest.digest();
}

/**
* Reads bytes from the stream and updates the given digest. Returns the result of the digest.
* @return digest result
*/
public static byte[] digest(InputStream stream, MessageDigest digest) throws IOException {
byte[] block = new byte[STREAM_DIGEST_BLOCK_SIZE];
int len = stream.read(block);
while (len > 0) {
digest.update(block, 0, len);
len = stream.read(block);
}
return digest.digest();
}

}
42 changes: 41 additions & 1 deletion server/src/main/java/org/elasticsearch/watcher/FileWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.util.CollectionUtils;

Expand All @@ -27,6 +28,7 @@ public class FileWatcher extends AbstractResourceWatcher<FileChangesListener> {

private FileObserver rootFileObserver;
private final Path path;
private final boolean checkFileContents;

private static final Logger logger = LogManager.getLogger(FileWatcher.class);

Expand All @@ -35,7 +37,19 @@ public class FileWatcher extends AbstractResourceWatcher<FileChangesListener> {
* @param path the directory to watch
*/
public FileWatcher(Path path) {
this(path, false);
}

/**
* Creates new file watcher on the given directory
* @param path the directory to watch
* @param checkFileContents whether to inspect the content of the file for changes (via a message digest)
* - this is a "best efforts" check and will err on the side of sending extra change notifications if the file
* <em>might</em> have changed.
*/
public FileWatcher(Path path, boolean checkFileContents) {
this.path = path;
this.checkFileContents = checkFileContents;
rootFileObserver = new FileObserver(path);
}

Expand Down Expand Up @@ -65,11 +79,13 @@ protected void doCheckAndNotify() throws IOException {

private class FileObserver {
private final Path path;

private boolean exists;
private long length;
private long lastModified;
private boolean isDirectory;
private FileObserver[] children;
private byte[] digest;

FileObserver(Path path) {
this.path = path;
Expand All @@ -80,6 +96,7 @@ public void checkAndNotify() throws IOException {
boolean prevIsDirectory = isDirectory;
long prevLength = length;
long prevLastModified = lastModified;
byte[] prevDigest = digest;

exists = Files.exists(path);
// TODO we might use the new NIO2 API to get real notification?
Expand Down Expand Up @@ -119,7 +136,14 @@ public void checkAndNotify() throws IOException {
} else {
// Remained file
if (prevLastModified != lastModified || prevLength != length) {
onFileChanged();
if (checkFileContents) {
digest = calculateDigest();
if (digest == null || Arrays.equals(prevDigest, digest) == false) {
onFileChanged();
}
} else {
onFileChanged();
}
}
}
}
Expand All @@ -144,6 +168,19 @@ public void checkAndNotify() throws IOException {

}

private byte[] calculateDigest() {
try (var in = Files.newInputStream(path)) {
return MessageDigests.digest(in, MessageDigests.md5());
} catch (IOException e) {
logger.warn(
"failed to read file [{}] while checking for file changes [{}], will assuming file has been modified",
path,
e.toString()
);
return null;
}
}

private void init(boolean initial) throws IOException {
exists = Files.exists(path);
if (exists) {
Expand All @@ -154,6 +191,9 @@ private void init(boolean initial) throws IOException {
} else {
length = attributes.size();
lastModified = attributes.lastModifiedTime().toMillis();
if (checkFileContents) {
digest = calculateDigest();
}
onFileCreated(initial);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,35 @@

package org.elasticsearch.common.hash;

import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.test.ESTestCase;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasLength;

public class MessageDigestsTests extends ESTestCase {
private void assertHash(String expected, String test, MessageDigest messageDigest) {
String actual = MessageDigests.toHexString(messageDigest.digest(test.getBytes(StandardCharsets.UTF_8)));
assertEquals(expected, actual);

private void assertHexString(String expected, byte[] bytes) {
final String actualDirect = MessageDigests.toHexString(bytes);
assertThat(actualDirect, equalTo(expected));
}

private void assertHash(String expected, String test, MessageDigest messageDigest) throws IOException {
final byte[] testBytes = test.getBytes(StandardCharsets.UTF_8);

assertHexString(expected, messageDigest.digest(testBytes));
assertHexString(expected, MessageDigests.digest(new BytesArray(testBytes), messageDigest));
try (var in = new ByteArrayInputStream(testBytes)) {
assertHexString(expected, MessageDigests.digest(in, messageDigest));
}
}

public void testMd5() throws Exception {
Expand Down Expand Up @@ -66,4 +85,50 @@ public void testToHexString() throws Exception {
BigInteger actual = new BigInteger(hex, 16);
assertEquals(expected, actual);
}

public void testDigestFromStreamWithMultipleBlocks() throws Exception {
final String longString = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".repeat(1000);
assertThat(longString, hasLength(26_000));

try (InputStream in = getInputStream(longString)) {
final byte[] md5 = MessageDigests.digest(in, MessageDigests.md5());
assertThat(MessageDigests.toHexString(md5), equalTo("5c48e92239a655cfe1762851c6708ddb"));
}
try (InputStream in = getInputStream(longString)) {
final byte[] md5 = MessageDigests.digest(in, MessageDigests.sha1());
assertThat(MessageDigests.toHexString(md5), equalTo("e363dfc35f4d170906aafcbb6b1f6fd1ae854808"));
}
try (InputStream in = getInputStream(longString)) {
final byte[] md5 = MessageDigests.digest(in, MessageDigests.sha256());
assertThat(MessageDigests.toHexString(md5), equalTo("e59a4d700410ce60f912bd6e5b24f77230cbc68b27838c5a9c06daef94737a8a"));
}
}

public void testDigestFromStreamWithExactlyOneBlock() throws Exception {
final String blockString = "ABCDEFGHIJKLMNOP".repeat(64);
assertThat(blockString, hasLength(MessageDigests.STREAM_DIGEST_BLOCK_SIZE));

try (InputStream in = getInputStream(blockString)) {
final byte[] md5 = MessageDigests.digest(in, MessageDigests.md5());
assertThat(MessageDigests.toHexString(md5), equalTo("2eda00073add15c6ee5c848797f8c0f4"));
}
try (InputStream in = getInputStream(blockString)) {
final byte[] md5 = MessageDigests.digest(in, MessageDigests.sha1());
assertThat(MessageDigests.toHexString(md5), equalTo("bb8275d97cb190cb02fd2c03e9bba2279955ace3"));
}
try (InputStream in = getInputStream(blockString)) {
final byte[] md5 = MessageDigests.digest(in, MessageDigests.sha256());
assertThat(MessageDigests.toHexString(md5), equalTo("36350546f9cc3cbd56d3b655ecae0e4281909d510687635b900ea7650976eb3b"));
}
}

private InputStream getInputStream(String str) {
InputStream in = randomBoolean()
? new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8))
: new BytesArray(str).streamInput();
if (randomBoolean()) {
in = new BufferedInputStream(in);
}
return in;
}
}
Loading

0 comments on commit f7454b8

Please sign in to comment.