Skip to content

Commit

Permalink
Support OpenLineage in spark-3.x-bigquery connectors (#1212)
Browse files Browse the repository at this point in the history
Signed-off-by: Pahulpreet Singh <pahulpreets@google.com>
  • Loading branch information
codelixir authored and isha97 committed May 29, 2024
1 parent 617067e commit 051334c
Show file tree
Hide file tree
Showing 19 changed files with 374 additions and 7 deletions.
4 changes: 4 additions & 0 deletions spark-bigquery-connector-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.cloud.bigquery.connector.common.BigQueryConfigurationUtil.DEFAULT_FALLBACK;
import static com.google.cloud.bigquery.connector.common.BigQueryConfigurationUtil.getOptionFromMultipleParams;
import static scala.collection.JavaConverters.mapAsJavaMap;

import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.TableId;
Expand All @@ -31,6 +32,7 @@
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -43,11 +45,14 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructType;
import scala.Option;

/** Spark related utilities */
public class SparkBigQueryUtil {
Expand Down Expand Up @@ -289,4 +294,29 @@ public static ImmutableMap<String, String> extractJobLabels(SparkConf sparkConf)
BigQueryUtil.sanitizeLabelValue(tag.substring(tag.lastIndexOf('_') + 1))));
return labels.build();
}

