Skip to content

Commit

Permalink
[ML] Complete the Data Frame task on stop (elastic#41752)
Browse files Browse the repository at this point in the history
Wait for indexer to stop then complete the persistent task on stop.
If the wait_for_completion is true the request will not return until stopped.
  • Loading branch information
davidkyle committed May 10, 2019
1 parent 2998c10 commit b79b77f
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ private void indexData(String indexName) throws IOException {
@After
public void cleanUpTransforms() throws IOException {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
highLevelClient().dataFrame().stopDataFrameTransform(
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT);
}

for (String transformId : transformsToClean) {
Expand Down Expand Up @@ -265,7 +266,7 @@ public void testStartStop() throws IOException {
assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1));
assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState());

StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id);
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
StopDataFrameTransformResponse stopResponse =
execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync);
assertTrue(stopResponse.isStopped());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
@After
public void cleanUpTransforms() throws IOException {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
highLevelClient().dataFrame().stopDataFrameTransform(
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT);
}

for (String transformId : transformsToClean) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,18 @@

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransformAction.Response> {
public class DeleteDataFrameTransformAction extends Action<AcknowledgedResponse> {

public static final DeleteDataFrameTransformAction INSTANCE = new DeleteDataFrameTransformAction();
public static final String NAME = "cluster:admin/data_frame/delete";
Expand All @@ -35,17 +28,21 @@ private DeleteDataFrameTransformAction() {
}

@Override
public Response newResponse() {
public AcknowledgedResponse newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
public Writeable.Reader<AcknowledgedResponse> getResponseReader() {
return in -> {
AcknowledgedResponse response = new AcknowledgedResponse();
response.readFrom(in);
return response;
};
}

public static class Request extends BaseTasksRequest<Request> {
private final String id;
public static class Request extends MasterNodeRequest<Request> {
private String id;

public Request(String id) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
Expand All @@ -60,11 +57,6 @@ public String getId() {
return id;
}

@Override
public boolean match(Task task) {
return task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -94,59 +86,4 @@ public boolean equals(Object obj) {
return Objects.equals(id, other.id);
}
}

public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {

private final boolean acknowledged;

public Response(StreamInput in) throws IOException {
super(in);
acknowledged = in.readBoolean();
}

public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.acknowledged = acknowledged;
}

public Response(boolean acknowledged) {
this(acknowledged, Collections.emptyList(), Collections.emptyList());
}

public boolean isDeleted() {
return acknowledged;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
toXContentCommon(builder, params);
builder.field("acknowledged", acknowledged);
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DeleteDataFrameTransformAction.Response response = (DeleteDataFrameTransformAction.Response) o;
return super.equals(o) && acknowledged == response.acknowledged;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), acknowledged);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
* Only one background job can run simultaneously and {@link #onFinish} is called when the job
* finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
* aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
* {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
* finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call
* to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()}
* is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()}
* to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called.
* {@link #stop()} can be used to stop the background job without aborting the indexer.
*
* In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query,
* indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input.
Expand Down Expand Up @@ -84,8 +86,10 @@ public synchronized IndexerState start() {

/**
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
* running in the background. If there is no job running when this function is
* called, the state is directly set to {@link IndexerState#STOPPED}.
* running in the background, {@link #onStop()} will be called when the background job
* detects that the indexer is stopped.
* If there is no job running when this function is called
* the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly.
*
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
Expand All @@ -94,6 +98,7 @@ public synchronized IndexerState stop() {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
onStop();
return IndexerState.STOPPED;
} else {
return previousState;
Expand Down Expand Up @@ -251,6 +256,14 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
*/
protected abstract void onFinish(ActionListener<Void> listener);

/**
* Called when the indexer is stopped. This is only called when the indexer is stopped
* via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called
* when the indexer's work is done.
*/
protected void onStop() {
}

/**
* Called when a background job detects that the indexer is aborted causing the
* async execution to stop.
Expand All @@ -276,6 +289,7 @@ private IndexerState finishAndSetState() {

case STOPPING:
// must be started again
onStop();
return IndexerState.STOPPED;

case ABORTING:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -34,17 +35,26 @@
public class AsyncTwoPhaseIndexerTests extends ESTestCase {

AtomicBoolean isFinished = new AtomicBoolean(false);
AtomicBoolean isStopped = new AtomicBoolean(false);

@Before
public void reset() {
isFinished.set(false);
isStopped.set(false);
}

private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {

private final CountDownLatch latch;
// test the execution order
private volatile int step;
private final boolean stoppedBeforeFinished;

protected MockIndexer(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition,
CountDownLatch latch) {
CountDownLatch latch, boolean stoppedBeforeFinished) {
super(executor, initialState, initialPosition, new MockJobStats());
this.latch = latch;
this.stoppedBeforeFinished = stoppedBeforeFinished;
}

@Override
Expand All @@ -57,7 +67,7 @@ protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
awaitForLatch();
assertThat(step, equalTo(3));
++step;
return new IterationResult<Integer>(Collections.emptyList(), 3, true);
return new IterationResult<>(Collections.emptyList(), 3, true);
}

private void awaitForLatch() {
Expand Down Expand Up @@ -99,7 +109,8 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next

@Override
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
assertThat(step, equalTo(5));
int expectedStep = stoppedBeforeFinished ? 3 : 5;
assertThat(step, equalTo(expectedStep));
++step;
next.run();
}
Expand All @@ -114,7 +125,12 @@ protected void onFinish(ActionListener<Void> listener) {
assertThat(step, equalTo(4));
++step;
listener.onResponse(null);
isFinished.set(true);
assertTrue(isFinished.compareAndSet(false, true));
}

@Override
protected void onStop() {
assertTrue(isStopped.compareAndSet(false, true));
}

@Override
Expand Down Expand Up @@ -180,7 +196,7 @@ protected void doSaveState(IndexerState state, Integer position, Runnable next)
protected void onFailure(Exception exc) {
assertThat(step, equalTo(2));
++step;
isFinished.set(true);
assertTrue(isFinished.compareAndSet(false, true));
}

@Override
Expand Down Expand Up @@ -209,18 +225,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public void testStateMachine() throws Exception {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
isFinished.set(false);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
countDownLatch.countDown();

assertThat(indexer.getPosition(), equalTo(2));
ESTestCase.awaitBusy(() -> isFinished.get());
assertTrue(awaitBusy(() -> isFinished.get()));
assertFalse(isStopped.get());
assertThat(indexer.getStep(), equalTo(6));
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
Expand All @@ -234,18 +250,57 @@ public void testStateMachine() throws Exception {
public void testStateMachineBrokenSearch() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
isFinished.set(false);
try {

MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertTrue(ESTestCase.awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
assertTrue(awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
assertThat(indexer.getStep(), equalTo(3));

} finally {
executor.shutdownNow();
}
}

public void testStop_AfterIndexerIsFinished() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
indexer.start();
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
countDownLatch.countDown();
assertTrue(awaitBusy(() -> isFinished.get()));

indexer.stop();
assertTrue(isStopped.get());
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
} finally {
executor.shutdownNow();
}
}

public void testStop_WhileIndexing() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, true);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
indexer.stop();
countDownLatch.countDown();

assertThat(indexer.getPosition(), equalTo(2));
assertTrue(awaitBusy(() -> isStopped.get()));
assertFalse(isFinished.get());
} finally {
executor.shutdownNow();
}
}
}
Loading

0 comments on commit b79b77f

Please sign in to comment.