Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.16] Fix race conditions in file settings service tests (#116309) #116403

Merged
merged 2 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,6 @@ tests:
- class: org.elasticsearch.xpack.inference.rest.ServerSentEventsRestActionListenerTests
method: testNoStream
issue: https://github.com/elastic/elasticsearch/issues/114788
- class: org.elasticsearch.reservedstate.service.FileSettingsServiceTests
method: testProcessFileChanges
issue: https://github.com/elastic/elasticsearch/issues/115280
- class: org.elasticsearch.xpack.ml.integration.DatafeedJobsRestIT
issue: https://github.com/elastic/elasticsearch/issues/111319
- class: org.elasticsearch.upgrades.FullClusterRestartIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

package org.elasticsearch.reservedstate.service;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -23,6 +26,8 @@
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.file.AbstractFileWatchingService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
Expand All @@ -39,9 +44,14 @@
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -50,6 +60,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.hasEntry;
Expand All @@ -62,6 +73,7 @@
import static org.mockito.Mockito.verify;

public class FileSettingsServiceTests extends ESTestCase {
private static final Logger logger = LogManager.getLogger(FileSettingsServiceTests.class);
private Environment env;
private ClusterService clusterService;
private ReservedClusterStateService controller;
Expand All @@ -71,6 +83,8 @@ public class FileSettingsServiceTests extends ESTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
// TODO remove me once https://github.com/elastic/elasticsearch/issues/115280 is closed
Loggers.setLevel(LogManager.getLogger(AbstractFileWatchingService.class), Level.DEBUG);

threadpool = new TestThreadPool("file_settings_service_tests");

Expand Down Expand Up @@ -115,16 +129,23 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
if (fileSettingsService.lifecycleState() == Lifecycle.State.STARTED) {
fileSettingsService.stop();
}
if (fileSettingsService.lifecycleState() == Lifecycle.State.STOPPED) {
fileSettingsService.close();
}
try {
if (fileSettingsService.lifecycleState() == Lifecycle.State.STARTED) {
logger.info("Stopping file settings service");
fileSettingsService.stop();
}
if (fileSettingsService.lifecycleState() == Lifecycle.State.STOPPED) {
logger.info("Closing file settings service");
fileSettingsService.close();
}

super.tearDown();
clusterService.close();
threadpool.shutdownNow();
super.tearDown();
clusterService.close();
threadpool.shutdownNow();
} finally {
// TODO remove me once https://github.com/elastic/elasticsearch/issues/115280 is closed
Loggers.setLevel(LogManager.getLogger(AbstractFileWatchingService.class), Level.INFO);
}
}

public void testStartStop() {
Expand Down Expand Up @@ -190,27 +211,17 @@ public void testInitialFileWorks() throws Exception {
return null;
}).when(controller).process(any(), any(XContentParser.class), any(), any());

CountDownLatch latch = new CountDownLatch(1);

fileSettingsService.addFileChangedListener(latch::countDown);
CountDownLatch processFileLatch = new CountDownLatch(1);
fileSettingsService.addFileChangedListener(processFileLatch::countDown);

Files.createDirectories(fileSettingsService.watchedFileDir());
// contents of the JSON don't matter, we just need a file to exist
writeTestFile(fileSettingsService.watchedFile(), "{}");

doAnswer((Answer<?>) invocation -> {
try {
return invocation.callRealMethod();
} finally {
latch.countDown();
}
}).when(fileSettingsService).processFileOnServiceStart();

fileSettingsService.start();
fileSettingsService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));

// wait for listener to be called
assertTrue(latch.await(20, TimeUnit.SECONDS));
longAwait(processFileLatch);

verify(fileSettingsService, times(1)).processFileOnServiceStart();
verify(controller, times(1)).process(any(), any(XContentParser.class), eq(ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), any());
Expand All @@ -223,40 +234,30 @@ public void testProcessFileChanges() throws Exception {
return null;
}).when(controller).process(any(), any(XContentParser.class), any(), any());

// we get three events: initial clusterChanged event, first write, second write
CountDownLatch latch = new CountDownLatch(3);

