diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 7890a9f74b941..8b69b0ef55236 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -17,17 +17,17 @@ */ package org.apache.beam.sdk.metrics; +import java.util.HashSet; +import java.util.Set; + /** * Standard collection of metrics used to record source and sinks information for lineage tracking. */ public class Lineage { - public static final String LINEAGE_NAMESPACE = "lineage"; - public static final String SOURCE_METRIC_NAME = "sources"; - public static final String SINK_METRIC_NAME = "sinks"; - - private static final StringSet SOURCES = Metrics.stringSet(LINEAGE_NAMESPACE, SOURCE_METRIC_NAME); - private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, SINK_METRIC_NAME); + private static final StringSet SOURCES = + Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString()); + private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, Type.SINK.toString()); /** {@link StringSet} representing sources and optionally side inputs. */ public static StringSet getSources() { @@ -38,4 +38,35 @@ public static StringSet getSources() { public static StringSet getSinks() { return SINKS; } + + /** Query {@link StringSet} metrics from {@link MetricResults}. */ + public static Set query(MetricResults results, Type type) { + MetricsFilter filter = + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(LINEAGE_NAMESPACE, type.toString())) + .build(); + Set result = new HashSet<>(); + for (MetricResult metrics : results.queryMetrics(filter).getStringSets()) { + result.addAll(metrics.getCommitted().getStringSet()); + result.addAll(metrics.getAttempted().getStringSet()); + } + return result; + } + + /** Lineage metrics resource types. */ + public enum Type { + SOURCE("source"), + SINK("sink"); + + private final String name; + + Type(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } } diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index e499bae6fc64f..23c56f13a94c7 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -218,6 +218,10 @@ task integrationTest(type: Test, dependsOn: processTestResources) { useJUnit { excludeCategories "org.apache.beam.sdk.testing.UsesKms" + filter { + // https://github.com/apache/beam/issues/32071 + excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableReadIT.testE2EBigtableSegmentRead' + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index d78ae2cb6c578..6d20109e947ba 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -1337,6 +1337,7 @@ private static class BigtableWriterFn private transient Set> badRecords = null; // Due to callback thread not supporting Beam metrics, Record pending metrics and report later. private transient long pendingThrottlingMsecs; + private transient boolean reportedLineage; // Assign serviceEntry in startBundle and clear it in tearDown. @Nullable private BigtableServiceEntry serviceEntry; @@ -1480,6 +1481,10 @@ public void finishBundle(FinishBundleContext c) throws Exception { throttlingMsecs.inc(excessTime); } } + if (!reportedLineage) { + bigtableWriter.reportLineage(); + reportedLineage = true; + } bigtableWriter = null; } @@ -1612,6 +1617,7 @@ public String toString() { private final BigtableConfig config; private final BigtableReadOptions readOptions; private @Nullable Long estimatedSizeBytes; + private transient boolean reportedLineage; private final BigtableServiceFactory.ConfigId configId; @@ -1989,6 +1995,13 @@ public List getRanges() { public ValueProvider getTableId() { return readOptions.getTableId(); } + + void reportLineageOnce(BigtableService.Reader reader) { + if (!reportedLineage) { + reader.reportLineage(); + reportedLineage = true; + } + } } private static class BigtableReader extends BoundedReader { @@ -2019,6 +2032,7 @@ true, makeByteKey(reader.getCurrentRow().getKey()))) || rangeTracker.markDone(); if (hasRecord) { ++recordsReturned; + source.reportLineageOnce(reader); } return hasRecord; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java index 261cc3ac081d8..50d8126999c4b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java @@ -57,6 +57,9 @@ CompletionStage writeRecord(KV * @throws IOException if there is an error closing the writer */ void close() throws IOException; + + /** Report Lineage metrics to runner. */ + default void reportLineage() {} } /** The interface of a class that reads from Cloud Bigtable. */ @@ -77,6 +80,9 @@ interface Reader { Row getCurrentRow() throws NoSuchElementException; void close(); + + /** Report Lineage metrics to runner. */ + default void reportLineage() {} } /** Returns a {@link Reader} that will read from the specified source. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index f06a4a1276864..6fdf67722bac2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -71,6 +71,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -212,6 +213,11 @@ public void close() { exhausted = true; } } + + @Override + public void reportLineage() { + Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + } } @VisibleForTesting @@ -225,6 +231,9 @@ static class BigtableSegmentReaderImpl implements Reader { private final int refillSegmentWaterMark; private final long maxSegmentByteSize; private ServiceCallMetric serviceCallMetric; + private final String projectId; + private final String instanceId; + private final String tableId; private static class UpstreamResults { private final List rows; @@ -308,11 +317,19 @@ static BigtableSegmentReaderImpl create( // Asynchronously refill buffer when there is 10% of the elements are left this.refillSegmentWaterMark = Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE)); + this.projectId = projectId; + this.instanceId = instanceId; + this.tableId = tableId; } @Override public void close() {} + @Override + public void reportLineage() { + Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + } + @Override public boolean start() throws IOException { future = fetchNextSegment(); @@ -578,6 +595,11 @@ public void writeSingleRecord(KV> record) throws } } + @Override + public void reportLineage() { + Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + } + private ServiceCallMetric createServiceCallMetric() { // Populate metrics HashMap baseLabels = new HashMap<>(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java index 5c43666e79e5c..a8aca7570b33d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecordBase; @@ -61,9 +62,6 @@ import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; import org.apache.beam.sdk.io.gcp.testing.FakeJobService; import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.metrics.MetricNameFilter; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; @@ -351,18 +349,8 @@ private void checkTypedReadQueryObjectWithValidate( } private void checkLineageSourceMetric(PipelineResult pipelineResult, String tableName) { - MetricQueryResults lineageMetrics = - pipelineResult - .metrics() - .queryMetrics( - MetricsFilter.builder() - .addNameFilter( - MetricNameFilter.named( - Lineage.LINEAGE_NAMESPACE, Lineage.SOURCE_METRIC_NAME)) - .build()); - assertThat( - lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(), - contains("bigquery:" + tableName.replace(':', '.'))); + Set result = Lineage.query(pipelineResult.metrics(), Lineage.Type.SOURCE); + assertThat(result, contains("bigquery:" + tableName.replace(':', '.'))); } @Before @@ -600,10 +588,7 @@ public void processElement(ProcessContext c) throws Exception { new MyData("b", 2L, bd1, bd2), new MyData("c", 3L, bd1, bd2))); PipelineResult result = p.run(); - // Skip when direct runner splits outside of a counters context. - if (useTemplateCompatibility) { - checkLineageSourceMetric(result, "non-executing-project:somedataset.sometable"); - } + checkLineageSourceMetric(result, "non-executing-project:somedataset.sometable"); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index bc90d4c8bae79..c5af8045bfe20 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -118,9 +118,6 @@ import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; import org.apache.beam.sdk.io.gcp.testing.FakeJobService; import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.metrics.MetricNameFilter; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.JavaFieldSchema; @@ -285,16 +282,8 @@ public void evaluate() throws Throwable { .withJobService(fakeJobService); private void checkLineageSinkMetric(PipelineResult pipelineResult, String tableName) { - MetricQueryResults lineageMetrics = - pipelineResult - .metrics() - .queryMetrics( - MetricsFilter.builder() - .addNameFilter( - MetricNameFilter.named(Lineage.LINEAGE_NAMESPACE, Lineage.SINK_METRIC_NAME)) - .build()); assertThat( - lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(), + Lineage.query(pipelineResult.metrics(), Lineage.Type.SINK), hasItem("bigquery:" + tableName.replace(':', '.'))); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java index bc88858ebc33e..4ce9ad10b2c06 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.io.gcp.bigtable; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; + import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; @@ -28,7 +31,9 @@ import java.util.Date; import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -110,7 +115,8 @@ public void testE2EBigtableRead() { p.apply(BigtableIO.read().withBigtableOptions(bigtableOptionsBuilder).withTableId(tableId)) .apply(Count.globally()); PAssert.thatSingleton(count).isEqualTo(numRows); - p.run(); + PipelineResult r = p.run(); + checkLineageSourceMetric(r, tableId); } @Test @@ -138,6 +144,17 @@ public void testE2EBigtableSegmentRead() { .withMaxBufferElementCount(10)) .apply(Count.globally()); PAssert.thatSingleton(count).isEqualTo(numRows); - p.run(); + PipelineResult r = p.run(); + checkLineageSourceMetric(r, tableId); + } + + private void checkLineageSourceMetric(PipelineResult r, String tableId) { + // TODO(https://github.com/apache/beam/issues/32071) test malformed, + // when pipeline.run() is non-blocking, the metrics are not available by the time of query + if (options.getRunner().getName().contains("DirectRunner")) { + assertThat( + Lineage.query(r.metrics(), Lineage.Type.SOURCE), + hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId))); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java index bf9f7d991fa24..46bb3df836e56 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigtable; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import com.google.api.gax.rpc.ServerStream; @@ -39,8 +40,10 @@ import java.util.Objects; import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.testing.PAssert; @@ -142,7 +145,7 @@ public void processElement(ProcessContext c) { .withProjectId(project) .withInstanceId(options.getInstanceId()) .withTableId(tableId)); - p.run(); + PipelineResult r = p.run(); // Test number of column families and column family name equality Table table = getTable(tableId); @@ -154,6 +157,7 @@ public void processElement(ProcessContext c) { // Test table data equality List> tableData = getTableData(tableId); assertThat(tableData, Matchers.containsInAnyOrder(testData.toArray())); + checkLineageSinkMetric(r, tableId); } @Test @@ -340,7 +344,7 @@ public void failureTest(int numRows, DoFn> tableData = getTableData(tableId); assertEquals(998, tableData.size()); + checkLineageSinkMetric(r, tableId); } @After @@ -412,4 +417,13 @@ private void deleteTable(String tableId) { tableAdminClient.deleteTable(tableId); } } + + private void checkLineageSinkMetric(PipelineResult r, String tableId) { + // Only check lineage metrics on direct runner until Dataflow runner v2 supported report back + if (options.getRunner().getName().contains("DirectRunner")) { + assertThat( + Lineage.query(r.metrics(), Lineage.Type.SINK), + hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId))); + } + } }