Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spark): Adding OpenLineage symlink support to Spark lineage #10637

Merged
merged 4 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.14.0'
ext.openLineageVersion = '1.16.0'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
Expand All @@ -45,6 +46,7 @@
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.TimeValue;

Expand Down Expand Up @@ -106,6 +108,14 @@ public RestEmitter(RestEmitterConfig config) {
config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS)
.build());
}
PoolingAsyncClientConnectionManagerBuilder poolingAsyncClientConnectionManagerBuilder =
PoolingAsyncClientConnectionManagerBuilder.create();

// Forcing http 1.x as 2.0 is not supported yet
TlsConfig tlsHttp1Config =
TlsConfig.copy(TlsConfig.DEFAULT).setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1).build();
poolingAsyncClientConnectionManagerBuilder.setDefaultTlsConfig(tlsHttp1Config);

if (config.isDisableSslVerification()) {
try {
SSLContext sslcontext =
Expand All @@ -115,15 +125,12 @@ public RestEmitter(RestEmitterConfig config) {
.setSslContext(sslcontext)
.setHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.build();

httpClientBuilder.setConnectionManager(
PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(tlsStrategy)
.build());
poolingAsyncClientConnectionManagerBuilder.setTlsStrategy(tlsStrategy);
} catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) {
throw new RuntimeException("Error while creating insecure http client", e);
}
}
httpClientBuilder.setConnectionManager(poolingAsyncClientConnectionManagerBuilder.build());

