Skip to content

Commit

Permalink
Revert "Avoid publishing string set metrics on the Dataflow legacy ru…
Browse files Browse the repository at this point in the history
…nner." (#32002)

* Revert "Avoid publishing string set metrics on the Dataflow legacy runner. (#…"

This reverts commit a226094.

* unskip stingset

* Fix ClassCastException
  • Loading branch information
Abacn authored Jul 26, 2024
1 parent 4479281 commit 7930a1f
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 26 deletions.
1 change: 0 additions & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ def commonLegacyExcludeCategories = [
'org.apache.beam.sdk.testing.UsesExternalService',
'org.apache.beam.sdk.testing.UsesDistributionMetrics',
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesStringSetMetrics',
'org.apache.beam.sdk.testing.UsesMultimapState',
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesParDoLifecycle',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
Expand Down Expand Up @@ -191,7 +191,7 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) {
if (metricUpdate.getSet() == null) {
return StringSetResult.empty();
}
return StringSetResult.create(ImmutableSet.copyOf(((Set) metricUpdate.getSet())));
return StringSetResult.create(ImmutableSet.copyOf(((Collection) metricUpdate.getSet())));
}

private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.InMemoryStateInternals;
Expand Down Expand Up @@ -78,18 +77,14 @@ public class BatchModeExecutionContext
protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";

// TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
private final boolean populateStringSetMetrics;

private BatchModeExecutionContext(
CounterFactory counterFactory,
Cache<?, WeightedValue<?>> dataCache,
Cache<?, ?> logicalReferenceCache,
ReaderFactory readerFactory,
PipelineOptions options,
DataflowExecutionStateTracker executionStateTracker,
DataflowExecutionStateRegistry executionStateRegistry,
boolean populateStringSetMetrics) {
DataflowExecutionStateRegistry executionStateRegistry) {
super(
counterFactory,
createMetricsContainerRegistry(),
Expand All @@ -102,7 +97,6 @@ private BatchModeExecutionContext(
this.dataCache = dataCache;
this.containerRegistry =
(MetricsContainerRegistry<MetricsContainerImpl>) getMetricsContainerRegistry();
this.populateStringSetMetrics = populateStringSetMetrics;
}

private static MetricsContainerRegistry<MetricsContainerImpl> createMetricsContainerRegistry() {
Expand Down Expand Up @@ -138,8 +132,7 @@ public static BatchModeExecutionContext forTesting(
counterFactory,
options,
"test-work-item-id"),
stateRegistry,
true);
stateRegistry);
}

public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) {
Expand Down Expand Up @@ -252,8 +245,7 @@ public static BatchModeExecutionContext create(
counterFactory,
options,
workItemId),
executionStateRegistry,
false);
executionStateRegistry);
}

/** Create a new {@link StepContext}. */
Expand Down Expand Up @@ -523,10 +515,7 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
update ->
MetricsToCounterUpdateConverter.fromDistribution(
update.getKey(), true, update.getUpdate())),
FluentIterable.from(
populateStringSetMetrics
? updates.stringSetUpdates()
: Collections.emptyList())
FluentIterable.from(updates.stringSetUpdates())
.transform(
update ->
MetricsToCounterUpdateConverter.fromStringSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -89,9 +88,6 @@ public class StreamingStepMetricsContainer implements MetricsContainer {

private final Clock clock;

// TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
@VisibleForTesting boolean populateStringSetUpdates = false;

private StreamingStepMetricsContainer(String stepName) {
this.stepName = stepName;
this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -191,8 +187,7 @@ public Histogram getPerWorkerHistogram(
public Iterable<CounterUpdate> extractUpdates() {
return counterUpdates()
.append(distributionUpdates())
.append(gaugeUpdates())
.append(populateStringSetUpdates ? stringSetUpdates() : Collections.emptyList());
.append(gaugeUpdates().append(stringSetUpdates()));
}

private FluentIterable<CounterUpdate> counterUpdates() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ public void testStringSetUpdateExtraction() {
.setCumulative(false)
.setStringList(new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh")));

((StreamingStepMetricsContainer) c1).populateStringSetUpdates = true;
Iterable<CounterUpdate> updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));

Expand All @@ -315,7 +314,6 @@ public void testStringSetUpdateExtraction() {
.setCumulative(false)
.setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn")));

((StreamingStepMetricsContainer) c2).populateStringSetUpdates = true;
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update, name2Update));

Expand Down

0 comments on commit 7930a1f

Please sign in to comment.