Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spark): Adding OpenLineage symlink support to Spark lineage #10637

Merged
merged 4 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.14.0'
ext.openLineageVersion = '1.16.0'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class DatahubOpenlineageConfig {
@Builder.Default private final DataJobUrn parentJobUrn = null;
// This is disabled until column level patch support won't be fixed in GMS
@Builder.Default private final boolean usePatch = true;
@Builder.Default private String hivePlatformAlias = "hive";
@Builder.Default private Map<String, String> urnAliases = new HashMap<>();

public List<PathSpec> getPathSpecsForPlatform(String platform) {
if ((pathSpecs == null) || (pathSpecs.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,56 @@ private OpenLineageToDataHub() {}

public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig) {

String namespace = dataset.getNamespace();
String datasetName = dataset.getName();
Optional<DatasetUrn> datahubUrn;
if (dataset.getFacets() != null && dataset.getFacets().getSymlinks() != null) {
Optional<DatasetUrn> originalUrn =
getDatasetUrnFromOlDataset(namespace, datasetName, mappingConfig);
for (OpenLineage.SymlinksDatasetFacetIdentifiers symlink :
dataset.getFacets().getSymlinks().getIdentifiers()) {
if (symlink.getType().equals("TABLE")) {
if (symlink.getNamespace().startsWith("aws:glue:")) {
namespace = "glue";
} else {
namespace = mappingConfig.getHivePlatformAlias();
}
datasetName = symlink.getName();
}
}
Optional<DatasetUrn> symlinkedUrn =
getDatasetUrnFromOlDataset(namespace, datasetName, mappingConfig);
if (symlinkedUrn.isPresent() && originalUrn.isPresent()) {
mappingConfig
.getUrnAliases()
.put(originalUrn.get().toString(), symlinkedUrn.get().toString());
}
datahubUrn = symlinkedUrn;
} else {
datahubUrn = getDatasetUrnFromOlDataset(namespace, datasetName, mappingConfig);
}

log.debug("Dataset URN: {}, alias_list: {}", datahubUrn, mappingConfig.getUrnAliases());
// If we have the urn in urn aliases then we should use the alias instead of the original urn
if (datahubUrn.isPresent()
&& mappingConfig.getUrnAliases().containsKey(datahubUrn.get().toString())) {
try {
datahubUrn =
Optional.of(
DatasetUrn.createFromString(
mappingConfig.getUrnAliases().get(datahubUrn.get().toString())));
return datahubUrn;
} catch (URISyntaxException e) {
return Optional.empty();
}
}

return datahubUrn;
}

private static Optional<DatasetUrn> getDatasetUrnFromOlDataset(
String namespace, String datasetName, DatahubOpenlineageConfig mappingConfig) {
String platform;
if (namespace.contains(SCHEME_SEPARATOR)) {
try {
Expand All @@ -115,8 +162,8 @@ public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
} else {
platform = datasetUri.getScheme();
}
datasetName = datasetUri.getPath();
if (HdfsPlatform.isFsPlatformPrefix(platform)) {
datasetName = datasetUri.getPath();
try {
HdfsPathDataset hdfsPathDataset = HdfsPathDataset.create(datasetUri, mappingConfig);
return Optional.of(hdfsPathDataset.urn());
Expand All @@ -125,16 +172,13 @@ public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
"Unable to create urn from namespace: {} and dataset {}.", namespace, datasetName);
return Optional.empty();
}
} else {
datasetName = dataset.getName();
}
} catch (URISyntaxException e) {
log.warn("Unable to create URI from namespace: {} and dataset {}.", namespace, datasetName);
return Optional.empty();
}
} else {
platform = namespace;
datasetName = dataset.getName();
}

if (mappingConfig.getCommonDatasetPlatformInstance() != null) {
Expand Down Expand Up @@ -328,8 +372,15 @@ private static UpstreamLineage getFineGrainedLineage(
+ inputField.getField()
+ ")");
upstreamFields.add(datasetFieldUrn);
upstreams.add(
new Upstream().setDataset(urn.get()).setType(DatasetLineageType.TRANSFORMED));
if (upstreams.stream()
.noneMatch(
upstream ->
upstream.getDataset().toString().equals(urn.get().toString()))) {
upstreams.add(
new Upstream()
.setDataset(urn.get())
.setType(DatasetLineageType.TRANSFORMED));
}
}
});