httpClientBuilder.setRetryStrategy(
new DatahubHttpRequestRetryStrategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class DatahubOpenlineageConfig {
@Builder.Default private final DataJobUrn parentJobUrn = null;
// This is disabled until column level patch support won't be fixed in GMS
@Builder.Default private final boolean usePatch = true;
@Builder.Default private String hivePlatformAlias = "hive";
@Builder.Default private Map<String, String> urnAliases = new HashMap<>();

public List<PathSpec> getPathSpecsForPlatform(String platform) {
if ((pathSpecs == null) || (pathSpecs.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,56 @@ private OpenLineageToDataHub() {}

public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig) {

String namespace = dataset.getNamespace();
String datasetName = dataset.getName();
Optional<DatasetUrn> datahubUrn;
if (dataset.getFacets() != null && dataset.getFacets().getSymlinks() != null) {
Optional<DatasetUrn> originalUrn =
getDatasetUrnFromOlDataset(namespace, datasetName, mappingConfig);
for (OpenLineage.SymlinksDatasetFacetIdentifiers symlink :
dataset.getFacets().getSymlinks().getIdentifiers()) {
if (symlink.getType().equals("TABLE")) {
if (symlink.getNamespace().startsWith("aws:glue:")) {
namespace = "glue";
} else {
namespace = mappingConfig.getHivePlatformAlias();
}
datasetName = symlink.getName();
}
}
Optional<DatasetUrn> symlinkedUrn =
getDatasetUrnFromOlDataset(namespace, datasetName, mappingConfig);
if (symlinkedUrn.isPresent() && originalUrn.isPresent()) {
mappingConfig
.getUrnAliases()
.put(originalUrn.get().toString(), symlinkedUrn.get().toString());
}
datahubUrn = symlinkedUrn;
} else {
datahubUrn = getDatasetUrnFromOlDataset(namespace, datasetName, mappingConfig);
}

log.debug("Dataset URN: {}, alias_list: {}", datahubUrn, mappingConfig.getUrnAliases());
// If we have the urn in urn aliases then we should use the alias instead of the original urn
if (datahubUrn.isPresent()
&& mappingConfig.getUrnAliases().containsKey(datahubUrn.get().toString())) {
try {
datahubUrn =
Optional.of(
DatasetUrn.createFromString(
mappingConfig.getUrnAliases().get(datahubUrn.get().toString())));
return datahubUrn;
} catch (URISyntaxException e) {
return Optional.empty();
}
}

return datahubUrn;
}

private static Optional<DatasetUrn> getDatasetUrnFromOlDataset(
String namespace, String datasetName, DatahubOpenlineageConfig mappingConfig) {
String platform;
if (namespace.contains(SCHEME_SEPARATOR)) {
try {
Expand All @@ -115,8 +162,8 @@ public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
} else {
platform = datasetUri.getScheme();
}
datasetName = datasetUri.getPath();
if (HdfsPlatform.isFsPlatformPrefix(platform)) {
datasetName = datasetUri.getPath();
try {
HdfsPathDataset hdfsPathDataset = HdfsPathDataset.create(datasetUri, mappingConfig);
return Optional.of(hdfsPathDataset.urn());
Expand All @@ -125,16 +172,13 @@ public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
"Unable to create urn from namespace: {} and dataset {}.", namespace, datasetName);
return Optional.empty();
}
} else {
datasetName = dataset.getName();
}
} catch (URISyntaxException e) {
log.warn("Unable to create URI from namespace: {} and dataset {}.", namespace, datasetName);
return Optional.empty();
}
} else {
platform = namespace;
datasetName = dataset.getName();
}

if (mappingConfig.getCommonDatasetPlatformInstance() != null) {
Expand Down Expand Up @@ -328,8 +372,15 @@ private static UpstreamLineage getFineGrainedLineage(
+ inputField.getField()
+ ")");
upstreamFields.add(datasetFieldUrn);
upstreams.add(
new Upstream().setDataset(urn.get()).setType(DatasetLineageType.TRANSFORMED));
if (upstreams.stream()
.noneMatch(
upstream ->
upstream.getDataset().toString().equals(urn.get().toString()))) {
upstreams.add(
new Upstream()
.setDataset(urn.get())
.setType(DatasetLineageType.TRANSFORMED));
}
}
});

Expand Down Expand Up @@ -675,7 +726,7 @@ private static void processJobInputs(
DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder();
builder.urn(datasetUrn.get());
if (datahubConf.isMaterializeDataset()) {
builder.schemaMetadata(getSchemaMetadata(input));
builder.schemaMetadata(getSchemaMetadata(input, datahubConf));
}
if (datahubConf.isCaptureColumnLevelLineage()) {
UpstreamLineage upstreamLineage = getFineGrainedLineage(input, datahubConf);
Expand Down Expand Up @@ -705,7 +756,7 @@ private static void processJobOutputs(
DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder();
builder.urn(datasetUrn.get());
if (datahubConf.isMaterializeDataset()) {
builder.schemaMetadata(getSchemaMetadata(output));
builder.schemaMetadata(getSchemaMetadata(output, datahubConf));
}
if (datahubConf.isCaptureColumnLevelLineage()) {
UpstreamLineage upstreamLineage = getFineGrainedLineage(output, datahubConf);
Expand Down Expand Up @@ -836,7 +887,8 @@ public static SchemaFieldDataType.Type convertOlFieldTypeToDHFieldType(
}
}

public static SchemaMetadata getSchemaMetadata(OpenLineage.Dataset dataset) {
public static SchemaMetadata getSchemaMetadata(
OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig) {
SchemaFieldArray schemaFieldArray = new SchemaFieldArray();
if ((dataset.getFacets() == null) || (dataset.getFacets().getSchema() == null)) {
return null;
Expand Down Expand Up @@ -865,9 +917,16 @@ public static SchemaMetadata getSchemaMetadata(OpenLineage.Dataset dataset) {
ddl.setTableSchema(OpenLineageClientUtils.toJson(dataset.getFacets().getSchema().getFields()));
SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
platformSchema.setMySqlDDL(ddl);
Optional<DatasetUrn> datasetUrn =
getDatasetUrnFromOlDataset(dataset.getNamespace(), dataset.getName(), mappingConfig);

if (!datasetUrn.isPresent()) {
return null;
}

schemaMetadata.setPlatformSchema(platformSchema);

schemaMetadata.setPlatform(new DataPlatformUrn(dataset.getNamespace()));
schemaMetadata.setPlatform(datasetUrn.get().getPlatformEntity());

schemaMetadata.setFields(schemaFieldArray);
return schemaMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ private void generateDataJobInputOutputMcp(
MetadataChangeProposal dataJobInputOutputMcp = dataJobInputOutputPatchBuilder.build();
log.info(
"dataJobInputOutputMcp: {}",
dataJobInputOutputMcp.getAspect().getValue().asString(Charset.defaultCharset()));
Objects.requireNonNull(dataJobInputOutputMcp.getAspect())
.getValue()
.asString(Charset.defaultCharset()));
mcps.add(dataJobInputOutputPatchBuilder.build());

} else {
Expand Down Expand Up @@ -292,9 +294,9 @@ private Pair<UrnArray, EdgeArray> processDownstreams(
"upstreamLineagePatch: {}",
mcp.getAspect().getValue().asString(Charset.defaultCharset()));
mcps.add(mcp);
} else {
addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps);
}
} else {
addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ public static HdfsPathDataset create(URI path, DatahubOpenlineageConfig datahubC

String rawName = getRawNameFromUri(pathUri, pathSpec.getPathSpecList());
if (rawName != null) {
String platformInstance =
pathSpec.platformInstance.orElseGet(datahubConf::getCommonDatasetPlatformInstance);
String platformInstance = pathSpec.platformInstance.orElse(null);
FabricType fabricType = datahubConf.getFabricType();
return new HdfsPathDataset(
platform, getDatasetName(rawName), platformInstance, fabricType, rawName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testPathSpecListPlatformInstance()
SparkDataset dataset =
HdfsPathDataset.create(new URI("s3a://my-bucket/foo/tests/bar.avro"), datahubConfig);
Assert.assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:s3,instance.my-bucket/foo/tests,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/foo/tests,PROD)",
dataset.urn().toString());
}

Expand Down
16 changes: 8 additions & 8 deletions metadata-integration/java/spark-lineage-beta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ When running jobs using spark-submit, the agent needs to be configured in the co

```text
#Configuring DataHub spark agent jar
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.10
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
```

## spark-submit command line

```sh
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.6 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.10 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
```

### Configuration Instructions: Amazon EMR
Expand All @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it
stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html)

```text
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.10
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
Expand All @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh
spark = SparkSession.builder
.master("spark://spark-master:7077")
.appName("test-application")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.6")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.10")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
Expand All @@ -79,7 +79,7 @@ appName("test-application")
config("spark.master","spark://spark-master:7077")
.

config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.6")
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.10")
.

config("spark.extraListeners","datahub.spark.DatahubSparkListener")
Expand Down Expand Up @@ -167,7 +167,7 @@ information like tokens.
| spark.datahub.rest.rest.max_retries | | 0 | Number of times a request retried if failed |
| spark.datahub.rest.rest.retry_interval | | 10 | Number of seconds to wait between retries |
| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
| spark.datahub.metadata.table.hive_platform_alias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` |
| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions |
Expand All @@ -181,8 +181,8 @@ information like tokens.
| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` |
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | true | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled.
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. |
|

## What to Expect: The Metadata Model
Expand Down
1 change: 1 addition & 0 deletions metadata-integration/java/spark-lineage-beta/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion"
compileOnly "org.apache.iceberg:iceberg-spark3-runtime:0.12.1"
compileOnly "org.apache.spark:spark-sql_2.12:3.1.3"
compileOnly "io.github.spark-redshift-community:spark-redshift_2.12:6.2.0-spark_3.5"

testCompileOnly externalDependency.lombok
testAnnotationProcessor externalDependency.lombok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public List<MetadataChangeProposal> generateCoalescedMcps() {
AtomicLong maxEndTime = new AtomicLong();
_datahubJobs.forEach(
storedDatahubJob -> {
log.info("Merging job stored job {} to {}", storedDatahubJob, datahubJob);
log.info("Merging job stored job {} with {}", storedDatahubJob, datahubJob);
DataJobUrn jobUrn =
jobUrn(
storedDatahubJob.getFlowUrn(), storedDatahubJob.getFlowUrn().getFlowIdEntity());
Expand Down
Loading
Loading