Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Lineage metrics for BigtableIO #32068

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
*/
package org.apache.beam.sdk.metrics;

import java.util.HashSet;
import java.util.Set;

/**
* Standard collection of metrics used to record source and sinks information for lineage tracking.
*/
public class Lineage {

public static final String LINEAGE_NAMESPACE = "lineage";
public static final String SOURCE_METRIC_NAME = "sources";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is breaking change but Lineage is just introduced for one release and only used internally. I would suggest change the constant String to a enum so we can have a query method guarantees the metric name is among defined ones

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG. Thanks.

public static final String SINK_METRIC_NAME = "sinks";

private static final StringSet SOURCES = Metrics.stringSet(LINEAGE_NAMESPACE, SOURCE_METRIC_NAME);
private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, SINK_METRIC_NAME);
private static final StringSet SOURCES =
Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString());
private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, Type.SINK.toString());

/** {@link StringSet} representing sources and optionally side inputs. */
public static StringSet getSources() {
Expand All @@ -38,4 +38,35 @@ public static StringSet getSources() {
public static StringSet getSinks() {
return SINKS;
}

/** Query {@link StringSet} metrics from {@link MetricResults}. */
public static Set<String> query(MetricResults results, Type type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

MetricsFilter filter =
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named(LINEAGE_NAMESPACE, type.toString()))
.build();
Set<String> result = new HashSet<>();
for (MetricResult<StringSetResult> metrics : results.queryMetrics(filter).getStringSets()) {
result.addAll(metrics.getCommitted().getStringSet());
result.addAll(metrics.getAttempted().getStringSet());
}
return result;
}

/** Lineage metrics resource types. */
public enum Type {
SOURCE("source"),
SINK("sink");

private final String name;

Type(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}
}
}
4 changes: 4 additions & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ task integrationTest(type: Test, dependsOn: processTestResources) {

useJUnit {
excludeCategories "org.apache.beam.sdk.testing.UsesKms"
filter {
// https://github.com/apache/beam/issues/32071
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting.....
Thanks for catching this.

excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableReadIT.testE2EBigtableSegmentRead'
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ private static class BigtableWriterFn
private transient Set<KV<BigtableWriteException, BoundedWindow>> badRecords = null;
// Due to callback thread not supporting Beam metrics, Record pending metrics and report later.
private transient long pendingThrottlingMsecs;
private transient boolean reportedLineage;

// Assign serviceEntry in startBundle and clear it in tearDown.
@Nullable private BigtableServiceEntry serviceEntry;
Expand Down Expand Up @@ -1480,6 +1481,10 @@ public void finishBundle(FinishBundleContext c) throws Exception {
throttlingMsecs.inc(excessTime);
}
}
if (!reportedLineage) {
bigtableWriter.reportLineage();
reportedLineage = true;
}
bigtableWriter = null;
}

Expand Down Expand Up @@ -1612,6 +1617,7 @@ public String toString() {
private final BigtableConfig config;
private final BigtableReadOptions readOptions;
private @Nullable Long estimatedSizeBytes;
private transient boolean reportedLineage;

private final BigtableServiceFactory.ConfigId configId;

Expand Down Expand Up @@ -1989,6 +1995,13 @@ public List<ByteKeyRange> getRanges() {
public ValueProvider<String> getTableId() {
return readOptions.getTableId();
}

void reportLineageOnce(BigtableService.Reader reader) {
if (!reportedLineage) {
reader.reportLineage();
reportedLineage = true;
}
}
}

private static class BigtableReader extends BoundedReader<Row> {
Expand Down Expand Up @@ -2017,8 +2030,11 @@ public boolean start() throws IOException {
&& rangeTracker.tryReturnRecordAt(
true, makeByteKey(reader.getCurrentRow().getKey())))
|| rangeTracker.markDone();
LOG.warn("called start");
Abacn marked this conversation as resolved.
Show resolved Hide resolved
if (hasRecord) {
LOG.warn("called has record");
++recordsReturned;
source.reportLineageOnce(reader);
}
return hasRecord;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>
* @throws IOException if there is an error closing the writer
*/
void close() throws IOException;

/** Report Lineage metrics to runner. */
default void reportLineage() {}
}

/** The interface of a class that reads from Cloud Bigtable. */
Expand All @@ -77,6 +80,9 @@ interface Reader {
Row getCurrentRow() throws NoSuchElementException;

void close();

/** Report Lineage metrics to runner. */
default void reportLineage() {}
}

/** Returns a {@link Reader} that will read from the specified source. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Lineage;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the challenges is that BigtableIO has a complicated fallback logic to resolve projectId / instanceId. From transform setting / Bigtable setting / pipeline options, and there is BigtableConfig/BigtableWriteOptions/BigtableDataSettings. This was partly due to client migration that supported both old and new config class.

For this reason this PR chose to emit metrics at the low level (BigtableServiceImpl) where the worker actually talking to the server, and where projectId/instanceId has been resolved definitely

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -212,6 +213,11 @@ public void close() {
exhausted = true;
}
}

@Override
public void reportLineage() {
Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId));
}
}

@VisibleForTesting
Expand All @@ -225,6 +231,9 @@ static class BigtableSegmentReaderImpl implements Reader {
private final int refillSegmentWaterMark;
private final long maxSegmentByteSize;
private ServiceCallMetric serviceCallMetric;
private final String projectId;
private final String instanceId;
private final String tableId;

private static class UpstreamResults {
private final List<Row> rows;
Expand Down Expand Up @@ -308,11 +317,19 @@ static BigtableSegmentReaderImpl create(
// Asynchronously refill buffer when there is 10% of the elements are left
this.refillSegmentWaterMark =
Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE));
this.projectId = projectId;
this.instanceId = instanceId;
this.tableId = tableId;
}

@Override
public void close() {}

@Override
public void reportLineage() {
Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId));
}

@Override
public boolean start() throws IOException {
future = fetchNextSegment();
Expand Down Expand Up @@ -578,6 +595,11 @@ public void writeSingleRecord(KV<ByteString, Iterable<Mutation>> record) throws
}
}

@Override
public void reportLineage() {
Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId));
}

private ServiceCallMetric createServiceCallMetric() {
// Populate metrics
HashMap<String, String> baseLabels = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
Expand All @@ -61,9 +62,6 @@
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -351,18 +349,8 @@ private void checkTypedReadQueryObjectWithValidate(
}

private void checkLineageSourceMetric(PipelineResult pipelineResult, String tableName) {
MetricQueryResults lineageMetrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(
Lineage.LINEAGE_NAMESPACE, Lineage.SOURCE_METRIC_NAME))
.build());
assertThat(
lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(),
contains("bigquery:" + tableName.replace(':', '.')));
Set<String> result = Lineage.query(pipelineResult.metrics(), Lineage.Type.SOURCE);
assertThat(result, contains("bigquery:" + tableName.replace(':', '.')));
}

@Before
Expand Down Expand Up @@ -600,10 +588,7 @@ public void processElement(ProcessContext c) throws Exception {
new MyData("b", 2L, bd1, bd2),
new MyData("c", 3L, bd1, bd2)));
PipelineResult result = p.run();
// Skip when direct runner splits outside of a counters context.
if (useTemplateCompatibility) {
checkLineageSourceMetric(result, "non-executing-project:somedataset.sometable");
}
checkLineageSourceMetric(result, "non-executing-project:somedataset.sometable");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
Expand Down Expand Up @@ -285,16 +282,8 @@ public void evaluate() throws Throwable {
.withJobService(fakeJobService);

private void checkLineageSinkMetric(PipelineResult pipelineResult, String tableName) {
MetricQueryResults lineageMetrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(Lineage.LINEAGE_NAMESPACE, Lineage.SINK_METRIC_NAME))
.build());
assertThat(
lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(),
Lineage.query(pipelineResult.metrics(), Lineage.Type.SINK),
hasItem("bigquery:" + tableName.replace(':', '.')));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;

import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
Expand All @@ -28,7 +31,9 @@
import java.util.Date;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -110,7 +115,8 @@ public void testE2EBigtableRead() {
p.apply(BigtableIO.read().withBigtableOptions(bigtableOptionsBuilder).withTableId(tableId))
.apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(numRows);
p.run();
PipelineResult r = p.run();
checkLineageSourceMetric(r, tableId);
}

@Test
Expand Down Expand Up @@ -138,6 +144,17 @@ public void testE2EBigtableSegmentRead() {
.withMaxBufferElementCount(10))
.apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(numRows);
p.run();
PipelineResult r = p.run();
checkLineageSourceMetric(r, tableId);
}

private void checkLineageSourceMetric(PipelineResult r, String tableId) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one cannot check Lineage metrics in unit test in BigtableIO as what BigQueryIO does is because the metrics are emitted in ServiceImpl, where unit tests used a fake Reader + ServiceImpl which does not emit Lineage (and it does not make sense add Lineage metrics there as well, because it is completely differerent code path from the real Reader / ServiceImpl)

// TODO(https://github.com/apache/beam/issues/32071) test malformed,
// when pipeline.run() os non-blocking, the metrics are not available by the time of query
Abacn marked this conversation as resolved.
Show resolved Hide resolved
if (options.getRunner().getName().contains("DirectRunner")) {
assertThat(
Lineage.query(r.metrics(), Lineage.Type.SOURCE),
hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId)));
}
}
}
Loading
Loading