Skip to content

Commit

Permalink
[8.16] Fix race conditions in file settings service tests (elastic#11…
Browse files Browse the repository at this point in the history
…6309) (elastic#116403)

# Backport

This will backport the following commits from `main` to `8.16`:
 - [Fix race conditions in file settings service tests (elastic#116309)](elastic#116309)
  • Loading branch information
n1v0lg authored Nov 7, 2024
1 parent 544f8f9 commit 022be89
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 58 deletions.
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,6 @@ tests:
- class: org.elasticsearch.xpack.inference.rest.ServerSentEventsRestActionListenerTests
method: testNoStream
issue: https://github.com/elastic/elasticsearch/issues/114788
- class: org.elasticsearch.reservedstate.service.FileSettingsServiceTests
method: testProcessFileChanges
issue: https://github.com/elastic/elasticsearch/issues/115280
- class: org.elasticsearch.xpack.ml.integration.DatafeedJobsRestIT
issue: https://github.com/elastic/elasticsearch/issues/111319
- class: org.elasticsearch.upgrades.FullClusterRestartIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

package org.elasticsearch.reservedstate.service;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -23,6 +26,8 @@
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.file.AbstractFileWatchingService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
Expand All @@ -39,9 +44,14 @@
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -50,6 +60,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.hasEntry;
Expand All @@ -62,6 +73,7 @@
import static org.mockito.Mockito.verify;

public class FileSettingsServiceTests extends ESTestCase {
private static final Logger logger = LogManager.getLogger(FileSettingsServiceTests.class);
private Environment env;
private ClusterService clusterService;
private ReservedClusterStateService controller;
Expand All @@ -71,6 +83,8 @@ public class FileSettingsServiceTests extends ESTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
// TODO remove me once https://github.com/elastic/elasticsearch/issues/115280 is closed
Loggers.setLevel(LogManager.getLogger(AbstractFileWatchingService.class), Level.DEBUG);

threadpool = new TestThreadPool("file_settings_service_tests");

Expand Down Expand Up @@ -115,16 +129,23 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
if (fileSettingsService.lifecycleState() == Lifecycle.State.STARTED) {
fileSettingsService.stop();
}
if (fileSettingsService.lifecycleState() == Lifecycle.State.STOPPED) {
fileSettingsService.close();
}
try {
if (fileSettingsService.lifecycleState() == Lifecycle.State.STARTED) {
logger.info("Stopping file settings service");
fileSettingsService.stop();
}
if (fileSettingsService.lifecycleState() == Lifecycle.State.STOPPED) {
logger.info("Closing file settings service");
fileSettingsService.close();
}

super.tearDown();
clusterService.close();
threadpool.shutdownNow();
super.tearDown();
clusterService.close();
threadpool.shutdownNow();
} finally {
// TODO remove me once https://github.com/elastic/elasticsearch/issues/115280 is closed
Loggers.setLevel(LogManager.getLogger(AbstractFileWatchingService.class), Level.INFO);
}
}

public void testStartStop() {
Expand Down Expand Up @@ -190,27 +211,17 @@ public void testInitialFileWorks() throws Exception {
return null;
}).when(controller).process(any(), any(XContentParser.class), any(), any());

CountDownLatch latch = new CountDownLatch(1);

fileSettingsService.addFileChangedListener(latch::countDown);
CountDownLatch processFileLatch = new CountDownLatch(1);
fileSettingsService.addFileChangedListener(processFileLatch::countDown);

Files.createDirectories(fileSettingsService.watchedFileDir());
// contents of the JSON don't matter, we just need a file to exist
writeTestFile(fileSettingsService.watchedFile(), "{}");

doAnswer((Answer<?>) invocation -> {
try {
return invocation.callRealMethod();
} finally {
latch.countDown();
}
}).when(fileSettingsService).processFileOnServiceStart();

fileSettingsService.start();
fileSettingsService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));

// wait for listener to be called
assertTrue(latch.await(20, TimeUnit.SECONDS));
longAwait(processFileLatch);

verify(fileSettingsService, times(1)).processFileOnServiceStart();
verify(controller, times(1)).process(any(), any(XContentParser.class), eq(ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), any());
Expand All @@ -223,40 +234,30 @@ public void testProcessFileChanges() throws Exception {
return null;
}).when(controller).process(any(), any(XContentParser.class), any(), any());

