Skip to content

Commit 53baae8

Browse files
NaireenNaireen
and
Naireen
authored
Kafka add counters v1 uw2 (apache#33503)
Co-authored-by: Naireen <naireenhussain@google.com>
1 parent caf80e8 commit 53baae8

File tree

23 files changed

+314
-116
lines changed

23 files changed

+314
-116
lines changed

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto

+3
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,9 @@ message MonitoringInfo {
457457
SPANNER_TABLE_ID = 25 [(label_props) = { name: "SPANNER_TABLE_ID" }];
458458
SPANNER_INSTANCE_ID = 26 [(label_props) = { name: "SPANNER_INSTANCE_ID" }];
459459
SPANNER_QUERY_NAME = 27 [(label_props) = { name: "SPANNER_QUERY_NAME" }];
460+
// Label which if has a "true" value indicates that the metric is intended
461+
// to be aggregated per-worker.
462+
PER_WORKER_METRIC = 28 [(label_props) = { name: "PER_WORKER_METRIC" }];
460463
}
461464

462465
// A set of key and value labels which define the scope of the metric. For

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java

+7
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,13 @@ public MetricUpdates getUpdates() {
318318
.setLabel(MonitoringInfoConstants.Labels.NAME, metricKey.metricName().getName())
319319
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricKey.stepName());
320320
}
321+
322+
// Add any metricKey labels to the monitoringInfoLabels.
323+
if (!metricName.getLabels().isEmpty()) {
324+
for (Map.Entry<String, String> entry : metricName.getLabels().entrySet()) {
325+
builder.setLabel(entry.getKey(), entry.getValue());
326+
}
327+
}
321328
return builder;
322329
}
323330

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java

+13
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo.MonitoringInfoLabels;
2727
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs;
2828
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoTypeUrns;
29+
import org.apache.beam.sdk.metrics.MetricName;
2930
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
31+
import org.checkerframework.checker.nullness.qual.Nullable;
3032

