Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix pull-based ingestion pause state initialization during replica promotion ([#19212](https://github.com/opensearch-project/OpenSearch/pull/19212))
- Fix QueryPhaseResultConsumer incomplete callback loops ([#19231](https://github.com/opensearch-project/OpenSearch/pull/19231))
- Fix the `scaled_float` precision issue ([#19188](https://github.com/opensearch-project/OpenSearch/pull/19188))
- Fix Using an excessively large reindex slice can lead to a JVM OutOfMemoryError on coordinator.([#18964](https://github.com/opensearch-project/OpenSearch/pull/18964))

### Dependencies
- Bump `com.netflix.nebula.ospackage-base` from 12.0.0 to 12.1.0 ([#19019](https://github.com/opensearch-project/OpenSearch/pull/19019))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.slice.SliceBuilder;
Expand Down Expand Up @@ -74,6 +77,7 @@ private BulkByScrollParallelizationHelper() {}
* This method is equivalent to calling {@link #initTaskState} followed by {@link #executeSlicedAction}
*/
static <Request extends AbstractBulkByScrollRequest<Request>> void startSlicedAction(
Metadata metadata,
Request request,
BulkByScrollTask task,
ActionType<BulkByScrollResponse> action,
Expand All @@ -85,7 +89,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void startSlicedAc
initTaskState(task, request, client, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
executeSlicedAction(task, request, action, listener, client, node, workerAction);
executeSlicedAction(metadata, task, request, action, listener, client, node, workerAction);
}

@Override
Expand All @@ -106,6 +110,7 @@ public void onFailure(Exception e) {
* This method can only be called after the task state is initialized {@link #initTaskState}.
*/
static <Request extends AbstractBulkByScrollRequest<Request>> void executeSlicedAction(
Metadata metadata,
BulkByScrollTask task,
Request request,
ActionType<BulkByScrollResponse> action,
Expand All @@ -115,7 +120,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void executeSliced
Runnable workerAction
) {
if (task.isLeader()) {
sendSubRequests(client, action, node.getId(), task, request, listener);
sendSubRequests(metadata, client, action, node.getId(), task, request, listener);
} else if (task.isWorker()) {
workerAction.run();
} else {
Expand Down Expand Up @@ -182,6 +187,7 @@ private static int countSlicesBasedOnShards(ClusterSearchShardsResponse response
}

private static <Request extends AbstractBulkByScrollRequest<Request>> void sendSubRequests(
Metadata metadata,
Client client,
ActionType<BulkByScrollResponse> action,
String localNodeId,
Expand All @@ -192,6 +198,24 @@ private static <Request extends AbstractBulkByScrollRequest<Request>> void sendS

LeaderBulkByScrollTaskState worker = task.getLeaderState();
int totalSlices = worker.getSlices();
for (String index : request.getSearchRequest().indices()) {
IndexMetadata indexMetadata = metadata.index(index);
if (indexMetadata != null && IndexSettings.MAX_SLICES_PER_SCROLL.get(indexMetadata.getSettings()) < totalSlices) {
throw new IllegalArgumentException(
"The number of slices ["
+ totalSlices
+ "] is too large. It must "
+ "be less than ["
+ IndexSettings.MAX_SLICES_PER_SCROLL.get(indexMetadata.getSettings())
+ "]. "
+ "This limit can be set by changing the ["
+ IndexSettings.MAX_SLICES_PER_SCROLL.getKey()
+ "] index"
+ " level setting."
);
}
}

TaskId parentTaskId = new TaskId(localNodeId, task.getId());
for (final SearchRequest slice : sliceIntoSubRequests(request.getSearchRequest(), IdFieldMapper.NAME, totalSlices)) {
// TODO move the request to the correct node. maybe here or somehow do it as part of startup for reindex in general....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen
public void execute(BulkByScrollTask task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
ActionListener<BulkByScrollResponse> remoteReindexActionListener = getRemoteReindexWrapperListener(listener, request);
BulkByScrollParallelizationHelper.executeSlicedAction(
clusterService.state().metadata(),
task,
request,
ReindexAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public TransportDeleteByQueryAction(
public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
BulkByScrollParallelizationHelper.startSlicedAction(
clusterService.state().metadata(),
request,
bulkByScrollTask,
DeleteByQueryAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public TransportUpdateByQueryAction(
protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
BulkByScrollParallelizationHelper.startSlicedAction(
clusterService.state().metadata(),
request,
bulkByScrollTask,
UpdateByQueryAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -483,4 +484,21 @@ private void verifyTransformedContent(String indexName, int expectedCount) {
assertNotNull(source.get("date_field"));
}
}

public void testTooMuchSlices() throws InterruptedException {
indexRandom(
true,
client().prepareIndex("source").setId("1").setSource("foo", "a"),
client().prepareIndex("source").setId("2").setSource("foo", "a"),
client().prepareIndex("source").setId("3").setSource("foo", "b"),
client().prepareIndex("source").setId("4").setSource("foo", "c")
);
assertHitCount(client().prepareSearch("source").setSize(0).get(), 4);

int slices = 2000;
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
reindex().source("source").destination("dest").refresh(true).setSlices(slices).get();
});
assertThat(e.getMessage(), containsString("is too large"));
}
}
Loading