Skip to content

Commit

Permalink
Merge pull request #31823 Add lineage information for BigQuery sinks.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Jul 10, 2024
1 parent dd09124 commit 7c0cf39
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -412,9 +413,23 @@ public static String toTableSpec(TableReference ref) {
return sb.toString();
}

public static String dataCatalogName(TableReference ref) {
return String.format(
"bigquery:%s.%s.%s", ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
public static String dataCatalogName(TableReference ref, BigQueryOptions options) {
String tableIdBase;
int ix = ref.getTableId().indexOf('$');
if (ix == -1) {
tableIdBase = ref.getTableId();
} else {
tableIdBase = ref.getTableId().substring(0, ix);
}
String projectId;
if (!Strings.isNullOrEmpty(ref.getProjectId())) {
projectId = ref.getProjectId();
} else if (!Strings.isNullOrEmpty(options.getBigQueryProject())) {
projectId = options.getBigQueryProject();
} else {
projectId = options.getProject();
}
return String.format("bigquery:%s.%s.%s", projectId, ref.getDatasetId(), tableIdBase);
}

static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ protected ExtractResult extractFiles(PipelineOptions options) throws Exception {
BigQueryHelpers.toTableSpec(tableToExtract)));
}
// emit this table ID as a lineage source
Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract));
Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract, bqOptions));

