diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index 5e72f4bda157b..cddd129d2753d 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -8,6 +8,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -345,7 +346,7 @@ public void testUpdateRunningKeepAlive() throws Exception { assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0)); assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); - response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10)); + response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(6)); assertThat(response.getExpirationTime(), greaterThan(expirationTime)); assertTrue(response.isRunning()); @@ -364,8 +365,13 @@ public void testUpdateRunningKeepAlive() throws Exception { assertEquals(0, statusResponse.getSkippedShards()); assertEquals(null, statusResponse.getCompletionStatus()); - response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1)); - assertThat(response.getExpirationTime(), lessThan(expirationTime)); + expirationTime = response.getExpirationTime(); + response = getAsyncSearch(response.getId(), TimeValue.timeValueMinutes(between(1, 24 * 60))); + assertThat(response.getExpirationTime(), equalTo(expirationTime)); + response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10)); + assertThat(response.getExpirationTime(), greaterThan(expirationTime)); + + deleteAsyncSearch(response.getId()); ensureTaskNotRunning(response.getId()); ensureTaskRemoval(response.getId()); } @@ -391,8 +397,9 @@ public void testUpdateStoreKeepAlive() throws Exception { assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards)); assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); - response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10)); + response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(8)); assertThat(response.getExpirationTime(), greaterThan(expirationTime)); + expirationTime = response.getExpirationTime(); assertFalse(response.isRunning()); assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards)); @@ -400,7 +407,11 @@ public void testUpdateStoreKeepAlive() throws Exception { assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1)); - assertThat(response.getExpirationTime(), lessThan(expirationTime)); + assertThat(response.getExpirationTime(), equalTo(expirationTime)); + response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10)); + assertThat(response.getExpirationTime(), greaterThan(expirationTime)); + + deleteAsyncSearch(response.getId()); ensureTaskNotRunning(response.getId()); ensureTaskRemoval(response.getId()); } @@ -427,22 +438,24 @@ public void testRemoveAsyncIndex() throws Exception { ExceptionsHelper.unwrapCause(exc.getCause()) : ExceptionsHelper.unwrapCause(exc); assertThat(ExceptionsHelper.status(cause).getStatus(), equalTo(404)); - SubmitAsyncSearchRequest newReq = new SubmitAsyncSearchRequest(indexName); + SubmitAsyncSearchRequest newReq = new SubmitAsyncSearchRequest(indexName) { + @Override + public ActionRequestValidationException validate() { + return null; // to use a small keep_alive + } + }; newReq.getSearchRequest().source( new SearchSourceBuilder().aggregation(new CancellingAggregationBuilder("test", randomLong())) ); - newReq.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)); + newReq.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)).setKeepAlive(TimeValue.timeValueSeconds(5)); AsyncSearchResponse newResp = submitAsyncSearch(newReq); assertNotNull(newResp.getSearchResponse()); assertTrue(newResp.isRunning()); assertThat(newResp.getSearchResponse().getTotalShards(), equalTo(numShards)); assertThat(newResp.getSearchResponse().getSuccessfulShards(), equalTo(0)); assertThat(newResp.getSearchResponse().getFailedShards(), equalTo(0)); - long expirationTime = newResp.getExpirationTime(); // check garbage collection - newResp = getAsyncSearch(newResp.getId(), TimeValue.timeValueMillis(1)); - assertThat(newResp.getExpirationTime(), lessThan(expirationTime)); ensureTaskNotRunning(newResp.getId()); ensureTaskRemoval(newResp.getId()); } diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index 5b9bc802aa1a4..7e767b23c12fc 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -23,6 +23,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -57,11 +58,15 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX; import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -93,6 +98,44 @@ public List getAggregations() { } } + public static class ExpirationTimeScriptPlugin extends MockScriptPlugin { + @Override + public String pluginScriptLang() { + return "painless"; + } + + @Override + @SuppressWarnings("unchecked") + protected Map, Object>> pluginScripts() { + final String fieldName = "expiration_time"; + final String script = + " if (ctx._source.expiration_time < params.expiration_time) { " + + " ctx._source.expiration_time = params.expiration_time; " + + " } else { " + + " ctx.op = \"noop\"; " + + " }"; + return Map.of( + script, vars -> { + Map params = (Map) vars.get("params"); + assertNotNull(params); + assertThat(params.keySet(), contains(fieldName)); + long updatingValue = (long) params.get(fieldName); + + Map ctx = (Map) vars.get("ctx"); + assertNotNull(ctx); + Map source = (Map) ctx.get("_source"); + long currentValue = (long) source.get(fieldName); + if (currentValue < updatingValue) { + source.put(fieldName, updatingValue); + } else { + ctx.put("op", "noop"); + } + return ctx; + } + ); + } + } + @Before public void startMaintenanceService() { for (AsyncTaskMaintenanceService service : internalCluster().getDataNodeInstances(AsyncTaskMaintenanceService.class)) { @@ -120,7 +163,7 @@ public void releaseQueryLatch() { @Override protected Collection> nodePlugins() { return Arrays.asList(LocalStateCompositeXPackPlugin.class, AsyncSearch.class, AsyncResultsIndexPlugin.class, IndexLifecycle.class, - SearchTestPlugin.class, ReindexPlugin.class); + SearchTestPlugin.class, ReindexPlugin.class, ExpirationTimeScriptPlugin.class); } @Override @@ -189,7 +232,7 @@ protected void ensureTaskNotRunning(String id) throws Exception { throw exc; } } - }); + }, 30, TimeUnit.SECONDS); } /** @@ -207,7 +250,7 @@ protected void ensureTaskCompletion(String id) throws Exception { throw exc; } } - }); + }, 30, TimeUnit.SECONDS); } /** diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 9f74c2949ee48..d03f12eb8e61d 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; @@ -62,7 +63,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { private final List initListeners = new ArrayList<>(); private final Map> completionListeners = new HashMap<>(); - private volatile long expirationTimeMillis; + private final AtomicLong expirationTimeMillis; private final AtomicBoolean isCancelling = new AtomicBoolean(false); private final AtomicReference searchResponse = new AtomicReference<>(); @@ -93,7 +94,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { ThreadPool threadPool, Supplier aggReduceContextSupplier) { super(id, type, action, () -> "async_search{" + descriptionSupplier.get() + "}", parentTaskId, taskHeaders); - this.expirationTimeMillis = getStartTime() + keepAlive.getMillis(); + this.expirationTimeMillis = new AtomicLong(getStartTime() + keepAlive.getMillis()); this.originHeaders = originHeaders; this.searchId = searchId; this.client = client; @@ -127,8 +128,8 @@ Listener getSearchProgressActionListener() { * Update the expiration time of the (partial) response. */ @Override - public void setExpirationTime(long expirationTimeMillis) { - this.expirationTimeMillis = expirationTimeMillis; + public void extendExpirationTime(long newExpirationTimeMillis) { + this.expirationTimeMillis.updateAndGet(curr -> Math.max(curr, newExpirationTimeMillis)); } @Override @@ -330,11 +331,11 @@ private AsyncSearchResponse getResponse(boolean restoreResponseHeaders) { checkCancellation(); AsyncSearchResponse asyncSearchResponse; try { - asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, restoreResponseHeaders); + asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis.get(), restoreResponseHeaders); } catch(Exception e) { ElasticsearchException exception = new ElasticsearchStatusException("Async search: error while reducing partial results", ExceptionsHelper.status(e), e); - asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, exception); + asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis.get(), exception); } return asyncSearchResponse; } @@ -342,7 +343,7 @@ private AsyncSearchResponse getResponse(boolean restoreResponseHeaders) { // checks if the search task should be cancelled private synchronized void checkCancellation() { long now = System.currentTimeMillis(); - if (hasCompleted == false && expirationTimeMillis < now) { + if (hasCompleted == false && expirationTimeMillis.get() < now) { // we cancel expired search task even if they are still running cancelTask(() -> {}, "async search has expired"); } @@ -354,7 +355,7 @@ private synchronized void checkCancellation() { public AsyncStatusResponse getStatusResponse() { MutableSearchResponse mutableSearchResponse = searchResponse.get(); assert mutableSearchResponse != null; - return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis); + return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis.get()); } class Listener extends SearchProgressActionListener { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java index d94dc9bf9c95e..810c3f42f8847 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java @@ -81,7 +81,7 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener 0) { - store.updateExpirationTime(searchId.getDocId(), expirationTime, + store.extendExpirationTime(searchId.getDocId(), expirationTime, ActionListener.wrap( p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), exc -> { @@ -123,7 +123,7 @@ private void getSearchResponseFromTask(AsyncExecutionId searchId, } if (expirationTimeMillis != -1) { - task.setExpirationTime(expirationTimeMillis); + task.extendExpirationTime(expirationTimeMillis); } addCompletionListener.apply(task, new ActionListener<>() { @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java index db8393e74a493..9c8536fe1f427 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java @@ -30,9 +30,9 @@ public interface AsyncTask { boolean isCancelled(); /** - * Update the expiration time of the (partial) response. + * Extends the expiration time of the (partial) response if needed */ - void setExpirationTime(long expirationTimeMillis); + void extendExpirationTime(long newExpirationTimeMillis); /** * Performs necessary checks, cancels the task and calls the runnable upon completion diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index bc3da1e3015a6..08c319686c16e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -32,6 +32,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptType; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.xpack.core.XPackPlugin; @@ -45,7 +47,6 @@ import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.Base64; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,6 +65,13 @@ public final class AsyncTaskIndexService> { public static final String HEADERS_FIELD = "headers"; public static final String RESPONSE_HEADERS_FIELD = "response_headers"; public static final String EXPIRATION_TIME_FIELD = "expiration_time"; + public static final String EXPIRATION_TIME_SCRIPT = + " if (ctx._source.expiration_time < params.expiration_time) { " + + " ctx._source.expiration_time = params.expiration_time; " + + " } else { " + + " ctx.op = \"noop\"; " + + " }"; + public static final String RESULT_FIELD = "result"; // Usually the settings, mappings and system index descriptor below @@ -196,16 +204,15 @@ public void updateResponse(String docId, } /** - * Updates the expiration time of the provided docId if the place-holder - * document is still present (update). + * Extends the expiration time of the provided docId if the place-holder document is still present (update). */ - public void updateExpirationTime(String docId, - long expirationTimeMillis, - ActionListener listener) { - Map source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis); - UpdateRequest request = new UpdateRequest().index(index) + public void extendExpirationTime(String docId, long expirationTimeMillis, ActionListener listener) { + Script script = new Script(ScriptType.INLINE, "painless", EXPIRATION_TIME_SCRIPT, + Map.of(EXPIRATION_TIME_FIELD, expirationTimeMillis)); + UpdateRequest request = new UpdateRequest() + .index(index) .id(docId) - .doc(source, XContentType.JSON) + .script(script) .retryOnConflict(5); client.update(request, listener); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java index ea9920be84ae0..efd1c67b8b07b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -23,6 +24,7 @@ import org.elasticsearch.xpack.core.async.AsyncSearchIndexServiceTests.TestAsyncResponse; import org.junit.Before; +import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -68,8 +70,8 @@ public AsyncExecutionId getExecutionId() { } @Override - public void setExpirationTime(long expirationTimeMillis) { - this.expirationTimeMillis = expirationTimeMillis; + public void extendExpirationTime(long newExpirationTimeMillis) { + this.expirationTimeMillis = newExpirationTimeMillis; } @Override @@ -155,7 +157,7 @@ public void testRetrieveFromMemoryWithExpiration() throws Exception { try { boolean shouldExpire = randomBoolean(); long expirationTime = System.currentTimeMillis() + randomLongBetween(100000, 1000000) * (shouldExpire ? -1 : 1); - task.setExpirationTime(expirationTime); + task.extendExpirationTime(expirationTime); if (updateInitialResultsInStore) { // we need to store initial result @@ -197,7 +199,7 @@ public void testAssertExpirationPropagation() throws Exception { TestTask task = (TestTask) taskManager.register("test", "test", request); try { long startTime = System.currentTimeMillis(); - task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); + task.extendExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); if (updateInitialResultsInStore) { // we need to store initial result @@ -235,7 +237,7 @@ public void testRetrieveFromDisk() throws Exception { TestTask task = (TestTask) taskManager.register("test", "test", request); try { long startTime = System.currentTimeMillis(); - task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); + task.extendExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); if (updateInitialResultsInStore) { // we need to store initial result @@ -273,4 +275,9 @@ public void testRetrieveFromDisk() throws Exception { deleteService.deleteResult(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener); assertFutureThrows(deleteListener, ResourceNotFoundException.class); } + + @Override + protected Collection> getPlugins() { + return pluginList(ExpirationTimeScriptPlugin.class); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java index 66fa5da060072..647bbc09e2218 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java @@ -51,6 +51,7 @@ public void setup() { protected Collection> getPlugins() { List> plugins = new ArrayList<>(super.getPlugins()); plugins.add(TestPlugin.class); + plugins.add(ExpirationTimeScriptPlugin.class); return plugins; } @@ -154,7 +155,7 @@ public void testAutoCreateIndex() throws Exception { // And so does updating the expiration time { PlainActionFuture future = PlainActionFuture.newFuture(); - indexService.updateExpirationTime("0", 10L, future); + indexService.extendExpirationTime("0", 10L, future); expectThrows(Exception.class, future::get); assertSettings(); } @@ -175,4 +176,6 @@ private void assertSettings() { Settings expected = AsyncTaskIndexService.settings(); assertEquals(expected, settings.filter(expected::hasValue)); } + + } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/ExpirationTimeScriptPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/ExpirationTimeScriptPlugin.java new file mode 100644 index 0000000000000..902dc7a95be7b --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/ExpirationTimeScriptPlugin.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.script.MockScriptPlugin; +import org.junit.Assert; + +import java.util.Map; +import java.util.function.Function; + +import static org.hamcrest.Matchers.contains; + +public class ExpirationTimeScriptPlugin extends MockScriptPlugin { + @Override + public String pluginScriptLang() { + return "painless"; + } + + @Override + @SuppressWarnings("unchecked") + protected Map, Object>> pluginScripts() { + final String fieldName = "expiration_time"; + final String script = + " if (ctx._source.expiration_time < params.expiration_time) { " + + " ctx._source.expiration_time = params.expiration_time; " + + " } else { " + + " ctx.op = \"noop\"; " + + " }"; + return Map.of( + script, vars -> { + Map params = (Map) vars.get("params"); + Assert.assertNotNull(params); + Assert.assertThat(params.keySet(), contains(fieldName)); + long updatingValue = (long) params.get(fieldName); + + Map ctx = (Map) vars.get("ctx"); + Assert.assertNotNull(ctx); + Map source = (Map) ctx.get("_source"); + long currentValue = (long) source.get(fieldName); + if (currentValue < updatingValue) { + source.put(fieldName, updatingValue); + } else { + ctx.put("op", "noop"); + } + return ctx; + } + ); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java index 92ed8876ad866..577aa50b22838 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java @@ -18,13 +18,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; public abstract class StoredAsyncTask extends CancellableTask implements AsyncTask { private final AsyncExecutionId asyncExecutionId; private final Map originHeaders; - private volatile long expirationTimeMillis; + private final AtomicLong expirationTimeMillis; private final List> completionListeners; public StoredAsyncTask(long id, String type, String action, String description, TaskId parentTaskId, @@ -33,7 +34,7 @@ public StoredAsyncTask(long id, String type, String action, String description, super(id, type, action, description, parentTaskId, headers); this.asyncExecutionId = asyncExecutionId; this.originHeaders = originHeaders; - this.expirationTimeMillis = getStartTime() + keepAlive.getMillis(); + this.expirationTimeMillis = new AtomicLong(getStartTime() + keepAlive.getMillis()); this.completionListeners = new ArrayList<>(); } @@ -51,12 +52,12 @@ public AsyncExecutionId getExecutionId() { * Update the expiration time of the (partial) response. */ @Override - public void setExpirationTime(long expirationTimeMillis) { - this.expirationTimeMillis = expirationTimeMillis; + public void extendExpirationTime(long newExpirationTimeMillis) { + this.expirationTimeMillis.updateAndGet(curr -> Math.max(curr, newExpirationTimeMillis)); } public long getExpirationTimeMillis() { - return expirationTimeMillis; + return expirationTimeMillis.get(); } public synchronized void addCompletionListener(ActionListener listener) { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/10_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/10_basic.yml index 70c8edc5226c6..3f041ee326a17 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/10_basic.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/10_basic.yml @@ -77,6 +77,43 @@ - match: { response.hits.hits.0._source.max: 1 } - match: { response.aggregations.max.value: 3.0 } + - do: + async_search.submit: + index: test-* + batched_reduce_size: 2 + wait_for_completion_timeout: 10s + keep_alive: 10m + keep_on_completion: true + body: + aggs: + max: + max: + field: max + sort: max + + - set: { id: id } + - set: { expiration_time_in_millis: expiration_time_in_millis } + + - do: + async_search.get: + id: "$id" + keep_alive: 5m + + - match: { expiration_time_in_millis: $expiration_time_in_millis } + + - do: + async_search.get: + id: "$id" + keep_alive: 30m + + - gt: { expiration_time_in_millis: $expiration_time_in_millis } + + - do: + async_search.delete: + id: "$id" + + - match: { acknowledged: true } + # test with typed_keys: - do: async_search.submit: