Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce pipeline options to disable user counter and user stringset #33059

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -567,14 +568,17 @@ static FileSystem getFileSystemInternal(String scheme) {
*
* <p>Outside of workers where Beam FileSystem API is used (e.g. test methods, user code executed
* during pipeline submission), consider use {@link #registerFileSystemsOnce} if initialize
* FIleSystem of supported schema is the main goal.
* FileSystem of supported schema is the main goal.
*/
@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options cannot be null");
long id = options.getOptionsId();
int nextRevision = options.revision();

// entry to set other PipelineOption determined flags
Metrics.setDefaultPipelineOptions(options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes this a little overloaded no? seems odd we set metric stuff in FS. But I see you want to do this in centralized place.
Would it better to have it separate in DataflowBatchWorkerHarness.java, StreamingDataflowWorker.java and FnHarness.java from where all this is called?

(I do not have a preference, just a though. Feel free to ignore this comment if you want to do it this way)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setDefaultPipelineOptions will be called once per worker, on SDK harness startup, so the overloead is minimum

Separate call has much larger diff. Multiple runners has their own main() function (search for FileSystems.setDefaultPipelineOptions) give a hint


while (true) {
KV<Long, Integer> revision = FILESYSTEM_REVISION.get();
// only update file systems if the pipeline changed or the options revision increased
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Serializable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.metrics.Metrics.MetricsFlag;

/** Implementation of {@link Counter} that delegates to the instance for the current context. */
@Internal
Expand Down Expand Up @@ -70,6 +71,9 @@ public void inc() {
/** Increment the counter by the given amount. */
@Override
public void inc(long n) {
if (MetricsFlag.counterDisabled()) {
return;
}
MetricsContainer container =
this.processWideContainer
? MetricsEnvironment.getProcessWideContainer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
package org.apache.beam.sdk.metrics;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The <code>Metrics</code> is a utility class for producing various kinds of metrics for reporting
Expand Down Expand Up @@ -50,9 +57,59 @@
* example off how to query metrics.
*/
public class Metrics {
private static final Logger LOG = LoggerFactory.getLogger(Metrics.class);

private Metrics() {}

static class MetricsFlag {
private static final AtomicReference<@Nullable MetricsFlag> INSTANCE = new AtomicReference<>();
final boolean counterDisabled;
final boolean stringSetDisabled;

private MetricsFlag(boolean counterDisabled, boolean stringSetDisabled) {
this.counterDisabled = counterDisabled;
this.stringSetDisabled = stringSetDisabled;
}

static boolean counterDisabled() {
MetricsFlag flag = INSTANCE.get();
return flag != null && flag.counterDisabled;
}

static boolean stringSetDisabled() {
MetricsFlag flag = INSTANCE.get();
return flag != null && flag.stringSetDisabled;
}
}

/**
* Initialize metrics flags if not already done so.
*
* <p>Should be called by worker at worker harness initialization. Should not be called by user
* code (and it does not have an effect as the initialization completed before).
*/
@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
MetricsFlag flag = MetricsFlag.INSTANCE.get();
if (flag == null) {
ExperimentalOptions exp = options.as(ExperimentalOptions.class);
boolean counterDisabled = ExperimentalOptions.hasExperiment(exp, "disableCounterMetrics");
if (counterDisabled) {
LOG.info("Counter metrics are disabled.");
}
boolean stringSetDisabled = ExperimentalOptions.hasExperiment(exp, "disableStringSetMetrics");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this if a streaming customer want to disable StringSet or Lineage in their job they will need to restart the job passing this experiment.

On service side we have an experiment to enable lineage (by default disabled). It is already present in pipeline option https://screenshot.googleplex.com/7WaPSvq85nHyCEt I was thinking we should keep lineage metric reporting disabled by default unless a job enable lineage.

We have identified some cases where for long running streaming jobs the lineage information can get very large over time so it will be ideal to only capture this for job which opt for lineage.

Can I please ask you while you are doing the same thing in this PR to add a

Lineage.setDefaultPipelineOptions(options);

in file above

and this similar function in Lineage class to set based on the lineage experiment above enable_lineage and the add in Lineage class just return as no-op if lineage is not enabled

Thank you in advance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, closed this one and opened an alternative PR: #33085 for this suggestion.

if (stringSetDisabled) {
LOG.info("StringSet metrics are disabled");
}
MetricsFlag.INSTANCE.compareAndSet(null, new MetricsFlag(counterDisabled, stringSetDisabled));
}
}

@Internal
static void resetDefaultPipelineOptions() {
MetricsFlag.INSTANCE.set(null);
}

/**
* Create a metric that can be incremented and decremented, and is aggregated by taking the sum.
*/
Expand Down Expand Up @@ -174,6 +231,9 @@ private DelegatingStringSet(MetricName name) {

@Override
public void add(String value) {
if (MetricsFlag.stringSetDisabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this part have unit test

return;
}
MetricsContainer container = MetricsEnvironment.getCurrentContainer();
if (container != null) {
container.getStringSet(name).add(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -39,6 +41,7 @@
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesAttemptedMetrics;
Expand Down Expand Up @@ -245,6 +248,24 @@ public void testCounterToCell() {
counter.dec(5L);
verify(mockCounter).inc(-5);
}

@Test
public void testMetricsFlag() {
Metrics.resetDefaultPipelineOptions();
assertFalse(Metrics.MetricsFlag.counterDisabled());
assertFalse(Metrics.MetricsFlag.stringSetDisabled());
PipelineOptions options =
PipelineOptionsFactory.fromArgs("--experiments=disableCounterMetrics").create();
Metrics.setDefaultPipelineOptions(options);
assertTrue(Metrics.MetricsFlag.counterDisabled());
assertFalse(Metrics.MetricsFlag.stringSetDisabled());
Metrics.resetDefaultPipelineOptions();
options = PipelineOptionsFactory.fromArgs("--experiments=disableStringSetMetrics").create();
Metrics.setDefaultPipelineOptions(options);
assertFalse(Metrics.MetricsFlag.counterDisabled());
assertTrue(Metrics.MetricsFlag.stringSetDisabled());
Metrics.resetDefaultPipelineOptions();
}
}

/** Tests for committed metrics. */
Expand Down
Loading