Skip to content

Commit 8e89e97

Browse files
author
Naireen
committed
Address nits
1 parent 8e7785b commit 8e89e97

File tree

9 files changed

+70
-27
lines changed

9 files changed

+70
-27
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java

+11
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo.MonitoringInfoLabels;
2727
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs;
2828
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoTypeUrns;
29+
import org.apache.beam.sdk.metrics.MetricName;
2930
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
31+
import org.checkerframework.checker.nullness.qual.Nullable;
3032

3133
/** This static class fetches MonitoringInfo related values from metrics.proto. */
3234
public final class MonitoringInfoConstants {
@@ -212,4 +214,13 @@ static String extractUrn(MonitoringInfoSpecs.Enum value) {
212214
private static String extractLabel(MonitoringInfo.MonitoringInfoLabels value) {
213215
return value.getValueDescriptor().getOptions().getExtension(labelProps).getName();
214216
}
217+
218+
public static boolean isPerWorkerMetric(MetricName metricName) {
219+
@Nullable
220+
String value = metricName.getLabels().get(MonitoringInfoConstants.Labels.PER_WORKER_METRIC);
221+
if (value != null && value.equals("true")) {
222+
return true;
223+
}
224+
return false;
225+
}
215226
}

runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstantsTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818
package org.apache.beam.runners.core.metrics;
1919

2020
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.extractUrn;
21+
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.isPerWorkerMetric;
2122
import static org.hamcrest.MatcherAssert.assertThat;
23+
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertTrue;
2225

2326
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs;
27+
import org.apache.beam.sdk.metrics.MetricName;
2428
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
29+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
2530
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
2631
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap;
2732
import org.hamcrest.Matchers;
@@ -47,4 +52,17 @@ public void testUniqueUrnsDefinedForAllSpecs() {
4752
}
4853
assertThat(urnToEnum.entries(), Matchers.empty());
4954
}
55+
56+
@Test
57+
public void testIsPerWorkerMetric() {
58+
MetricName metricName =
59+
MetricName.named("IO", "name1", ImmutableMap.of("PER_WORKER_METRIC", "true"));
60+
assertTrue(isPerWorkerMetric(metricName));
61+
62+
metricName = MetricName.named("IO", "name1", ImmutableMap.of("PER_WORKER_METRIC", "false"));
63+
assertFalse(isPerWorkerMetric(metricName));
64+
65+
metricName = MetricName.named("IO", "name1", ImmutableMap.of());
66+
assertFalse(isPerWorkerMetric(metricName));
67+
}
5068
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.beam.runners.core.metrics.DistributionData;
3737
import org.apache.beam.runners.core.metrics.GaugeCell;
3838
import org.apache.beam.runners.core.metrics.MetricsMap;
39+
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
3940
import org.apache.beam.runners.core.metrics.StringSetCell;
4041
import org.apache.beam.runners.core.metrics.StringSetData;
4142
import org.apache.beam.sdk.metrics.BoundedTrie;
@@ -240,10 +241,15 @@ private FluentIterable<CounterUpdate> gaugeUpdates() {
240241
@Override
241242
public @Nullable CounterUpdate apply(
242243
@Nonnull Map.Entry<MetricName, GaugeCell> entry) {
243-
long value = entry.getValue().getCumulative().value();
244-
org.joda.time.Instant timestamp = entry.getValue().getCumulative().timestamp();
245-
return MetricsToCounterUpdateConverter.fromGauge(
246-
MetricKey.create(stepName, entry.getKey()), value, timestamp);
244+
if (!MonitoringInfoConstants.isPerWorkerMetric(entry.getKey())) {
245+
long value = entry.getValue().getCumulative().value();
246+
org.joda.time.Instant timestamp = entry.getValue().getCumulative().timestamp();
247+
return MetricsToCounterUpdateConverter.fromGauge(
248+
MetricKey.create(stepName, entry.getKey()), value, timestamp);
249+
} else {
250+
// add a test for this.
251+
return null;
252+
}
247253
}
248254
})
249255
.filter(Predicates.notNull());
@@ -389,9 +395,10 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
389395

