Skip to content

Commit

Permalink
Introduce pipeline options to disable user counter and user stringset
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Nov 8, 2024
1 parent 159c1a4 commit 9388f40
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -567,14 +568,17 @@ static FileSystem getFileSystemInternal(String scheme) {
*
* <p>Outside of workers where Beam FileSystem API is used (e.g. test methods, user code executed
* during pipeline submission), consider use {@link #registerFileSystemsOnce} if initialize
* FIleSystem of supported schema is the main goal.
* FileSystem of supported schema is the main goal.
*/
@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options cannot be null");
long id = options.getOptionsId();
int nextRevision = options.revision();

// entry to set other PipelineOption determined flags
Metrics.setDefaultPipelineOptions(options);

while (true) {
KV<Long, Integer> revision = FILESYSTEM_REVISION.get();
// only update file systems if the pipeline changed or the options revision increased
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Serializable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.metrics.Metrics.MetricsFlag;

/** Implementation of {@link Counter} that delegates to the instance for the current context. */
@Internal
Expand Down Expand Up @@ -70,6 +71,9 @@ public void inc() {
/** Increment the counter by the given amount. */
@Override
public void inc(long n) {
if (MetricsFlag.counterDisabled()) {
return;
}
MetricsContainer container =
this.processWideContainer
? MetricsEnvironment.getProcessWideContainer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
package org.apache.beam.sdk.metrics;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The <code>Metrics</code> is a utility class for producing various kinds of metrics for reporting
Expand Down Expand Up @@ -50,9 +57,59 @@
* example off how to query metrics.
*/
public class Metrics {
private static final Logger LOG = LoggerFactory.getLogger(Metrics.class);

private Metrics() {}

static class MetricsFlag {
private static final AtomicReference<@Nullable MetricsFlag> INSTANCE = new AtomicReference<>();
final boolean counterDisabled;
final boolean stringSetDisabled;

private MetricsFlag(boolean counterDisabled, boolean stringSetDisabled) {
this.counterDisabled = counterDisabled;
this.stringSetDisabled = stringSetDisabled;
}

static boolean counterDisabled() {
MetricsFlag flag = INSTANCE.get();
return flag != null && flag.counterDisabled;
}

static boolean stringSetDisabled() {
MetricsFlag flag = INSTANCE.get();
return flag != null && flag.stringSetDisabled;
}
}

/**
* Initialize metrics flags if not already done so.
*
* <p>Should be called by worker at worker harness initialization. Should not be called by user
* code (and it does not have an effect as the initialization completed before).
*/
@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
MetricsFlag flag = MetricsFlag.INSTANCE.get();
if (flag == null) {
ExperimentalOptions exp = options.as(ExperimentalOptions.class);
boolean counterDisabled = ExperimentalOptions.hasExperiment(exp, "disableCounterMetrics");
if (counterDisabled) {
LOG.info("Counter metrics are disabled.");
}
boolean stringSetDisabled = ExperimentalOptions.hasExperiment(exp, "disableStringSetMetrics");
if (stringSetDisabled) {
LOG.info("StringSet metrics are disabled");
}
MetricsFlag.INSTANCE.compareAndSet(null, new MetricsFlag(counterDisabled, stringSetDisabled));
}
}

@Internal
static void resetDefaultPipelineOptions() {
MetricsFlag.INSTANCE.set(null);
}

/**
* Create a metric that can be incremented and decremented, and is aggregated by taking the sum.
*/
Expand Down Expand Up @@ -174,6 +231,9 @@ private DelegatingStringSet(MetricName name) {

@Override
public void add(String value) {
if (MetricsFlag.stringSetDisabled()) {
return;
}
MetricsContainer container = MetricsEnvironment.getCurrentContainer();
if (container != null) {
container.getStringSet(name).add(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -39,6 +41,7 @@
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesAttemptedMetrics;
Expand Down Expand Up @@ -245,6 +248,24 @@ public void testCounterToCell() {
counter.dec(5L);
verify(mockCounter).inc(-5);
}

@Test
public void testMetricsFlag() {
Metrics.resetDefaultPipelineOptions();
assertFalse(Metrics.MetricsFlag.counterDisabled());
assertFalse(Metrics.MetricsFlag.stringSetDisabled());
PipelineOptions options =
PipelineOptionsFactory.fromArgs("--experiments=disableCounterMetrics").create();
Metrics.setDefaultPipelineOptions(options);
assertTrue(Metrics.MetricsFlag.counterDisabled());
assertFalse(Metrics.MetricsFlag.stringSetDisabled());
Metrics.resetDefaultPipelineOptions();
options = PipelineOptionsFactory.fromArgs("--experiments=disableStringSetMetrics").create();
Metrics.setDefaultPipelineOptions(options);
assertFalse(Metrics.MetricsFlag.counterDisabled());
assertTrue(Metrics.MetricsFlag.stringSetDisabled());
Metrics.resetDefaultPipelineOptions();
}
}

/** Tests for committed metrics. */
Expand Down

0 comments on commit 9388f40

Please sign in to comment.