3133
/** This static class fetches MonitoringInfo related values from metrics.proto. */
3234
public final class MonitoringInfoConstants {
@@ -110,6 +112,7 @@ public static final class Labels {
110112
public static final String SPANNER_DATABASE_ID = "SPANNER_DATABASE_ID";
111113
public static final String SPANNER_INSTANCE_ID = "SPANNER_INSTANCE_ID";
112114
public static final String SPANNER_QUERY_NAME = "SPANNER_QUERY_NAME";
115+
public static final String PER_WORKER_METRIC = "PER_WORKER_METRIC";
113116

114117
static {
115118
// Validate that compile time constants match the values stored in the protos.
@@ -151,6 +154,7 @@ public static final class Labels {
151154
SPANNER_INSTANCE_ID.equals(extractLabel(MonitoringInfoLabels.SPANNER_INSTANCE_ID)));
152155
checkArgument(
153156
SPANNER_QUERY_NAME.equals(extractLabel(MonitoringInfoLabels.SPANNER_QUERY_NAME)));
157+
checkArgument(PER_WORKER_METRIC.equals(extractLabel(MonitoringInfoLabels.PER_WORKER_METRIC)));
154158
}
155159
}
156160

@@ -210,4 +214,13 @@ static String extractUrn(MonitoringInfoSpecs.Enum value) {
210214
private static String extractLabel(MonitoringInfo.MonitoringInfoLabels value) {
211215
return value.getValueDescriptor().getOptions().getExtension(labelProps).getName();
212216
}
217+
218+
public static boolean isPerWorkerMetric(MetricName metricName) {
219+
@Nullable
220+
String value = metricName.getLabels().get(MonitoringInfoConstants.Labels.PER_WORKER_METRIC);
221+
if (value != null && value.equals("true")) {
222+
return true;
223+
}
224+
return false;
225+
}
213226
}

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java

+1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public String getUrn() {
8080
}
8181

8282
/** @return The labels associated with this MonitoringInfo. */
83+
@Override
8384
public Map<String, String> getLabels() {
8485
return this.labels;
8586
}

runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java

+38
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.beam.sdk.metrics.MetricName;
4343
import org.apache.beam.sdk.util.HistogramData;
4444
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
45+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
4546
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
4647
import org.junit.Assert;
4748
import org.junit.Test;
@@ -241,6 +242,43 @@ public void testMonitoringInfosArePopulatedForUserCounters() {
241242
assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
242243
}
243244

245+
@Test
246+
public void testMonitoringInfosLabelsArePopulatedForMetricNamesWithLabels() {
247+
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
248+
249+
CounterCell c1 =
250+
testObject.getCounter(
251+
MetricName.named("KafkaSink", "name1", ImmutableMap.of("PER_WORKER_METRIC", "true")));
252+
CounterCell c2 = testObject.getCounter(MetricName.named("BigQuerySink", "name2"));
253+
254+
c1.inc(2L);
255+
c2.inc(4L);
256+
257+
SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
258+
builder1
259+
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
260+
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "KafkaSink")
261+
.setLabel(MonitoringInfoConstants.Labels.NAME, "name1")
262+
.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true")
263+
.setInt64SumValue(2)
264+
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
265+
266+
SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
267+
builder2
268+
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
269+
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "BigQuerySink")
270+
.setLabel(MonitoringInfoConstants.Labels.NAME, "name2")
271+
.setInt64SumValue(4)
272+
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
273+
274+
ArrayList<MonitoringInfo> actualMonitoringInfos = new ArrayList<MonitoringInfo>();
275+
for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
276+
actualMonitoringInfos.add(mi);
277+
}
278+
279+
assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
280+
}
281+
244282
@Test
245283
public void testMonitoringInfosArePopulatedForUserDistributions() {
246284
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");

runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstantsTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818
package org.apache.beam.runners.core.metrics;
1919

2020
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.extractUrn;
21+
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.isPerWorkerMetric;
2122
import static org.hamcrest.MatcherAssert.assertThat;
23+
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertTrue;
2225

2326
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs;
27+
import org.apache.beam.sdk.metrics.MetricName;
2428
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
29+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
2530
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
2631
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap;
2732
import org.hamcrest.Matchers;
@@ -47,4 +52,17 @@ public void testUniqueUrnsDefinedForAllSpecs() {
4752
}
4853
assertThat(urnToEnum.entries(), Matchers.empty());
4954
}
55+
56+
@Test
57+
public void testIsPerWorkerMetric() {
58+
MetricName metricName =
59+
MetricName.named("IO", "name1", ImmutableMap.of("PER_WORKER_METRIC", "true"));
60+
assertTrue(isPerWorkerMetric(metricName));
61+
62+
metricName = MetricName.named("IO", "name1", ImmutableMap.of("PER_WORKER_METRIC", "false"));
63+
assertFalse(isPerWorkerMetric(metricName));
64+
65+
metricName = MetricName.named("IO", "name1", ImmutableMap.of());
66+
assertFalse(isPerWorkerMetric(metricName));
67+
}
5068
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java

-6
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,6 @@ public Gauge getGauge(MetricName metricName) {
7575
return getCurrentContainer().getGauge(metricName);
7676
}
7777

78-
@Override
79-
public Gauge getPerWorkerGauge(MetricName metricName) {
80-
Gauge gauge = getCurrentContainer().getPerWorkerGauge(metricName);
81-
return gauge;
82-
}
83-
8478
@Override
8579
public StringSet getStringSet(MetricName metricName) {
8680
return getCurrentContainer().getStringSet(metricName);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java

+23-26
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.beam.runners.core.metrics.DistributionData;
3737
import org.apache.beam.runners.core.metrics.GaugeCell;
3838
import org.apache.beam.runners.core.metrics.MetricsMap;
39+
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
3940
import org.apache.beam.runners.core.metrics.StringSetCell;
4041
import org.apache.beam.runners.core.metrics.StringSetData;
4142
import org.apache.beam.sdk.metrics.BoundedTrie;
@@ -75,9 +76,6 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
7576

7677
private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);
7778

78-
private final ConcurrentHashMap<MetricName, GaugeCell> perWorkerGauges =
79-
new ConcurrentHashMap<>();
80-
8179
private MetricsMap<MetricName, StringSetCell> stringSets = new MetricsMap<>(StringSetCell::new);
8280

