Skip to content

Commit

Permalink
Refactor spark plan traversal to find input/output datasets from Data…
Browse files Browse the repository at this point in the history
…sources (apache#1065)

* Refactor spark plan traversal to find input/output datasets from Datasources, including added BigQuery and JDBC relations

Signed-off-by: Michael Collado <mike@datakin.com>

* Add support for URIs in namespaces (add : and / as valid characters)

Signed-off-by: Michael Collado <mike@datakin.com>

* Refactored open lineage URIs to reference constant values

Signed-off-by: Michael Collado <mike@datakin.com>
  • Loading branch information
collado-mike authored Mar 18, 2021
1 parent b9e6d08 commit 6e5fbec
Show file tree
Hide file tree
Showing 29 changed files with 1,492 additions and 747 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
implementation 'com.github.ok2c.hc5:hc5-async-json:0.2.1'
compileOnly 'org.projectlombok:lombok:1.18.18'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.0.3'
compileOnly 'com.google.cloud.spark:spark-bigquery_2.11:0.19.1'

implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.2'
implementation 'com.fasterxml.jackson.core:jackson-core:2.12.2'
Expand Down
30 changes: 30 additions & 0 deletions facets/spark-2.4/v1/output-statistics-facet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"$schema": "http://json-schema.org/schema#",
"definitions": {
"Run": {
"properties": {
"facets": {
"type": "object",
"properties": {
"spark.output.stats": {
"type": "object",
"required": [
"rowCount",
"size"
],
"properties": {
"rowCount": {
"type": "integer"
},
"size": {
"type": "integer"
}
}
}
},
"additionalProperties": true
}
}
}
}
}
4 changes: 2 additions & 2 deletions integrations/sparkrdd/1.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"eventType" : "START",
"eventTime" : "2021-01-01T00:00:00Z",
"run" : {
"runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54",
"runId" : "8d99e33e-2a1c-4254-9600-18f23435fc3b",
"facets" : { }
},
"job" : {
Expand All @@ -14,5 +14,5 @@
"name" : "gs://bucket/data.txt"
} ],
"outputs" : [ ],
"producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"
"producer" : "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark"
}
4 changes: 2 additions & 2 deletions integrations/sparkrdd/2.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"eventType" : "COMPLETE",
"eventTime" : "2021-01-01T00:00:00Z",
"run" : {
"runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54",
"runId" : "8d99e33e-2a1c-4254-9600-18f23435fc3b",
"facets" : { }
},
"job" : {
Expand All @@ -14,5 +14,5 @@
"name" : "gs://bucket/data.txt"
} ],
"outputs" : [ ],
"producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"
"producer" : "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark"
}
217 changes: 164 additions & 53 deletions integrations/sparksql/1.json
Original file line number Diff line number Diff line change
@@ -1,65 +1,176 @@
{
"eventType" : "START",
"eventTime" : "2021-01-01T00:00:00Z",
"run" : {
"runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54",
"facets" : {
"parent" : {
"_producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#ParentRunFacet",
"run" : {
"runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54"
"eventType": "START",
"eventTime": "2021-01-01T00:00:00Z",
"run": {
"runId": "ea445b5c-22eb-457a-8007-01c7c52b6e54",
"facets": {
"parent": {
"_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark",
"_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#ParentRunFacet",
"run": {
"runId": "ea445b5c-22eb-457a-8007-01c7c52b6e54"
},
"job" : {
"namespace" : "ns_name",
"name" : "job_name"
"job": {
"namespace": "ns_name",
"name": "job_name"
}
},
"spark.logicalPlan" : {
"_producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL" : "https://github.com/MarquezProject/marquez/blob/main/experimental/integrations/marquez-spark-agent/facets/spark-2.4/v1/logicalPlanFacet",
"type" : "org.apache.spark.sql.catalyst.plans.logical.Aggregate",
"name" : "Aggregate",
"children" : [ {
"type" : "org.apache.spark.sql.catalyst.plans.logical.TypedFilter",
"name" : "TypedFilter",
"children" : [ {
"type" : "org.apache.spark.sql.catalyst.plans.logical.Project",
"name" : "Project",
"children" : [ {
"org.apache.spark.sql.execution.datasources.LogicalRelation" : {
"relation" : {
"location" : {
"rootPaths" : [ {
"name" : "data.txt",
"uri" : "data.txt"
} ]
"spark.logicalPlan": {
"_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark",
"_schemaURL": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark/facets/spark-2.4/v1/logicalPlanFacet",
"plan": [
{
"class": "org.apache.spark.sql.catalyst.plans.logical.Aggregate",
"num-children": 1,
"groupingExpressions": [],
"aggregateExpressions": [
[
{
"class": "org.apache.spark.sql.catalyst.expressions.Alias",
"num-children": 1,
"child": 0,
"name": "count",
"qualifier": []
},
{
"class": "org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression",
"num-children": 1,
"aggregateFunction": 0,
"mode": {
"object": "org.apache.spark.sql.catalyst.expressions.aggregate.Complete$"
},
"schema" : {
"fields" : [ {
"name" : "value",
"type" : "string",
"nullable" : true
} ]
}
"isDistinct": false
},
{
"class": "org.apache.spark.sql.catalyst.expressions.aggregate.Count",
"num-children": 1,
"children": [
0
]
},
{
"class": "org.apache.spark.sql.catalyst.expressions.Literal",
"num-children": 0,
"value": "1",
"dataType": "integer"
}
]
],
"child": 0
},
{
"class": "org.apache.spark.sql.catalyst.plans.logical.TypedFilter",
"num-children": 1,
"func": null,
"argumentClass": "java.lang.String",
"argumentSchema": {
"type": "struct",
"fields": [
{
"name": "value",
"type": "string",
"nullable": true,
"metadata": {}
}
]
},
"deserializer": [
{
"class": "org.apache.spark.sql.catalyst.expressions.objects.Invoke",
"num-children": 1,
"targetObject": 0,
"functionName": "toString",
"dataType": "object",
"arguments": [],
"propagateNull": true,
"returnNullable": false
},
{
"class": "org.apache.spark.sql.catalyst.expressions.Cast",
"num-children": 1,
"child": 0,
"dataType": "string"
},
"type" : "org.apache.spark.sql.execution.datasources.LogicalRelation",
"name" : "LogicalRelation"
} ]
} ]
} ]
{
"class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children": 0,
"name": "value",
"dataType": "string",
"nullable": true,
"metadata": {},
"qualifier": []
}
],
"child": 0
},
{
"class": "org.apache.spark.sql.catalyst.plans.logical.Project",
"num-children": 1,
"projectList": [
[
{
"class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children": 0,
"name": "value",
"dataType": "string",
"nullable": true,
"metadata": {},
"qualifier": []
}
]
],
"child": 0
},
{
"class": "org.apache.spark.sql.execution.datasources.LogicalRelation",
"num-children": 0,
"relation": null,
"output": [
[
{
"class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children": 0,
"name": "value",
"dataType": "string",
"nullable": true,
"metadata": {},
"qualifier": []
}
]
],
"isStreaming": false
}
]
}
}
},
"job" : {
"namespace" : "ns_name",
"name" : "job_name"
"job": {
"namespace": "ns_name",
"name": "job_name"
},
"inputs" : [ {
"namespace" : "gs.bucket",
"name" : "gs://bucket/data.txt"
} ],
"outputs" : [ ],
"producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"
"inputs": [
{
"namespace": "file",
"name": "/path/to/data",
"facets": {
"schema": {
"_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark",
"_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#SchemaDatasetFacet",
"fields": [
{
"name": "value",
"type": "string"
}
]
},
"dataSource": {
"_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark",
"_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#DatasourceDatasetFacet",
"uri": "file",
"name": "file"
}
}
}
],
"producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark"
}
Loading

0 comments on commit 6e5fbec

Please sign in to comment.