390396
gauges.forEach(
391397
(k, v) -> {
392-
// Check if metric name has the per worker label set
393-
if (k.getLabels().containsKey("PER_WORKER_METRIC")
394-
&& k.getLabels().get("PER_WORKER_METRIC").equals("true")) {
398+
// Check if metric name has the per worker label set.
399+
// TODO(Naireen): Populate local map with perWorkerMetrics so we don't need to check each
400+
// time we update the metrics.
401+
if (MonitoringInfoConstants.isPerWorkerMetric(k)) {
395402
Long val = v.getCumulative().value();
396403
per_worker_gauges.put(k, val);
397404
v.reset();

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java

+12
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public class StreamingStepMetricsContainerTest {
9090

9191
private MetricName name1 = MetricName.named("ns", "name1");
9292
private MetricName name2 = MetricName.named("ns", "name2");
93+
private MetricName name3 =
94+
MetricName.named("ns", "name3", ImmutableMap.of("PER_WORKER_METRIC", "true"));
9395

9496
@Test
9597
public void testDedupping() {
@@ -275,6 +277,16 @@ public void testGaugeUpdateExtraction() {
275277
DateTimeUtils.setCurrentMillisSystem();
276278
}
277279

280+
@Test
281+
public void testNoPerWorkerGaugeUpdateExtraction() {
282+
Gauge gauge = c1.getGauge(name3);
283+
gauge.set(5);
284+
285+
// There is no update.
286+
Iterable<CounterUpdate> updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
287+
assertThat(updates, IsEmptyIterable.emptyIterable());
288+
}
289+
278290
@Test
279291
public void testStringSetUpdateExtraction() {
280292
StringSet stringSet = c1.getStringSet(name1);

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ public interface KafkaMetrics {
3737

3838
void updateBacklogBytes(String topic, int partitionId, long backlog);
3939

40-
/*Used to update all metrics in container*/
41-
void updateKafkaMetrics();
40+
/*Flushes the buffered metrics to the current metric container for this thread.*/
41+
void flushBufferedMetrics();
4242

4343
/** No-op implementation of {@code KafkaResults}. */
4444
class NoOpKafkaMetrics implements KafkaMetrics {
@@ -51,7 +51,7 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {}
5151
public void updateBacklogBytes(String topic, int partitionId, long backlog) {}
5252

5353
@Override
54-
public void updateKafkaMetrics() {}
54+
public void flushBufferedMetrics() {}
5555

5656
private static NoOpKafkaMetrics singleton = new NoOpKafkaMetrics();
5757

@@ -164,7 +164,7 @@ private void recordBacklogBytesInternal() {
164164
* this function will no-op.
165165
*/
166166
@Override
167-
public void updateKafkaMetrics() {
167+
public void flushBufferedMetrics() {
168168
if (!isWritable().compareAndSet(true, false)) {
169169
LOG.warn("Updating stale Kafka metrics container");
170170
return;

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java

+5-10
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.apache.beam.sdk.metrics.Histogram;
2525
import org.apache.beam.sdk.metrics.LabeledMetricNameUtils;
2626
import org.apache.beam.sdk.metrics.MetricName;
27-
import org.apache.beam.sdk.metrics.NoOpGauge;
2827
import org.apache.beam.sdk.util.HistogramData;
28+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
2929

3030
/**
3131
* Helper class to create per worker metrics for Kafka Sink stages.
@@ -86,15 +86,10 @@ public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic
8686
* @return Counter.
8787
*/
8888
public static Gauge createBacklogGauge(MetricName name) {
89-
// Use label to differenciate between the type of gauge metric is created
90-
// TODO(34195):
91-
if (name.getLabels().containsKey(MonitoringInfoConstants.Labels.PER_WORKER_METRIC)
92-
&& name.getLabels().get(MonitoringInfoConstants.Labels.PER_WORKER_METRIC).equals("true")) {
93-
return new DelegatingGauge(name, false);
94-
} else {
95-
// Currently KafkaSink metrics only supports aggregated per worker metrics.
96-
return NoOpGauge.getInstance();
97-
}
89+
// TODO(#34195): Unify metrics collection path.
90+
// Currently KafkaSink metrics only supports aggregated per worker metrics.
91+
Preconditions.checkState(MonitoringInfoConstants.isPerWorkerMetric(name));
92+
return new DelegatingGauge(name, false);
9893
}
9994

10095
/**

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,12 @@ public boolean advance() throws IOException {
228228
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + pState.topicPartition.toString());
229229
rawSizes.update(recordSize);
230230

231-
kafkaResults.updateKafkaMetrics();
231+
kafkaResults.flushBufferedMetrics();
232232
return true;
233233
} else { // -- (b)
234234
kafkaResults = KafkaSinkMetrics.kafkaMetrics();
235235
nextBatch();
236-
kafkaResults.updateKafkaMetrics();
236+
kafkaResults.flushBufferedMetrics();
237237
if (!curBatch.hasNext()) {
238238
return false;
239239
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ public ProcessContinuation processElement(
569569
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
570570
.doubleValue()
571571
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
572-
kafkaResults.updateKafkaMetrics();
572+
kafkaResults.flushBufferedMetrics();
573573
}
574574
}
575575
}

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void testNoOpKafkaMetrics() throws Exception {
101101
KafkaMetrics results = KafkaMetrics.NoOpKafkaMetrics.getInstance();
102102
results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10));
103103
results.updateBacklogBytes("test-topic", 0, 10);
104-
results.updateKafkaMetrics();
104+
results.flushBufferedMetrics();
105105

106106
assertThat(testContainer.perWorkerHistograms.size(), equalTo(0));
107107
assertThat(testContainer.gauges.size(), equalTo(0));
@@ -119,7 +119,7 @@ public void testKafkaRPCLatencyMetrics() throws Exception {
119119
results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10));
120120
results.updateBacklogBytes("test-topic", 0, 10);
121121

122-
results.updateKafkaMetrics();
122+
results.flushBufferedMetrics();
123123
// RpcLatency*rpc_method:POLL;topic_name:test-topic
124124
MetricName histogramName =
125125
MetricName.named("KafkaSink", "RpcLatency*rpc_method:POLL;topic_name:test-topic;");
@@ -150,7 +150,7 @@ public void testKafkaRPCLatencyMetricsAreNotRecorded() throws Exception {
150150

151151
results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10));
152152

153-
results.updateKafkaMetrics();
153+
results.flushBufferedMetrics();
154154
assertThat(testContainer.perWorkerHistograms.size(), equalTo(0));
155155
}
156156
}

0 commit comments

Comments
 (0)