8381
private MetricsMap<MetricName, DeltaDistributionCell> distributions =
@@ -178,19 +176,6 @@ public Gauge getGauge(MetricName metricName) {
178176
return gauges.get(metricName);
179177
}
180178

181-
@Override
182-
public Gauge getPerWorkerGauge(MetricName metricName) {
183-
if (!enablePerWorkerMetrics) {
184-
return MetricsContainer.super.getPerWorkerGauge(metricName);
185-
}
186-
Gauge val = perWorkerGauges.get(metricName);
187-
if (val != null) {
188-
return val;
189-
}
190-
191-
return perWorkerGauges.computeIfAbsent(metricName, name -> new GaugeCell(metricName));
192-
}
193-
194179
@Override
195180
public StringSet getStringSet(MetricName metricName) {
196181
return stringSets.get(metricName);
@@ -256,10 +241,15 @@ private FluentIterable<CounterUpdate> gaugeUpdates() {
256241
@Override
257242
public @Nullable CounterUpdate apply(
258243
@Nonnull Map.Entry<MetricName, GaugeCell> entry) {
259-
long value = entry.getValue().getCumulative().value();
260-
org.joda.time.Instant timestamp = entry.getValue().getCumulative().timestamp();
261-
return MetricsToCounterUpdateConverter.fromGauge(
262-
MetricKey.create(stepName, entry.getKey()), value, timestamp);
244+
if (!MonitoringInfoConstants.isPerWorkerMetric(entry.getKey())) {
245+
long value = entry.getValue().getCumulative().value();
246+
org.joda.time.Instant timestamp = entry.getValue().getCumulative().timestamp();
247+
return MetricsToCounterUpdateConverter.fromGauge(
248+
MetricKey.create(stepName, entry.getKey()), value, timestamp);
249+
} else {
250+
// add a test for this.
251+
return null;
252+
}
263253
}
264254
})
265255
.filter(Predicates.notNull());
@@ -388,7 +378,8 @@ private void deleteStaleCounters(
388378
@VisibleForTesting
389379
Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
390380
ConcurrentHashMap<MetricName, Long> counters = new ConcurrentHashMap<MetricName, Long>();
391-
ConcurrentHashMap<MetricName, Long> gauges = new ConcurrentHashMap<MetricName, Long>();
381+
ConcurrentHashMap<MetricName, Long> per_worker_gauges =
382+
new ConcurrentHashMap<MetricName, Long>();
392383
ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot> histograms =
393384
new ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot>();
394385
HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();
@@ -402,12 +393,18 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
402393
counters.put(k, val);
403394
});
404395

405-
perWorkerGauges.forEach(
396+
gauges.forEach(
406397
(k, v) -> {
407-
Long val = v.getCumulative().value();
408-
gauges.put(k, val);
409-
v.reset();
398+
// Check if metric name has the per worker label set.
399+
// TODO(Naireen): Populate local map with perWorkerMetrics so we don't need to check each
400+
// time we update the metrics.
401+
if (MonitoringInfoConstants.isPerWorkerMetric(k)) {
402+
Long val = v.getCumulative().value();
403+
per_worker_gauges.put(k, val);
404+
v.reset();
405+
}
410406
});
407+
411408
perWorkerHistograms.forEach(
412409
(k, v) -> {
413410
v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot));
@@ -416,7 +413,7 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
416413
deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));
417414

418415
return MetricsToPerStepNamespaceMetricsConverter.convert(
419-
stepName, counters, gauges, histograms, parsedPerWorkerMetricsCache);
416+
stepName, counters, per_worker_gauges, histograms, parsedPerWorkerMetricsCache);
420417
}
421418

