diff --git a/build.gradle b/build.gradle index c6e14081c6147..36893dfd53ec1 100644 --- a/build.gradle +++ b/build.gradle @@ -54,7 +54,7 @@ buildscript { ext.hazelcastVersion = '5.3.6' ext.ebeanVersion = '12.16.1' ext.googleJavaFormatVersion = '1.18.1' - ext.openLineageVersion = '1.14.0' + ext.openLineageVersion = '1.16.0' ext.logbackClassicJava8 = '1.2.12' ext.docker_registry = 'acryldata' diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java index ed4cee060bd69..dd6a7ba98c87d 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java @@ -35,6 +35,7 @@ import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.config.TlsConfig; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; @@ -45,6 +46,7 @@ import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.http2.HttpVersionPolicy; import org.apache.hc.core5.ssl.SSLContexts; import org.apache.hc.core5.util.TimeValue; @@ -106,6 +108,14 @@ public RestEmitter(RestEmitterConfig config) { config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS) .build()); } + PoolingAsyncClientConnectionManagerBuilder poolingAsyncClientConnectionManagerBuilder = + PoolingAsyncClientConnectionManagerBuilder.create(); + + // Forcing http 1.x as 2.0 is not supported yet + TlsConfig tlsHttp1Config = + TlsConfig.copy(TlsConfig.DEFAULT).setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1).build(); + poolingAsyncClientConnectionManagerBuilder.setDefaultTlsConfig(tlsHttp1Config); + if (config.isDisableSslVerification()) { try { SSLContext sslcontext = @@ -115,15 +125,12 @@ public RestEmitter(RestEmitterConfig config) { .setSslContext(sslcontext) .setHostnameVerifier(NoopHostnameVerifier.INSTANCE) .build(); - - httpClientBuilder.setConnectionManager( - PoolingAsyncClientConnectionManagerBuilder.create() - .setTlsStrategy(tlsStrategy) - .build()); + poolingAsyncClientConnectionManagerBuilder.setTlsStrategy(tlsStrategy); } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) { throw new RuntimeException("Error while creating insecure http client", e); } } + httpClientBuilder.setConnectionManager(poolingAsyncClientConnectionManagerBuilder.build()); httpClientBuilder.setRetryStrategy( new DatahubHttpRequestRetryStrategy( diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java index 5e4b791fa8d82..6883999b48e53 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java @@ -28,6 +28,8 @@ public class DatahubOpenlineageConfig { @Builder.Default private final DataJobUrn parentJobUrn = null; // This is disabled until column level patch support won't be fixed in GMS @Builder.Default private final boolean usePatch = true; + @Builder.Default private String hivePlatformAlias = "hive"; + @Builder.Default private Map urnAliases = new HashMap<>(); public List getPathSpecsForPlatform(String platform) { if ((pathSpecs == null) || (pathSpecs.isEmpty())) { diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java index 1db09306cbdc2..03132cfdcda72 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java @@ -98,9 +98,56 @@ private OpenLineageToDataHub() {} public static Optional convertOpenlineageDatasetToDatasetUrn( OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig) { + String namespace = dataset.getNamespace(); String datasetName = dataset.getName(); + Optional datahubUrn; + if (dataset.getFacets() != null && dataset.getFacets().getSymlinks() != null) { + Optional originalUrn = + getDatasetUrnFromOlDataset(namespace, datasetName, mappingConfig); + for (OpenLineage.SymlinksDatasetFacetIdentifiers symlink : + dataset.getFacets().getSymlinks().getIdentifiers()) { + if (symlink.getType().equals("TABLE")) { + if (symlink.getNamespace().startsWith("aws:glue:")) { + namespace = "glue"; + } else { + namespace = mappingConfig.getHivePlatformAlias(); + } + datasetName = symlink.getName(); + } + } + Optional symlinkedUrn = + getDatasetUrnFromOlDataset(namespace, datasetName, mappingConfig); + if (symlinkedUrn.isPresent() && originalUrn.isPresent()) { + mappingConfig + .getUrnAliases() + .put(originalUrn.get().toString(), symlinkedUrn.get().toString()); + } + datahubUrn = symlinkedUrn; + } else { + datahubUrn = getDatasetUrnFromOlDataset(namespace, datasetName, mappingConfig); + } + log.debug("Dataset URN: {}, alias_list: {}", datahubUrn, mappingConfig.getUrnAliases()); + // If we have the urn in urn aliases then we should use the alias instead of the original urn + if (datahubUrn.isPresent() + && mappingConfig.getUrnAliases().containsKey(datahubUrn.get().toString())) { + try { + datahubUrn = + Optional.of( + DatasetUrn.createFromString( + mappingConfig.getUrnAliases().get(datahubUrn.get().toString()))); + return datahubUrn; + } catch (URISyntaxException e) { + return Optional.empty(); + } + } + + return datahubUrn; + } + + private static Optional getDatasetUrnFromOlDataset( + String namespace, String datasetName, DatahubOpenlineageConfig mappingConfig) { String platform; if (namespace.contains(SCHEME_SEPARATOR)) { try { @@ -115,8 +162,8 @@ public static Optional convertOpenlineageDatasetToDatasetUrn( } else { platform = datasetUri.getScheme(); } - datasetName = datasetUri.getPath(); if (HdfsPlatform.isFsPlatformPrefix(platform)) { + datasetName = datasetUri.getPath(); try { HdfsPathDataset hdfsPathDataset = HdfsPathDataset.create(datasetUri, mappingConfig); return Optional.of(hdfsPathDataset.urn()); @@ -125,8 +172,6 @@ public static Optional convertOpenlineageDatasetToDatasetUrn( "Unable to create urn from namespace: {} and dataset {}.", namespace, datasetName); return Optional.empty(); } - } else { - datasetName = dataset.getName(); } } catch (URISyntaxException e) { log.warn("Unable to create URI from namespace: {} and dataset {}.", namespace, datasetName); @@ -134,7 +179,6 @@ public static Optional convertOpenlineageDatasetToDatasetUrn( } } else { platform = namespace; - datasetName = dataset.getName(); } if (mappingConfig.getCommonDatasetPlatformInstance() != null) { @@ -328,8 +372,15 @@ private static UpstreamLineage getFineGrainedLineage( + inputField.getField() + ")"); upstreamFields.add(datasetFieldUrn); - upstreams.add( - new Upstream().setDataset(urn.get()).setType(DatasetLineageType.TRANSFORMED)); + if (upstreams.stream() + .noneMatch( + upstream -> + upstream.getDataset().toString().equals(urn.get().toString()))) { + upstreams.add( + new Upstream() + .setDataset(urn.get()) + .setType(DatasetLineageType.TRANSFORMED)); + } } }); @@ -675,7 +726,7 @@ private static void processJobInputs( DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder(); builder.urn(datasetUrn.get()); if (datahubConf.isMaterializeDataset()) { - builder.schemaMetadata(getSchemaMetadata(input)); + builder.schemaMetadata(getSchemaMetadata(input, datahubConf)); } if (datahubConf.isCaptureColumnLevelLineage()) { UpstreamLineage upstreamLineage = getFineGrainedLineage(input, datahubConf); @@ -705,7 +756,7 @@ private static void processJobOutputs( DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder(); builder.urn(datasetUrn.get()); if (datahubConf.isMaterializeDataset()) { - builder.schemaMetadata(getSchemaMetadata(output)); + builder.schemaMetadata(getSchemaMetadata(output, datahubConf)); } if (datahubConf.isCaptureColumnLevelLineage()) { UpstreamLineage upstreamLineage = getFineGrainedLineage(output, datahubConf); @@ -836,7 +887,8 @@ public static SchemaFieldDataType.Type convertOlFieldTypeToDHFieldType( } } - public static SchemaMetadata getSchemaMetadata(OpenLineage.Dataset dataset) { + public static SchemaMetadata getSchemaMetadata( + OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig) { SchemaFieldArray schemaFieldArray = new SchemaFieldArray(); if ((dataset.getFacets() == null) || (dataset.getFacets().getSchema() == null)) { return null; @@ -865,9 +917,16 @@ public static SchemaMetadata getSchemaMetadata(OpenLineage.Dataset dataset) { ddl.setTableSchema(OpenLineageClientUtils.toJson(dataset.getFacets().getSchema().getFields())); SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema(); platformSchema.setMySqlDDL(ddl); + Optional datasetUrn = + getDatasetUrnFromOlDataset(dataset.getNamespace(), dataset.getName(), mappingConfig); + + if (!datasetUrn.isPresent()) { + return null; + } + schemaMetadata.setPlatformSchema(platformSchema); - schemaMetadata.setPlatform(new DataPlatformUrn(dataset.getNamespace())); + schemaMetadata.setPlatform(datasetUrn.get().getPlatformEntity()); schemaMetadata.setFields(schemaFieldArray); return schemaMetadata; diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java index 5f4a9b6a596e7..85eaeb445d7bd 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java @@ -189,7 +189,9 @@ private void generateDataJobInputOutputMcp( MetadataChangeProposal dataJobInputOutputMcp = dataJobInputOutputPatchBuilder.build(); log.info( "dataJobInputOutputMcp: {}", - dataJobInputOutputMcp.getAspect().getValue().asString(Charset.defaultCharset())); + Objects.requireNonNull(dataJobInputOutputMcp.getAspect()) + .getValue() + .asString(Charset.defaultCharset())); mcps.add(dataJobInputOutputPatchBuilder.build()); } else { @@ -292,9 +294,9 @@ private Pair processDownstreams( "upstreamLineagePatch: {}", mcp.getAspect().getValue().asString(Charset.defaultCharset())); mcps.add(mcp); - } else { - addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps); } + } else { + addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps); } } }); diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/HdfsPathDataset.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/HdfsPathDataset.java index 0d0868afedfd9..b938db24fc626 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/HdfsPathDataset.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/HdfsPathDataset.java @@ -69,8 +69,7 @@ public static HdfsPathDataset create(URI path, DatahubOpenlineageConfig datahubC String rawName = getRawNameFromUri(pathUri, pathSpec.getPathSpecList()); if (rawName != null) { - String platformInstance = - pathSpec.platformInstance.orElseGet(datahubConf::getCommonDatasetPlatformInstance); + String platformInstance = pathSpec.platformInstance.orElse(null); FabricType fabricType = datahubConf.getFabricType(); return new HdfsPathDataset( platform, getDatasetName(rawName), platformInstance, fabricType, rawName); diff --git a/metadata-integration/java/openlineage-converter/src/test/java/io/datahubproject/openlineage/HdfsPathDatasetTest.java b/metadata-integration/java/openlineage-converter/src/test/java/io/datahubproject/openlineage/HdfsPathDatasetTest.java index e8981aeb0be59..3693ddc15e6f0 100644 --- a/metadata-integration/java/openlineage-converter/src/test/java/io/datahubproject/openlineage/HdfsPathDatasetTest.java +++ b/metadata-integration/java/openlineage-converter/src/test/java/io/datahubproject/openlineage/HdfsPathDatasetTest.java @@ -137,7 +137,7 @@ public void testPathSpecListPlatformInstance() SparkDataset dataset = HdfsPathDataset.create(new URI("s3a://my-bucket/foo/tests/bar.avro"), datahubConfig); Assert.assertEquals( - "urn:li:dataset:(urn:li:dataPlatform:s3,instance.my-bucket/foo/tests,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/foo/tests,PROD)", dataset.urn().toString()); } diff --git a/metadata-integration/java/spark-lineage-beta/README.md b/metadata-integration/java/spark-lineage-beta/README.md index 7b3598453498f..ca966b08bc19a 100644 --- a/metadata-integration/java/spark-lineage-beta/README.md +++ b/metadata-integration/java/spark-lineage-beta/README.md @@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co ```text #Configuring DataHub spark agent jar -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.10 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server http://localhost:8080 ``` @@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080 ## spark-submit command line ```sh -spark-submit --packages io.acryl:acryl-spark-lineage:0.2.6 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py +spark-submit --packages io.acryl:acryl-spark-lineage:0.2.10 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py ``` ### Configuration Instructions: Amazon EMR @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html) ```text -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.10 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server https://your_datahub_host/gms #If you have authentication set up then you also need to specify the Datahub access token @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh spark = SparkSession.builder .master("spark://spark-master:7077") .appName("test-application") -.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.6") +.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.10") .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.rest.server", "http://localhost:8080") .enableHiveSupport() @@ -79,7 +79,7 @@ appName("test-application") config("spark.master","spark://spark-master:7077") . -config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.6") +config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.10") . config("spark.extraListeners","datahub.spark.DatahubSparkListener") @@ -167,7 +167,7 @@ information like tokens. | spark.datahub.rest.rest.max_retries | | 0 | Number of times a request retried if failed | | spark.datahub.rest.rest.retry_interval | | 10 | Number of seconds to wait between retries | | spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance | -| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance | +| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) | | spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD | | spark.datahub.metadata.table.hive_platform_alias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` | | spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions | @@ -181,8 +181,8 @@ information like tokens. | spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | | spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | | spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | -| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks. You should enable this on Databricks if you want coalesced run . | -| spark.datahub.patch.enabled | | true | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. +| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run . | +| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. | | ## What to Expect: The Metadata Model diff --git a/metadata-integration/java/spark-lineage-beta/build.gradle b/metadata-integration/java/spark-lineage-beta/build.gradle index d83753028d0b4..99b87b9b89bf4 100644 --- a/metadata-integration/java/spark-lineage-beta/build.gradle +++ b/metadata-integration/java/spark-lineage-beta/build.gradle @@ -56,6 +56,7 @@ dependencies { implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion" compileOnly "org.apache.iceberg:iceberg-spark3-runtime:0.12.1" compileOnly "org.apache.spark:spark-sql_2.12:3.1.3" + compileOnly "io.github.spark-redshift-community:spark-redshift_2.12:6.2.0-spark_3.5" testCompileOnly externalDependency.lombok testAnnotationProcessor externalDependency.lombok diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java index 1dc086e4af585..5a3f4bd27b415 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java @@ -176,7 +176,7 @@ public List generateCoalescedMcps() { AtomicLong maxEndTime = new AtomicLong(); _datahubJobs.forEach( storedDatahubJob -> { - log.info("Merging job stored job {} to {}", storedDatahubJob, datahubJob); + log.info("Merging job stored job {} with {}", storedDatahubJob, datahubJob); DataJobUrn jobUrn = jobUrn( storedDatahubJob.getFlowUrn(), storedDatahubJob.getFlowUrn().getFlowIdEntity()); diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java index d8da5d95935c9..7834d100912f9 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -40,6 +40,7 @@ public class SparkConfigParser { public static final String STREAMING_HEARTBEAT = "streaming_heartbeat"; public static final String DATAHUB_FLOW_NAME = "flow_name"; public static final String DATASET_ENV_KEY = "metadata.dataset.env"; + public static final String DATASET_HIVE_PLATFORM_ALIAS = "metadata.dataset.hivePlatformAlias"; public static final String DATASET_MATERIALIZE_KEY = "metadata.dataset.materialize"; public static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance"; public static final String DATASET_INCLUDE_SCHEMA_METADATA = @@ -147,6 +148,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf( } builder.platformInstance(SparkConfigParser.getPlatformInstance(sparkConfig)); builder.commonDatasetPlatformInstance(SparkConfigParser.getCommonPlatformInstance(sparkConfig)); + builder.hivePlatformAlias(SparkConfigParser.getHivePlatformAlias(sparkConfig)); builder.usePatch(SparkConfigParser.isPatchEnabled(sparkConfig)); try { String parentJob = SparkConfigParser.getParentJobKey(sparkConfig); @@ -174,6 +176,12 @@ public static FabricType getCommonFabricType(Config datahubConfig) { return fabricType; } + public static String getHivePlatformAlias(Config datahubConfig) { + return datahubConfig.hasPath(DATASET_HIVE_PLATFORM_ALIAS) + ? datahubConfig.getString(DATASET_HIVE_PLATFORM_ALIAS) + : "hive"; + } + public static String getCommonPlatformInstance(Config datahubConfig) { return datahubConfig.hasPath(DATASET_PLATFORM_INSTANCE_KEY) ? datahubConfig.getString(DATASET_PLATFORM_INSTANCE_KEY) @@ -307,7 +315,7 @@ public static boolean isCoalesceEnabled(Config datahubConfig) { public static boolean isPatchEnabled(Config datahubConfig) { if (!datahubConfig.hasPath(PATCH_ENABLED)) { - return true; + return false; } return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED); } diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/plan/InsertIntoHadoopFsRelationVisitor.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/plan/InsertIntoHadoopFsRelationVisitor.java new file mode 100644 index 0000000000000..1e5bc72967e68 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/plan/InsertIntoHadoopFsRelationVisitor.java @@ -0,0 +1,79 @@ +/* +/* Copyright 2018-2024 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark.agent.lifecycle.plan; + +import io.openlineage.client.OpenLineage; +import io.openlineage.client.utils.DatasetIdentifier; +import io.openlineage.spark.agent.util.PathUtils; +import io.openlineage.spark.api.OpenLineageContext; +import io.openlineage.spark.api.QueryPlanVisitor; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand; +import scala.Option; + +/** + * {@link LogicalPlan} visitor that matches an {@link InsertIntoHadoopFsRelationCommand} and + * extracts the output {@link OpenLineage.Dataset} being written. + */ +public class InsertIntoHadoopFsRelationVisitor + extends QueryPlanVisitor { + + public InsertIntoHadoopFsRelationVisitor(OpenLineageContext context) { + super(context); + } + + @Override + public List apply(LogicalPlan x) { + InsertIntoHadoopFsRelationCommand command = (InsertIntoHadoopFsRelationCommand) x; + + Option catalogTable = command.catalogTable(); + OpenLineage.OutputDataset outputDataset; + + if (catalogTable.isEmpty()) { + DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file"); + if (SaveMode.Overwrite == command.mode()) { + outputDataset = + outputDataset() + .getDataset( + di, + command.query().schema(), + OpenLineage.LifecycleStateChangeDatasetFacet.LifecycleStateChange.OVERWRITE); + } else { + outputDataset = outputDataset().getDataset(di, command.query().schema()); + } + return Collections.singletonList(outputDataset); + } else { + if (SaveMode.Overwrite == command.mode()) { + return Collections.singletonList( + outputDataset() + .getDataset( + PathUtils.fromCatalogTable(catalogTable.get()), + catalogTable.get().schema(), + OpenLineage.LifecycleStateChangeDatasetFacet.LifecycleStateChange.CREATE)); + } else { + return Collections.singletonList( + outputDataset() + .getDataset( + PathUtils.fromCatalogTable(catalogTable.get()), catalogTable.get().schema())); + } + } + } + + @Override + public Optional jobNameSuffix(InsertIntoHadoopFsRelationCommand command) { + if (command.catalogTable().isEmpty()) { + DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file"); + return Optional.of(trimPath(di.getName())); + } + return Optional.of( + trimPath(PathUtils.fromCatalogTable(command.catalogTable().get()).getName())); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java index c44dacf8ff3be..841298ab0e037 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java @@ -18,6 +18,7 @@ import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -72,7 +73,7 @@ public static List removeOutputsPathPattern( .map( dataset -> { String newName = removePathPattern(dataset.getName()); - if (newName != dataset.getName()) { + if (!Objects.equals(newName, dataset.getName())) { return context .getOpenLineage() .newOutputDatasetBuilder() @@ -95,7 +96,7 @@ public static List removeInputsPathPattern( .map( dataset -> { String newName = removePathPattern(dataset.getName()); - if (newName != dataset.getName()) { + if (!Objects.equals(newName, dataset.getName())) { return context .getOpenLineage() .newInputDatasetBuilder() @@ -112,8 +113,8 @@ public static List removeInputsPathPattern( } private static Optional getPattern(OpenLineageContext context) { - return Optional.ofNullable(context.getSparkContext()) - .map(sparkContext -> sparkContext.conf()) + return Optional.of(context.getSparkContext()) + .map(sparkContext -> sparkContext.get().conf()) .filter(conf -> conf.contains(SPARK_OPENLINEAGE_DATASET_REMOVE_PATH_PATTERN)) .map(conf -> conf.get(SPARK_OPENLINEAGE_DATASET_REMOVE_PATH_PATTERN)) .map(pattern -> Pattern.compile(pattern)); diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/Constants.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/Constants.java new file mode 100644 index 0000000000000..717525e4a9aaf --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/Constants.java @@ -0,0 +1,9 @@ +package io.openlineage.spark.agent.vendor.redshift; + +public class Constants { + public static final String REDSHIFT_CLASS_NAME = + "io.github.spark_redshift_community.spark.redshift.RedshiftRelation"; + + public static final String REDSHIFT_PROVIDER_CLASS_NAME = + "io.github.spark_redshift_community.spark.redshift.DefaultSource"; +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/RedshiftVendor.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/RedshiftVendor.java new file mode 100644 index 0000000000000..6f0fceb9c4c4a --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/RedshiftVendor.java @@ -0,0 +1,56 @@ +package io.openlineage.spark.agent.vendor.redshift; + +import static io.openlineage.spark.agent.vendor.redshift.Constants.*; + +import io.openlineage.spark.agent.lifecycle.VisitorFactory; +import io.openlineage.spark.agent.vendor.redshift.lifecycle.RedshiftRelationVisitor; +import io.openlineage.spark.agent.vendor.redshift.lifecycle.plan.RedshiftEventHandlerFactory; +import io.openlineage.spark.agent.vendor.snowflake.lifecycle.SnowflakeVisitorFactory; +import io.openlineage.spark.api.OpenLineageEventHandlerFactory; +import io.openlineage.spark.api.Vendor; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RedshiftVendor implements Vendor { + + public static boolean hasRedshiftClasses() { + /* + Checking the Redshift class with both + SnowflakeRelationVisitor.class.getClassLoader.loadClass and + Thread.currentThread().getContextClassLoader().loadClass. The first checks if the class is + present on the classpath, and the second one is a catchall which captures if the class has + been installed. This is relevant for Azure Databricks where jars can be installed and + accessible to the user, even if they are not present on the classpath. + */ + try { + RedshiftRelationVisitor.class.getClassLoader().loadClass(REDSHIFT_PROVIDER_CLASS_NAME); + return true; + } catch (Exception e) { + // swallow - we don't care + } + try { + Thread.currentThread().getContextClassLoader().loadClass(REDSHIFT_PROVIDER_CLASS_NAME); + return true; + } catch (Exception e) { + // swallow - we don't care + } + return false; + } + + @Override + public boolean isVendorAvailable() { + log.info("Checking if Redshift classes are available"); + return hasRedshiftClasses(); + } + + @Override + public Optional getVisitorFactory() { + return Optional.of(new SnowflakeVisitorFactory()); + } + + @Override + public Optional getEventHandlerFactory() { + return Optional.of(new RedshiftEventHandlerFactory()); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftDataset.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftDataset.java new file mode 100644 index 0000000000000..0e7fd4ce942bd --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftDataset.java @@ -0,0 +1,72 @@ +package io.openlineage.spark.agent.vendor.redshift.lifecycle; + +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.agent.util.SqlUtils; +import io.openlineage.spark.api.DatasetFactory; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Slf4j +public class RedshiftDataset { + public static final String REDSHIFT_PREFIX = "redshift://"; + + private static final Logger logger = LoggerFactory.getLogger(RedshiftDataset.class); + public static final String DEFAULT_SCHEMA = "public"; + + public static List getDatasets( + DatasetFactory factory, + String url, + Optional dbtable, + Optional query, + StructType schema) + throws URISyntaxException { + + URI jdbcUrl = + new URI( + REDSHIFT_PREFIX + + url.replace("jdbc:redshift:iam://", "").replace("jdbc:redshift://", "")); + String db = jdbcUrl.getPath().substring(1); // remove leading slash + final String namespace = + jdbcUrl.getScheme() + "://" + jdbcUrl.getHost() + ":" + jdbcUrl.getPort(); + + final String tableName; + // https://github.com/databricks/spark-redshift?tab=readme-ov-file + // > Specify one of the following options for the table data to be read: + // > - `dbtable`: The name of the table to be read. All columns and records are retrieved + // > (i.e. it is equivalent to SELECT * FROM db_table). + // > - `query`: The exact query (SELECT statement) to run. + // If dbtable is null it will be replaced with the string `complex` and it means the query + // option was used. + // An improvement could be put the query string in the `DatasetFacets` + if (dbtable.isPresent()) { + tableName = dbtable.get(); + String[] splits = tableName.split("\\."); + String table = tableName; + if (splits.length == 1) { + table = String.format("%s.%s.%s", db, DEFAULT_SCHEMA, tableName); + } else if (splits.length == 2) { + table = String.format("%s.%s", db, tableName); + } else if (splits.length == 3) { + table = tableName; + } else { + logger.warn("Redshift getDataset: tableName: {} is not in the expected format", tableName); + return Collections.emptyList(); + } + + return Collections.singletonList(factory.getDataset(table, namespace, schema)); + } else if (query.isPresent()) { + return SqlUtils.getDatasets(factory, query.get(), "redshift", namespace, db, DEFAULT_SCHEMA); + } else { + logger.warn( + "Unable to discover Redshift table property - neither \"dbtable\" nor \"query\" option present"); + } + return Collections.emptyList(); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftRelationVisitor.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftRelationVisitor.java new file mode 100644 index 0000000000000..02eb2d1745558 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftRelationVisitor.java @@ -0,0 +1,66 @@ +package io.openlineage.spark.agent.vendor.redshift.lifecycle; + +import io.github.spark_redshift_community.spark.redshift.Parameters; +import io.github.spark_redshift_community.spark.redshift.RedshiftRelation; +import io.github.spark_redshift_community.spark.redshift.TableName; +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.agent.util.ScalaConversionUtils; +import io.openlineage.spark.api.DatasetFactory; +import io.openlineage.spark.api.OpenLineageContext; +import io.openlineage.spark.api.QueryPlanVisitor; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.LogicalRelation; +import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand; + +/** + * {@link LogicalPlan} visitor that matches {@link SaveIntoDataSourceCommand}s that use a {@link + * RedshiftRelation}. This function extracts a {@link OpenLineage.Dataset} from the Redshift table + * referenced by the relation. + */ +@Slf4j +public class RedshiftRelationVisitor + extends QueryPlanVisitor { + private static final String REDSHIFT_NAMESPACE = "redshift"; + private static final String REDSHIFT_CLASS_NAME = + "io.github.spark_redshift_community.spark.redshift.RedshiftRelation"; + private final DatasetFactory factory; + + public RedshiftRelationVisitor(@NonNull OpenLineageContext context, DatasetFactory factory) { + super(context); + this.factory = factory; + log.info("RedshiftRelationVisitor created"); + } + + @Override + public List apply(LogicalPlan x) { + RedshiftRelation relation = (RedshiftRelation) ((LogicalRelation) x).relation(); + Parameters.MergedParameters params = relation.params(); + Optional dbtable = + (Optional) + ScalaConversionUtils.asJavaOptional(params.table().map(TableName::toString)); + Optional query = ScalaConversionUtils.asJavaOptional(params.query()); + return Collections.singletonList( + factory.getDataset(dbtable.orElse(""), REDSHIFT_NAMESPACE, relation.schema())); + } + + protected boolean isRedshiftClass(LogicalPlan plan) { + try { + Class c = Thread.currentThread().getContextClassLoader().loadClass(REDSHIFT_CLASS_NAME); + return (plan instanceof LogicalRelation + && c.isAssignableFrom(((LogicalRelation) plan).relation().getClass())); + } catch (Exception e) { + // swallow - not a snowflake class + } + return false; + } + + @Override + public boolean isDefinedAt(LogicalPlan plan) { + return isRedshiftClass(plan); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftVisitorFactory.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftVisitorFactory.java new file mode 100644 index 0000000000000..1003c863227b8 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftVisitorFactory.java @@ -0,0 +1,26 @@ +package io.openlineage.spark.agent.vendor.redshift.lifecycle; + +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.agent.lifecycle.VisitorFactory; +import io.openlineage.spark.api.DatasetFactory; +import io.openlineage.spark.api.OpenLineageContext; +import java.util.Collections; +import java.util.List; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import scala.PartialFunction; + +public class RedshiftVisitorFactory implements VisitorFactory { + @Override + public List>> getInputVisitors( + OpenLineageContext context) { + DatasetFactory factory = DatasetFactory.input(context); + return Collections.singletonList(new RedshiftRelationVisitor<>(context, factory)); + } + + @Override + public List>> getOutputVisitors( + OpenLineageContext context) { + DatasetFactory factory = DatasetFactory.output(context); + return Collections.singletonList(new RedshiftRelationVisitor<>(context, factory)); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftEventHandlerFactory.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftEventHandlerFactory.java new file mode 100644 index 0000000000000..4cd1ba996fe88 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftEventHandlerFactory.java @@ -0,0 +1,20 @@ +package io.openlineage.spark.agent.vendor.redshift.lifecycle.plan; + +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.api.OpenLineageContext; +import io.openlineage.spark.api.OpenLineageEventHandlerFactory; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import scala.PartialFunction; + +public class RedshiftEventHandlerFactory implements OpenLineageEventHandlerFactory { + @Override + public Collection>> + createOutputDatasetBuilder(OpenLineageContext context) { + // The right function will be determined at runtime by using type checking based on the correct + // Spark LogicalPlan + return Collections.singleton( + (PartialFunction) new RedshiftSaveIntoDataSourceCommandBuilder(context)); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftSaveIntoDataSourceCommandBuilder.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftSaveIntoDataSourceCommandBuilder.java new file mode 100644 index 0000000000000..e484458d40aeb --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftSaveIntoDataSourceCommandBuilder.java @@ -0,0 +1,80 @@ +package io.openlineage.spark.agent.vendor.redshift.lifecycle.plan; + +import static io.openlineage.spark.agent.vendor.redshift.RedshiftVendor.hasRedshiftClasses; + +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.agent.util.PlanUtils; +import io.openlineage.spark.agent.util.ScalaConversionUtils; +import io.openlineage.spark.agent.vendor.redshift.lifecycle.RedshiftDataset; +import io.openlineage.spark.api.AbstractQueryPlanDatasetBuilder; +import io.openlineage.spark.api.OpenLineageContext; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.scheduler.SparkListenerEvent; +import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand; +import org.apache.spark.sql.sources.CreatableRelationProvider; +import org.apache.spark.sql.types.StructType; + +@Slf4j +public class RedshiftSaveIntoDataSourceCommandBuilder + extends AbstractQueryPlanDatasetBuilder< + SparkListenerEvent, SaveIntoDataSourceCommand, OpenLineage.OutputDataset> { + + public RedshiftSaveIntoDataSourceCommandBuilder(OpenLineageContext context) { + super(context, false); + } + + @Override + public List apply(SaveIntoDataSourceCommand command) { + if (isRedshiftSource(command.dataSource())) { + // Called from SaveIntoDataSourceCommandVisitor on Snowflake write operations. + Map options = ScalaConversionUtils.fromMap(command.options()); + log.info("Redshift SaveIntoDataSourceCommand options: {}", options); + Optional dbtable = Optional.ofNullable(options.get("dbtable")); + Optional query = Optional.ofNullable(options.get("query")); + String url = options.get("url"); + + try { + return + // Similar to Kafka, Snowflake also has some special handling. So we use the method + // below for extracting the dataset from Snowflake write operations. + RedshiftDataset.getDatasets( + outputDataset(), url, dbtable, query, getSchema(command) + // command.schema() doesn't seem to contain the schema when tested with Azure + // Snowflake, + // so we use the helper to extract it from the logical plan. + ); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } else { + return Collections.emptyList(); + } + } + + public static boolean isRedshiftSource(CreatableRelationProvider provider) { + return hasRedshiftClasses(); // && provider instanceof DefaultSource; + } + + /** + * Taken from {@link + * io.openlineage.spark.agent.lifecycle.plan.SaveIntoDataSourceCommandVisitor#getSchema(org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand)} + * + * @param command + * @return + */ + private StructType getSchema(SaveIntoDataSourceCommand command) { + StructType schema = command.schema(); + if ((schema == null || schema.fields() == null || schema.fields().length == 0) + && command.query() != null + && command.query().output() != null) { + // get schema from logical plan's output + schema = PlanUtils.toStructType(ScalaConversionUtils.fromSeq(command.query().output())); + } + return schema; + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/Vendors.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/Vendors.java new file mode 100644 index 0000000000000..967935cb40468 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/Vendors.java @@ -0,0 +1,80 @@ +/* +/* Copyright 2018-2024 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark.api; + +import io.openlineage.spark.agent.lifecycle.VisitorFactory; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public interface Vendors { + + @SuppressWarnings("PMD.AvoidFieldNameMatchingTypeName") + List VENDORS = + Arrays.asList( + // Add vendor classes here + "io.openlineage.spark.agent.vendor.snowflake.SnowflakeVendor", + // This is the only chance we have to add the RedshiftVendor to the list of vendors + "io.openlineage.spark.agent.vendor.redshift.RedshiftVendor"); + + static Vendors getVendors() { + return getVendors(Collections.emptyList()); + } + + static Vendors getVendors(List additionalVendors) { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + + List vendors = + Stream.concat(VENDORS.stream(), additionalVendors.stream()) + .map( + vendorClassName -> { + try { + Class vendor = cl.loadClass(vendorClassName); + return (Vendor) vendor.newInstance(); + } catch (ClassNotFoundException + | InstantiationException + | IllegalAccessException e) { + return null; + } + }) + .filter(Objects::nonNull) + .filter(Vendor::isVendorAvailable) + .collect(Collectors.toList()); + // The main reason to avoid using the service loader and use static loading with the class name + // is to prevent potential missing loading caused by missing META-INF/services files. + // This can happen if the user packages the OpenLineage dependency in an Uber-jar without proper + // services file configuration + // The implementation with the ClassLoader and the list of vendor class names increase the + // coupling between the vendor + // and the app + // https://github.com/OpenLineage/OpenLineage/issues/1860 + // ServiceLoader serviceLoader = ServiceLoader.load(Vendor.class); + return new VendorsImpl(vendors); + } + + static Vendors empty() { + return new Vendors() { + + @Override + public Collection getVisitorFactories() { + return Collections.emptyList(); + } + + @Override + public Collection getEventHandlerFactories() { + return Collections.emptyList(); + } + }; + } + + Collection getVisitorFactories(); + + Collection getEventHandlerFactories(); +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/VendorsImpl.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/VendorsImpl.java new file mode 100644 index 0000000000000..66db4cf4f4e43 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/VendorsImpl.java @@ -0,0 +1,42 @@ +/* +/* Copyright 2018-2024 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark.api; + +import io.openlineage.spark.agent.lifecycle.VisitorFactory; +import io.openlineage.spark.agent.vendor.redshift.RedshiftVendor; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class VendorsImpl implements Vendors { + private final List vendors; + + public VendorsImpl(List vendors) { + this.vendors = vendors; + } + + @Override + public Collection getVisitorFactories() { + vendors.add(new RedshiftVendor()); + return vendors.stream() + .map(Vendor::getVisitorFactory) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + + @Override + public Collection getEventHandlerFactories() { + return vendors.stream() + .map(Vendor::getEventHandlerFactory) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/HdfsPathDatasetTest.java b/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/HdfsPathDatasetTest.java index bed4c197f9691..9d7637c6742b8 100644 --- a/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/HdfsPathDatasetTest.java +++ b/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/HdfsPathDatasetTest.java @@ -205,7 +205,7 @@ public void testPathSpecListPlatformInstance() HdfsPathDataset.create( new URI("s3a://my-bucket/foo/tests/bar.avro"), sparkLineageConf.getOpenLineageConf()); Assert.assertEquals( - "urn:li:dataset:(urn:li:dataPlatform:s3,instance.my-bucket/foo/tests,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/foo/tests,PROD)", dataset.urn().toString()); } diff --git a/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/SparkStreamingEventToDatahubTest.java b/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java similarity index 85% rename from metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/SparkStreamingEventToDatahubTest.java rename to metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java index 71c43cea408e6..1f6f1f2f28a43 100644 --- a/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/SparkStreamingEventToDatahubTest.java +++ b/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java @@ -26,7 +26,7 @@ import org.apache.commons.lang3.tuple.Triple; import org.junit.Assert; -public class SparkStreamingEventToDatahubTest extends TestCase { +public class OpenLineageEventToDatahubTest extends TestCase { public void testGenerateUrnFromStreamingDescriptionFile() throws URISyntaxException { Config datahubConfig = ConfigFactory.parseMap( @@ -483,4 +483,91 @@ public void testProcessOlEventWithSetDatasetFabricType() throws URISyntaxExcepti dataset.getUrn().toString()); } } + + public void testProcessGlueOlEvent() throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + + String olEvent = + IOUtils.toString( + this.getClass().getResourceAsStream("/ol_events/sample_glue.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:glue,my_glue_database.my_glue_table,DEV)", + dataset.getUrn().toString()); + } + for (DatahubDataset dataset : datahubJob.getOutSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:hive,my_glue_database.my_output_glue_table,DEV)", + dataset.getUrn().toString()); + } + } + + public void testProcessGlueOlEventWithHiveAlias() throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + builder.hivePlatformAlias("glue"); + + String olEvent = + IOUtils.toString( + this.getClass().getResourceAsStream("/ol_events/sample_glue.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:glue,my_glue_database.my_glue_table,DEV)", + dataset.getUrn().toString()); + } + for (DatahubDataset dataset : datahubJob.getOutSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:glue,my_glue_database.my_output_glue_table,DEV)", + dataset.getUrn().toString()); + } + } + + public void testProcessRedshiftOutput() throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + builder.hivePlatformAlias("glue"); + builder.materializeDataset(true); + builder.includeSchemaMetadata(true); + + String olEvent = + IOUtils.toString( + this.getClass().getResourceAsStream("/ol_events/redshift_lineage_spark.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,DEV)", + dataset.getUrn().toString()); + } + for (DatahubDataset dataset : datahubJob.getOutSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.spark_redshift_load_test,DEV)", + dataset.getUrn().toString()); + assertEquals( + dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift"); + } + } } diff --git a/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_lineage_spark.json b/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_lineage_spark.json new file mode 100644 index 0000000000000..3b11b28207636 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_lineage_spark.json @@ -0,0 +1,147 @@ +{ + "eventTime": "2024-06-18T06:52:21.64Z", + "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", + "eventType": "COMPLETE", + "run": { + "runId": "01902a1e-371a-7dbf-8098-2337d441e8dc", + "facets": { + "parent": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet", + "run": { + "runId": "01902a1e-0b05-750e-b38d-439998f7a853" + }, + "job": { + "namespace": "default", + "name": "jdbc_test_demo" + } + }, + "processing_engine": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet", + "version": "3.3.4", + "name": "spark" + }, + "environment-properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "environment-properties": {} + }, + "spark_properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "properties": { + "spark.master": "local[*]", + "spark.app.name": "JdbcTest-Demo" + } + } + } + }, + "job": { + "namespace": "default", + "name": "jdbc_test_demo.execute_save_into_data_source_command.spark_redshift_load_test", + "facets": { + "jobType": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", + "processingType": "BATCH", + "integration": "SPARK", + "jobType": "SQL_JOB" + } + } + }, + "inputs": [ + { + "namespace": "mysql://localhost:3306", + "name": "datahub.metadata_aspect_v2", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "mysql://localhost:3306", + "uri": "mysql://localhost:3306" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "urn", + "type": "string" + }, + { + "name": "aspect", + "type": "string" + }, + { + "name": "version", + "type": "long" + }, + { + "name": "metadata", + "type": "string" + }, + { + "name": "systemmetadata", + "type": "string" + }, + { + "name": "createdon", + "type": "timestamp" + }, + { + "name": "createdby", + "type": "string" + }, + { + "name": "createdfor", + "type": "string" + } + ] + } + }, + "inputFacets": {} + } + ], + "outputs": [ + { + "namespace": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439", + "name": "dev.public.spark_redshift_load_test", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439", + "uri": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "urn", + "type": "string" + } + ] + }, + "columnLineage": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet", + "fields": { + "urn": { + "inputFields": [ + { + "namespace": "mysql://localhost:3306", + "name": "datahub.metadata_aspect_v2", + "field": "urn" + } + ] + } + } + } + }, + "outputFacets": {} + } + ] +} \ No newline at end of file diff --git a/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/sample_glue.json b/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/sample_glue.json new file mode 100644 index 0000000000000..a1959ba37c0f3 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/sample_glue.json @@ -0,0 +1,168 @@ +{ + "eventTime": "2024-05-31T17:01:26.465Z", + "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", + "eventType": "START", + "run": { + "runId": "3ad2a5ec-1c8b-4bda-84f4-1492758af65c", + "facets": { + "parent": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet", + "run": { + "runId": "f03077f2-c077-4472-987e-b89b1c741c86" + }, + "job": { + "namespace": "default", + "name": "simple_app_parquet_with_persist_without_coalesce_s3_demo" + } + }, + "processing_engine": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet", + "version": "3.3.0-amzn-1", + "name": "spark" + }, + "environment-properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "environment-properties": {} + }, + "spark_properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "properties": { + "spark.master": "jes", + "spark.app.name": "SimpleAppParquetWithPersistWithoutCoalesceS3-Demo" + } + }, + "spark_version": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "spark-version": "3.3.0-amzn-1" + } + } + }, + "job": { + "namespace": "default", + "name": "simple_app_parquet_with_persist_without_coalesce_s3_demo.execute_insert_into_hadoop_fs_relation_command.sample_data_output", + "facets": { + "jobType": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", + "processingType": "BATCH", + "integration": "SPARK", + "jobType": "JOB" + } + } + }, + "inputs": [ + { + "namespace": "s3://my-bucket-test", + "name": "/sample_data/input_data.parquet", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "s3://my-bucket-test", + "uri": "s3://my-bucket-test" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "field_1", + "type": "integer" + }, + { + "name": "field_2", + "type": "string" + } + ] + }, + "symlinks": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", + "identifiers": [ + { + "namespace": "aws:glue:us-west-2:123456789012", + "name": "my_glue_database.my_glue_table", + "type": "TABLE" + } + ] + } + }, + "inputFacets": {} + } + ], + "outputs": [ + { + "namespace": "s3://my-bucket-test", + "name": "mydata _output", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "s3://my-bucket-test", + "uri": "s3://my-bucket-test" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "field1", + "type": "long" + }, + { + "name": "field2", + "type": "string" + } + ] + }, + "columnLineage": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet", + "fields": { + "field1": { + "inputFields": [ + { + "namespace": "s3://my-bucket-test", + "name": "/output/field1.parquet", + "field": "field1" + } + ] + }, + "field2": { + "inputFields": [ + { + "namespace": "s3://my-bucket-test", + "name": "/output/field2.parquet", + "field": "field2" + } + ] + } + } + }, + "symlinks": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", + "identifiers": [ + { + "namespace": "s3://my-bucket-test/my-warehouse/", + "name": "my_glue_database.my_output_glue_table", + "type": "TABLE" + } + ] + }, + "lifecycleStateChange": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet", + "lifecycleStateChange": "OVERWRITE" + } + }, + "outputFacets": {} + } + ] +} \ No newline at end of file