diff --git a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java index 0e1bb0a0dacd9..e301326fe7088 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java @@ -480,14 +480,18 @@ protected void finishHim(Exception failure) { * @param searchFailures any search failures accumulated during the request * @param timedOut have any of the sub-requests timed out? */ - protected void finishHim(Exception failure, List indexingFailures, List searchFailures, boolean timedOut) { - scrollSource.close(); - if (failure == null) { - listener.onResponse( - buildResponse(timeValueNanos(System.nanoTime() - startTime.get()), indexingFailures, searchFailures, timedOut)); - } else { - listener.onFailure(failure); - } + protected void finishHim(Exception failure, List indexingFailures, + List searchFailures, boolean timedOut) { + scrollSource.close(() -> { + if (failure == null) { + BulkByScrollResponse response = buildResponse( + timeValueNanos(System.nanoTime() - startTime.get()), + indexingFailures, searchFailures, timedOut); + listener.onResponse(response); + } else { + listener.onFailure(failure); + } + }); } /** diff --git a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java index 5676dd16ba65b..dbe90080c2891 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java @@ -115,8 +115,8 @@ public void onFailure(Exception e) { } @Override - protected void cleanup() { - // Nothing to do + protected void cleanup(Runnable onCompletion) { + onCompletion.run(); } /** diff --git a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ScrollableHitSource.java b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ScrollableHitSource.java index 2f898345c72a3..f04404f9436b0 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ScrollableHitSource.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ScrollableHitSource.java @@ -47,7 +47,7 @@ /** * A scrollable source of results. */ -public abstract class ScrollableHitSource implements Closeable { +public abstract class ScrollableHitSource { private final AtomicReference scrollId = new AtomicReference<>(); protected final Logger logger; @@ -82,25 +82,31 @@ public final void startNextScroll(TimeValue extraKeepAlive, Consumer o } protected abstract void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer onResponse); - @Override - public final void close() { + public final void close(Runnable onCompletion) { String scrollId = this.scrollId.get(); if (Strings.hasLength(scrollId)) { - clearScroll(scrollId, this::cleanup); + clearScroll(scrollId, () -> cleanup(onCompletion)); } else { - cleanup(); + cleanup(onCompletion); } } + /** * Called to clear a scroll id. + * * @param scrollId the id to clear - * @param onCompletion implementers must call this after completing the clear whether they are successful or not + * @param onCompletion implementers must call this after completing the clear whether they are + * successful or not */ protected abstract void clearScroll(String scrollId, Runnable onCompletion); /** - * Called after the process has been totally finished to clean up any resources the process needed like remote connections. + * Called after the process has been totally finished to clean up any resources the process + * needed like remote connections. + * + * @param onCompletion implementers must call this after completing the cleanup whether they are + * successful or not */ - protected abstract void cleanup(); + protected abstract void cleanup(Runnable onCompletion); /** * Set the id of the last scroll. Used for debugging. diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java index 796106c269e50..6781da6497201 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java @@ -141,15 +141,18 @@ private void logFailure(Exception e) { } @Override - protected void cleanup() { - /* This is called on the RestClient's thread pool and attempting to close the client on its own threadpool causes it to fail to - * close. So we always shutdown the RestClient asynchronously on a thread in Elasticsearch's generic thread pool. */ + protected void cleanup(Runnable onCompletion) { + /* This is called on the RestClient's thread pool and attempting to close the client on its + * own threadpool causes it to fail to close. So we always shutdown the RestClient + * asynchronously on a thread in Elasticsearch's generic thread pool. */ threadPool.generic().submit(() -> { try { client.close(); logger.debug("Shut down remote connection"); } catch (IOException e) { logger.error("Failed to shutdown the remote connection", e); + } finally { + onCompletion.run(); } }); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java index e51e3b4ee305e..ca0655f0a94ce 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java @@ -79,7 +79,9 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteScrollableHitSourceTests extends ESTestCase { @@ -487,6 +489,25 @@ public void testUnexpectedJsonThinksRemoveIsNotES() throws IOException { e.getCause().getCause().getCause().getMessage()); } + public void testCleanupSuccessful() throws Exception { + AtomicBoolean cleanupCallbackCalled = new AtomicBoolean(); + RestClient client = mock(RestClient.class); + TestRemoteScrollableHitSource hitSource = new TestRemoteScrollableHitSource(client); + hitSource.cleanup(() -> cleanupCallbackCalled.set(true)); + verify(client).close(); + assertTrue(cleanupCallbackCalled.get()); + } + + public void testCleanupFailure() throws Exception { + AtomicBoolean cleanupCallbackCalled = new AtomicBoolean(); + RestClient client = mock(RestClient.class); + doThrow(new RuntimeException("test")).when(client).close(); + TestRemoteScrollableHitSource hitSource = new TestRemoteScrollableHitSource(client); + hitSource.cleanup(() -> cleanupCallbackCalled.set(true)); + verify(client).close(); + assertTrue(cleanupCallbackCalled.get()); + } + private RemoteScrollableHitSource sourceWithMockedRemoteCall(String... paths) throws Exception { return sourceWithMockedRemoteCall(true, ContentType.APPLICATION_JSON, paths); }