From b25cbe22100d2b41596ac4813f050d76e029e6ee Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 26 Jul 2024 14:05:27 -0400 Subject: [PATCH] =?UTF-8?q?Revert=20"Avoid=20publishing=20string=20set=20m?= =?UTF-8?q?etrics=20on=20the=20Dataflow=20legacy=20runner.=20(#=E2=80=A6"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit a22609494318e3125255105b6d2568bbba30a54f. --- .../worker/BatchModeExecutionContext.java | 19 ++++--------------- .../worker/StreamingStepMetricsContainer.java | 7 +------ .../StreamingStepMetricsContainerTest.java | 2 -- 3 files changed, 5 insertions(+), 23 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index 41bbae7cfdb36..aeef7784c2c33 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -19,7 +19,6 @@ import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.SideInputInfo; -import java.util.Collections; import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.InMemoryStateInternals; @@ -78,9 +77,6 @@ public class BatchModeExecutionContext protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE = "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl"; - // TODO(BEAM-31814): Remove once Dataflow legacy runner supports this. - private final boolean populateStringSetMetrics; - private BatchModeExecutionContext( CounterFactory counterFactory, Cache> dataCache, @@ -88,8 +84,7 @@ private BatchModeExecutionContext( ReaderFactory readerFactory, PipelineOptions options, DataflowExecutionStateTracker executionStateTracker, - DataflowExecutionStateRegistry executionStateRegistry, - boolean populateStringSetMetrics) { + DataflowExecutionStateRegistry executionStateRegistry) { super( counterFactory, createMetricsContainerRegistry(), @@ -102,7 +97,6 @@ private BatchModeExecutionContext( this.dataCache = dataCache; this.containerRegistry = (MetricsContainerRegistry) getMetricsContainerRegistry(); - this.populateStringSetMetrics = populateStringSetMetrics; } private static MetricsContainerRegistry createMetricsContainerRegistry() { @@ -138,8 +132,7 @@ public static BatchModeExecutionContext forTesting( counterFactory, options, "test-work-item-id"), - stateRegistry, - true); + stateRegistry); } public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) { @@ -252,8 +245,7 @@ public static BatchModeExecutionContext create( counterFactory, options, workItemId), - executionStateRegistry, - false); + executionStateRegistry); } /** Create a new {@link StepContext}. */ @@ -523,10 +515,7 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { update -> MetricsToCounterUpdateConverter.fromDistribution( update.getKey(), true, update.getUpdate())), - FluentIterable.from( - populateStringSetMetrics - ? updates.stringSetUpdates() - : Collections.emptyList()) + FluentIterable.from(updates.stringSetUpdates()) .transform( update -> MetricsToCounterUpdateConverter.fromStringSet( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 04f983c4fa7f6..7cc0dc68f7e7c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -23,7 +23,6 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; @@ -89,9 +88,6 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private final Clock clock; - // TODO(BEAM-31814): Remove once Dataflow legacy runner supports this. - @VisibleForTesting boolean populateStringSetUpdates = false; - private StreamingStepMetricsContainer(String stepName) { this.stepName = stepName; this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>(); @@ -191,8 +187,7 @@ public Histogram getPerWorkerHistogram( public Iterable extractUpdates() { return counterUpdates() .append(distributionUpdates()) - .append(gaugeUpdates()) - .append(populateStringSetUpdates ? stringSetUpdates() : Collections.emptyList()); + .append(gaugeUpdates().append(stringSetUpdates())); } private FluentIterable counterUpdates() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index d128255cd237d..2d5a8d8266ae3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -292,7 +292,6 @@ public void testStringSetUpdateExtraction() { .setCumulative(false) .setStringList(new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh"))); - ((StreamingStepMetricsContainer) c1).populateStringSetUpdates = true; Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); @@ -315,7 +314,6 @@ public void testStringSetUpdateExtraction() { .setCumulative(false) .setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn"))); - ((StreamingStepMetricsContainer) c2).populateStringSetUpdates = true; updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update, name2Update));