Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML Data Frame] Refactor stop logic #42644

Merged
merged 12 commits into from
May 31, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -89,28 +89,21 @@ public synchronized IndexerState start() {
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
* running in the background, {@link #onStop()} will be called when the background job
* detects that the indexer is stopped.
* If there is no job running when this function is called
* the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly.
* If there is no job running when this function is called the returned
* state is {@link IndexerState#STOPPED} and {@link #onStop()} will not be called.
*
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false);
IndexerState currentState = state.updateAndGet(previousState -> {
return state.updateAndGet(previousState -> {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
wasStartedAndSetStopped.set(true);
return IndexerState.STOPPED;
} else {
return previousState;
}
});

if (wasStartedAndSetStopped.get()) {
onStop();
}
return currentState;
}

/**
Expand Down Expand Up @@ -287,20 +280,22 @@ private void finishWithIndexingFailure(Exception exc) {
}

private IndexerState finishAndSetState() {
return state.updateAndGet(prev -> {
AtomicBoolean callOnStop = new AtomicBoolean(false);
AtomicBoolean callOnAbort = new AtomicBoolean(false);
IndexerState updatedState = state.updateAndGet(prev -> {
switch (prev) {
case INDEXING:
// ready for another job
return IndexerState.STARTED;

case STOPPING:
callOnStop.set(true);
// must be started again
onStop();
return IndexerState.STOPPED;

case ABORTING:
callOnAbort.set(true);
// abort and exit
onAbort();
return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first

case STOPPED:
Expand All @@ -315,6 +310,14 @@ private IndexerState finishAndSetState() {
throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]");
}
});

if (callOnStop.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this doesn't need the intermediate stopping state anymore, could we just check updatedState and avoid the atomic booleans?

I suppose it'd also trigger if was already stopped and that's not what you want?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@polyfractal stopping still provides a call to doSaveState when checkState is called against it.
But, changing these branches to

if(updatedState == IndexerState.STOPPED) {
   onStop();
else if(udpatedState == IndexerState.ABORTING){
   onAbort();
}

Maybe could work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidkyle what do you think? That would make it out of step with onAbort but, we could possibly remove at least that atomic boolean

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, if it requires jumping through hoops it's probably not worth it. Just was looking to see if there was an easy way to avoid the atomics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Carry on 👍

onStop();
} else if (callOnAbort.get()) {
onAbort();
}

return updatedState;
}

private void onSearchResponse(SearchResponse searchResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,25 +268,6 @@ public void testStateMachineBrokenSearch() throws InterruptedException {
}
}

public void testStop_AfterIndexerIsFinished() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
indexer.start();
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
countDownLatch.countDown();
assertTrue(awaitBusy(() -> isFinished.get()));

indexer.stop();
assertTrue(isStopped.get());
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
} finally {
executor.shutdownNow();
}
}

public void testStop_WhileIndexing() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public void cleanTransforms() throws IOException {
cleanUp();
}

@AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public void testDataFrameTransformCrud() throws Exception {
createReviewsIndex();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.Before;
Expand All @@ -23,7 +22,6 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameAuditorIT extends DataFrameRestTestCase {

private static final String TEST_USER_NAME = "df_admin_plus_data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
Expand All @@ -23,7 +22,6 @@

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameConfigurationIndexIT extends DataFrameRestTestCase {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
Expand All @@ -22,7 +21,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {

private static final String TEST_USER_NAME = "df_user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand All @@ -16,7 +15,6 @@
import java.io.IOException;
import java.util.Map;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameMetaDataIT extends DataFrameRestTestCase {

private boolean indicesCreated = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;
Expand All @@ -22,7 +21,6 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFramePivotRestIT extends DataFrameRestTestCase {

private static final String TEST_USER_NAME = "df_admin_plus_data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -20,7 +19,6 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {

public void testDummy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
Expand All @@ -23,7 +22,6 @@
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE;
import static org.elasticsearch.xpack.dataframe.DataFrameFeatureSet.PROVIDED_STATS;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameUsageIT extends DataFrameRestTestCase {
private boolean indicesCreated = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final Map<String, Object> initialPosition;
private final IndexerState initialIndexerState;

private final SetOnce<DataFrameIndexer> indexer = new SetOnce<>();
private final SetOnce<ClientDataFrameIndexer> indexer = new SetOnce<>();

private final AtomicReference<DataFrameTransformTaskState> taskState;
private final AtomicReference<String> stateReason;
Expand Down Expand Up @@ -125,7 +125,7 @@ public Status getStatus() {
return getState();
}

private DataFrameIndexer getIndexer() {
private ClientDataFrameIndexer getIndexer() {
return indexer.get();
}

Expand Down Expand Up @@ -236,7 +236,10 @@ public synchronized void stop() {
return;
}

getIndexer().stop();
IndexerState state = getIndexer().stop();
if (state == IndexerState.STOPPED) {
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop());
}
}

@Override
Expand Down Expand Up @@ -530,40 +533,36 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
next.run();
return;
}
// If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING`
// OR we called `doSaveState` manually as the indexer was not actively running.
// Since we save the state to an index, we should make sure that our task state is in parity with the indexer state
if (indexerState.equals(IndexerState.STOPPED)) {
transformTask.setTaskStateStopped();
}

final DataFrameTransformState state = new DataFrameTransformState(
transformTask.taskState.get(),
indexerState,
getPosition(),
position,
transformTask.currentCheckpoint.get(),
transformTask.stateReason.get(),
getProgress());
logger.debug("Updating persistent state of transform [{}] to [{}]", transformConfig.getId(), state.toString());

// Persisting stats when we call `doSaveState` should be ok as we only call it on a state transition and
// only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
task -> {
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
},
exc -> {
logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc);
next.run();
}
);

transformTask.persistStateToClusterState(state, updateClusterStateListener);
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
}

@Override
Expand Down Expand Up @@ -602,20 +601,7 @@ protected void onFinish(ActionListener<Void> listener) {
protected void onStop() {
auditor.info(transformConfig.getId(), "Indexer has stopped");
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());

transformTask.setTaskStateStopped();
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
transformTask.shutdown();
},
statsExc -> {
transformTask.shutdown();
logger.error("Updating saving stats of transform [" + transformConfig.getId() + "] failed", statsExc);
}
));
transformTask.shutdown();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ teardown:
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-start-stop"
wait_for_completion: true
- match: { acknowledged: true }


- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-start-later"
Expand All @@ -209,3 +211,46 @@ teardown:
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-start-later"

---
"Test stop all":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-stop-all"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-start-later" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-stop-all"
- match: { acknowledged: true }

- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { acknowledged: true }

- do:
data_frame.stop_data_frame_transform:
transform_id: "_all"
wait_for_completion: true
- match: { acknowledged: true }

- do:
data_frame.get_data_frame_transform_stats:
transform_id: "*"
- match: { count: 2 }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }
- match: { transforms.1.state.indexer_state: "stopped" }
- match: { transforms.1.state.task_state: "stopped" }

- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stop-all"