422419
/**

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java

+57-1
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
import com.google.api.services.dataflow.model.CounterStructuredName;
3232
import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
3333
import com.google.api.services.dataflow.model.CounterUpdate;
34+
import com.google.api.services.dataflow.model.DataflowGaugeValue;
3435
import com.google.api.services.dataflow.model.DataflowHistogramValue;
3536
import com.google.api.services.dataflow.model.DistributionUpdate;
3637
import com.google.api.services.dataflow.model.IntegerGauge;
3738
import com.google.api.services.dataflow.model.Linear;
3839
import com.google.api.services.dataflow.model.MetricValue;
3940
import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
4041
import com.google.api.services.dataflow.model.StringList;
42+
import com.google.common.collect.ImmutableMap;
4143
import java.time.Clock;
4244
import java.time.Duration;
4345
import java.time.Instant;
@@ -88,6 +90,8 @@ public class StreamingStepMetricsContainerTest {
8890

8991
private MetricName name1 = MetricName.named("ns", "name1");
9092
private MetricName name2 = MetricName.named("ns", "name2");
93+
private MetricName name3 =
94+
MetricName.named("ns", "name3", ImmutableMap.of("PER_WORKER_METRIC", "true"));
9195

9296
@Test
9397
public void testDedupping() {
@@ -273,6 +277,16 @@ public void testGaugeUpdateExtraction() {
273277
DateTimeUtils.setCurrentMillisSystem();
274278
}
275279

280+
@Test
281+
public void testNoPerWorkerGaugeUpdateExtraction() {
282+
Gauge gauge = c1.getGauge(name3);
283+
gauge.set(5);
284+
285+
// There is no update.
286+
Iterable<CounterUpdate> updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
287+
assertThat(updates, IsEmptyIterable.emptyIterable());
288+
}
289+
276290
@Test
277291
public void testStringSetUpdateExtraction() {
278292
StringSet stringSet = c1.getStringSet(name1);
@@ -471,7 +485,7 @@ public void testExtractPerWorkerMetricUpdates_populatedMetrics() {
471485
}
472486

473487
@Test
474-
public void testExtractPerWorkerMetricUpdatesKafka_populatedMetrics() {
488+
public void testExtractPerWorkerMetricUpdatesKafka_populatedHistogramMetrics() {
475489
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
476490

477491
MetricName histogramMetricName = MetricName.named("KafkaSink", "histogram");
@@ -508,6 +522,48 @@ public void testExtractPerWorkerMetricUpdatesKafka_populatedMetrics() {
508522
assertThat(updates, containsInAnyOrder(histograms));
509523
}
510524

525+
@Test
526+
public void testExtractPerWorkerMetricUpdatesKafka_populateGaugeMetrics() {
527+
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
528+
529+
MetricName gaugeMetricName =
530+
MetricName.named("KafkaSink", "gauge", ImmutableMap.of("PER_WORKER_METRIC", "true"));
531+
c2.getGauge(gaugeMetricName).set(5L);
532+
533+
Iterable<PerStepNamespaceMetrics> updates =
534+
StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(registry);
535+
536+
DataflowGaugeValue gaugeValue = new DataflowGaugeValue();
537+
gaugeValue.setValue(5L);
538+
539+
MetricValue expectedGauge =
540+
new MetricValue()
541+
.setMetric("gauge")
542+
.setMetricLabels(new HashMap<>())
543+
.setValueGauge64(gaugeValue);
544+
545+
PerStepNamespaceMetrics gauge =
546+
new PerStepNamespaceMetrics()
547+
.setOriginalStep("s2")
548+
.setMetricsNamespace("KafkaSink")
549+
.setMetricValues(Collections.singletonList(expectedGauge));
550+
551+
assertThat(updates, containsInAnyOrder(gauge));
552+
}
553+
554+
@Test
555+
public void testExtractPerWorkerMetricUpdatesKafka_gaugeMetricsDropped() {
556+
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
557+
558+
MetricName gaugeMetricName =
559+
MetricName.named("KafkaSink", "gauge", ImmutableMap.of("PER_WORKER_METRIC", "false"));
560+
c2.getGauge(gaugeMetricName).set(5L);
561+
562+
Iterable<PerStepNamespaceMetrics> updates =
563+
StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(registry);
564+
assertThat(updates, IsEmptyIterable.emptyIterable());
565+
}
566+
511567
@Test
512568
public void testExtractPerWorkerMetricUpdates_emptyMetrics() {
513569
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);

0 commit comments

Comments
 (0)