diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java index e9766fe387e2..ee2a04af9982 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java @@ -40,6 +40,8 @@ public enum StreamingSystemCounterNames { JAVA_HARNESS_MAX_MEMORY("dataflow_java_harness_max_memory"), JAVA_HARNESS_RESTARTS("dataflow_java_harness_restarts"), TIME_AT_MAX_ACTIVE_THREADS("dataflow_time_at_max_active_threads"), + ACTIVE_THREADS("dataflow_active_threads"), + TOTAL_ALLOCATED_THREADS("dataflow_total_allocated_threads"), WINDMILL_QUOTA_THROTTLING("dataflow_streaming_engine_throttled_msecs"), MEMORY_THRASHING("dataflow_streaming_engine_user_worker_thrashing"); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 7110fee29362..5d4c0288c838 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -246,6 +246,8 @@ public class StreamingDataflowWorker { private final Counter javaHarnessUsedMemory; private final Counter javaHarnessMaxMemory; private final Counter timeAtMaxActiveThreads; + private final Counter activeThreads; + private final Counter totalAllocatedThreads; private final Counter windmillMaxObservedWorkItemCommitBytes; private final Counter memoryThrashing; private final boolean publishCounters; @@ -330,6 +332,11 @@ public class StreamingDataflowWorker { this.timeAtMaxActiveThreads = pendingCumulativeCounters.longSum( StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName()); + this.activeThreads = + pendingCumulativeCounters.intSum(StreamingSystemCounterNames.ACTIVE_THREADS.counterName()); + this.totalAllocatedThreads = + pendingCumulativeCounters.intSum( + StreamingSystemCounterNames.TOTAL_ALLOCATED_THREADS.counterName()); this.windmillMaxObservedWorkItemCommitBytes = pendingCumulativeCounters.intMax( StreamingSystemCounterNames.WINDMILL_MAX_WORK_ITEM_COMMIT_BYTES.counterName()); @@ -1702,6 +1709,10 @@ private void updateVMMetrics() { private void updateThreadMetrics() { timeAtMaxActiveThreads.getAndReset(); timeAtMaxActiveThreads.addValue(workUnitExecutor.allThreadsActiveTime()); + activeThreads.getAndReset(); + activeThreads.addValue(workUnitExecutor.activeCount()); + totalAllocatedThreads.getAndReset(); + totalAllocatedThreads.addValue(chooseMaximumNumberOfThreads()); } @VisibleForTesting diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 05b752f91c0c..a160b0e6ad03 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -119,6 +119,10 @@ public long allThreadsActiveTime() { return totalTimeMaxActiveThreadsUsed; } + public int activeCount() { + return activeCount.intValue(); + } + public String summaryHtml() { monitor.enter(); try { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 82fc38055a88..24e6e2795c68 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -291,7 +291,7 @@ static Work createMockWork(long workToken, Consumer processWorkFn) { Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(workToken).build(), Instant::now, Collections.emptyList(), - work -> {}); + processWorkFn); } private byte[] intervalWindowBytes(IntervalWindow window) throws Exception { @@ -2793,6 +2793,79 @@ public void testMaxThreadMetric() throws Exception { executor.shutdown(); } + volatile boolean stop = false; + + @Test + public void testActiveThreadMetric() throws Exception { + int maxThreads = 5; + int threadExpirationSec = 60; + // setting up actual implementation of executor instead of mocking to keep track of + // active thread count. + BoundedQueueExecutor executor = + new BoundedQueueExecutor( + maxThreads, + threadExpirationSec, + TimeUnit.SECONDS, + maxThreads, + 10000000, + new ThreadFactoryBuilder() + .setNameFormat("DataflowWorkUnits-%d") + .setDaemon(true) + .build()); + + ComputationState computationState = + new ComputationState( + "computation", + defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))), + executor, + ImmutableMap.of(), + null); + + ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1); + + Consumer sleepProcessWorkFn = + unused -> { + synchronized (this) { + this.notify(); + } + int count = 0; + while (!stop) { + count += 1; + } + }; + + Work m2 = createMockWork(2, sleepProcessWorkFn); + + Work m3 = createMockWork(3, sleepProcessWorkFn); + + Work m4 = createMockWork(4, sleepProcessWorkFn); + assertEquals(0, executor.activeCount()); + + assertTrue(computationState.activateWork(key1Shard1, m2)); + synchronized (this) { + executor.execute(m2, m2.getWorkItem().getSerializedSize()); + this.wait(); + // Seems current executor executes the initial work item twice + this.wait(); + } + assertEquals(2, executor.activeCount()); + + assertTrue(computationState.activateWork(key1Shard1, m3)); + assertTrue(computationState.activateWork(key1Shard1, m4)); + synchronized (this) { + executor.execute(m3, m3.getWorkItem().getSerializedSize()); + this.wait(); + } + assertEquals(3, executor.activeCount()); + synchronized (this) { + executor.execute(m4, m4.getWorkItem().getSerializedSize()); + this.wait(); + } + assertEquals(4, executor.activeCount()); + stop = true; + executor.shutdown(); + } + @Test public void testExceptionInvalidatesCache() throws Exception { // We'll need to force the system to limit bundles to one message at a time.