Skip to content

Commit 8e7785b

Browse files
author
Naireen
committed
refactor to remove per worker gauge method
1 parent 1c582f5 commit 8e7785b

File tree

10 files changed

+83
-133
lines changed

10 files changed

+83
-133
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java

-6
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,6 @@ public Gauge getGauge(MetricName metricName) {
7575
return getCurrentContainer().getGauge(metricName);
7676
}
7777

78-
@Override
79-
public Gauge getPerWorkerGauge(MetricName metricName) {
80-
Gauge gauge = getCurrentContainer().getPerWorkerGauge(metricName);
81-
return gauge;
82-
}
83-
8478
@Override
8579
public StringSet getStringSet(MetricName metricName) {
8680
return getCurrentContainer().getStringSet(metricName);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java

+12-22
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,6 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
7575

7676
private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);
7777

78-
private final ConcurrentHashMap<MetricName, GaugeCell> perWorkerGauges =
79-
new ConcurrentHashMap<>();
80-
8178
private MetricsMap<MetricName, StringSetCell> stringSets = new MetricsMap<>(StringSetCell::new);
8279

8380
private MetricsMap<MetricName, DeltaDistributionCell> distributions =
@@ -178,19 +175,6 @@ public Gauge getGauge(MetricName metricName) {
178175
return gauges.get(metricName);
179176
}
180177

181-
@Override
182-
public Gauge getPerWorkerGauge(MetricName metricName) {
183-
if (!enablePerWorkerMetrics) {
184-
return MetricsContainer.super.getPerWorkerGauge(metricName);
185-
}
186-
Gauge val = perWorkerGauges.get(metricName);
187-
if (val != null) {
188-
return val;
189-
}
190-
191-
return perWorkerGauges.computeIfAbsent(metricName, name -> new GaugeCell(metricName));
192-
}
193-
194178
@Override
195179
public StringSet getStringSet(MetricName metricName) {
196180
return stringSets.get(metricName);
@@ -388,7 +372,8 @@ private void deleteStaleCounters(
388372
@VisibleForTesting
389373
Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
390374
ConcurrentHashMap<MetricName, Long> counters = new ConcurrentHashMap<MetricName, Long>();
391-
ConcurrentHashMap<MetricName, Long> gauges = new ConcurrentHashMap<MetricName, Long>();
375+
ConcurrentHashMap<MetricName, Long> per_worker_gauges =
376+
new ConcurrentHashMap<MetricName, Long>();
392377
ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot> histograms =
393378
new ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot>();
394379
HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();
@@ -402,12 +387,17 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
402387
counters.put(k, val);
403388
});
404389

405-
perWorkerGauges.forEach(
390+
gauges.forEach(
406391
(k, v) -> {
407-
Long val = v.getCumulative().value();
408-
gauges.put(k, val);
409-
v.reset();
392+
// Check if metric name has the per worker label set
393+
if (k.getLabels().containsKey("PER_WORKER_METRIC")
394+
&& k.getLabels().get("PER_WORKER_METRIC").equals("true")) {
395+
Long val = v.getCumulative().value();
396+
per_worker_gauges.put(k, val);
397+
v.reset();
398+
}
410399
});
400+
411401
perWorkerHistograms.forEach(
412402
(k, v) -> {
413403
v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot));
@@ -416,7 +406,7 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
416406
deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));
417407

418408
return MetricsToPerStepNamespaceMetricsConverter.convert(
419-
stepName, counters, gauges, histograms, parsedPerWorkerMetricsCache);
409+
stepName, counters, per_worker_gauges, histograms, parsedPerWorkerMetricsCache);
420410
}
421411

