Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,15 @@ public void shutdown() {
isRunning.compareAndSet(true, false),
"Multiple calls to {}.shutdown() are not allowed.",
getClass());
workProviderExecutor.shutdown();
boolean isTerminated = false;
// Interrupt the dispatch loop to start shutting it down.
workProviderExecutor.shutdownNow();
try {
isTerminated = workProviderExecutor.awaitTermination(10, TimeUnit.SECONDS);
while (!workProviderExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("Still waiting for the dispatch loop to terminate.");
}
} catch (InterruptedException e) {
LOG.warn("Unable to shutdown {}", getClass());
}

if (!isTerminated) {
workProviderExecutor.shutdownNow();
}
workCommitter.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,8 @@ public void testKeyTokenInvalidException() throws Exception {
.build(),
removeDynamicFields(result.get(1L)));
assertEquals(1, result.size());

worker.stop();
}

@Test
Expand Down Expand Up @@ -1301,6 +1303,7 @@ public void testKeyCommitTooLargeException() throws Exception {
}
}
assertTrue(foundErrors);
worker.stop();
}

@Test
Expand Down Expand Up @@ -1338,6 +1341,7 @@ public void testOutputKeyTooLargeException() throws Exception {
assertEquals(
makeExpectedOutput(1, 0, bigKey, DEFAULT_SHARDING_KEY, "smaller_key").build(),
removeDynamicFields(result.get(1L)));
worker.stop();
}

@Test
Expand Down Expand Up @@ -1375,6 +1379,7 @@ public void testOutputValueTooLargeException() throws Exception {
assertEquals(
makeExpectedOutput(1, 0, "key", DEFAULT_SHARDING_KEY, "smaller_key").build(),
removeDynamicFields(result.get(1L)));
worker.stop();
}

@Test
Expand Down Expand Up @@ -1429,6 +1434,7 @@ public void testKeyChange() throws Exception {
.build(),
removeDynamicFields(result.get((long) i + 1000)));
}
worker.stop();
}

@Test(timeout = 30000)
Expand Down Expand Up @@ -1530,6 +1536,7 @@ public void testExceptions() throws Exception {
assertEquals(keyString, stats.getKey().toStringUtf8());
assertEquals(0, stats.getWorkToken());
assertEquals(1, stats.getShardingKey());
worker.stop();
}

@Test
Expand Down Expand Up @@ -1605,6 +1612,7 @@ public void testAssignWindows() throws Exception {
intervalWindowBytes(WINDOW_AT_ONE_SECOND),
makeExpectedOutput(timestamp2, timestamp2))
.build()));
worker.stop();
}

private void verifyTimers(WorkItemCommitRequest commit, Timer... timers) {
Expand Down Expand Up @@ -1929,6 +1937,7 @@ public void testMergeWindows() throws Exception {
splitIntToLong(getCounter(counters, "WindmillStateBytesWritten").getInteger()));
// No input messages
assertEquals(0L, splitIntToLong(getCounter(counters, "WindmillShuffleBytesRead").getInteger()));
worker.stop();
}

@Test
Expand Down Expand Up @@ -2223,6 +2232,7 @@ public void testMergeWindowsCaching() throws Exception {
LOG.info("cache stats {}", stats);
assertEquals(1, stats.hitCount());
assertEquals(4, stats.missCount());
worker.stop();
}

// Helper for running tests for merging sessions based upon Actions consisting of GetWorkResponse
Expand Down Expand Up @@ -2304,6 +2314,7 @@ private void runMergeSessionsActions(List<Action> actions) throws Exception {
verifyTimers(actualOutput, action.expectedTimers);
verifyHolds(actualOutput, action.expectedHolds);
}
worker.stop();
}

@Test
Expand Down Expand Up @@ -2590,6 +2601,7 @@ public void testUnboundedSources() throws Exception {
.build()));

assertNull(getCounter(counters, "dataflow_input_size-computation"));
worker.stop();
}

@Test
Expand Down Expand Up @@ -2702,6 +2714,7 @@ public void testUnboundedSourcesDrain() throws Exception {
.build()));

assertThat(finalizeTracker, contains(0));
worker.stop();
}

// Regression test to ensure that a reader is not used from the cache
Expand Down Expand Up @@ -2835,6 +2848,7 @@ public void testUnboundedSourceWorkRetry() throws Exception {
.build()));

assertThat(finalizeTracker, contains(0));
worker.stop();
}

@Test
Expand Down Expand Up @@ -3412,6 +3426,7 @@ public void testExceptionInvalidatesCache() throws Exception {
parseCommitRequest(sb.toString()))
.build()));
}
worker.stop();
}

@Test
Expand Down Expand Up @@ -3909,6 +3924,7 @@ numMessagesInCustomSourceShard, new InflateDoFn(inflatedSizePerMessage)))
commit = result.get(2L);

assertThat(commit.getSerializedSize(), isWithinBundleSizeLimits);
worker.stop();
}

@Test
Expand Down Expand Up @@ -3990,6 +4006,7 @@ public void testLimitOnOutputBundleSizeWithMultipleSinks() throws Exception {
commit = result.get(2L);

assertThat(commit.getSerializedSize(), isWithinBundleSizeLimits);
worker.stop();
}

@Test
Expand Down
Loading