Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Complete the Data Frame task on stop #41752

Merged
merged 10 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ private void indexData(String indexName) throws IOException {
@After
public void cleanUpTransforms() throws IOException {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
highLevelClient().dataFrame().stopDataFrameTransform(
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT);
}

for (String transformId : transformsToClean) {
Expand Down Expand Up @@ -265,7 +266,7 @@ public void testStartStop() throws IOException {
assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1));
assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState());

StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id);
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
StopDataFrameTransformResponse stopResponse =
execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync);
assertTrue(stopResponse.isStopped());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
@After
public void cleanUpTransforms() throws IOException {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
highLevelClient().dataFrame().stopDataFrameTransform(
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boolean.True should probably just be true

}

for (String transformId : transformsToClean) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,18 @@

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransformAction.Response> {
public class DeleteDataFrameTransformAction extends Action<AcknowledgedResponse> {

public static final DeleteDataFrameTransformAction INSTANCE = new DeleteDataFrameTransformAction();
public static final String NAME = "cluster:admin/data_frame/delete";
Expand All @@ -35,17 +28,21 @@ private DeleteDataFrameTransformAction() {
}

@Override
public Response newResponse() {
public AcknowledgedResponse newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
public Writeable.Reader<AcknowledgedResponse> getResponseReader() {
return in -> {
AcknowledgedResponse response = new AcknowledgedResponse();
response.readFrom(in);
return response;
};
}

public static class Request extends BaseTasksRequest<Request> {
private final String id;
public static class Request extends MasterNodeRequest<Request> {
private String id;

public Request(String id) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
Expand All @@ -60,11 +57,6 @@ public String getId() {
return id;
}

@Override
public boolean match(Task task) {
return task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -94,59 +86,4 @@ public boolean equals(Object obj) {
return Objects.equals(id, other.id);
}
}

public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {

private final boolean acknowledged;

public Response(StreamInput in) throws IOException {
super(in);
acknowledged = in.readBoolean();
}

public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.acknowledged = acknowledged;
}

public Response(boolean acknowledged) {
this(acknowledged, Collections.emptyList(), Collections.emptyList());
}

public boolean isDeleted() {
return acknowledged;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
toXContentCommon(builder, params);
builder.field("acknowledged", acknowledged);
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DeleteDataFrameTransformAction.Response response = (DeleteDataFrameTransformAction.Response) o;
return super.equals(o) && acknowledged == response.acknowledged;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), acknowledged);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
* Only one background job can run simultaneously and {@link #onFinish} is called when the job
* finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
* aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
* {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
* finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call
* to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()}
* is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()}
* to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called.
* {@link #stop()} can be used to stop the background job without aborting the indexer.
*
* In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query,
* indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input.
Expand Down Expand Up @@ -84,8 +86,10 @@ public synchronized IndexerState start() {

/**
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
* running in the background. If there is no job running when this function is
* called, the state is directly set to {@link IndexerState#STOPPED}.
* running in the background, {@link #onStop()} will be called when the background job
* detects that the indexer is stopped.
* If there is no job running when this function is called
* the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly.
*
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
Expand All @@ -94,6 +98,7 @@ public synchronized IndexerState stop() {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
onStop();
return IndexerState.STOPPED;
} else {
return previousState;
Expand Down Expand Up @@ -251,6 +256,14 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
*/
protected abstract void onFinish(ActionListener<Void> listener);

/**
* Called when the indexer is stopped. This is only called when the indexer is stopped
* via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called
* when the indexer's work is done.
*/
protected void onStop() {
}

/**
* Called when a background job detects that the indexer is aborted causing the
* async execution to stop.
Expand All @@ -276,6 +289,7 @@ private IndexerState finishAndSetState() {

case STOPPING:
// must be started again
onStop();
return IndexerState.STOPPED;

case ABORTING:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -34,17 +35,26 @@
public class AsyncTwoPhaseIndexerTests extends ESTestCase {

AtomicBoolean isFinished = new AtomicBoolean(false);
AtomicBoolean isStopped = new AtomicBoolean(false);

@Before
public void reset() {
isFinished.set(false);
isStopped.set(false);
}

private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {

private final CountDownLatch latch;
// test the execution order
private volatile int step;
private final boolean stoppedBeforeFinished;

protected MockIndexer(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition,
CountDownLatch latch) {
CountDownLatch latch, boolean stoppedBeforeFinished) {
super(executor, initialState, initialPosition, new MockJobStats());
this.latch = latch;
this.stoppedBeforeFinished = stoppedBeforeFinished;
}

@Override
Expand All @@ -57,7 +67,7 @@ protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
awaitForLatch();
assertThat(step, equalTo(3));
++step;
return new IterationResult<Integer>(Collections.emptyList(), 3, true);
return new IterationResult<>(Collections.emptyList(), 3, true);
}

private void awaitForLatch() {
Expand Down Expand Up @@ -99,7 +109,8 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next

@Override
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
assertThat(step, equalTo(5));
int expectedStep = stoppedBeforeFinished ? 3 : 5;
assertThat(step, equalTo(expectedStep));
++step;
next.run();
}
Expand All @@ -114,7 +125,12 @@ protected void onFinish(ActionListener<Void> listener) {
assertThat(step, equalTo(4));
++step;
listener.onResponse(null);
isFinished.set(true);
assertTrue(isFinished.compareAndSet(false, true));
}

@Override
protected void onStop() {
assertTrue(isStopped.compareAndSet(false, true));
}

@Override
Expand Down Expand Up @@ -180,7 +196,7 @@ protected void doSaveState(IndexerState state, Integer position, Runnable next)
protected void onFailure(Exception exc) {
assertThat(step, equalTo(2));
++step;
isFinished.set(true);
assertTrue(isFinished.compareAndSet(false, true));
}

@Override
Expand Down Expand Up @@ -209,18 +225,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public void testStateMachine() throws Exception {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
isFinished.set(false);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
countDownLatch.countDown();

assertThat(indexer.getPosition(), equalTo(2));
ESTestCase.awaitBusy(() -> isFinished.get());
assertTrue(awaitBusy(() -> isFinished.get()));
assertFalse(isStopped.get());
assertThat(indexer.getStep(), equalTo(6));
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
Expand All @@ -234,18 +250,57 @@ public void testStateMachine() throws Exception {
public void testStateMachineBrokenSearch() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
isFinished.set(false);
try {

MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertTrue(ESTestCase.awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
assertTrue(awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
assertThat(indexer.getStep(), equalTo(3));

} finally {
executor.shutdownNow();
}
}

public void testStop_AfterIndexerIsFinished() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
indexer.start();
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
countDownLatch.countDown();
assertTrue(awaitBusy(() -> isFinished.get()));

indexer.stop();
assertTrue(isStopped.get());
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
} finally {
executor.shutdownNow();
}
}

public void testStop_WhileIndexing() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, true);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
indexer.stop();
countDownLatch.countDown();

assertThat(indexer.getPosition(), equalTo(2));
assertTrue(awaitBusy(() -> isStopped.get()));
assertFalse(isFinished.get());
} finally {
executor.shutdownNow();
}
}
}
Loading