fileSettingsService.addFileChangedListener(latch::countDown);
CountDownLatch processFileCreationLatch = new CountDownLatch(1);
fileSettingsService.addFileChangedListener(processFileCreationLatch::countDown);

Files.createDirectories(fileSettingsService.watchedFileDir());
// contents of the JSON don't matter, we just need a file to exist
writeTestFile(fileSettingsService.watchedFile(), "{}");

doAnswer((Answer<?>) invocation -> {
try {
return invocation.callRealMethod();
} finally {
latch.countDown();
}
}).when(fileSettingsService).processFileOnServiceStart();
doAnswer((Answer<?>) invocation -> {
try {
return invocation.callRealMethod();
} finally {
latch.countDown();
}
}).when(fileSettingsService).processFileChanges();

fileSettingsService.start();
fileSettingsService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));
// second file change; contents still don't matter
overwriteTestFile(fileSettingsService.watchedFile(), "{}");

// wait for listener to be called (once for initial processing, once for subsequent update)
assertTrue(latch.await(20, TimeUnit.SECONDS));
longAwait(processFileCreationLatch);

CountDownLatch processFileChangeLatch = new CountDownLatch(1);
fileSettingsService.addFileChangedListener(processFileChangeLatch::countDown);

verify(fileSettingsService, times(1)).processFileOnServiceStart();
verify(controller, times(1)).process(any(), any(XContentParser.class), eq(ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), any());

// Touch the file to get an update
Instant now = LocalDateTime.now(ZoneId.systemDefault()).toInstant(ZoneOffset.ofHours(0));
Files.setLastModifiedTime(fileSettingsService.watchedFile(), FileTime.from(now));

longAwait(processFileChangeLatch);

verify(fileSettingsService, times(1)).processFileChanges();
verify(controller, times(1)).process(any(), any(XContentParser.class), eq(ReservedStateVersionCheck.HIGHER_VERSION_ONLY), any());
}
Expand Down Expand Up @@ -295,9 +296,7 @@ public void testStopWorksInMiddleOfProcessing() throws Exception {
// Make some fake settings file to cause the file settings service to process it
writeTestFile(fileSettingsService.watchedFile(), "{}");

// we need to wait a bit, on MacOS it may take up to 10 seconds for the Java watcher service to notice the file,
// on Linux is instantaneous. Windows is instantaneous too.
assertTrue(processFileLatch.await(30, TimeUnit.SECONDS));
longAwait(processFileLatch);

// Stopping the service should interrupt the watcher thread, we should be able to stop
fileSettingsService.stop();
Expand Down Expand Up @@ -352,15 +351,27 @@ public void testHandleSnapshotRestoreResetsMetadata() throws Exception {
}

// helpers
private void writeTestFile(Path path, String contents) throws IOException {
private static void writeTestFile(Path path, String contents) throws IOException {
logger.info("Writing settings file under [{}]", path.toAbsolutePath());
Path tempFilePath = createTempFile();
Files.writeString(tempFilePath, contents);
Files.move(tempFilePath, path, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
try {
Files.move(tempFilePath, path, ATOMIC_MOVE);
} catch (AtomicMoveNotSupportedException e) {
logger.info("Atomic move not available. Falling back on non-atomic move to write [{}]", path.toAbsolutePath());
Files.move(tempFilePath, path);
}
}

private void overwriteTestFile(Path path, String contents) throws IOException {
Path tempFilePath = createTempFile();
Files.writeString(tempFilePath, contents);
Files.move(tempFilePath, path, StandardCopyOption.REPLACE_EXISTING);
// this waits for up to 20 seconds to account for watcher service differences between OSes
// on MacOS it may take up to 10 seconds for the Java watcher service to notice the file,
// on Linux is instantaneous. Windows is instantaneous too.
private static void longAwait(CountDownLatch latch) {
try {
assertTrue("longAwait: CountDownLatch did not reach zero within the timeout", latch.await(20, TimeUnit.SECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail(e, "longAwait: interrupted waiting for CountDownLatch to reach zero");
}
}
}