Skip to content

Commit

Permalink
Avoid publishing string set metrics on the Dataflow legacy runner. (#…
Browse files Browse the repository at this point in the history
…31825)

This can be reverted once Dataflow supports these.

The publishing logic is conditional so that the tests still run.
  • Loading branch information
robertwb authored Jul 10, 2024
1 parent 018bcdf commit a226094
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.InMemoryStateInternals;
Expand Down Expand Up @@ -80,14 +81,18 @@ public class BatchModeExecutionContext
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";
protected static final String THROTTLE_TIME_COUNTER_NAME = "throttling-msecs";

// TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
private final boolean populateStringSetMetrics;

private BatchModeExecutionContext(
CounterFactory counterFactory,
Cache<?, WeightedValue<?>> dataCache,
Cache<?, ?> logicalReferenceCache,
ReaderFactory readerFactory,
PipelineOptions options,
DataflowExecutionStateTracker executionStateTracker,
DataflowExecutionStateRegistry executionStateRegistry) {
DataflowExecutionStateRegistry executionStateRegistry,
boolean populateStringSetMetrics) {
super(
counterFactory,
createMetricsContainerRegistry(),
Expand All @@ -100,6 +105,7 @@ private BatchModeExecutionContext(
this.dataCache = dataCache;
this.containerRegistry =
(MetricsContainerRegistry<MetricsContainerImpl>) getMetricsContainerRegistry();
this.populateStringSetMetrics = populateStringSetMetrics;
}

private static MetricsContainerRegistry<MetricsContainerImpl> createMetricsContainerRegistry() {
Expand Down Expand Up @@ -135,7 +141,8 @@ public static BatchModeExecutionContext forTesting(
counterFactory,
options,
"test-work-item-id"),
stateRegistry);
stateRegistry,
true);
}

public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) {
Expand Down Expand Up @@ -248,7 +255,8 @@ public static BatchModeExecutionContext create(
counterFactory,
options,
workItemId),
executionStateRegistry);
executionStateRegistry,
false);
}

/** Create a new {@link StepContext}. */
Expand Down Expand Up @@ -518,7 +526,10 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
update ->
MetricsToCounterUpdateConverter.fromDistribution(
update.getKey(), true, update.getUpdate())),
FluentIterable.from(updates.stringSetUpdates())
FluentIterable.from(
populateStringSetMetrics
? updates.stringSetUpdates()
: Collections.emptyList())
.transform(
update ->
MetricsToCounterUpdateConverter.fromStringSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -88,6 +89,9 @@ public class StreamingStepMetricsContainer implements MetricsContainer {

private final Clock clock;

// TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
@VisibleForTesting boolean populateStringSetUpdates = false;

private StreamingStepMetricsContainer(String stepName) {
this.stepName = stepName;
this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -187,7 +191,8 @@ public Histogram getPerWorkerHistogram(
public Iterable<CounterUpdate> extractUpdates() {
return counterUpdates()
.append(distributionUpdates())
.append(gaugeUpdates().append(stringSetUpdates()));
.append(gaugeUpdates())
.append(populateStringSetUpdates ? stringSetUpdates() : Collections.emptyList());
}

private FluentIterable<CounterUpdate> counterUpdates() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public void testStringSetUpdateExtraction() {
.setCumulative(false)
.setStringList(new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh")));

((StreamingStepMetricsContainer) c1).populateStringSetUpdates = true;
Iterable<CounterUpdate> updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));

Expand All @@ -314,6 +315,7 @@ public void testStringSetUpdateExtraction() {
.setCumulative(false)
.setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn")));

((StreamingStepMetricsContainer) c2).populateStringSetUpdates = true;
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update, name2Update));

Expand Down

0 comments on commit a226094

Please sign in to comment.