diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index 1a961a947ddd..95023d117299 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -130,17 +130,15 @@ public void shutdown() { isRunning.compareAndSet(true, false), "Multiple calls to {}.shutdown() are not allowed.", getClass()); - workProviderExecutor.shutdown(); - boolean isTerminated = false; + // Interrupt the dispatch loop to start shutting it down. + workProviderExecutor.shutdownNow(); try { - isTerminated = workProviderExecutor.awaitTermination(10, TimeUnit.SECONDS); + while (!workProviderExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.warn("Still waiting for the dispatch loop to terminate."); + } } catch (InterruptedException e) { LOG.warn("Unable to shutdown {}", getClass()); } - - if (!isTerminated) { - workProviderExecutor.shutdownNow(); - } workCommitter.stop(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 033d7718d1c9..bead1ffd5b24 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -1226,6 +1226,8 @@ public void testKeyTokenInvalidException() throws Exception { .build(), removeDynamicFields(result.get(1L))); assertEquals(1, result.size()); + + worker.stop(); } @Test @@ -1301,6 +1303,7 @@ public void testKeyCommitTooLargeException() throws Exception { } } assertTrue(foundErrors); + worker.stop(); } @Test @@ -1338,6 +1341,7 @@ public void testOutputKeyTooLargeException() throws Exception { assertEquals( makeExpectedOutput(1, 0, bigKey, DEFAULT_SHARDING_KEY, "smaller_key").build(), removeDynamicFields(result.get(1L))); + worker.stop(); } @Test @@ -1375,6 +1379,7 @@ public void testOutputValueTooLargeException() throws Exception { assertEquals( makeExpectedOutput(1, 0, "key", DEFAULT_SHARDING_KEY, "smaller_key").build(), removeDynamicFields(result.get(1L))); + worker.stop(); } @Test @@ -1429,6 +1434,7 @@ public void testKeyChange() throws Exception { .build(), removeDynamicFields(result.get((long) i + 1000))); } + worker.stop(); } @Test(timeout = 30000) @@ -1530,6 +1536,7 @@ public void testExceptions() throws Exception { assertEquals(keyString, stats.getKey().toStringUtf8()); assertEquals(0, stats.getWorkToken()); assertEquals(1, stats.getShardingKey()); + worker.stop(); } @Test @@ -1605,6 +1612,7 @@ public void testAssignWindows() throws Exception { intervalWindowBytes(WINDOW_AT_ONE_SECOND), makeExpectedOutput(timestamp2, timestamp2)) .build())); + worker.stop(); } private void verifyTimers(WorkItemCommitRequest commit, Timer... timers) { @@ -1929,6 +1937,7 @@ public void testMergeWindows() throws Exception { splitIntToLong(getCounter(counters, "WindmillStateBytesWritten").getInteger())); // No input messages assertEquals(0L, splitIntToLong(getCounter(counters, "WindmillShuffleBytesRead").getInteger())); + worker.stop(); } @Test @@ -2223,6 +2232,7 @@ public void testMergeWindowsCaching() throws Exception { LOG.info("cache stats {}", stats); assertEquals(1, stats.hitCount()); assertEquals(4, stats.missCount()); + worker.stop(); } // Helper for running tests for merging sessions based upon Actions consisting of GetWorkResponse @@ -2304,6 +2314,7 @@ private void runMergeSessionsActions(List actions) throws Exception { verifyTimers(actualOutput, action.expectedTimers); verifyHolds(actualOutput, action.expectedHolds); } + worker.stop(); } @Test @@ -2590,6 +2601,7 @@ public void testUnboundedSources() throws Exception { .build())); assertNull(getCounter(counters, "dataflow_input_size-computation")); + worker.stop(); } @Test @@ -2702,6 +2714,7 @@ public void testUnboundedSourcesDrain() throws Exception { .build())); assertThat(finalizeTracker, contains(0)); + worker.stop(); } // Regression test to ensure that a reader is not used from the cache @@ -2835,6 +2848,7 @@ public void testUnboundedSourceWorkRetry() throws Exception { .build())); assertThat(finalizeTracker, contains(0)); + worker.stop(); } @Test @@ -3412,6 +3426,7 @@ public void testExceptionInvalidatesCache() throws Exception { parseCommitRequest(sb.toString())) .build())); } + worker.stop(); } @Test @@ -3909,6 +3924,7 @@ numMessagesInCustomSourceShard, new InflateDoFn(inflatedSizePerMessage))) commit = result.get(2L); assertThat(commit.getSerializedSize(), isWithinBundleSizeLimits); + worker.stop(); } @Test @@ -3990,6 +4006,7 @@ public void testLimitOnOutputBundleSizeWithMultipleSinks() throws Exception { commit = result.get(2L); assertThat(commit.getSerializedSize(), isWithinBundleSizeLimits); + worker.stop(); } @Test