Expand Down Expand Up @@ -675,7 +726,7 @@ private static void processJobInputs(
DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder();
builder.urn(datasetUrn.get());
if (datahubConf.isMaterializeDataset()) {
builder.schemaMetadata(getSchemaMetadata(input));
builder.schemaMetadata(getSchemaMetadata(input, datahubConf));
}
if (datahubConf.isCaptureColumnLevelLineage()) {
UpstreamLineage upstreamLineage = getFineGrainedLineage(input, datahubConf);
Expand Down Expand Up @@ -705,7 +756,7 @@ private static void processJobOutputs(
DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder();
builder.urn(datasetUrn.get());
if (datahubConf.isMaterializeDataset()) {
builder.schemaMetadata(getSchemaMetadata(output));
builder.schemaMetadata(getSchemaMetadata(output, datahubConf));
}
if (datahubConf.isCaptureColumnLevelLineage()) {
UpstreamLineage upstreamLineage = getFineGrainedLineage(output, datahubConf);
Expand Down Expand Up @@ -836,7 +887,8 @@ public static SchemaFieldDataType.Type convertOlFieldTypeToDHFieldType(
}
}

public static SchemaMetadata getSchemaMetadata(OpenLineage.Dataset dataset) {
public static SchemaMetadata getSchemaMetadata(
OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig) {
SchemaFieldArray schemaFieldArray = new SchemaFieldArray();
if ((dataset.getFacets() == null) || (dataset.getFacets().getSchema() == null)) {
return null;
Expand Down Expand Up @@ -865,9 +917,16 @@ public static SchemaMetadata getSchemaMetadata(OpenLineage.Dataset dataset) {
ddl.setTableSchema(OpenLineageClientUtils.toJson(dataset.getFacets().getSchema().getFields()));
SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
platformSchema.setMySqlDDL(ddl);
Optional<DatasetUrn> datasetUrn =
getDatasetUrnFromOlDataset(dataset.getNamespace(), dataset.getName(), mappingConfig);

if (!datasetUrn.isPresent()) {
return null;
}

schemaMetadata.setPlatformSchema(platformSchema);

schemaMetadata.setPlatform(new DataPlatformUrn(dataset.getNamespace()));
schemaMetadata.setPlatform(datasetUrn.get().getPlatformEntity());

schemaMetadata.setFields(schemaFieldArray);
return schemaMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ private void generateDataJobInputOutputMcp(
MetadataChangeProposal dataJobInputOutputMcp = dataJobInputOutputPatchBuilder.build();
log.info(
"dataJobInputOutputMcp: {}",
dataJobInputOutputMcp.getAspect().getValue().asString(Charset.defaultCharset()));
Objects.requireNonNull(dataJobInputOutputMcp.getAspect())
.getValue()
.asString(Charset.defaultCharset()));
mcps.add(dataJobInputOutputPatchBuilder.build());

} else {
Expand Down Expand Up @@ -292,9 +294,9 @@ private Pair<UrnArray, EdgeArray> processDownstreams(
"upstreamLineagePatch: {}",
mcp.getAspect().getValue().asString(Charset.defaultCharset()));
mcps.add(mcp);
} else {
addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps);
}
} else {
addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ public static HdfsPathDataset create(URI path, DatahubOpenlineageConfig datahubC

String rawName = getRawNameFromUri(pathUri, pathSpec.getPathSpecList());
if (rawName != null) {
String platformInstance =
pathSpec.platformInstance.orElseGet(datahubConf::getCommonDatasetPlatformInstance);
String platformInstance = pathSpec.platformInstance.orElse(null);
FabricType fabricType = datahubConf.getFabricType();
return new HdfsPathDataset(
platform, getDatasetName(rawName), platformInstance, fabricType, rawName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testPathSpecListPlatformInstance()
SparkDataset dataset =
HdfsPathDataset.create(new URI("s3a://my-bucket/foo/tests/bar.avro"), datahubConfig);
Assert.assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:s3,instance.my-bucket/foo/tests,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/foo/tests,PROD)",
dataset.urn().toString());
}

Expand Down
6 changes: 3 additions & 3 deletions metadata-integration/java/spark-lineage-beta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ information like tokens.
| spark.datahub.rest.rest.max_retries | | 0 | Number of times a request retried if failed |
| spark.datahub.rest.rest.retry_interval | | 10 | Number of seconds to wait between retries |
| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
| spark.datahub.metadata.table.hive_platform_alias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` |
| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions |
Expand All @@ -181,8 +181,8 @@ information like tokens.
| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` |
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | true | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled.
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. |
|

## What to Expect: The Metadata Model
Expand Down
1 change: 1 addition & 0 deletions metadata-integration/java/spark-lineage-beta/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion"
compileOnly "org.apache.iceberg:iceberg-spark3-runtime:0.12.1"
compileOnly "org.apache.spark:spark-sql_2.12:3.1.3"
compileOnly "io.github.spark-redshift-community:spark-redshift_2.12:6.2.0-spark_3.5"

testCompileOnly externalDependency.lombok
testAnnotationProcessor externalDependency.lombok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public List<MetadataChangeProposal> generateCoalescedMcps() {
AtomicLong maxEndTime = new AtomicLong();
_datahubJobs.forEach(
storedDatahubJob -> {
log.info("Merging job stored job {} to {}", storedDatahubJob, datahubJob);
log.info("Merging job stored job {} with {}", storedDatahubJob, datahubJob);
DataJobUrn jobUrn =
jobUrn(
storedDatahubJob.getFlowUrn(), storedDatahubJob.getFlowUrn().getFlowIdEntity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class SparkConfigParser {
public static final String STREAMING_HEARTBEAT = "streaming_heartbeat";
public static final String DATAHUB_FLOW_NAME = "flow_name";
public static final String DATASET_ENV_KEY = "metadata.dataset.env";
public static final String DATASET_HIVE_PLATFORM_ALIAS = "metadata.dataset.hivePlatformAlias";
public static final String DATASET_MATERIALIZE_KEY = "metadata.dataset.materialize";
public static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
public static final String DATASET_INCLUDE_SCHEMA_METADATA =
Expand Down Expand Up @@ -147,6 +148,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf(
}
builder.platformInstance(SparkConfigParser.getPlatformInstance(sparkConfig));
builder.commonDatasetPlatformInstance(SparkConfigParser.getCommonPlatformInstance(sparkConfig));
builder.hivePlatformAlias(SparkConfigParser.getHivePlatformAlias(sparkConfig));
builder.usePatch(SparkConfigParser.isPatchEnabled(sparkConfig));
try {
String parentJob = SparkConfigParser.getParentJobKey(sparkConfig);
Expand Down Expand Up @@ -174,6 +176,12 @@ public static FabricType getCommonFabricType(Config datahubConfig) {
return fabricType;
}

public static String getHivePlatformAlias(Config datahubConfig) {
return datahubConfig.hasPath(DATASET_HIVE_PLATFORM_ALIAS)
? datahubConfig.getString(DATASET_HIVE_PLATFORM_ALIAS)
: "hive";
}

public static String getCommonPlatformInstance(Config datahubConfig) {
return datahubConfig.hasPath(DATASET_PLATFORM_INSTANCE_KEY)
? datahubConfig.getString(DATASET_PLATFORM_INSTANCE_KEY)
Expand Down Expand Up @@ -307,7 +315,7 @@ public static boolean isCoalesceEnabled(Config datahubConfig) {

public static boolean isPatchEnabled(Config datahubConfig) {
if (!datahubConfig.hasPath(PATCH_ENABLED)) {
return true;
return false;
}
return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.spark.agent.lifecycle.plan;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.spark.agent.util.PathUtils;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.QueryPlanVisitor;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import scala.Option;

/**
* {@link LogicalPlan} visitor that matches an {@link InsertIntoHadoopFsRelationCommand} and
* extracts the output {@link OpenLineage.Dataset} being written.
*/
public class InsertIntoHadoopFsRelationVisitor
extends QueryPlanVisitor<InsertIntoHadoopFsRelationCommand, OpenLineage.OutputDataset> {

public InsertIntoHadoopFsRelationVisitor(OpenLineageContext context) {
super(context);
}

@Override
public List<OpenLineage.OutputDataset> apply(LogicalPlan x) {
InsertIntoHadoopFsRelationCommand command = (InsertIntoHadoopFsRelationCommand) x;

Option<CatalogTable> catalogTable = command.catalogTable();
OpenLineage.OutputDataset outputDataset;

if (catalogTable.isEmpty()) {
DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file");
if (SaveMode.Overwrite == command.mode()) {
outputDataset =
outputDataset()
.getDataset(
di,
command.query().schema(),
OpenLineage.LifecycleStateChangeDatasetFacet.LifecycleStateChange.OVERWRITE);
} else {
outputDataset = outputDataset().getDataset(di, command.query().schema());
}
return Collections.singletonList(outputDataset);
} else {
if (SaveMode.Overwrite == command.mode()) {
return Collections.singletonList(
outputDataset()
.getDataset(
PathUtils.fromCatalogTable(catalogTable.get()),
catalogTable.get().schema(),
OpenLineage.LifecycleStateChangeDatasetFacet.LifecycleStateChange.CREATE));
} else {
return Collections.singletonList(
outputDataset()
.getDataset(
PathUtils.fromCatalogTable(catalogTable.get()), catalogTable.get().schema()));
}
}
}

@Override
public Optional<String> jobNameSuffix(InsertIntoHadoopFsRelationCommand command) {
if (command.catalogTable().isEmpty()) {
DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file");
return Optional.of(trimPath(di.getName()));
}
return Optional.of(
trimPath(PathUtils.fromCatalogTable(command.catalogTable().get()).getName()));
}
}
Loading
Loading