Skip to content

Commit 80580d8

Browse files
author
Hendrik Muhs
authored
[7.12][Transform] make shouldStopAtCheckpoint more robust (#70461) (#71343)
shouldStopAtCheckpoint tells transform to stop at the next checkpoint, if this API is called while a checkpoint is finishing, it can cause a race condition in state persistence. This is similar to #69551, but this time in a different place. With this change _stop?shouldStopAtCheckpoint=true does not call doSaveState if indexer is shutting down. Still it ensures the job stops after the indexer has shutdown. Apart from that the change fixes: a logging problem, it adds error handling in case of a timeout during _stop?shouldStopAtCheckpoint=true. Some logic has been moved from the task to the indexer. fixes #70416
1 parent e30d3ce commit 80580d8

File tree

4 files changed

+153
-18
lines changed

4 files changed

+153
-18
lines changed

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ long getCheckpoint() {
8484
return currentCheckpoint.get();
8585
}
8686

87-
long getAndIncrementCheckpoint() {
88-
return currentCheckpoint.getAndIncrement();
87+
long incrementAndGetCheckpoint() {
88+
return currentCheckpoint.incrementAndGet();
8989
}
9090

9191
void setNumFailureRetries(int numFailureRetries) {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ private enum RunState {
125125
private volatile long lastCheckpointCleanup = 0L;
126126

127127
protected volatile boolean indexerThreadShuttingDown = false;
128-
protected volatile boolean stopCalledDuringIndexerThreadShutdown = false;
128+
protected volatile boolean saveStateRequestedDuringIndexerThreadShutdown = false;
129129

130130
public TransformIndexer(
131131
ThreadPool threadPool,
@@ -515,7 +515,7 @@ private void finalizeCheckpoint(ActionListener<Void> listener) {
515515
changeCollector.clear();
516516
}
517517

518-
long checkpoint = context.getAndIncrementCheckpoint();
518+
long checkpoint = context.incrementAndGetCheckpoint();
519519
lastCheckpoint = getNextCheckpoint();
520520
nextCheckpoint = null;
521521
// Reset our failure count as we have finished and may start again with a new checkpoint
@@ -672,7 +672,7 @@ final void setStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener<Vo
672672
} catch (InterruptedException e) {
673673
logger.error(
674674
new ParameterizedMessage(
675-
"[{}] Timed out ({}s) waiting for transform state to be stored.",
675+
"[{}] Interrupt waiting ({}s) for transform state to be stored.",
676676
getJobId(),
677677
PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC
678678
),
@@ -696,6 +696,14 @@ private synchronized boolean addSetStopAtCheckpointListener(
696696
boolean shouldStopAtCheckpoint,
697697
ActionListener<Void> shouldStopAtCheckpointListener
698698
) throws InterruptedException {
699+
700+
// in case the indexer is already shutting down
701+
if (indexerThreadShuttingDown) {
702+
context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
703+
saveStateRequestedDuringIndexerThreadShutdown = true;
704+
return false;
705+
}
706+
699707
IndexerState state = getState();
700708

701709
// in case the indexer isn't running, respond immediately
@@ -705,9 +713,19 @@ private synchronized boolean addSetStopAtCheckpointListener(
705713
// because save state is async we need to block the call until state is persisted, so that the job can not
706714
// be triggered (ensured by synchronized)
707715
CountDownLatch latch = new CountDownLatch(1);
716+
logger.debug("[{}] persisiting stop at checkpoint", getJobId());
717+
708718
doSaveState(IndexerState.STARTED, getPosition(), () -> { latch.countDown(); });
709719

710-
latch.await(PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC, TimeUnit.SECONDS);
720+
if (latch.await(PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC, TimeUnit.SECONDS) == false) {
721+
logger.error(
722+
new ParameterizedMessage(
723+
"[{}] Timed out ({}s) waiting for transform state to be stored.",
724+
getJobId(),
725+
PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC
726+
)
727+
);
728+
}
711729
return false;
712730
}
713731

@@ -742,11 +760,14 @@ private synchronized boolean addSetStopAtCheckpointListener(
742760
return true;
743761
}
744762

745-
synchronized void stopAndSaveState() {
763+
synchronized void stopAndMaybeSaveState() {
746764
onStop();
765+
IndexerState state = stop();
766+
747767
if (indexerThreadShuttingDown) {
748-
stopCalledDuringIndexerThreadShutdown = true;
749-
} else {
768+
saveStateRequestedDuringIndexerThreadShutdown = true;
769+
// if stop() returned STOPPED we need to persist state, otherwise the indexer does it for us
770+
} else if (state == IndexerState.STOPPED) {
750771
doSaveState(IndexerState.STOPPED, getPosition(), () -> {});
751772
}
752773
}
@@ -1134,13 +1155,18 @@ private void configurePageSize(Integer newPageSize) {
11341155

11351156
private synchronized void startIndexerThreadShutdown() {
11361157
indexerThreadShuttingDown = true;
1137-
stopCalledDuringIndexerThreadShutdown = false;
1158+
saveStateRequestedDuringIndexerThreadShutdown = false;
11381159
}
11391160

11401161
private synchronized void finishIndexerThreadShutdown() {
11411162
indexerThreadShuttingDown = false;
1142-
if (stopCalledDuringIndexerThreadShutdown) {
1143-
doSaveState(IndexerState.STOPPED, getPosition(), () -> {});
1163+
if (saveStateRequestedDuringIndexerThreadShutdown) {
1164+
// if stop has been called and set shouldStopAtCheckpoint to true,
1165+
// we should stop if we just finished a checkpoint
1166+
if (context.shouldStopAtCheckpoint() && nextCheckpoint == null) {
1167+
stop();
1168+
}
1169+
doSaveState(getState(), getPosition(), () -> {});
11441170
}
11451171
}
11461172

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) {
346346

347347
// If state was in a failed state, we should stop immediately
348348
if (wasFailed) {
349-
getIndexer().stopAndSaveState();
349+
getIndexer().stopAndMaybeSaveState();
350350
return;
351351
}
352352

@@ -363,10 +363,7 @@ public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) {
363363
// or has yet to even start one.
364364
// Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time).
365365
(indexerState == IndexerState.STARTED && getIndexer().initialRun())) {
366-
IndexerState state = getIndexer().stop();
367-
if (state == IndexerState.STOPPED) {
368-
getIndexer().stopAndSaveState();
369-
}
366+
getIndexer().stopAndMaybeSaveState();
370367
}
371368
}
372369

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.apache.lucene.search.TotalHits;
1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.LatchedActionListener;
1213
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
1314
import org.elasticsearch.action.bulk.BulkItemResponse;
1415
import org.elasticsearch.action.bulk.BulkRequest;
@@ -53,6 +54,7 @@
5354
import java.util.Map;
5455
import java.util.concurrent.CountDownLatch;
5556
import java.util.concurrent.TimeUnit;
57+
import java.util.concurrent.atomic.AtomicBoolean;
5658
import java.util.concurrent.atomic.AtomicReference;
5759
import java.util.function.Consumer;
5860

@@ -98,6 +100,9 @@ class MockedTransformIndexer extends TransformIndexer {
98100
// used for synchronizing with the test
99101
private CountDownLatch searchLatch;
100102
private CountDownLatch doProcessLatch;
103+
private CountDownLatch doSaveStateLatch;
104+
105+
private AtomicBoolean saveStateInProgress = new AtomicBoolean(false);
101106

102107
// how many loops to execute until reporting done
103108
private int numberOfLoops;
@@ -146,6 +151,10 @@ public CountDownLatch createCountDownOnResponseLatch(int count) {
146151
return doProcessLatch = new CountDownLatch(count);
147152
}
148153

154+
public CountDownLatch createAwaitForDoSaveStateLatch(int count) {
155+
return doSaveStateLatch = new CountDownLatch(count);
156+
}
157+
149158
@Override
150159
void doGetInitialProgress(SearchRequest request, ActionListener<SearchResponse> responseListener) {
151160
responseListener.onResponse(ONE_HIT_SEARCH_RESPONSE);
@@ -214,7 +223,21 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
214223

215224
@Override
216225
protected void doSaveState(IndexerState state, TransformIndexerPosition position, Runnable next) {
226+
// assert that the indexer does not call doSaveState again, while it is still saving state
227+
// this is only useful together with the doSaveStateLatch
228+
assertTrue("doSaveState called again while still in progress", saveStateInProgress.compareAndSet(false, true));
229+
if (doSaveStateLatch != null) {
230+
try {
231+
doSaveStateLatch.await();
232+
233+
} catch (InterruptedException e) {
234+
throw new IllegalStateException(e);
235+
}
236+
}
237+
217238
assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED;
239+
240+
assertTrue(saveStateInProgress.compareAndSet(true, false));
218241
next.run();
219242
}
220243

@@ -288,7 +311,7 @@ public void testRetentionPolicyExecution() throws Exception {
288311
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
289312
assertThat(indexer.getState(), oneOf(IndexerState.INDEXING, IndexerState.STARTED));
290313

291-
assertBusy(() -> assertEquals(1L, indexer.getLastCheckpoint().getCheckpoint()), 5, TimeUnit.HOURS);
314+
assertBusy(() -> assertEquals(1L, indexer.getLastCheckpoint().getCheckpoint()), 5, TimeUnit.SECONDS);
292315

293316
// delete by query has been executed
294317
assertEquals(1, indexer.getDeleteByQueryCallCount());
@@ -340,6 +363,63 @@ public void testRetentionPolicyExecution() throws Exception {
340363
}
341364
}
342365

366+
/**
367+
* This test ensures correct handling of async behavior during indexer shutdown
368+
*
369+
* Indexer shutdown is not atomic: 1st the state is set back to e.g. STARTED, afterwards state is stored.
370+
* State is stored async and is IO based, therefore it can take time until this is done.
371+
*
372+
* Between setting the state and storing it, some race condition occurred, this test acts
373+
* as regression test.
374+
*/
375+
public void testInterActionWhileIndexerShutsdown() throws Exception {
376+
TransformConfig config = new TransformConfig(
377+
randomAlphaOfLength(10),
378+
randomSourceConfig(),
379+
randomDestConfig(),
380+
null,
381+
new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)),
382+
null,
383+
randomPivotConfig(),
384+
null,
385+
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
386+
null,
387+
null,
388+
null,
389+
null
390+
);
391+
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STARTED);
392+
393+
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
394+
final MockedTransformIndexer indexer = createMockIndexer(
395+
5,
396+
config,
397+
state,
398+
null,
399+
threadPool,
400+
auditor,
401+
new TransformIndexerStats(),
402+
context
403+
);
404+
405+
// add a latch at doSaveState
406+
CountDownLatch saveStateLatch = indexer.createAwaitForDoSaveStateLatch(1);
407+
408+
indexer.start();
409+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
410+
assertEquals(indexer.getState(), IndexerState.INDEXING);
411+
412+
assertBusy(() -> assertEquals(IndexerState.STARTED, indexer.getState()), 5, TimeUnit.SECONDS);
413+
414+
// the indexer thread is shutting down, the trigger should be ignored
415+
assertFalse(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
416+
this.<Void>assertAsync(listener -> setStopAtCheckpoint(indexer, true, listener), v -> {});
417+
saveStateLatch.countDown();
418+
419+
// after the indexer has shutdown, it should check for stop at checkpoint and shutdown
420+
assertBusy(() -> assertEquals(IndexerState.STOPPED, indexer.getState()), 5, TimeUnit.SECONDS);
421+
}
422+
343423
private MockedTransformIndexer createMockIndexer(
344424
int numberOfLoops,
345425
TransformConfig config,
@@ -370,4 +450,36 @@ private MockedTransformIndexer createMockIndexer(
370450
indexer.initialize();
371451
return indexer;
372452
}
453+
454+
private void setStopAtCheckpoint(
455+
TransformIndexer indexer,
456+
boolean shouldStopAtCheckpoint,
457+
ActionListener<Void> shouldStopAtCheckpointListener
458+
) {
459+
// we need to simulate that this is called from the task, which offloads it to the generic threadpool
460+
CountDownLatch latch = new CountDownLatch(1);
461+
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
462+
indexer.setStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener);
463+
latch.countDown();
464+
});
465+
try {
466+
assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS));
467+
} catch (InterruptedException e) {
468+
fail("timed out after 5s");
469+
}
470+
}
471+
472+
private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> furtherTests) throws InterruptedException {
473+
CountDownLatch latch = new CountDownLatch(1);
474+
AtomicBoolean listenerCalled = new AtomicBoolean(false);
475+
476+
LatchedActionListener<T> listener = new LatchedActionListener<>(ActionListener.wrap(r -> {
477+
assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true));
478+
furtherTests.accept(r);
479+
}, e -> { fail("got unexpected exception: " + e); }), latch);
480+
481+
function.accept(listener);
482+
assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS));
483+
}
484+
373485
}

0 commit comments

Comments
 (0)