Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Fix thread leak when waiting for job flush (#32196) #32541

Merged
merged 1 commit into from
Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public void persistJob(BiConsumer<Void, Exception> handler) {
}

@Nullable
FlushAcknowledgement waitFlushToCompletion(String flushId) {
FlushAcknowledgement waitFlushToCompletion(String flushId) throws InterruptedException {
LOGGER.debug("[{}] waiting for flush", job.getId());

FlushAcknowledgement flushAcknowledgement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public void awaitCompletion() throws TimeoutException {
* @return The {@link FlushAcknowledgement} if the flush has completed or the parsing finished; {@code null} if the timeout expired
*/
@Nullable
public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) {
public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) throws InterruptedException {
return failed ? null : flushListener.waitForFlush(flushId, timeout);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,14 @@ class FlushListener {
final AtomicBoolean cleared = new AtomicBoolean(false);

@Nullable
FlushAcknowledgement waitForFlush(String flushId, Duration timeout) {
FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException {
if (cleared.get()) {
return null;
}

FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId));
try {
if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
return holder.flushAcknowledgement;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
return holder.flushAcknowledgement;
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testWriteUpdateProcessMessage() throws IOException {
verifyNoMoreInteractions(process);
}

public void testFlushJob() throws IOException {
public void testFlushJob() throws IOException, InterruptedException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
Expand All @@ -123,7 +123,7 @@ public void testFlushJob() throws IOException {
}
}

public void testWaitForFlushReturnsIfParserFails() throws IOException {
public void testWaitForFlushReturnsIfParserFails() throws IOException, InterruptedException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
Expand All @@ -144,7 +144,7 @@ public void testFlushJob_throwsIfProcessIsDead() throws IOException {
assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage());
}

public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException {
public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException, InterruptedException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ public void testPersisterThrowingDoesntBlockProcessing() {
verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE));
}

public void testParsingErrorSetsFailed() {
public void testParsingErrorSetsFailed() throws InterruptedException {
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenThrow(new ElasticsearchParseException("this test throws"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ public void testAcknowledgeFlush() throws Exception {
FlushListener listener = new FlushListener();
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
new Thread(() -> {
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000));
flushAcknowledgementHolder.set(flushAcknowledgement);
try {
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000));
flushAcknowledgementHolder.set(flushAcknowledgement);
} catch (InterruptedException _ex) {
Thread.currentThread().interrupt();
}
}).start();
assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
assertNull(flushAcknowledgementHolder.get());
Expand All @@ -46,8 +50,12 @@ public void testClear() throws Exception {
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
flushAcknowledgementHolders.add(flushAcknowledgementHolder);
new Thread(() -> {
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
flushAcknowledgementHolder.set(flushAcknowledgement);
try {
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
flushAcknowledgementHolder.set(flushAcknowledgement);
} catch (InterruptedException _ex) {
Thread.currentThread().interrupt();
}
}).start();
}
assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size()));
Expand Down