TableSchema schema = table.getSchema();
JobService jobService = bqServices.getJobService(bqOptions);
Expand Down Expand Up @@ -158,7 +158,8 @@ public List<BoundedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions
if (res.extractedFiles.size() > 0) {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
// emit this table ID as a lineage source
Lineage.getSources().add(BigQueryHelpers.dataCatalogName(getTableToExtract(bqOptions)));
Lineage.getSources()
.add(BigQueryHelpers.dataCatalogName(getTableToExtract(bqOptions), bqOptions));
final String extractDestinationDir =
resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
// Match all files in the destination directory to stat them in bulk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public List<BigQueryStorageStreamSource<T>> split(
TableReference tableReference = targetTable.getTableReference();
readSessionBuilder.setTable(BigQueryHelpers.toTableResourceName(tableReference));
// register the table as lineage source
lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference));
lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, bqOptions));
} else {
// If the table does not exist targetTable will be null.
// Construct the table id if we can generate it. For error recording/logging.
Expand All @@ -123,7 +123,7 @@ public List<BigQueryStorageStreamSource<T>> split(
readSessionBuilder.setTable(tableReferenceId);
// register the table as lineage source
TableReference tableReference = BigQueryHelpers.parseTableUrn(tableReferenceId);
lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference));
lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, bqOptions));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -117,8 +118,13 @@ public void processElement(ProcessContext context) {
Supplier<@Nullable TableConstraints> tableConstraintsSupplier =
() -> dynamicDestinations.getTableConstraints(dest);

BigQueryOptions bqOptions = context.getPipelineOptions().as(BigQueryOptions.class);
Lineage.getSinks()
.add(
BigQueryHelpers.dataCatalogName(
tableDestination1.getTableReference(), bqOptions));
return CreateTableHelpers.possiblyCreateTable(
context.getPipelineOptions().as(BigQueryOptions.class),
bqOptions,
tableDestination1,
schemaSupplier,
tableConstraintsSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -267,6 +268,7 @@ public AppendRowsContext(
}

class DestinationState {
private final TableDestination tableDestination;
private final String tableUrn;
private final String shortTableUrn;
private String streamName = "";
Expand Down Expand Up @@ -298,6 +300,7 @@ class DestinationState {
private final boolean includeCdcColumns;

public DestinationState(
TableDestination tableDestination,
String tableUrn,
String shortTableUrn,
MessageConverter<ElementT> messageConverter,
Expand All @@ -309,6 +312,7 @@ public DestinationState(
Callable<Boolean> tryCreateTable,
boolean includeCdcColumns)
throws Exception {
this.tableDestination = tableDestination;
this.tableUrn = tableUrn;
this.shortTableUrn = shortTableUrn;
this.pendingMessages = Lists.newArrayList();
Expand All @@ -327,6 +331,10 @@ public DestinationState(
}
}

public TableDestination getTableDestination() {
return tableDestination;
}

void teardown() {
maybeTickleCache();
if (appendClientInfo != null) {
Expand Down Expand Up @@ -1050,6 +1058,7 @@ DestinationState createDestinationState(
try {
messageConverter = messageConverters.get(destination, dynamicDestinations, datasetService);
return new DestinationState(
tableDestination1,
tableDestination1.getTableUrn(bigQueryOptions),
tableDestination1.getShortTableUrn(),
messageConverter,
Expand Down Expand Up @@ -1089,6 +1098,11 @@ public void process(
initializedDatasetService,
initializedWriteStreamService,
pipelineOptions.as(BigQueryOptions.class)));
Lineage.getSinks()
.add(
BigQueryHelpers.dataCatalogName(
state.getTableDestination().getTableReference(),
pipelineOptions.as(BigQueryOptions.class)));

OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver = o.get(failedRowsTag);
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
Expand Down Expand Up @@ -469,6 +470,11 @@ public void process(
final DatasetService datasetService = getDatasetService(pipelineOptions);
final WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions);

Lineage.getSinks()
.add(
BigQueryHelpers.dataCatalogName(
tableDestination.getTableReference(), bigQueryOptions));

Coder<DestinationT> destinationCoder = dynamicDestinations.getDestinationCoder();
Callable<Boolean> tryCreateTable =
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -207,6 +208,11 @@ public void processElement(
// Process each destination table.
// Do not copy if no temp tables are provided.
if (!entry.getValue().isEmpty()) {
Lineage.getSinks()
.add(
BigQueryHelpers.dataCatalogName(
entry.getKey().getTableReference(),
c.getPipelineOptions().as(BigQueryOptions.class)));
pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c, window));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -259,6 +260,11 @@ public void processElement(
}
// This is a temp table. Create a new one for each partition and each pane.
tableReference.setTableId(jobIdPrefix);
} else {
Lineage.getSinks()
.add(
BigQueryHelpers.dataCatalogName(
tableReference, c.getPipelineOptions().as(BigQueryOptions.class)));
}

WriteDisposition writeDisposition = firstPaneWriteDisposition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -93,6 +94,7 @@
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -115,6 +117,10 @@
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
Expand Down Expand Up @@ -278,6 +284,20 @@ public void evaluate() throws Throwable {
.withDatasetService(fakeDatasetService)
.withJobService(fakeJobService);

private void checkLineageSinkMetric(PipelineResult pipelineResult, String tableName) {
MetricQueryResults lineageMetrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(Lineage.LINEAGE_NAMESPACE, Lineage.SINK_METRIC_NAME))
.build());
assertThat(
lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(),
hasItem("bigquery:" + tableName.replace(':', '.')));
}

@Before
public void setUp() throws ExecutionException, IOException, InterruptedException {
FakeDatasetService.setUp();
Expand Down Expand Up @@ -488,7 +508,7 @@ private void verifySideInputs() {
.containsInAnyOrder(expectedTables);
}

p.run();
PipelineResult pipelineResult = p.run();

Map<Long, List<TableRow>> expectedTableRows = Maps.newHashMap();
for (String anUserList : userList) {
Expand All @@ -505,6 +525,7 @@ private void verifySideInputs() {
assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "userid-" + entry.getKey()),
containsInAnyOrder(Iterables.toArray(entry.getValue(), TableRow.class)));
checkLineageSinkMetric(pipelineResult, "project-id.dataset-id.userid-" + entry.getKey());
}
}

Expand Down Expand Up @@ -680,7 +701,7 @@ public void runStreamingFileLoads(String tableRef, boolean useTempTables, boolea
}

p.apply(testStream).apply(writeTransform);
p.run();
PipelineResult pipelineResult = p.run();

final int projectIdSplitter = tableRef.indexOf(':');
final String projectId =
Expand All @@ -689,6 +710,9 @@ public void runStreamingFileLoads(String tableRef, boolean useTempTables, boolea
assertThat(
fakeDatasetService.getAllRows(projectId, "dataset-id", "table-id"),
containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));

checkLineageSinkMetric(
pipelineResult, tableRef.contains(projectId) ? tableRef : projectId + ":" + tableRef);
}

public void runStreamingFileLoads(String tableRef) throws Exception {
Expand Down Expand Up @@ -828,11 +852,12 @@ public void testBatchFileLoads() throws Exception {

PAssert.that(result.getSuccessfulTableLoads())
.containsInAnyOrder(new TableDestination("project-id:dataset-id.table-id", null));
p.run();
PipelineResult pipelineResult = p.run();

assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
checkLineageSinkMetric(pipelineResult, "project-id.dataset-id.table-id");
}

@Test
Expand Down Expand Up @@ -861,11 +886,12 @@ public void testBatchFileLoadsWithTempTables() throws Exception {

PAssert.that(result.getSuccessfulTableLoads())
.containsInAnyOrder(new TableDestination("project-id:dataset-id.table-id", null));
p.run();
PipelineResult pipelineResult = p.run();

assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
checkLineageSinkMetric(pipelineResult, "project-id.dataset-id.table-id");
}

@Test
Expand Down

0 comments on commit 7c0cf39

Please sign in to comment.