From 2fb244216d60d880699113f4e80518c5d5644943 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine <58790826+elasticsearchmachine@users.noreply.github.com> Date: Thu, 7 Nov 2024 16:49:36 +1100 Subject: [PATCH 1/7] Mute org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT test {categorize.Categorize SYNC} #113054 --- muted-tests.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/muted-tests.yml b/muted-tests.yml index 70f8df86f3cb6..e12c6128df4b7 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -286,6 +286,9 @@ tests: - class: org.elasticsearch.xpack.deprecation.DeprecationHttpIT method: testDeprecatedSettingsReturnWarnings issue: https://github.com/elastic/elasticsearch/issues/108628 +- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT + method: test {categorize.Categorize SYNC} + issue: https://github.com/elastic/elasticsearch/issues/113054 # Examples: # From 567429ea0d7659c7725b35d3616959e37a99ec81 Mon Sep 17 00:00:00 2001 From: Liam Thompson <32779855+leemthompo@users.noreply.github.com> Date: Thu, 7 Nov 2024 12:45:31 +0100 Subject: [PATCH 2/7] [DOCS] Fix boolean for native connectors (#116394) (#116396) (cherry picked from commit 22c55fa1ca9f58eb98532e754ea62c646e03885e) --- .../reference/connector/docs/_connectors-create-native.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/connector/docs/_connectors-create-native.asciidoc b/docs/reference/connector/docs/_connectors-create-native.asciidoc index b247047584690..8023fbbe61136 100644 --- a/docs/reference/connector/docs/_connectors-create-native.asciidoc +++ b/docs/reference/connector/docs/_connectors-create-native.asciidoc @@ -22,7 +22,7 @@ PUT _connector/my-{service-name-stub}-connector "index_name": "my-elasticsearch-index", "name": "Content synced from {service-name}", "service_type": "{service-name-stub}", - "is_native": "true" + "is_native": true } ---- // TEST[skip:can't test in isolation] From 544f8f9f4c900458dbe4b2ecd814b573e220290a Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Thu, 7 Nov 2024 15:48:34 +0100 Subject: [PATCH 3/7] Merge (#116407) --- .../SecurityIndexManagerIntegTests.java | 110 +++++++++++++++++- .../support/SecurityIndexManager.java | 86 ++++++++++---- 2 files changed, 168 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerIntegTests.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerIntegTests.java index 32337f0d66896..44cbf03f220a1 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerIntegTests.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerIntegTests.java @@ -26,18 +26,24 @@ import org.elasticsearch.xpack.core.security.action.user.PutUserResponse; import org.elasticsearch.xpack.security.authz.store.NativePrivilegeStore; import org.hamcrest.Matchers; +import org.junit.After; import org.junit.Before; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_MAIN_ALIAS; import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -45,6 +51,14 @@ public class SecurityIndexManagerIntegTests extends SecurityIntegTestCase { + private final int concurrentCallsToOnAvailable = 6; + private final ExecutorService executor = Executors.newFixedThreadPool(concurrentCallsToOnAvailable); + + @After + public void shutdownExecutor() { + executor.shutdown(); + } + public void testConcurrentOperationsTryingToCreateSecurityIndexAndAlias() throws Exception { final int processors = Runtime.getRuntime().availableProcessors(); final int numThreads = Math.min(50, scaledRandomIntBetween((processors + 1) / 2, 4 * processors)); // up to 50 threads @@ -110,6 +124,12 @@ public void testOnIndexAvailableForSearchIndexCompletesWithinTimeout() throws Ex // pick longer wait than in the assertBusy that waits for below to ensure index has had enough time to initialize securityIndexManager.onIndexAvailableForSearch((ActionListener) future, TimeValue.timeValueSeconds(40)); + // check listener added + assertThat( + securityIndexManager.getStateChangeListeners(), + hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class)) + ); + createSecurityIndexWithWaitForActiveShards(); assertBusy( @@ -121,6 +141,12 @@ public void testOnIndexAvailableForSearchIndexCompletesWithinTimeout() throws Ex // security index creation is complete and index is available for search; therefore whenIndexAvailableForSearch should report // success in time future.actionGet(); + + // check no remaining listeners + assertThat( + securityIndexManager.getStateChangeListeners(), + not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class))) + ); } @SuppressWarnings("unchecked") @@ -152,6 +178,69 @@ public void testOnIndexAvailableForSearchIndexAlreadyAvailable() throws Exceptio securityIndexManager.onIndexAvailableForSearch((ActionListener) future, TimeValue.timeValueSeconds(10)); future.actionGet(); } + + // check no remaining listeners + assertThat( + securityIndexManager.getStateChangeListeners(), + not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class))) + ); + } + + @SuppressWarnings("unchecked") + public void testOnIndexAvailableForSearchIndexUnderConcurrentLoad() throws Exception { + final SecurityIndexManager securityIndexManager = internalCluster().getInstances(NativePrivilegeStore.class) + .iterator() + .next() + .getSecurityIndexManager(); + // Long time out calls should all succeed + final List> futures = new ArrayList<>(); + for (int i = 0; i < concurrentCallsToOnAvailable / 2; i++) { + final Future future = executor.submit(() -> { + try { + final ActionFuture f = new PlainActionFuture<>(); + securityIndexManager.onIndexAvailableForSearch((ActionListener) f, TimeValue.timeValueSeconds(40)); + f.actionGet(); + } catch (Exception ex) { + fail(ex, "should not have encountered exception"); + } + return null; + }); + futures.add(future); + } + + // short time-out tasks should all time out + for (int i = 0; i < concurrentCallsToOnAvailable / 2; i++) { + final Future future = executor.submit(() -> { + expectThrows(ElasticsearchTimeoutException.class, () -> { + final ActionFuture f = new PlainActionFuture<>(); + securityIndexManager.onIndexAvailableForSearch((ActionListener) f, TimeValue.timeValueMillis(10)); + f.actionGet(); + }); + return null; + }); + futures.add(future); + } + + // Sleep a second for short-running calls to timeout + Thread.sleep(1000); + + createSecurityIndexWithWaitForActiveShards(); + // ensure security index manager state is fully in the expected precondition state for this test (ready for search) + assertBusy( + () -> assertThat(securityIndexManager.isAvailable(SecurityIndexManager.Availability.SEARCH_SHARDS), is(true)), + 30, + TimeUnit.SECONDS + ); + + for (var future : futures) { + future.get(10, TimeUnit.SECONDS); + } + + // check no remaining listeners + assertThat( + securityIndexManager.getStateChangeListeners(), + not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class))) + ); } @SuppressWarnings("unchecked") @@ -163,9 +252,24 @@ public void testOnIndexAvailableForSearchIndexWaitTimeOut() { .next() .getSecurityIndexManager(); - final ActionFuture future = new PlainActionFuture<>(); - securityIndexManager.onIndexAvailableForSearch((ActionListener) future, TimeValue.timeValueMillis(100)); - expectThrows(ElasticsearchTimeoutException.class, future::actionGet); + { + final ActionFuture future = new PlainActionFuture<>(); + securityIndexManager.onIndexAvailableForSearch((ActionListener) future, TimeValue.timeValueMillis(100)); + expectThrows(ElasticsearchTimeoutException.class, future::actionGet); + } + + // Also works with 0 timeout + { + final ActionFuture future = new PlainActionFuture<>(); + securityIndexManager.onIndexAvailableForSearch((ActionListener) future, TimeValue.timeValueMillis(0)); + expectThrows(ElasticsearchTimeoutException.class, future::actionGet); + } + + // check no remaining listeners + assertThat( + securityIndexManager.getStateChangeListeners(), + not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class))) + ); } public void testSecurityIndexSettingsCannotBeChanged() throws Exception { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java index 3520973f8feff..12ef800a7aae7 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java @@ -57,6 +57,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -420,45 +421,80 @@ public void accept(State previousState, State nextState) { * Notifies {@code listener} once the security index is available, or calls {@code onFailure} on {@code timeout}. */ public void onIndexAvailableForSearch(ActionListener listener, TimeValue timeout) { - logger.info("Will wait for security index [{}] to become available for search", getConcreteIndexName()); + logger.info("Will wait for security index [{}] for [{}] to become available for search", getConcreteIndexName(), timeout); - final ActionListener notifyOnceListener = ActionListener.notifyOnce(listener); + if (state.indexAvailableForSearch) { + logger.debug("Security index [{}] is already available", getConcreteIndexName()); + listener.onResponse(null); + return; + } + final AtomicBoolean isDone = new AtomicBoolean(false); final var indexAvailableForSearchListener = new StateConsumerWithCancellable() { @Override public void accept(SecurityIndexManager.State previousState, SecurityIndexManager.State nextState) { if (nextState.indexAvailableForSearch) { - assert cancellable != null; - // cancel and removeStateListener are idempotent - cancellable.cancel(); - removeStateListener(this); - notifyOnceListener.onResponse(null); + if (isDone.compareAndSet(false, true)) { + cancel(); + removeStateListener(this); + listener.onResponse(null); + } } } }; + // add listener _before_ registering timeout -- this way we are guaranteed it gets removed (either by timeout below, or successful + // completion above) + addStateListener(indexAvailableForSearchListener); + // schedule failure handling on timeout -- keep reference to cancellable so a successful completion can cancel the timeout - indexAvailableForSearchListener.cancellable = client.threadPool().schedule(() -> { - removeStateListener(indexAvailableForSearchListener); - notifyOnceListener.onFailure( - new ElasticsearchTimeoutException( - "timed out waiting for security index [" + getConcreteIndexName() + "] to become available for search" - ) - ); - }, timeout, client.threadPool().generic()); + indexAvailableForSearchListener.setCancellable(client.threadPool().schedule(() -> { + if (isDone.compareAndSet(false, true)) { + removeStateListener(indexAvailableForSearchListener); + listener.onFailure( + new ElasticsearchTimeoutException( + "timed out waiting for security index [" + getConcreteIndexName() + "] to become available for search" + ) + ); + } + }, timeout, client.threadPool().generic())); + } - // in case the state has meanwhile changed to available, return immediately - if (state.indexAvailableForSearch) { - indexAvailableForSearchListener.cancellable.cancel(); - notifyOnceListener.onResponse(null); - } else { - addStateListener(indexAvailableForSearchListener); - } + // pkg-private for testing + List> getStateChangeListeners() { + return stateChangeListeners; } - private abstract static class StateConsumerWithCancellable + /** + * This class ensures that if cancel() is called _before_ setCancellable(), the passed-in cancellable is still correctly cancelled on + * a subsequent setCancellable() call. + */ + // pkg-private for testing + abstract static class StateConsumerWithCancellable implements - BiConsumer { - volatile Scheduler.ScheduledCancellable cancellable; + BiConsumer, + Scheduler.Cancellable { + private volatile Scheduler.ScheduledCancellable cancellable; + private volatile boolean cancelled = false; + + void setCancellable(Scheduler.ScheduledCancellable cancellable) { + this.cancellable = cancellable; + if (cancelled) { + cancel(); + } + } + + public boolean cancel() { + cancelled = true; + if (cancellable != null) { + // cancellable is idempotent, so it's fine to potentially call it multiple times + return cancellable.cancel(); + } + return isCancelled(); + } + + public boolean isCancelled() { + return cancelled; + } } private Tuple checkIndexAvailable(ClusterState state) { From 022be89607147cb201f86c0ae3ee65d4ec1b8736 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Thu, 7 Nov 2024 15:52:58 +0100 Subject: [PATCH 4/7] [8.16] Fix race conditions in file settings service tests (#116309) (#116403) # Backport This will backport the following commits from `main` to `8.16`: - [Fix race conditions in file settings service tests (#116309)](https://github.com/elastic/elasticsearch/pull/116309) --- muted-tests.yml | 3 - .../service/FileSettingsServiceTests.java | 121 ++++++++++-------- 2 files changed, 66 insertions(+), 58 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index e12c6128df4b7..aaee081a1567a 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -263,9 +263,6 @@ tests: - class: org.elasticsearch.xpack.inference.rest.ServerSentEventsRestActionListenerTests method: testNoStream issue: https://github.com/elastic/elasticsearch/issues/114788 -- class: org.elasticsearch.reservedstate.service.FileSettingsServiceTests - method: testProcessFileChanges - issue: https://github.com/elastic/elasticsearch/issues/115280 - class: org.elasticsearch.xpack.ml.integration.DatafeedJobsRestIT issue: https://github.com/elastic/elasticsearch/issues/111319 - class: org.elasticsearch.upgrades.FullClusterRestartIT diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java index c0657b5888ad2..1c8568a9d1a92 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java @@ -9,6 +9,9 @@ package org.elasticsearch.reservedstate.service; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -23,6 +26,8 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.file.AbstractFileWatchingService; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; @@ -39,9 +44,14 @@ import org.mockito.stubbing.Answer; import java.io.IOException; +import java.nio.file.AtomicMoveNotSupportedException; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.FileTime; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.List; import java.util.Map; import java.util.Set; @@ -50,6 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.hasEntry; @@ -62,6 +73,7 @@ import static org.mockito.Mockito.verify; public class FileSettingsServiceTests extends ESTestCase { + private static final Logger logger = LogManager.getLogger(FileSettingsServiceTests.class); private Environment env; private ClusterService clusterService; private ReservedClusterStateService controller; @@ -71,6 +83,8 @@ public class FileSettingsServiceTests extends ESTestCase { @Before public void setUp() throws Exception { super.setUp(); + // TODO remove me once https://github.com/elastic/elasticsearch/issues/115280 is closed + Loggers.setLevel(LogManager.getLogger(AbstractFileWatchingService.class), Level.DEBUG); threadpool = new TestThreadPool("file_settings_service_tests"); @@ -115,16 +129,23 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { - if (fileSettingsService.lifecycleState() == Lifecycle.State.STARTED) { - fileSettingsService.stop(); - } - if (fileSettingsService.lifecycleState() == Lifecycle.State.STOPPED) { - fileSettingsService.close(); - } + try { + if (fileSettingsService.lifecycleState() == Lifecycle.State.STARTED) { + logger.info("Stopping file settings service"); + fileSettingsService.stop(); + } + if (fileSettingsService.lifecycleState() == Lifecycle.State.STOPPED) { + logger.info("Closing file settings service"); + fileSettingsService.close(); + } - super.tearDown(); - clusterService.close(); - threadpool.shutdownNow(); + super.tearDown(); + clusterService.close(); + threadpool.shutdownNow(); + } finally { + // TODO remove me once https://github.com/elastic/elasticsearch/issues/115280 is closed + Loggers.setLevel(LogManager.getLogger(AbstractFileWatchingService.class), Level.INFO); + } } public void testStartStop() { @@ -190,27 +211,17 @@ public void testInitialFileWorks() throws Exception { return null; }).when(controller).process(any(), any(XContentParser.class), any(), any()); - CountDownLatch latch = new CountDownLatch(1); - - fileSettingsService.addFileChangedListener(latch::countDown); + CountDownLatch processFileLatch = new CountDownLatch(1); + fileSettingsService.addFileChangedListener(processFileLatch::countDown); Files.createDirectories(fileSettingsService.watchedFileDir()); // contents of the JSON don't matter, we just need a file to exist writeTestFile(fileSettingsService.watchedFile(), "{}"); - doAnswer((Answer) invocation -> { - try { - return invocation.callRealMethod(); - } finally { - latch.countDown(); - } - }).when(fileSettingsService).processFileOnServiceStart(); - fileSettingsService.start(); fileSettingsService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE)); - // wait for listener to be called - assertTrue(latch.await(20, TimeUnit.SECONDS)); + longAwait(processFileLatch); verify(fileSettingsService, times(1)).processFileOnServiceStart(); verify(controller, times(1)).process(any(), any(XContentParser.class), eq(ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), any()); @@ -223,40 +234,30 @@ public void testProcessFileChanges() throws Exception { return null; }).when(controller).process(any(), any(XContentParser.class), any(), any()); - // we get three events: initial clusterChanged event, first write, second write - CountDownLatch latch = new CountDownLatch(3); - - fileSettingsService.addFileChangedListener(latch::countDown); + CountDownLatch processFileCreationLatch = new CountDownLatch(1); + fileSettingsService.addFileChangedListener(processFileCreationLatch::countDown); Files.createDirectories(fileSettingsService.watchedFileDir()); // contents of the JSON don't matter, we just need a file to exist writeTestFile(fileSettingsService.watchedFile(), "{}"); - doAnswer((Answer) invocation -> { - try { - return invocation.callRealMethod(); - } finally { - latch.countDown(); - } - }).when(fileSettingsService).processFileOnServiceStart(); - doAnswer((Answer) invocation -> { - try { - return invocation.callRealMethod(); - } finally { - latch.countDown(); - } - }).when(fileSettingsService).processFileChanges(); - fileSettingsService.start(); fileSettingsService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE)); - // second file change; contents still don't matter - overwriteTestFile(fileSettingsService.watchedFile(), "{}"); - // wait for listener to be called (once for initial processing, once for subsequent update) - assertTrue(latch.await(20, TimeUnit.SECONDS)); + longAwait(processFileCreationLatch); + + CountDownLatch processFileChangeLatch = new CountDownLatch(1); + fileSettingsService.addFileChangedListener(processFileChangeLatch::countDown); verify(fileSettingsService, times(1)).processFileOnServiceStart(); verify(controller, times(1)).process(any(), any(XContentParser.class), eq(ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), any()); + + // Touch the file to get an update + Instant now = LocalDateTime.now(ZoneId.systemDefault()).toInstant(ZoneOffset.ofHours(0)); + Files.setLastModifiedTime(fileSettingsService.watchedFile(), FileTime.from(now)); + + longAwait(processFileChangeLatch); + verify(fileSettingsService, times(1)).processFileChanges(); verify(controller, times(1)).process(any(), any(XContentParser.class), eq(ReservedStateVersionCheck.HIGHER_VERSION_ONLY), any()); } @@ -295,9 +296,7 @@ public void testStopWorksInMiddleOfProcessing() throws Exception { // Make some fake settings file to cause the file settings service to process it writeTestFile(fileSettingsService.watchedFile(), "{}"); - // we need to wait a bit, on MacOS it may take up to 10 seconds for the Java watcher service to notice the file, - // on Linux is instantaneous. Windows is instantaneous too. - assertTrue(processFileLatch.await(30, TimeUnit.SECONDS)); + longAwait(processFileLatch); // Stopping the service should interrupt the watcher thread, we should be able to stop fileSettingsService.stop(); @@ -352,15 +351,27 @@ public void testHandleSnapshotRestoreResetsMetadata() throws Exception { } // helpers - private void writeTestFile(Path path, String contents) throws IOException { + private static void writeTestFile(Path path, String contents) throws IOException { + logger.info("Writing settings file under [{}]", path.toAbsolutePath()); Path tempFilePath = createTempFile(); Files.writeString(tempFilePath, contents); - Files.move(tempFilePath, path, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + try { + Files.move(tempFilePath, path, ATOMIC_MOVE); + } catch (AtomicMoveNotSupportedException e) { + logger.info("Atomic move not available. Falling back on non-atomic move to write [{}]", path.toAbsolutePath()); + Files.move(tempFilePath, path); + } } - private void overwriteTestFile(Path path, String contents) throws IOException { - Path tempFilePath = createTempFile(); - Files.writeString(tempFilePath, contents); - Files.move(tempFilePath, path, StandardCopyOption.REPLACE_EXISTING); + // this waits for up to 20 seconds to account for watcher service differences between OSes + // on MacOS it may take up to 10 seconds for the Java watcher service to notice the file, + // on Linux is instantaneous. Windows is instantaneous too. + private static void longAwait(CountDownLatch latch) { + try { + assertTrue("longAwait: CountDownLatch did not reach zero within the timeout", latch.await(20, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail(e, "longAwait: interrupted waiting for CountDownLatch to reach zero"); + } } } From 84087bda719ae837e3e2afb83bc4361591389f4d Mon Sep 17 00:00:00 2001 From: Alexey Ivanov Date: Thu, 7 Nov 2024 17:40:11 +0000 Subject: [PATCH 5/7] [CI] JvmStatsTests testJvmStats failing (#116197) (#116427) Fix test JvmStatsTests.testJvmStats (backport from main) Fixes #116197 --- .../test/java/org/elasticsearch/monitor/jvm/JvmStatsTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/monitor/jvm/JvmStatsTests.java b/server/src/test/java/org/elasticsearch/monitor/jvm/JvmStatsTests.java index 28976d803ff53..7956d67c83c3b 100644 --- a/server/src/test/java/org/elasticsearch/monitor/jvm/JvmStatsTests.java +++ b/server/src/test/java/org/elasticsearch/monitor/jvm/JvmStatsTests.java @@ -53,7 +53,7 @@ public void testJvmStats() { assertThat(memoryPools, hasKey("Metaspace")); assertThat(memoryPools.keySet(), hasSize(greaterThan(3))); for (JvmStats.MemoryPool memoryPool : memoryPools.values()) { - assertThat(memoryPool.getUsed().getBytes(), greaterThan(0L)); + assertThat("Memory pool: " + memoryPool.getName(), memoryPool.getUsed().getBytes(), greaterThanOrEqualTo(0L)); } // Threads From fca2f43756a9281ee6ac37dfd40b36621cbc6221 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 7 Nov 2024 11:06:38 -0800 Subject: [PATCH 6/7] Fallback to field-caps (#115977) (#116428) This change falls back to the old field-caps action if the remote cluster has not been updated to 8.16 or later. --- .../TransportFieldCapabilitiesAction.java | 26 ++++++++++++++----- .../qa/server/multi-clusters/build.gradle | 4 +-- .../xpack/esql/ccq/MultiClusterSpecIT.java | 1 - .../xpack/esql/ccq/MultiClustersIT.java | 1 - .../esql/action/EsqlResolveFieldsAction.java | 19 ++++++++++++-- 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 8e222e7197180..37af070effc9f 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.client.internal.RemoteClusterClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -113,23 +114,28 @@ public TransportFieldCapabilitiesAction( @Override protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener listener) { - executeRequest(task, request, REMOTE_TYPE, listener); + executeRequest( + task, + request, + (remoteClient, remoteRequest, remoteListener) -> remoteClient.execute(REMOTE_TYPE, remoteRequest, remoteListener), + listener + ); } public void executeRequest( Task task, FieldCapabilitiesRequest request, - RemoteClusterActionType remoteAction, + RemoteRequestExecutor remoteRequestExecutor, ActionListener listener ) { // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can - searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteAction, l))); + searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteRequestExecutor, l))); } private void doExecuteForked( Task task, FieldCapabilitiesRequest request, - RemoteClusterActionType remoteAction, + RemoteRequestExecutor remoteRequestExecutor, ActionListener listener ) { if (ccsCheckCompatibility) { @@ -282,8 +288,8 @@ private void doExecuteForked( handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex); } }); - remoteClusterClient.execute( - remoteAction, + remoteRequestExecutor.executeRemoteRequest( + remoteClusterClient, remoteRequest, // The underlying transport service may call onFailure with a thread pool other than search_coordinator. // This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator. @@ -298,6 +304,14 @@ private void doExecuteForked( } } + public interface RemoteRequestExecutor { + void executeRemoteRequest( + RemoteClusterClient remoteClient, + FieldCapabilitiesRequest remoteRequest, + ActionListener remoteListener + ); + } + private static void checkIndexBlocks(ClusterState clusterState, String[] concreteIndices) { var blocks = clusterState.blocks(); if (blocks.global().isEmpty() && blocks.indices().isEmpty()) { diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/build.gradle b/x-pack/plugin/esql/qa/server/multi-clusters/build.gradle index 676729573b69d..aa19371685ce1 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/build.gradle +++ b/x-pack/plugin/esql/qa/server/multi-clusters/build.gradle @@ -18,8 +18,8 @@ dependencies { } def supportedVersion = bwcVersion -> { - // ESQL requires its own resolve_fields API - return bwcVersion.onOrAfter(Version.fromString("8.16.0")); + // CCS in ES|QL available in 8.13 + return bwcVersion.onOrAfter(Version.fromString("8.13.0")); } BuildParams.bwcVersions.withWireCompatible(supportedVersion) { bwcVersion, baseName -> diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 3e799730f7269..3cd70026c4133 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -104,7 +104,6 @@ public MultiClusterSpecIT( protected void shouldSkipTest(String testName) throws IOException { super.shouldSkipTest(testName); checkCapabilities(remoteClusterClient(), remoteFeaturesService(), testName, testCase); - assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api")); assumeFalse("can't test with _index metadata", hasIndexMetadata(testCase.query)); assumeTrue( "Test " + testName + " is skipped on " + Clusters.oldVersion(), diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java index 1f72827057c5b..dbeaed1596eff 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java @@ -67,7 +67,6 @@ record Doc(int id, String color, long data) { @Before public void setUpIndices() throws Exception { - assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api")); final String mapping = """ "properties": { "data": { "type": "long" }, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java index 2161efca1d2b4..f7e6793fc4fb3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RemoteClusterActionType; @@ -14,6 +15,7 @@ import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.RemoteClusterClient; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; @@ -27,7 +29,7 @@ public class EsqlResolveFieldsAction extends HandledTransportAction { public static final String NAME = "indices:data/read/esql/resolve_fields"; public static final ActionType TYPE = new ActionType<>(NAME); - public static final RemoteClusterActionType REMOTE_TYPE = new RemoteClusterActionType<>( + public static final RemoteClusterActionType RESOLVE_REMOTE_TYPE = new RemoteClusterActionType<>( NAME, FieldCapabilitiesResponse::new ); @@ -47,6 +49,19 @@ public EsqlResolveFieldsAction( @Override protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener listener) { - fieldCapsAction.executeRequest(task, request, REMOTE_TYPE, listener); + fieldCapsAction.executeRequest(task, request, this::executeRemoteRequest, listener); + } + + void executeRemoteRequest( + RemoteClusterClient remoteClient, + FieldCapabilitiesRequest remoteRequest, + ActionListener remoteListener + ) { + remoteClient.getConnection(remoteRequest, remoteListener.delegateFailure((l, conn) -> { + var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.ESQL_ORIGINAL_INDICES) + ? RESOLVE_REMOTE_TYPE + : TransportFieldCapabilitiesAction.REMOTE_TYPE; + remoteClient.execute(conn, remoteAction, remoteRequest, l); + })); } } From 10934b398158073255416417a616ecd7bc490836 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Thu, 7 Nov 2024 14:09:36 -0600 Subject: [PATCH 7/7] Fix missing remote_cluster docs (#116366) (#116439) Documentation for the remote_cluster in the role was added in #111682 and #108840, but a few places were missed. This commit fill the gaps in the documentation. --- .../cluster/remote-clusters-migration.asciidoc | 9 +++++---- .../remote-clusters-troubleshooting.asciidoc | 2 +- .../rest-api/security/bulk-create-roles.asciidoc | 14 +++++++++++++- .../rest-api/security/create-roles.asciidoc | 11 +++++++++++ .../remote-clusters-privileges-api-key.asciidoc | 5 +++-- 5 files changed, 33 insertions(+), 8 deletions(-) diff --git a/docs/reference/modules/cluster/remote-clusters-migration.asciidoc b/docs/reference/modules/cluster/remote-clusters-migration.asciidoc index e205d7cb141fe..e84304ce9ef94 100644 --- a/docs/reference/modules/cluster/remote-clusters-migration.asciidoc +++ b/docs/reference/modules/cluster/remote-clusters-migration.asciidoc @@ -66,10 +66,11 @@ indices that were created from the auto-follow pattern. On the local cluster: . Enhance any roles used by local cluster users with the required -<> for {ccr} and {ccs}. +<> or +<> for {ccr} and {ccs}. Refer to <>. Note: -** You only need to assign additional `remote_indices` privileges to existing +** You only need to assign additional `remote_indices` or `remote_cluster` privileges to existing roles used for cross-cluster operations. You should be able to copy these privileges from the original roles on the remote cluster, where they are defined under the certification based security model. @@ -197,7 +198,7 @@ authentication. Resume any persistent tasks that you stopped earlier. Tasks should be restarted by the same user or API key that created the task before the migration. Ensure the roles of this user or API key have been updated with the required -`remote_indices` privileges. For users, tasks capture the caller's credentials +`remote_indices` or `remote_cluster` privileges. For users, tasks capture the caller's credentials when started and run in that user's security context. For API keys, restarting a task will update the task with the updated API key. @@ -246,7 +247,7 @@ If you need to roll back, follow these steps on the local cluster: . Remove the remote cluster definition by setting the remote cluster settings to `null`. -. Remove the `remote_indices` privileges from any roles that were updated during +. Remove the `remote_indices` or 'remote_cluster' privileges from any roles that were updated during the migration. . On each node, remove the `remote_cluster_client.ssl.*` settings from diff --git a/docs/reference/modules/cluster/remote-clusters-troubleshooting.asciidoc b/docs/reference/modules/cluster/remote-clusters-troubleshooting.asciidoc index df3c54794dc06..e21f93d81afc7 100644 --- a/docs/reference/modules/cluster/remote-clusters-troubleshooting.asciidoc +++ b/docs/reference/modules/cluster/remote-clusters-troubleshooting.asciidoc @@ -399,7 +399,7 @@ This does not show up in any logs. ====== Resolution -. Check that the local user has the necessary `remote_indices` privileges. Grant sufficient `remote_indices` privileges if necessary. +. Check that the local user has the necessary `remote_indices` or `remote_cluster` privileges. Grant sufficient `remote_indices` or `remote_cluster` privileges if necessary. . If permission is not an issue locally, ask the remote cluster administrator to create and distribute a <>. Replace the diff --git a/docs/reference/rest-api/security/bulk-create-roles.asciidoc b/docs/reference/rest-api/security/bulk-create-roles.asciidoc index e4b6ef7f765c2..a1fe998c08146 100644 --- a/docs/reference/rest-api/security/bulk-create-roles.asciidoc +++ b/docs/reference/rest-api/security/bulk-create-roles.asciidoc @@ -75,7 +75,7 @@ that begin with `_` are reserved for system usage. For more information, see <>. -`remote_indices`:: beta:[] (list) A list of remote indices permissions entries. +`remote_indices`:: (list) A list of remote indices permissions entries. + -- NOTE: Remote indices are effective for <>. @@ -94,6 +94,18 @@ have on the specified indices. read access to. A document within the specified indices must match this query in order for it to be accessible by the owners of the role. +`remote_cluster`:: (list) A list of remote cluster permissions entries. ++ +-- +NOTE: Remote cluster permissions are effective for <>. +They have no effect for remote clusters configured with the <>. +-- +`clusters` (required)::: (list) A list of cluster aliases to which the permissions +in this entry apply. +`privileges`(required)::: (list) The cluster level privileges that the owners of the role +have in the specified clusters. + + For more information, see <>. ==== diff --git a/docs/reference/rest-api/security/create-roles.asciidoc b/docs/reference/rest-api/security/create-roles.asciidoc index 75f1d7c799187..a1ab892330e67 100644 --- a/docs/reference/rest-api/security/create-roles.asciidoc +++ b/docs/reference/rest-api/security/create-roles.asciidoc @@ -96,6 +96,17 @@ have on the specified indices. read access to. A document within the specified indices must match this query in order for it to be accessible by the owners of the role. +`remote_cluster`:: (list) A list of remote cluster permissions entries. ++ +-- +NOTE: Remote cluster permissions are effective for <>. +They have no effect for remote clusters configured with the <>. +-- +`clusters` (required)::: (list) A list of cluster aliases to which the permissions +in this entry apply. +`privileges`(required)::: (list) The cluster level privileges that the owners of the role +have in the specified clusters. + For more information, see <>. [[security-api-put-role-example]] diff --git a/docs/reference/security/authentication/remote-clusters-privileges-api-key.asciidoc b/docs/reference/security/authentication/remote-clusters-privileges-api-key.asciidoc index 1d31c7b6b9345..9b51a58725f39 100644 --- a/docs/reference/security/authentication/remote-clusters-privileges-api-key.asciidoc +++ b/docs/reference/security/authentication/remote-clusters-privileges-api-key.asciidoc @@ -2,7 +2,8 @@ === Configure roles and users To use a remote cluster for {ccr} or {ccs}, you need to create user roles with -<> on the local cluster. +<> or +<> on the local cluster. You can manage users and roles from Stack Management in {kib} by selecting *Security > Roles* from the side navigation. You can also use the @@ -80,7 +81,7 @@ POST /_security/role/remote-search "privileges": [ "read", "read_cross_cluster", - "view_index_metadata" + "view_index_metadata" ] } ]