public static SparkBigQueryConfig createSparkBigQueryConfig(
SQLContext sqlContext,
scala.collection.immutable.Map<String, String> options,
Option<StructType> schema,
DataSourceVersion dataSourceVersion) {
java.util.Map<String, String> optionsMap = new HashMap<>(mapAsJavaMap(options));
dataSourceVersion.updateOptionsMap(optionsMap);
SparkSession spark = sqlContext.sparkSession();
ImmutableMap<String, String> globalOptions =
ImmutableMap.copyOf(mapAsJavaMap(spark.conf().getAll()));
int defaultParallelism =
spark.sparkContext().isStopped() ? 1 : spark.sparkContext().defaultParallelism();

return SparkBigQueryConfig.from(
ImmutableMap.copyOf(optionsMap),
globalOptions,
spark.sparkContext().hadoopConfiguration(),
ImmutableMap.of(),
defaultParallelism,
spark.sqlContext().conf(),
spark.version(),
Optional.ofNullable(schema.getOrElse(null)),
true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2024 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spark.bigquery.integration;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.spark.bigquery.integration.SparkBigQueryIntegrationTestBase.TestDataset;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.ExternalResource;

public class OpenLineageIntegrationTestBase {
@ClassRule
public static TestDataset testDataset = new SparkBigQueryIntegrationTestBase.TestDataset();

@ClassRule public static CustomSessionFactory sessionFactory = new CustomSessionFactory();

protected SparkSession spark;
protected String testTable;
protected File lineageFile;

public OpenLineageIntegrationTestBase() {
this.spark = sessionFactory.spark;
this.lineageFile = sessionFactory.lineageFile;
}

@Before
public void createTestTable() {
testTable = "test_" + System.nanoTime();
}

protected static class CustomSessionFactory extends ExternalResource {
SparkSession spark;
File lineageFile;

@Override
protected void before() throws Throwable {
lineageFile = File.createTempFile("openlineage_test_" + System.nanoTime(), ".log");
lineageFile.deleteOnExit();
spark =
SparkSession.builder()
.master("local")
.appName("openlineage_test_bigquery_connector")
.config("spark.ui.enabled", "false")
.config("spark.default.parallelism", 20)
.config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener")
.config("spark.openlineage.transport.type", "file")
.config("spark.openlineage.transport.location", lineageFile.getAbsolutePath())
.getOrCreate();
spark.sparkContext().setLogLevel("WARN");
}
}

private List<JSONObject> parseEventLogs(File file) throws Exception {
List<JSONObject> eventList;
try (Scanner scanner = new Scanner(file)) {
eventList = new ArrayList<>();
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
JSONObject event = new JSONObject(line);
if (!event.getJSONArray("inputs").isEmpty() && !event.getJSONArray("outputs").isEmpty()) {
eventList.add(event);
}
}
}
return eventList;
}

private String getFieldName(JSONObject event, String field) {
JSONObject eventField = (JSONObject) event.getJSONArray(field).get(0);
return eventField.getString("name");
}

@Test
public void testLineageEvent() throws Exception {
String fullTableName = testDataset.toString() + "." + testTable;
Dataset<Row> readDF =
spark.read().format("bigquery").option("table", TestConstants.SHAKESPEARE_TABLE).load();
readDF.createOrReplaceTempView("words");
Dataset<Row> writeDF =
spark.sql("SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word");
writeDF
.write()
.format("bigquery")
.mode(SaveMode.Append)
.option("table", fullTableName)
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.option("writeMethod", "direct")
.save();
List<JSONObject> eventList = parseEventLogs(lineageFile);
assertThat(eventList)
.isNotEmpty(); // check if there is at least one event with both input and output
eventList.forEach(
(event) -> { // check if each of these events have the correct input and output
assertThat(getFieldName(event, "inputs")).matches(TestConstants.SHAKESPEARE_TABLE);
assertThat(getFieldName(event, "outputs")).matches(fullTableName);
});
}
}
5 changes: 5 additions & 0 deletions spark-bigquery-dsv1/spark-bigquery-dsv1-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>spark-bigquery-dsv1-spark3-support</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions spark-bigquery-dsv1/spark-bigquery_2.12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spark-3.2-bigquery-pushdown_${scala.binary.version}
Expand Down
4 changes: 4 additions & 0 deletions spark-bigquery-dsv1/spark-bigquery_2.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,7 @@ class BigQueryRelationProvider(
def createSparkBigQueryConfig(sqlContext: SQLContext,
parameters: Map[String, String],
schema: Option[StructType] = None): SparkBigQueryConfig = {
SparkBigQueryConfig.from(parameters.asJava,
ImmutableMap.of[String, String](),
DataSourceVersion.V1,
sqlContext.sparkSession,
Optional.ofNullable(schema.orNull),
/* tableIsMandatory */ true)
SparkBigQueryUtil.createSparkBigQueryConfig(sqlContext, parameters, schema, DataSourceVersion.V1)
}

override def shortName: String = "bigquery"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spark.bigquery.integration;

public class DataSourceV1OpenLineageIntegrationTest extends OpenLineageIntegrationTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spark.bigquery.integration;

public class Spark24OpenLineageIntegrationTest extends OpenLineageIntegrationTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package com.google.cloud.spark.bigquery.v2;

import com.google.cloud.bigquery.connector.common.BigQueryUtil;
import com.google.cloud.spark.bigquery.DataSourceVersion;
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
import com.google.cloud.spark.bigquery.v2.context.BigQueryDataSourceReaderContext;
import com.google.cloud.spark.bigquery.v2.context.BigQueryDataSourceReaderModule;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
Expand Down Expand Up @@ -97,4 +99,14 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
// context
return new BigQueryWriteBuilder(injector, info, SaveMode.Append);
}

@Override
public Map<String, String> properties() {
SparkBigQueryConfig config = injector.getInstance(SparkBigQueryConfig.class);
return ImmutableMap.<String, String>builder()
.put("openlineage.dataset.name", BigQueryUtil.friendlyTableName(config.getTableId()))
.put("openlineage.dataset.namespace", "bigquery")
.put("openlineage.dataset.storageDatasetFacet.storageLayer", "bigquery")
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spark.bigquery.integration;

public class Spark31OpenLineageIntegrationTest extends OpenLineageIntegrationTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spark.bigquery.integration;

public class Spark32OpenLineageIntegrationTest extends OpenLineageIntegrationTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spark.bigquery.integration;

public class Spark33OpenLineageIntegrationTest extends OpenLineageIntegrationTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spark.bigquery.integration;

public class Spark34OpenLineageIntegrationTest extends OpenLineageIntegrationTestBase {}
Loading

0 comments on commit 051334c

Please sign in to comment.