422412
/**

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java

+45-1
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
import com.google.api.services.dataflow.model.CounterStructuredName;
3232
import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
3333
import com.google.api.services.dataflow.model.CounterUpdate;
34+
import com.google.api.services.dataflow.model.DataflowGaugeValue;
3435
import com.google.api.services.dataflow.model.DataflowHistogramValue;
3536
import com.google.api.services.dataflow.model.DistributionUpdate;
3637
import com.google.api.services.dataflow.model.IntegerGauge;
3738
import com.google.api.services.dataflow.model.Linear;
3839
import com.google.api.services.dataflow.model.MetricValue;
3940
import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
4041
import com.google.api.services.dataflow.model.StringList;
42+
import com.google.common.collect.ImmutableMap;
4143
import java.time.Clock;
4244
import java.time.Duration;
4345
import java.time.Instant;
@@ -471,7 +473,7 @@ public void testExtractPerWorkerMetricUpdates_populatedMetrics() {
471473
}
472474

473475
@Test
474-
public void testExtractPerWorkerMetricUpdatesKafka_populatedMetrics() {
476+
public void testExtractPerWorkerMetricUpdatesKafka_populatedHistogramMetrics() {
475477
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
476478

477479
MetricName histogramMetricName = MetricName.named("KafkaSink", "histogram");
@@ -508,6 +510,48 @@ public void testExtractPerWorkerMetricUpdatesKafka_populatedMetrics() {
508510
assertThat(updates, containsInAnyOrder(histograms));
509511
}
510512

513+
@Test
514+
public void testExtractPerWorkerMetricUpdatesKafka_populateGaugeMetrics() {
515+
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
516+
517+
MetricName gaugeMetricName =
518+
MetricName.named("KafkaSink", "gauge", ImmutableMap.of("PER_WORKER_METRIC", "true"));
519+
c2.getGauge(gaugeMetricName).set(5L);
520+
521+
Iterable<PerStepNamespaceMetrics> updates =
522+
StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(registry);
523+
524+
DataflowGaugeValue gaugeValue = new DataflowGaugeValue();
525+
gaugeValue.setValue(5L);
526+
527+
MetricValue expectedGauge =
528+
new MetricValue()
529+
.setMetric("gauge")
530+
.setMetricLabels(new HashMap<>())
531+
.setValueGauge64(gaugeValue);
532+
533+
PerStepNamespaceMetrics gauge =
534+
new PerStepNamespaceMetrics()
535+
.setOriginalStep("s2")
536+
.setMetricsNamespace("KafkaSink")
537+
.setMetricValues(Collections.singletonList(expectedGauge));
538+
539+
assertThat(updates, containsInAnyOrder(gauge));
540+
}
541+
542+
@Test
543+
public void testExtractPerWorkerMetricUpdatesKafka_gaugeMetricsDropped() {
544+
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
545+
546+
MetricName gaugeMetricName =
547+
MetricName.named("KafkaSink", "gauge", ImmutableMap.of("PER_WORKER_METRIC", "false"));
548+
c2.getGauge(gaugeMetricName).set(5L);
549+
550+
Iterable<PerStepNamespaceMetrics> updates =
551+
StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(registry);
552+
assertThat(updates, IsEmptyIterable.emptyIterable());
553+
}
554+
511555
@Test
512556
public void testExtractPerWorkerMetricUpdates_emptyMetrics() {
513557
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingGauge.java

+4-21
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
/** Implementation of {@link Gauge} that delegates to the instance for the current context. */
2424
@Internal
2525
public class DelegatingGauge implements Metric, Gauge, Serializable {
26+
2627
private final MetricName name;
2728
private final boolean processWideContainer;
28-
private final boolean perWorkerGauge;
2929

3030
/**
3131
* Create a {@code DelegatingGauge} with {@code perWorkerGauge} and {@code processWideContainer}
@@ -34,30 +34,17 @@ public class DelegatingGauge implements Metric, Gauge, Serializable {
3434
* @param name Metric name for this metric.
3535
*/
3636
public DelegatingGauge(MetricName name) {
37-
this(name, false, false);
38-
}
39-
40-
/**
41-
* Create a {@code DelegatingGauge} with {@code perWorkerGauge} set to false.
42-
*
43-
* @param name Metric name for this metric.
44-
* @param processWideContainer Whether this Gauge is stored in the ProcessWide container or the
45-
* current thread's container.
46-
*/
47-
public DelegatingGauge(MetricName name, boolean processWideContainer) {
48-
this(name, processWideContainer, false);
37+
this(name, false);
4938
}
5039

5140
/**
5241
* @param name Metric name for this metric.
5342
* @param processWideContainer Whether this gauge is stored in the ProcessWide container or the
5443
* current thread's container.
55-
* @param perWorkerGauge Whether this gauge refers to a perWorker metric or not.
5644
*/
57-
public DelegatingGauge(MetricName name, boolean processWideContainer, boolean perWorkerGauge) {
45+
public DelegatingGauge(MetricName name, boolean processWideContainer) {
5846
this.name = name;
5947
this.processWideContainer = processWideContainer;
60-
this.perWorkerGauge = perWorkerGauge;
6148
}
6249

6350
/** Set the gauge. */
@@ -70,11 +57,7 @@ public void set(long n) {
7057
if (container == null) {
7158
return;
7259
}
73-
if (perWorkerGauge) {
74-
container.getPerWorkerGauge(name).set(n);
75-
} else {
76-
container.getGauge(name).set(n);
77-
}
60+
container.getGauge(name).set(n);
7861
}
7962

8063
@Override

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java

-8
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,6 @@ default Histogram getPerWorkerHistogram(
8181
return NoOpHistogram.getInstance();
8282
}
8383

84-
/**
85-
* Return the {@link Gauge} that should be used for implementing the given per-worker {@code
86-
* metricName} in this container.
87-
*/
88-
default Gauge getPerWorkerGauge(MetricName metricName) {
89-
return NoOpGauge.getInstance();
90-
}
91-
9284
/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
9385
default Iterable<MetricsApi.MonitoringInfo> getMonitoringInfos() {
9486
throw new RuntimeException("getMonitoringInfos is not implemented on this MetricsContainer.");

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java

-31
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,13 @@
3333
/** Stores and exports metrics for a batch of Kafka Client RPCs. */
3434
public interface KafkaMetrics {
3535

36-
/*Used to update latency metrics, which is later used to update metrics container in another thread*/
3736
void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime);
3837

39-
/*Used to update backlog metrics in current thread*/
4038
void updateBacklogBytes(String topic, int partitionId, long backlog);
4139

4240
/*Used to update all metrics in container*/
4341
void updateKafkaMetrics();
4442

45-
/*Used to update backlog metrics, which is later used to update metrics container in another thread*/
46-
// void recordBacklogBytes(String topic, int partitionId, long backlog);
47-
4843
/** No-op implementation of {@code KafkaResults}. */
4944
class NoOpKafkaMetrics implements KafkaMetrics {
5045
private NoOpKafkaMetrics() {}
@@ -58,9 +53,6 @@ public void updateBacklogBytes(String topic, int partitionId, long backlog) {}
5853
@Override
5954
public void updateKafkaMetrics() {}
6055

61-
// @Override
62-
// public void recordBacklogBytes(String topic, int partitionId, long backlog) {};
63-
6456
private static NoOpKafkaMetrics singleton = new NoOpKafkaMetrics();
6557

6658
static NoOpKafkaMetrics getInstance() {
@@ -132,7 +124,6 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {
132124
@Override
133125
public void updateBacklogBytes(String topicName, int partitionId, long backlog) {
134126
if (isWritable().get()) {
135-
// Key by MetricName so info isn't lost
136127
MetricName metricName = KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId);
137128
perTopicPartitionBacklogs().put(metricName, backlog);
138129
}
@@ -162,33 +153,11 @@ private void recordRpcLatencyMetrics() {
162153
/** This is for creating gauges from backlog bytes recorded previously. */
163154
private void recordBacklogBytesInternal() {
164155
for (Map.Entry<MetricName, Long> backlog : perTopicPartitionBacklogs().entrySet()) {
165-
// MetricName perPartionBacklogName = KafkaSinkMetrics.getMetricGaugeName(topicName,
166-
// partitionId);
167-
// Gauge perPartition =
168-
// Metrics.gauge(KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId));
169-
// Use lambda for more readability?
170-
// map.forEach((key, value) -> System.out.println(key + ": " + value));
171156
Gauge gauge = KafkaSinkMetrics.createBacklogGauge(backlog.getKey());
172157
gauge.set(backlog.getValue());
173158
}
174159
}
175160

176-
/**
177-
* This is for recording backlog bytes on the current thread.
178-
*
179-
* @param topicName topicName
180-
* @param partitionId partitionId for the topic Only included in the metric key if
181-
* 'supportsMetricsDeletion' is enabled.
182-
* @param backlogBytes backlog for the topic Only included in the metric key if
183-
* 'supportsMetricsDeletion' is enabled.
184-
*/
185-
// @Override
186-
// public void recordBacklogBytes(String topicName, int partitionId, long backlogBytes) {
187-
// Gauge perPartition =
188-
// Metrics.gauge(KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId));
189-
// perPartition.set(backlogBytes);
190-
// }
191-
192161
/**
193162
* Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics}
194163
* containers. This function will only report metrics once per instance. Subsequent calls to

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java

+6-28
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.beam.sdk.metrics.Histogram;
2525
import org.apache.beam.sdk.metrics.LabeledMetricNameUtils;
2626
import org.apache.beam.sdk.metrics.MetricName;
27+
import org.apache.beam.sdk.metrics.NoOpGauge;
2728
import org.apache.beam.sdk.util.HistogramData;
2829

2930
/**
@@ -76,19 +77,6 @@ public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic
7677
return new DelegatingHistogram(metricName, buckets, false, true);
7778
}
7879

79-
/**
80-
* Creates a {@link Gauge} metric to record per partition backlog with the name
81-
*
82-
* <p>'EstimatedBacklogSize*topic_name:{topic};partitionId:{partitionId};'.
83-
*
84-
* @param topic Kafka topic associated with this metric.
85-
* @param partitionId partition id associated with this metric.
86-
* @return Counter.
87-
*/
88-
public static Gauge createBacklogGauge(String topic, int partitionId) {
89-
return new DelegatingGauge(getMetricGaugeName(topic, partitionId), false, true);
90-
}
91-
9280
/**
9381
* Creates a {@link Gauge} metric to record per partition backlog with the name
9482
*
@@ -98,24 +86,14 @@ public static Gauge createBacklogGauge(String topic, int partitionId) {
9886
* @return Counter.
9987
*/
10088
public static Gauge createBacklogGauge(MetricName name) {
101-
// use label to differenciate between the type of gauge metric is created
102-
// TODO(bug to clean this to make more consistent between the two runners)
103-
// test if set to false, this doesn't occur
104-
// && name.getLabels().get(MonitoringInfoConstants.Labels.PER_WORKER_METRIC)
89+
// Use label to differenciate between the type of gauge metric is created
90+
// TODO(34195):
10591
if (name.getLabels().containsKey(MonitoringInfoConstants.Labels.PER_WORKER_METRIC)
10692
&& name.getLabels().get(MonitoringInfoConstants.Labels.PER_WORKER_METRIC).equals("true")) {
107-
// return Metrics.gauge(name); // can this be a delgating gauge of false type?
108-
// metric name always exists, so need a way to handle them differently
109-
// second bollean should be false for u2 path, how to not add label just yet?
110-
// for legacy
111-
// return new DelegatingGauge(name, false, true);
112-
113-
// for runner v2
114-
// investigate why it gest ton UW container, but not to DFE
115-
// return Metrics.gauge(name);
116-
return new DelegatingGauge(name, false, false);
93+
return new DelegatingGauge(name, false);
11794
} else {
118-
return new DelegatingGauge(name, false, true);
95+
// Currently KafkaSink metrics only supports aggregated per worker metrics.
96+
return NoOpGauge.getInstance();
11997
}
12098
}
12199

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

-5
Original file line numberDiff line numberDiff line change
@@ -559,8 +559,6 @@ public ProcessContinuation processElement(
559559
.doubleValue()
560560
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
561561
KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics();
562-
// public void updateBacklogBytes(String topicName, int partitionId, long backlog) {
563-
564562
kafkaResults.updateBacklogBytes(
565563
kafkaSourceDescriptor.getTopic(),
566564
kafkaSourceDescriptor.getPartition(),
@@ -571,9 +569,6 @@ public ProcessContinuation processElement(
571569
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
572570
.doubleValue()
573571
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
574-
// flush imediately afterwards, can it be moved to finish bundle?
575-
// create the same kind of metric? This creates a per Worker metric, we dont want that to be
576-
// reusable between the two ios
577572
kafkaResults.updateKafkaMetrics();
578573
}
579574
}

0 commit comments

Comments
 (0)