Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scroll parameter to _reindex API #28041

Merged
merged 3 commits into from
Jan 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand All @@ -42,7 +43,7 @@
public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScrollRequest<Self>> extends ActionRequest {

public static final int SIZE_ALL_MATCHES = -1;
private static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5);
static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5);
static final int DEFAULT_SCROLL_SIZE = 1000;

public static final int AUTO_SLICES = 0;
Expand Down Expand Up @@ -341,6 +342,21 @@ public boolean getShouldStoreResult() {
return shouldStoreResult;
}

/**
* Set scroll timeout for {@link SearchRequest}
*/
public Self setScroll(TimeValue keepAlive) {
searchRequest.scroll(new Scroll(keepAlive));
return self();
}

/**
* Get scroll timeout
*/
public TimeValue getScrollTime() {
return searchRequest.scroll().keepAlive();
}

/**
* The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks.
*/
Expand Down
7 changes: 5 additions & 2 deletions docs/reference/docs/delete-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ POST twitter/_delete_by_query?scroll_size=5000
=== URL Parameters

In addition to the standard parameters like `pretty`, the Delete By Query API
also supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, and `timeout`.
also supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, `timeout`
and `scroll`.

Sending the `refresh` will refresh all shards involved in the delete by query
once the request completes. This is different than the Delete API's `refresh`
Expand All @@ -161,7 +162,9 @@ Elasticsearch can reclaim the space it uses.
before proceeding with the request. See <<index-wait-for-active-shards,here>>
for details. `timeout` controls how long each write request waits for unavailable
shards to become available. Both work exactly how they work in the
<<docs-bulk,Bulk API>>.
<<docs-bulk,Bulk API>>. As `_delete_by_query` uses scroll search, you can also specify
the `scroll` parameter to control how long it keeps the "search context" alive,
eg `?scroll=10m`, by default it's 5 minutes.

`requests_per_second` can be set to any positive decimal number (`1.4`, `6`,
`1000`, etc) and throttles rate at which `_delete_by_query` issues batches of
Expand Down
8 changes: 5 additions & 3 deletions docs/reference/docs/reindex.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,8 @@ POST _reindex
=== URL Parameters

In addition to the standard parameters like `pretty`, the Reindex API also
supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, `timeout`, and
`requests_per_second`.
supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, `timeout`,
`scroll` and `requests_per_second`.

Sending the `refresh` url parameter will cause all indexes to which the request
wrote to be refreshed. This is different than the Index API's `refresh`
Expand All @@ -531,7 +531,9 @@ Elasticsearch can reclaim the space it uses.
before proceeding with the reindexing. See <<index-wait-for-active-shards,here>>
for details. `timeout` controls how long each write request waits for unavailable
shards to become available. Both work exactly how they work in the
<<docs-bulk,Bulk API>>.
<<docs-bulk,Bulk API>>. As `_reindex` uses scroll search, you can also specify
the `scroll` parameter to control how long it keeps the "search context" alive,
eg `?scroll=10m`, by default it's 5 minutes.

`requests_per_second` can be set to any positive decimal number (`1.4`, `6`,
`1000`, etc) and throttles rate at which reindex issues batches of index
Expand Down
7 changes: 5 additions & 2 deletions docs/reference/docs/update-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ POST twitter/_update_by_query?pipeline=set-foo
=== URL Parameters

In addition to the standard parameters like `pretty`, the Update By Query API
also supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, and `timeout`.
also supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, `timeout`
and `scroll`.

Sending the `refresh` will update all shards in the index being updated when
the request completes. This is different than the Index API's `refresh`
Expand All @@ -218,7 +219,9 @@ Elasticsearch can reclaim the space it uses.
before proceeding with the request. See <<index-wait-for-active-shards,here>>
for details. `timeout` controls how long each write request waits for unavailable
shards to become available. Both work exactly how they work in the
<<docs-bulk,Bulk API>>.
<<docs-bulk,Bulk API>>. As `_update_by_query` uses scroll search, you can also specify
the `scroll` parameter to control how long it keeps the "search context" alive,
eg `?scroll=10m`, by default it's 5 minutes.

`requests_per_second` can be set to any positive decimal number (`1.4`, `6`,
`1000`, etc) and throttles rate at which `_update_by_query` issues batches of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ protected ReindexRequest buildRequest(RestRequest request) throws IOException {
try (XContentParser parser = request.contentParser()) {
PARSER.parse(parser, internal, null);
}
if (request.hasParam("scroll")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you changed AbstractBulkByScrollRequest to accept a scroll parameter you could parse this option directly in AbstractBaseReindexRestHandler#setCommonOptions. This way all reindex actions would be able to set this option (_delete_by_query, _update_by_query`).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx for your quick review. In fact I see that for _delete_by_query and _update_by_query, we have already parsed the rest request here in their superclass https://github.com/elastic/elasticsearch/blob/master/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java#L54 by calling RestSearchAction#parseSearchRequest, so also including scroll parameter. I'm not sure for RestReindexAction we should do the same thing or not (as _delete_by_query and _update_by_query have a superclass of their own) ? So I just added it in RestReindexAction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right it's not needed for _delete_by_query and _update_by_query since they parse the request with RestSearchAction. I missed this part so I think that this pr is ready and fixes the main issue; adding the support to change the default scroll for _reindex request.

internal.setScroll(parseTimeValue(request.param("scroll"), "scroll"));
}
return internal;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -150,6 +152,24 @@ public void testPipelineQueryParameterIsError() throws IOException {
assertEquals("_reindex doesn't support [pipeline] as a query parmaeter. Specify it in the [dest] object instead.", e.getMessage());
}

public void testSetScrollTimeout() throws IOException {
{
RestReindexAction action = new RestReindexAction(Settings.EMPTY, mock(RestController.class));
FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
ReindexRequest request = action.buildRequest(requestBuilder.build());
assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT, request.getScrollTime());
}
{
RestReindexAction action = new RestReindexAction(Settings.EMPTY, mock(RestController.class));
FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
requestBuilder.withParams(singletonMap("scroll", "10m"));
requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
ReindexRequest request = action.buildRequest(requestBuilder.build());
assertEquals("10m", request.getScrollTime().toString());
}
}

private RemoteInfo buildRemoteInfoHostTestCase(String hostInRest) throws IOException {
Map<String, Object> remote = new HashMap<>();
remote.put("host", hostInRest);
Expand Down