diff --git a/CHANGELOG.md b/CHANGELOG.md index 20187e0291b50..3b0faf1996450 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix pull-based ingestion pause state initialization during replica promotion ([#19212](https://github.com/opensearch-project/OpenSearch/pull/19212)) - Fix QueryPhaseResultConsumer incomplete callback loops ([#19231](https://github.com/opensearch-project/OpenSearch/pull/19231)) - Fix the `scaled_float` precision issue ([#19188](https://github.com/opensearch-project/OpenSearch/pull/19188)) +- Fix Using an excessively large reindex slice can lead to a JVM OutOfMemoryError on coordinator.([#18964](https://github.com/opensearch-project/OpenSearch/pull/18964)) ### Dependencies - Bump `com.netflix.nebula.ospackage-base` from 12.0.0 to 12.1.0 ([#19019](https://github.com/opensearch-project/OpenSearch/pull/19019)) diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/BulkByScrollParallelizationHelper.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/BulkByScrollParallelizationHelper.java index 9423edb3e0ade..529d4c6a28794 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/BulkByScrollParallelizationHelper.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/BulkByScrollParallelizationHelper.java @@ -36,10 +36,13 @@ import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.opensearch.action.search.SearchRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.Index; import org.opensearch.core.tasks.TaskId; +import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.slice.SliceBuilder; @@ -74,6 +77,7 @@ private BulkByScrollParallelizationHelper() {} * This method is equivalent to calling {@link #initTaskState} followed by {@link #executeSlicedAction} */ static > void startSlicedAction( + Metadata metadata, Request request, BulkByScrollTask task, ActionType action, @@ -85,7 +89,7 @@ static > void startSlicedAc initTaskState(task, request, client, new ActionListener() { @Override public void onResponse(Void aVoid) { - executeSlicedAction(task, request, action, listener, client, node, workerAction); + executeSlicedAction(metadata, task, request, action, listener, client, node, workerAction); } @Override @@ -106,6 +110,7 @@ public void onFailure(Exception e) { * This method can only be called after the task state is initialized {@link #initTaskState}. */ static > void executeSlicedAction( + Metadata metadata, BulkByScrollTask task, Request request, ActionType action, @@ -115,7 +120,7 @@ static > void executeSliced Runnable workerAction ) { if (task.isLeader()) { - sendSubRequests(client, action, node.getId(), task, request, listener); + sendSubRequests(metadata, client, action, node.getId(), task, request, listener); } else if (task.isWorker()) { workerAction.run(); } else { @@ -182,6 +187,7 @@ private static int countSlicesBasedOnShards(ClusterSearchShardsResponse response } private static > void sendSubRequests( + Metadata metadata, Client client, ActionType action, String localNodeId, @@ -192,6 +198,24 @@ private static > void sendS LeaderBulkByScrollTaskState worker = task.getLeaderState(); int totalSlices = worker.getSlices(); + for (String index : request.getSearchRequest().indices()) { + IndexMetadata indexMetadata = metadata.index(index); + if (indexMetadata != null && IndexSettings.MAX_SLICES_PER_SCROLL.get(indexMetadata.getSettings()) < totalSlices) { + throw new IllegalArgumentException( + "The number of slices [" + + totalSlices + + "] is too large. It must " + + "be less than [" + + IndexSettings.MAX_SLICES_PER_SCROLL.get(indexMetadata.getSettings()) + + "]. " + + "This limit can be set by changing the [" + + IndexSettings.MAX_SLICES_PER_SCROLL.getKey() + + "] index" + + " level setting." + ); + } + } + TaskId parentTaskId = new TaskId(localNodeId, task.getId()); for (final SearchRequest slice : sliceIntoSubRequests(request.getSearchRequest(), IdFieldMapper.NAME, totalSlices)) { // TODO move the request to the correct node. maybe here or somehow do it as part of startup for reindex in general.... diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java index d303ab1c741af..fcbe12b3af9aa 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java @@ -133,6 +133,7 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen public void execute(BulkByScrollTask task, ReindexRequest request, ActionListener listener) { ActionListener remoteReindexActionListener = getRemoteReindexWrapperListener(listener, request); BulkByScrollParallelizationHelper.executeSlicedAction( + clusterService.state().metadata(), task, request, ReindexAction.INSTANCE, diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportDeleteByQueryAction.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportDeleteByQueryAction.java index f50680777fcb8..76e3cc42697ff 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportDeleteByQueryAction.java @@ -77,6 +77,7 @@ public TransportDeleteByQueryAction( public void doExecute(Task task, DeleteByQueryRequest request, ActionListener listener) { BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task; BulkByScrollParallelizationHelper.startSlicedAction( + clusterService.state().metadata(), request, bulkByScrollTask, DeleteByQueryAction.INSTANCE, diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportUpdateByQueryAction.java index 0039002c23f07..1688c7873990a 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportUpdateByQueryAction.java @@ -87,6 +87,7 @@ public TransportUpdateByQueryAction( protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener listener) { BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task; BulkByScrollParallelizationHelper.startSlicedAction( + clusterService.state().metadata(), request, bulkByScrollTask, UpdateByQueryAction.INSTANCE, diff --git a/modules/reindex/src/test/java/org/opensearch/index/reindex/ReindexBasicTests.java b/modules/reindex/src/test/java/org/opensearch/index/reindex/ReindexBasicTests.java index 1fcf79aec1b26..316ab14fbb7aa 100644 --- a/modules/reindex/src/test/java/org/opensearch/index/reindex/ReindexBasicTests.java +++ b/modules/reindex/src/test/java/org/opensearch/index/reindex/ReindexBasicTests.java @@ -52,6 +52,7 @@ import static org.opensearch.index.query.QueryBuilders.termQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -483,4 +484,21 @@ private void verifyTransformedContent(String indexName, int expectedCount) { assertNotNull(source.get("date_field")); } } + + public void testTooMuchSlices() throws InterruptedException { + indexRandom( + true, + client().prepareIndex("source").setId("1").setSource("foo", "a"), + client().prepareIndex("source").setId("2").setSource("foo", "a"), + client().prepareIndex("source").setId("3").setSource("foo", "b"), + client().prepareIndex("source").setId("4").setSource("foo", "c") + ); + assertHitCount(client().prepareSearch("source").setSize(0).get(), 4); + + int slices = 2000; + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { + reindex().source("source").destination("dest").refresh(true).setSlices(slices).get(); + }); + assertThat(e.getMessage(), containsString("is too large")); + } }