// we get three events: initial clusterChanged event, first write, second write
CountDownLatch latch = new CountDownLatch(3);

fileSettingsService.addFileChangedListener(latch::countDown);
CountDownLatch processFileCreationLatch = new CountDownLatch(1);
fileSettingsService.addFileChangedListener(processFileCreationLatch::countDown);

Files.createDirectories(fileSettingsService.watchedFileDir());
// contents of the JSON don't matter, we just need a file to exist
writeTestFile(fileSettingsService.watchedFile(), "{}");

doAnswer((Answer<?>) invocation -> {
try {
return invocation.callRealMethod();
} finally {
latch.countDown();
}
}).when(fileSettingsService).processFileOnServiceStart();
doAnswer((Answer<?>) invocation -> {
try {
return invocation.callRealMethod();
} finally {
latch.countDown();
}
}).when(fileSettingsService).processFileChanges();

fileSettingsService.start();
fileSettingsService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));
// second file change; contents still don't matter
overwriteTestFile(fileSettingsService.watchedFile(), "{}");

// wait for listener to be called (once for initial processing, once for subsequent update)
assertTrue(latch.await(20, TimeUnit.SECONDS));
longAwait(processFileCreationLatch);

CountDownLatch processFileChangeLatch = new CountDownLatch(1);
fileSettingsService.addFileChangedListener(processFileChangeLatch::countDown);

verify(fileSettingsService, times(1)).processFileOnServiceStart();
verify(controller, times(1)).process(any(), any(XContentParser.class), eq(ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), any());

// Touch the file to get an update
Instant now = LocalDateTime.now(ZoneId.systemDefault()).toInstant(ZoneOffset.ofHours(0));
Files.setLastModifiedTime(fileSettingsService.watchedFile(), FileTime.from(now));

longAwait(processFileChangeLatch);

verify(fileSettingsService, times(1)).processFileChanges();
verify(controller, times(1)).process(any(), any(XContentParser.class), eq(ReservedStateVersionCheck.HIGHER_VERSION_ONLY), any());
}
Expand Down Expand Up @@ -295,9 +296,7 @@ public void testStopWorksInMiddleOfProcessing() throws Exception {
// Make some fake settings file to cause the file settings service to process it
writeTestFile(fileSettingsService.watchedFile(), "{}");

// we need to wait a bit, on MacOS it may take up to 10 seconds for the Java watcher service to notice the file,
// on Linux is instantaneous. Windows is instantaneous too.
assertTrue(processFileLatch.await(30, TimeUnit.SECONDS));
longAwait(processFileLatch);

// Stopping the service should interrupt the watcher thread, we should be able to stop
fileSettingsService.stop();
Expand Down Expand Up @@ -352,15 +351,27 @@ public void testHandleSnapshotRestoreResetsMetadata() throws Exception {
}

// helpers
private void writeTestFile(Path path, String contents) throws IOException {
private static void writeTestFile(Path path, String contents) throws IOException {
logger.info("Writing settings file under [{}]", path.toAbsolutePath());
Path tempFilePath = createTempFile();
Files.writeString(tempFilePath, contents);
Files.move(tempFilePath, path, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
try {
Files.move(tempFilePath, path, ATOMIC_MOVE);
} catch (AtomicMoveNotSupportedException e) {
logger.info("Atomic move not available. Falling back on non-atomic move to write [{}]", path.toAbsolutePath());
Files.move(tempFilePath, path);
}
}

private void overwriteTestFile(Path path, String contents) throws IOException {
Path tempFilePath = createTempFile();
Files.writeString(tempFilePath, contents);
Files.move(tempFilePath, path, StandardCopyOption.REPLACE_EXISTING);
// this waits for up to 20 seconds to account for watcher service differences between OSes
// on MacOS it may take up to 10 seconds for the Java watcher service to notice the file,
// on Linux is instantaneous. Windows is instantaneous too.
private static void longAwait(CountDownLatch latch) {
try {
assertTrue("longAwait: CountDownLatch did not reach zero within the timeout", latch.await(20, TimeUnit.SECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail(e, "longAwait: interrupted waiting for CountDownLatch to reach zero");
}
}
}

0 comments on commit 022be89

Please sign in to comment.