diff --git a/build.gradle b/build.gradle index f0c92c46752bb..92e0048824697 100644 --- a/build.gradle +++ b/build.gradle @@ -31,6 +31,7 @@ dependencies { implementation 'com.github.ok2c.hc5:hc5-async-json:0.2.1' compileOnly 'org.projectlombok:lombok:1.18.18' implementation 'org.apache.httpcomponents.client5:httpclient5:5.0.3' + compileOnly 'com.google.cloud.spark:spark-bigquery_2.11:0.19.1' implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.2' implementation 'com.fasterxml.jackson.core:jackson-core:2.12.2' diff --git a/facets/spark-2.4/v1/output-statistics-facet.json b/facets/spark-2.4/v1/output-statistics-facet.json new file mode 100644 index 0000000000000..28dd274548970 --- /dev/null +++ b/facets/spark-2.4/v1/output-statistics-facet.json @@ -0,0 +1,30 @@ +{ + "$schema": "http://json-schema.org/schema#", + "definitions": { + "Run": { + "properties": { + "facets": { + "type": "object", + "properties": { + "spark.output.stats": { + "type": "object", + "required": [ + "rowCount", + "size" + ], + "properties": { + "rowCount": { + "type": "integer" + }, + "size": { + "type": "integer" + } + } + } + }, + "additionalProperties": true + } + } + } + } +} \ No newline at end of file diff --git a/integrations/sparkrdd/1.json b/integrations/sparkrdd/1.json index 6e26edddcdcbc..fb15449a0e642 100644 --- a/integrations/sparkrdd/1.json +++ b/integrations/sparkrdd/1.json @@ -2,7 +2,7 @@ "eventType" : "START", "eventTime" : "2021-01-01T00:00:00Z", "run" : { - "runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54", + "runId" : "8d99e33e-2a1c-4254-9600-18f23435fc3b", "facets" : { } }, "job" : { @@ -14,5 +14,5 @@ "name" : "gs://bucket/data.txt" } ], "outputs" : [ ], - "producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client" + "producer" : "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark" } \ No newline at end of file diff --git a/integrations/sparkrdd/2.json b/integrations/sparkrdd/2.json index f78fd7d99eaa3..723e26174df69 100644 --- a/integrations/sparkrdd/2.json +++ b/integrations/sparkrdd/2.json @@ -2,7 +2,7 @@ "eventType" : "COMPLETE", "eventTime" : "2021-01-01T00:00:00Z", "run" : { - "runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54", + "runId" : "8d99e33e-2a1c-4254-9600-18f23435fc3b", "facets" : { } }, "job" : { @@ -14,5 +14,5 @@ "name" : "gs://bucket/data.txt" } ], "outputs" : [ ], - "producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client" + "producer" : "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark" } \ No newline at end of file diff --git a/integrations/sparksql/1.json b/integrations/sparksql/1.json index 396ba23bb9fd3..e7c12dc8815e3 100644 --- a/integrations/sparksql/1.json +++ b/integrations/sparksql/1.json @@ -1,65 +1,176 @@ { - "eventType" : "START", - "eventTime" : "2021-01-01T00:00:00Z", - "run" : { - "runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54", - "facets" : { - "parent" : { - "_producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", - "_schemaURL" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#ParentRunFacet", - "run" : { - "runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54" + "eventType": "START", + "eventTime": "2021-01-01T00:00:00Z", + "run": { + "runId": "ea445b5c-22eb-457a-8007-01c7c52b6e54", + "facets": { + "parent": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#ParentRunFacet", + "run": { + "runId": "ea445b5c-22eb-457a-8007-01c7c52b6e54" }, - "job" : { - "namespace" : "ns_name", - "name" : "job_name" + "job": { + "namespace": "ns_name", + "name": "job_name" } }, - "spark.logicalPlan" : { - "_producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", - "_schemaURL" : "https://github.com/MarquezProject/marquez/blob/main/experimental/integrations/marquez-spark-agent/facets/spark-2.4/v1/logicalPlanFacet", - "type" : "org.apache.spark.sql.catalyst.plans.logical.Aggregate", - "name" : "Aggregate", - "children" : [ { - "type" : "org.apache.spark.sql.catalyst.plans.logical.TypedFilter", - "name" : "TypedFilter", - "children" : [ { - "type" : "org.apache.spark.sql.catalyst.plans.logical.Project", - "name" : "Project", - "children" : [ { - "org.apache.spark.sql.execution.datasources.LogicalRelation" : { - "relation" : { - "location" : { - "rootPaths" : [ { - "name" : "data.txt", - "uri" : "data.txt" - } ] + "spark.logicalPlan": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark/facets/spark-2.4/v1/logicalPlanFacet", + "plan": [ + { + "class": "org.apache.spark.sql.catalyst.plans.logical.Aggregate", + "num-children": 1, + "groupingExpressions": [], + "aggregateExpressions": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.Alias", + "num-children": 1, + "child": 0, + "name": "count", + "qualifier": [] + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression", + "num-children": 1, + "aggregateFunction": 0, + "mode": { + "object": "org.apache.spark.sql.catalyst.expressions.aggregate.Complete$" }, - "schema" : { - "fields" : [ { - "name" : "value", - "type" : "string", - "nullable" : true - } ] - } + "isDistinct": false + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.aggregate.Count", + "num-children": 1, + "children": [ + 0 + ] + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.Literal", + "num-children": 0, + "value": "1", + "dataType": "integer" } + ] + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.catalyst.plans.logical.TypedFilter", + "num-children": 1, + "func": null, + "argumentClass": "java.lang.String", + "argumentSchema": { + "type": "struct", + "fields": [ + { + "name": "value", + "type": "string", + "nullable": true, + "metadata": {} + } + ] + }, + "deserializer": [ + { + "class": "org.apache.spark.sql.catalyst.expressions.objects.Invoke", + "num-children": 1, + "targetObject": 0, + "functionName": "toString", + "dataType": "object", + "arguments": [], + "propagateNull": true, + "returnNullable": false + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.Cast", + "num-children": 1, + "child": 0, + "dataType": "string" }, - "type" : "org.apache.spark.sql.execution.datasources.LogicalRelation", - "name" : "LogicalRelation" - } ] - } ] - } ] + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.catalyst.plans.logical.Project", + "num-children": 1, + "projectList": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ] + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.execution.datasources.LogicalRelation", + "num-children": 0, + "relation": null, + "output": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ] + ], + "isStreaming": false + } + ] } } }, - "job" : { - "namespace" : "ns_name", - "name" : "job_name" + "job": { + "namespace": "ns_name", + "name": "job_name" }, - "inputs" : [ { - "namespace" : "gs.bucket", - "name" : "gs://bucket/data.txt" - } ], - "outputs" : [ ], - "producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client" + "inputs": [ + { + "namespace": "file", + "name": "/path/to/data", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#SchemaDatasetFacet", + "fields": [ + { + "name": "value", + "type": "string" + } + ] + }, + "dataSource": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#DatasourceDatasetFacet", + "uri": "file", + "name": "file" + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark" } \ No newline at end of file diff --git a/integrations/sparksql/2.json b/integrations/sparksql/2.json index afd7f4409c6aa..23f1f633b732f 100644 --- a/integrations/sparksql/2.json +++ b/integrations/sparksql/2.json @@ -1,65 +1,176 @@ { - "eventType" : "COMPLETE", - "eventTime" : "2021-01-01T00:00:00Z", - "run" : { - "runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54", - "facets" : { - "parent" : { - "_producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", - "_schemaURL" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#ParentRunFacet", - "run" : { - "runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54" + "eventType": "COMPLETE", + "eventTime": "2021-01-01T00:00:00Z", + "run": { + "runId": "ea445b5c-22eb-457a-8007-01c7c52b6e54", + "facets": { + "parent": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#ParentRunFacet", + "run": { + "runId": "ea445b5c-22eb-457a-8007-01c7c52b6e54" }, - "job" : { - "namespace" : "ns_name", - "name" : "job_name" + "job": { + "namespace": "ns_name", + "name": "job_name" } }, - "spark.logicalPlan" : { - "_producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", - "_schemaURL" : "https://github.com/MarquezProject/marquez/blob/main/experimental/integrations/marquez-spark-agent/facets/spark-2.4/v1/logicalPlanFacet", - "type" : "org.apache.spark.sql.catalyst.plans.logical.Aggregate", - "name" : "Aggregate", - "children" : [ { - "type" : "org.apache.spark.sql.catalyst.plans.logical.TypedFilter", - "name" : "TypedFilter", - "children" : [ { - "type" : "org.apache.spark.sql.catalyst.plans.logical.Project", - "name" : "Project", - "children" : [ { - "org.apache.spark.sql.execution.datasources.LogicalRelation" : { - "relation" : { - "location" : { - "rootPaths" : [ { - "name" : "data.txt", - "uri" : "data.txt" - } ] + "spark.logicalPlan": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark/facets/spark-2.4/v1/logicalPlanFacet", + "plan": [ + { + "class": "org.apache.spark.sql.catalyst.plans.logical.Aggregate", + "num-children": 1, + "groupingExpressions": [], + "aggregateExpressions": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.Alias", + "num-children": 1, + "child": 0, + "name": "count", + "qualifier": [] + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression", + "num-children": 1, + "aggregateFunction": 0, + "mode": { + "object": "org.apache.spark.sql.catalyst.expressions.aggregate.Complete$" }, - "schema" : { - "fields" : [ { - "name" : "value", - "type" : "string", - "nullable" : true - } ] - } + "isDistinct": false + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.aggregate.Count", + "num-children": 1, + "children": [ + 0 + ] + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.Literal", + "num-children": 0, + "value": "1", + "dataType": "integer" } + ] + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.catalyst.plans.logical.TypedFilter", + "num-children": 1, + "func": null, + "argumentClass": "java.lang.String", + "argumentSchema": { + "type": "struct", + "fields": [ + { + "name": "value", + "type": "string", + "nullable": true, + "metadata": {} + } + ] + }, + "deserializer": [ + { + "class": "org.apache.spark.sql.catalyst.expressions.objects.Invoke", + "num-children": 1, + "targetObject": 0, + "functionName": "toString", + "dataType": "object", + "arguments": [], + "propagateNull": true, + "returnNullable": false + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.Cast", + "num-children": 1, + "child": 0, + "dataType": "string" }, - "type" : "org.apache.spark.sql.execution.datasources.LogicalRelation", - "name" : "LogicalRelation" - } ] - } ] - } ] + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.catalyst.plans.logical.Project", + "num-children": 1, + "projectList": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ] + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.execution.datasources.LogicalRelation", + "num-children": 0, + "relation": null, + "output": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ] + ], + "isStreaming": false + } + ] } } }, - "job" : { - "namespace" : "ns_name", - "name" : "job_name" + "job": { + "namespace": "ns_name", + "name": "job_name" }, - "inputs" : [ { - "namespace" : "gs.bucket", - "name" : "gs://bucket/data.txt" - } ], - "outputs" : [ ], - "producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client" + "inputs": [ + { + "namespace": "file", + "name": "/path/to/data", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#SchemaDatasetFacet", + "fields": [ + { + "name": "value", + "type": "string" + } + ] + }, + "dataSource": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#DatasourceDatasetFacet", + "uri": "file", + "name": "file" + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark" } \ No newline at end of file diff --git a/integrations/sparksql/3.json b/integrations/sparksql/3.json index 396ba23bb9fd3..e7c12dc8815e3 100644 --- a/integrations/sparksql/3.json +++ b/integrations/sparksql/3.json @@ -1,65 +1,176 @@ { - "eventType" : "START", - "eventTime" : "2021-01-01T00:00:00Z", - "run" : { - "runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54", - "facets" : { - "parent" : { - "_producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", - "_schemaURL" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#ParentRunFacet", - "run" : { - "runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54" + "eventType": "START", + "eventTime": "2021-01-01T00:00:00Z", + "run": { + "runId": "ea445b5c-22eb-457a-8007-01c7c52b6e54", + "facets": { + "parent": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#ParentRunFacet", + "run": { + "runId": "ea445b5c-22eb-457a-8007-01c7c52b6e54" }, - "job" : { - "namespace" : "ns_name", - "name" : "job_name" + "job": { + "namespace": "ns_name", + "name": "job_name" } }, - "spark.logicalPlan" : { - "_producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", - "_schemaURL" : "https://github.com/MarquezProject/marquez/blob/main/experimental/integrations/marquez-spark-agent/facets/spark-2.4/v1/logicalPlanFacet", - "type" : "org.apache.spark.sql.catalyst.plans.logical.Aggregate", - "name" : "Aggregate", - "children" : [ { - "type" : "org.apache.spark.sql.catalyst.plans.logical.TypedFilter", - "name" : "TypedFilter", - "children" : [ { - "type" : "org.apache.spark.sql.catalyst.plans.logical.Project", - "name" : "Project", - "children" : [ { - "org.apache.spark.sql.execution.datasources.LogicalRelation" : { - "relation" : { - "location" : { - "rootPaths" : [ { - "name" : "data.txt", - "uri" : "data.txt" - } ] + "spark.logicalPlan": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark/facets/spark-2.4/v1/logicalPlanFacet", + "plan": [ + { + "class": "org.apache.spark.sql.catalyst.plans.logical.Aggregate", + "num-children": 1, + "groupingExpressions": [], + "aggregateExpressions": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.Alias", + "num-children": 1, + "child": 0, + "name": "count", + "qualifier": [] + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression", + "num-children": 1, + "aggregateFunction": 0, + "mode": { + "object": "org.apache.spark.sql.catalyst.expressions.aggregate.Complete$" }, - "schema" : { - "fields" : [ { - "name" : "value", - "type" : "string", - "nullable" : true - } ] - } + "isDistinct": false + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.aggregate.Count", + "num-children": 1, + "children": [ + 0 + ] + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.Literal", + "num-children": 0, + "value": "1", + "dataType": "integer" } + ] + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.catalyst.plans.logical.TypedFilter", + "num-children": 1, + "func": null, + "argumentClass": "java.lang.String", + "argumentSchema": { + "type": "struct", + "fields": [ + { + "name": "value", + "type": "string", + "nullable": true, + "metadata": {} + } + ] + }, + "deserializer": [ + { + "class": "org.apache.spark.sql.catalyst.expressions.objects.Invoke", + "num-children": 1, + "targetObject": 0, + "functionName": "toString", + "dataType": "object", + "arguments": [], + "propagateNull": true, + "returnNullable": false + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.Cast", + "num-children": 1, + "child": 0, + "dataType": "string" }, - "type" : "org.apache.spark.sql.execution.datasources.LogicalRelation", - "name" : "LogicalRelation" - } ] - } ] - } ] + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.catalyst.plans.logical.Project", + "num-children": 1, + "projectList": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ] + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.execution.datasources.LogicalRelation", + "num-children": 0, + "relation": null, + "output": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ] + ], + "isStreaming": false + } + ] } } }, - "job" : { - "namespace" : "ns_name", - "name" : "job_name" + "job": { + "namespace": "ns_name", + "name": "job_name" }, - "inputs" : [ { - "namespace" : "gs.bucket", - "name" : "gs://bucket/data.txt" - } ], - "outputs" : [ ], - "producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client" + "inputs": [ + { + "namespace": "file", + "name": "/path/to/data", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#SchemaDatasetFacet", + "fields": [ + { + "name": "value", + "type": "string" + } + ] + }, + "dataSource": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#DatasourceDatasetFacet", + "uri": "file", + "name": "file" + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark" } \ No newline at end of file diff --git a/integrations/sparksql/4.json b/integrations/sparksql/4.json index afd7f4409c6aa..23f1f633b732f 100644 --- a/integrations/sparksql/4.json +++ b/integrations/sparksql/4.json @@ -1,65 +1,176 @@ { - "eventType" : "COMPLETE", - "eventTime" : "2021-01-01T00:00:00Z", - "run" : { - "runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54", - "facets" : { - "parent" : { - "_producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", - "_schemaURL" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#ParentRunFacet", - "run" : { - "runId" : "ea445b5c-22eb-457a-8007-01c7c52b6e54" + "eventType": "COMPLETE", + "eventTime": "2021-01-01T00:00:00Z", + "run": { + "runId": "ea445b5c-22eb-457a-8007-01c7c52b6e54", + "facets": { + "parent": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#ParentRunFacet", + "run": { + "runId": "ea445b5c-22eb-457a-8007-01c7c52b6e54" }, - "job" : { - "namespace" : "ns_name", - "name" : "job_name" + "job": { + "namespace": "ns_name", + "name": "job_name" } }, - "spark.logicalPlan" : { - "_producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", - "_schemaURL" : "https://github.com/MarquezProject/marquez/blob/main/experimental/integrations/marquez-spark-agent/facets/spark-2.4/v1/logicalPlanFacet", - "type" : "org.apache.spark.sql.catalyst.plans.logical.Aggregate", - "name" : "Aggregate", - "children" : [ { - "type" : "org.apache.spark.sql.catalyst.plans.logical.TypedFilter", - "name" : "TypedFilter", - "children" : [ { - "type" : "org.apache.spark.sql.catalyst.plans.logical.Project", - "name" : "Project", - "children" : [ { - "org.apache.spark.sql.execution.datasources.LogicalRelation" : { - "relation" : { - "location" : { - "rootPaths" : [ { - "name" : "data.txt", - "uri" : "data.txt" - } ] + "spark.logicalPlan": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark/facets/spark-2.4/v1/logicalPlanFacet", + "plan": [ + { + "class": "org.apache.spark.sql.catalyst.plans.logical.Aggregate", + "num-children": 1, + "groupingExpressions": [], + "aggregateExpressions": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.Alias", + "num-children": 1, + "child": 0, + "name": "count", + "qualifier": [] + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression", + "num-children": 1, + "aggregateFunction": 0, + "mode": { + "object": "org.apache.spark.sql.catalyst.expressions.aggregate.Complete$" }, - "schema" : { - "fields" : [ { - "name" : "value", - "type" : "string", - "nullable" : true - } ] - } + "isDistinct": false + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.aggregate.Count", + "num-children": 1, + "children": [ + 0 + ] + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.Literal", + "num-children": 0, + "value": "1", + "dataType": "integer" } + ] + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.catalyst.plans.logical.TypedFilter", + "num-children": 1, + "func": null, + "argumentClass": "java.lang.String", + "argumentSchema": { + "type": "struct", + "fields": [ + { + "name": "value", + "type": "string", + "nullable": true, + "metadata": {} + } + ] + }, + "deserializer": [ + { + "class": "org.apache.spark.sql.catalyst.expressions.objects.Invoke", + "num-children": 1, + "targetObject": 0, + "functionName": "toString", + "dataType": "object", + "arguments": [], + "propagateNull": true, + "returnNullable": false + }, + { + "class": "org.apache.spark.sql.catalyst.expressions.Cast", + "num-children": 1, + "child": 0, + "dataType": "string" }, - "type" : "org.apache.spark.sql.execution.datasources.LogicalRelation", - "name" : "LogicalRelation" - } ] - } ] - } ] + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.catalyst.plans.logical.Project", + "num-children": 1, + "projectList": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ] + ], + "child": 0 + }, + { + "class": "org.apache.spark.sql.execution.datasources.LogicalRelation", + "num-children": 0, + "relation": null, + "output": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "value", + "dataType": "string", + "nullable": true, + "metadata": {}, + "qualifier": [] + } + ] + ], + "isStreaming": false + } + ] } } }, - "job" : { - "namespace" : "ns_name", - "name" : "job_name" + "job": { + "namespace": "ns_name", + "name": "job_name" }, - "inputs" : [ { - "namespace" : "gs.bucket", - "name" : "gs://bucket/data.txt" - } ], - "outputs" : [ ], - "producer" : "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client" + "inputs": [ + { + "namespace": "file", + "name": "/path/to/data", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#SchemaDatasetFacet", + "fields": [ + { + "name": "value", + "type": "string" + } + ] + }, + "dataSource": { + "_producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#DatasourceDatasetFacet", + "uri": "file", + "name": "file" + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark" } \ No newline at end of file diff --git a/src/main/java/marquez/spark/agent/SparkListener.java b/src/main/java/marquez/spark/agent/SparkListener.java index d2b530171b555..76d8f26a12aa3 100644 --- a/src/main/java/marquez/spark/agent/SparkListener.java +++ b/src/main/java/marquez/spark/agent/SparkListener.java @@ -12,6 +12,7 @@ import marquez.spark.agent.client.LineageEvent.Job; import marquez.spark.agent.client.LineageEvent.Run; import marquez.spark.agent.client.LineageEvent.RunFacet; +import marquez.spark.agent.client.OpenLineageClient; import marquez.spark.agent.facets.ErrorFacet; import marquez.spark.agent.lifecycle.ContextFactory; import marquez.spark.agent.lifecycle.ExecutionContext; @@ -217,7 +218,7 @@ public static LineageEvent buildErrorLineageEvent(RunFacet runFacet) { .name(contextFactory.marquezContext.getJobName()) .namespace(contextFactory.marquezContext.getJobNamespace()) .build()) - .producer("https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client") + .producer(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI) .build(); } diff --git a/src/main/java/marquez/spark/agent/client/OpenLineageClient.java b/src/main/java/marquez/spark/agent/client/OpenLineageClient.java index 92fe3a79675f2..b3c63f9eed372 100644 --- a/src/main/java/marquez/spark/agent/client/OpenLineageClient.java +++ b/src/main/java/marquez/spark/agent/client/OpenLineageClient.java @@ -30,6 +30,16 @@ @Slf4j public class OpenLineageClient { + + public static final String OPEN_LINEAGE_CLIENT_URI = + "https://github.com/MarquezProject/marquez/tree/0.12.0/integrations/spark"; + public static final String OPEN_LINEAGE_PARENT_FACET_URI = + "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#ParentRunFacet"; + public static final String OPEN_LINEAGE_DATASOURCE_FACET = + "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#DatasourceDatasetFacet"; + public static final String OPEN_LINEAGE_SCHEMA_FACET_URI = + "https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#SchemaDatasetFacet"; + private final CloseableHttpAsyncClient http; private final ExecutorService executorService; private final Optional apiKey; diff --git a/src/main/java/marquez/spark/agent/facets/ErrorFacet.java b/src/main/java/marquez/spark/agent/facets/ErrorFacet.java index abec56e24c241..9aca935cd18e2 100644 --- a/src/main/java/marquez/spark/agent/facets/ErrorFacet.java +++ b/src/main/java/marquez/spark/agent/facets/ErrorFacet.java @@ -6,6 +6,7 @@ import lombok.Builder; import lombok.NonNull; import marquez.spark.agent.client.LineageEvent.BaseFacet; +import marquez.spark.agent.client.OpenLineageClient; public class ErrorFacet extends BaseFacet { private final Exception exception; @@ -13,10 +14,8 @@ public class ErrorFacet extends BaseFacet { @Builder public ErrorFacet(@NonNull Exception exception) { super( - URI.create("https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"), - URI.create( - "https://github.com/MarquezProject/marquez/blob/main/experimental/integrations/" - + "marquez-spark-agent/facets/spark-2.4/v1/error-facet")); + URI.create(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI), + URI.create(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI + "/facets/spark-2.4/v1/error-facet")); this.exception = exception; } diff --git a/src/main/java/marquez/spark/agent/facets/LogicalPlanFacet.java b/src/main/java/marquez/spark/agent/facets/LogicalPlanFacet.java index c71a5b65049cd..e4aa649fd25c5 100644 --- a/src/main/java/marquez/spark/agent/facets/LogicalPlanFacet.java +++ b/src/main/java/marquez/spark/agent/facets/LogicalPlanFacet.java @@ -1,26 +1,26 @@ package marquez.spark.agent.facets; -import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonRawValue; import java.net.URI; -import java.util.Map; import lombok.Builder; import marquez.spark.agent.client.LineageEvent.BaseFacet; +import marquez.spark.agent.client.OpenLineageClient; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; public class LogicalPlanFacet extends BaseFacet { - private final Map plan; + private final LogicalPlan plan; @Builder - public LogicalPlanFacet(Map plan) { + public LogicalPlanFacet(LogicalPlan plan) { super( - URI.create("https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"), + URI.create(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI), URI.create( - "https://github.com/MarquezProject/marquez/blob/main/experimental/integrations/" - + "marquez-spark-agent/facets/spark-2.4/v1/logicalPlanFacet")); + OpenLineageClient.OPEN_LINEAGE_CLIENT_URI + "/facets/spark-2.4/v1/logicalPlanFacet")); this.plan = plan; } - @JsonAnyGetter - public Map getPlan() { - return plan; + @JsonRawValue + public String getPlan() { + return plan.toJSON(); } } diff --git a/src/main/java/marquez/spark/agent/facets/OutputStatisticsFacet.java b/src/main/java/marquez/spark/agent/facets/OutputStatisticsFacet.java index 7e4422b21f5b6..a17dde0121923 100644 --- a/src/main/java/marquez/spark/agent/facets/OutputStatisticsFacet.java +++ b/src/main/java/marquez/spark/agent/facets/OutputStatisticsFacet.java @@ -1,8 +1,11 @@ package marquez.spark.agent.facets; +import java.net.URI; +import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Value; import marquez.spark.agent.client.LineageEvent; +import marquez.spark.agent.client.OpenLineageClient; /** * Facet with statistics for an output dataset, including the number of rows and the size of the @@ -13,4 +16,15 @@ public class OutputStatisticsFacet extends LineageEvent.BaseFacet { long rowCount; long size; + + @Builder + public OutputStatisticsFacet(long rowCount, long size) { + super( + URI.create(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI), + URI.create( + OpenLineageClient.OPEN_LINEAGE_CLIENT_URI + + "/facets/spark-2.4/v1/outputStatisticsFacet")); + this.rowCount = rowCount; + this.size = size; + } } diff --git a/src/main/java/marquez/spark/agent/lifecycle/ContextFactory.java b/src/main/java/marquez/spark/agent/lifecycle/ContextFactory.java index c2c361149764e..90e99787cc60f 100644 --- a/src/main/java/marquez/spark/agent/lifecycle/ContextFactory.java +++ b/src/main/java/marquez/spark/agent/lifecycle/ContextFactory.java @@ -2,6 +2,10 @@ import lombok.AllArgsConstructor; import marquez.spark.agent.MarquezContext; +import marquez.spark.agent.lifecycle.plan.InputDatasetVisitors; +import marquez.spark.agent.lifecycle.plan.OutputDatasetVisitors; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.execution.SQLExecution; @AllArgsConstructor public class ContextFactory { @@ -16,10 +20,11 @@ public RddExecutionContext createRddExecutionContext(int jobId) { } public SparkSQLExecutionContext createSparkSQLExecutionContext(long executionId) { + SQLContext sqlContext = SQLExecution.getQueryExecution(executionId).sparkPlan().sqlContext(); + InputDatasetVisitors inputDatasetVisitors = new InputDatasetVisitors(sqlContext); + OutputDatasetVisitors outputDatasetVisitors = + new OutputDatasetVisitors(sqlContext, inputDatasetVisitors); return new SparkSQLExecutionContext( - executionId, - marquezContext, - new LogicalPlanFacetTraverser(), - new DatasetLogicalPlanTraverser()); + executionId, marquezContext, outputDatasetVisitors.get(), inputDatasetVisitors.get()); } } diff --git a/src/main/java/marquez/spark/agent/lifecycle/DatasetLogicalPlanTraverser.java b/src/main/java/marquez/spark/agent/lifecycle/DatasetLogicalPlanTraverser.java deleted file mode 100644 index a02c9cfcc93f7..0000000000000 --- a/src/main/java/marquez/spark/agent/lifecycle/DatasetLogicalPlanTraverser.java +++ /dev/null @@ -1,217 +0,0 @@ -package marquez.spark.agent.lifecycle; - -import static scala.collection.JavaConversions.asJavaCollection; - -import com.google.common.collect.ImmutableMap; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import lombok.Value; -import marquez.spark.agent.client.DatasetParser; -import marquez.spark.agent.client.DatasetParser.DatasetParseResult; -import marquez.spark.agent.client.LineageEvent.Dataset; -import marquez.spark.agent.client.LineageEvent.DatasetFacet; -import marquez.spark.agent.client.LineageEvent.SchemaDatasetFacet; -import marquez.spark.agent.client.LineageEvent.SchemaField; -import marquez.spark.agent.facets.OutputStatisticsFacet; -import org.apache.hadoop.fs.Path; -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; -import org.apache.spark.sql.catalyst.plans.logical.Statistics; -import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand; -import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand; -import org.apache.spark.sql.execution.datasources.FileIndex; -import org.apache.spark.sql.execution.datasources.HadoopFsRelation; -import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand; -import org.apache.spark.sql.execution.metric.SQLMetric; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import scala.collection.immutable.Map; -import scala.runtime.AbstractFunction0; -import scala.runtime.AbstractFunction1; - -public class DatasetLogicalPlanTraverser extends LogicalPlanTraverser { - private Set outputDatasets; - private Set inputDatasets; - private Statistics statistics; - private String jobNamespace; - - public TraverserResult build(LogicalPlan plan, String jobNamespace) { - synchronized (this) { - outputDatasets = new HashSet<>(); - inputDatasets = new HashSet<>(); - this.jobNamespace = jobNamespace; - super.visit(plan); - return new TraverserResult( - new ArrayList<>(outputDatasets), new ArrayList<>(inputDatasets), statistics); - } - } - - @Override - protected Object visit( - CreateDataSourceTableAsSelectCommand createDataSourceTableAsSelectCommand) { - OutputStatisticsFacet outputStats = - getOutputStats(createDataSourceTableAsSelectCommand.metrics()); - apply( - buildDataset( - createDataSourceTableAsSelectCommand.table().qualifiedName(), - DatasetFacet.builder() - .schema(visit(createDataSourceTableAsSelectCommand.table().schema())) - .additional(ImmutableMap.of("stats", outputStats)) - .build())); - return null; - } - - @Override - protected Object visit(InsertIntoDataSourceDirCommand insertIntoDataSourceCommand) { - OutputStatisticsFacet outputStats = getOutputStats(insertIntoDataSourceCommand.metrics()); - DatasetFacet datasetFacet = - DatasetFacet.builder() - .schema(visit(insertIntoDataSourceCommand.schema())) - .additional(ImmutableMap.of("stats", outputStats)) - .build(); - DatasetLogicalPlanTraverser traverser = this; - insertIntoDataSourceCommand - .storage() - .locationUri() - .map( - new AbstractFunction1() { - @Override - public Dataset apply(URI uri) { - return buildDataset(visitPathUri(uri), datasetFacet); - } - }) - .foreach( - new AbstractFunction1() { - @Override - public Void apply(Dataset v1) { - traverser.apply(v1); - return null; - } - }); - return super.visit(insertIntoDataSourceCommand); - } - - private OutputStatisticsFacet getOutputStats(Map metrics) { - long rowCount = - metrics - .getOrElse( - "numOutputRows", - new AbstractFunction0() { - @Override - public SQLMetric apply() { - return new SQLMetric("sum", 0L); - } - }) - .value(); - long outputBytes = - metrics - .getOrElse( - "numOutputBytes", - new AbstractFunction0() { - @Override - public SQLMetric apply() { - return new SQLMetric("sum", 0L); - } - }) - .value(); - return new OutputStatisticsFacet(rowCount, outputBytes); - } - - protected Object visit(InsertIntoHadoopFsRelationCommand insertIntoHadoopFsRelationCommand) { - OutputStatisticsFacet outputStats = getOutputStats(insertIntoHadoopFsRelationCommand.metrics()); - DatasetFacet datasetFacet = - DatasetFacet.builder() - .schema(visit(insertIntoHadoopFsRelationCommand.schema())) - .additional(ImmutableMap.of("stats", outputStats)) - .build(); - outputDatasets.add( - buildDataset( - visitPathUri(insertIntoHadoopFsRelationCommand.outputPath().toUri()), datasetFacet)); - return null; - } - - protected Dataset buildDataset(String uri, DatasetFacet datasetFacet) { - DatasetParseResult result = DatasetParser.parse(uri); - return buildDataset(result, datasetFacet); - } - - protected Dataset buildDataset(URI uri, DatasetFacet datasetFacet) { - DatasetParseResult result = DatasetParser.parse(uri); - return buildDataset(result, datasetFacet); - } - - protected Dataset buildDataset(DatasetParseResult result, DatasetFacet datasetFacet) { - return Dataset.builder() - .name(result.getName()) - .namespace(result.getNamespace()) - .facets(datasetFacet) - .build(); - } - - protected SchemaDatasetFacet visit(StructType structType) { - return SchemaDatasetFacet.builder() - ._producer(URI.create("https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client")) - ._schemaURL( - URI.create("https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/schemaDatasetFacet")) - .fields(visit(structType.fields())) - .build(); - } - - protected List visit(StructField[] fields) { - List list = new ArrayList<>(); - for (StructField field : fields) { - list.add(visit(field)); - } - return list; - } - - protected SchemaField visit(StructField field) { - return SchemaField.builder().name(field.name()).type(field.dataType().typeName()).build(); - } - - protected Object visit(HadoopFsRelation relation) { - inputDatasets.addAll(visit(relation.location())); - return null; - } - - protected List visit(FileIndex fileIndex) { - return visitPaths(asJavaCollection(fileIndex.rootPaths())); - } - - protected List visitPaths(Collection paths) { - List list = new ArrayList<>(); - for (Path path : paths) { - list.add(visit(path)); - } - return list; - } - - protected Dataset visit(Path path) { - return buildDataset(visitPathUri(path.toUri()), null); - } - - protected URI visitPathUri(URI uri) { - return uri; - } - - @Override - protected Object visitStatistics(Statistics stats) { - this.statistics = stats; - return null; - } - - private Boolean apply(Dataset uri) { - return outputDatasets.add(uri); - } - - @Value - public static class TraverserResult { - - List outputDataset; - List inputDataset; - Statistics statistics; - } -} diff --git a/src/main/java/marquez/spark/agent/lifecycle/LogicalPlanFacetTraverser.java b/src/main/java/marquez/spark/agent/lifecycle/LogicalPlanFacetTraverser.java deleted file mode 100644 index 401948dee2f72..0000000000000 --- a/src/main/java/marquez/spark/agent/lifecycle/LogicalPlanFacetTraverser.java +++ /dev/null @@ -1,230 +0,0 @@ -package marquez.spark.agent.lifecycle; - -import static scala.collection.JavaConversions.asJavaCollection; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import org.apache.hadoop.fs.Path; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.catalyst.TableIdentifier; -import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat; -import org.apache.spark.sql.catalyst.catalog.CatalogTable; -import org.apache.spark.sql.catalyst.catalog.CatalogTableType; -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; -import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand; -import org.apache.spark.sql.execution.datasources.FileIndex; -import org.apache.spark.sql.execution.datasources.HadoopFsRelation; -import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand; -import org.apache.spark.sql.execution.datasources.LogicalRelation; -import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions; -import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation; -import org.apache.spark.sql.sources.BaseRelation; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; - -public class LogicalPlanFacetTraverser extends LogicalPlanTraverser { - @Override - public Map visit(LogicalPlan plan) { - if (plan == null) return null; - - Map map = new LinkedHashMap<>(); - switch (plan.getClass().getSimpleName()) { - case "LogicalRelation": - if (plan instanceof LogicalRelation) { - map.put(plan.getClass().getName(), visit((LogicalRelation) plan)); - } - break; - case "InsertIntoHadoopFsRelationCommand": - if (plan instanceof InsertIntoHadoopFsRelationCommand) { - map.put(plan.getClass().getName(), visit((InsertIntoHadoopFsRelationCommand) plan)); - } - break; - case "CreateDataSourceTableAsSelectCommand": - if (plan instanceof CreateDataSourceTableAsSelectCommand) { - map.put(plan.getClass().getName(), visit((CreateDataSourceTableAsSelectCommand) plan)); - } - break; - } - map.put("type", plan.getClass().getName()); - map.put("name", plan.nodeName()); - List children = visitChildren(asJavaCollection(plan.children())); - if (children.size() != 0) { - map.put("children", children); - } - return map; - } - - @Override - protected Object visit( - CreateDataSourceTableAsSelectCommand createDataSourceTableAsSelectCommand) { - Map map = new LinkedHashMap<>(); - map.put("table", visit(createDataSourceTableAsSelectCommand.table())); - map.put("query", visit(createDataSourceTableAsSelectCommand.query())); - map.put("mode", visit(createDataSourceTableAsSelectCommand.mode())); - return map; - } - - @Override - protected Object visit(SaveMode mode) { - if (mode != null) { - return mode.name(); - } - return null; - } - - @Override - protected Object visit(CatalogTable table) { - Map map = new LinkedHashMap<>(); - map.put("identifier", visit(table.identifier())); - map.put("tableType", visit(table.tableType())); - map.put("storage", visit(table.storage())); - map.put("schema", visit(table.schema())); - return map; - } - - @Override - protected Object visit(CatalogStorageFormat storage) { - Map map = new LinkedHashMap<>(); - map.put("locationUri", storage.locationUri()); - map.put("inputFormat", storage.inputFormat()); - map.put("outputFormat", storage.outputFormat()); - map.put("serde", storage.serde()); - map.put("compressed", storage.compressed()); - return map; - } - - @Override - protected Object visit(CatalogTableType tableType) { - return tableType.name(); - } - - @Override - protected Object visit(TableIdentifier identifier) { - return identifier.identifier(); - } - - @Override - protected Object visit(InsertIntoHadoopFsRelationCommand insertIntoHadoopFsRelationCommand) { - Map map = new LinkedHashMap<>(); - map.put("outputPath", visit(insertIntoHadoopFsRelationCommand.outputPath())); - map.put("query", visit(insertIntoHadoopFsRelationCommand.query())); - map.put("options", asJavaCollection(insertIntoHadoopFsRelationCommand.options())); - return map; - } - - @Override - protected Object visit(LogicalRelation logicalRelation) { - Map map = new HashMap<>(); - map.put("relation", visitRelation(logicalRelation.relation())); - return map; - } - - @Override - protected Object visitRelation(BaseRelation relation) { - Map map = new LinkedHashMap(); - switch (relation.getClass().getSimpleName()) { - case "JDBCRelation": - map = visit((JDBCRelation) relation); - break; - case "HadoopFsRelation": - map = visit((HadoopFsRelation) relation); - break; - } - map.put("schema", visit(relation.schema())); - return map; - } - - @Override - protected Map visit(JDBCRelation relation) { - Map map = new LinkedHashMap<>(); - map.put("jdbcOptions", visit(relation.jdbcOptions())); - return map; - } - - @Override - protected Object visit(JDBCOptions jdbcOptions) { - return jdbcOptions.asProperties(); - } - - @Override - protected Object visit(StructType structType) { - Map map = new LinkedHashMap(); - map.put("fields", visit(structType.fields())); - return map; - } - - @Override - protected Object visit(StructField[] fields) { - List list = new ArrayList(); - for (StructField field : fields) { - list.add(visit(field)); - } - return list; - } - - @Override - protected Object visit(StructField field) { - Map map = new LinkedHashMap(); - map.put("name", field.name()); - map.put("type", visit(field.dataType())); - map.put("nullable", field.nullable()); - return map; - } - - @Override - protected Object visit(DataType dataType) { - if (dataType != null) { - return dataType.simpleString(); - } - return null; - } - - @Override - protected Map visit(HadoopFsRelation relation) { - Map map = new LinkedHashMap<>(); - map.put("location", visit(relation.location())); - return map; - } - - @Override - protected Object visit(FileIndex fileIndex) { - Map map = new LinkedHashMap<>(); - map.put("rootPaths", visitPaths(asJavaCollection(fileIndex.rootPaths()))); - return map; - } - - @Override - protected Object visitPaths(Collection paths) { - List list = new ArrayList(); - for (Path path : paths) { - list.add(visit(path)); - } - return list; - } - - @Override - protected Object visit(Path path) { - Map map = new LinkedHashMap<>(); - map.put("name", path.getName()); - map.put("uri", visitPathUri(path.toUri())); - return map; - } - - protected Object visitPathUri(URI uri) { - return uri.toASCIIString(); - } - - protected List visitChildren(Collection children) { - List list = new ArrayList<>(); - for (LogicalPlan plan : children) { - list.add(visit(plan)); - } - return list; - } -} diff --git a/src/main/java/marquez/spark/agent/lifecycle/RddExecutionContext.java b/src/main/java/marquez/spark/agent/lifecycle/RddExecutionContext.java index 144ec3e4ef951..aacabcf64e459 100644 --- a/src/main/java/marquez/spark/agent/lifecycle/RddExecutionContext.java +++ b/src/main/java/marquez/spark/agent/lifecycle/RddExecutionContext.java @@ -22,6 +22,7 @@ import marquez.spark.agent.client.LineageEvent; import marquez.spark.agent.client.LineageEvent.Dataset; import marquez.spark.agent.client.LineageEvent.RunFacet; +import marquez.spark.agent.client.OpenLineageClient; import marquez.spark.agent.facets.ErrorFacet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -67,7 +68,7 @@ public void start(SparkListenerJobStart jobStart) { .job(buildJob()) .eventTime(toZonedTime(jobStart.time())) .eventType("START") - .producer("https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client") + .producer(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI) .build(); marquezContext.emit(event); @@ -83,7 +84,7 @@ public void end(SparkListenerJobEnd jobEnd) { .job(buildJob()) .eventTime(toZonedTime(jobEnd.time())) .eventType(getEventType(jobEnd.jobResult())) - .producer("https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client") + .producer(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI) .build(); marquezContext.emit(event); diff --git a/src/main/java/marquez/spark/agent/lifecycle/SparkSQLExecutionContext.java b/src/main/java/marquez/spark/agent/lifecycle/SparkSQLExecutionContext.java index f5be354a213c1..efa22f585acd5 100644 --- a/src/main/java/marquez/spark/agent/lifecycle/SparkSQLExecutionContext.java +++ b/src/main/java/marquez/spark/agent/lifecycle/SparkSQLExecutionContext.java @@ -1,20 +1,23 @@ package marquez.spark.agent.lifecycle; -import java.net.URI; +import com.fasterxml.jackson.databind.ObjectMapper; import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import marquez.spark.agent.MarquezContext; import marquez.spark.agent.client.LineageEvent; -import marquez.spark.agent.client.LineageEvent.JobLink; +import marquez.spark.agent.client.LineageEvent.Dataset; import marquez.spark.agent.client.LineageEvent.ParentRunFacet; import marquez.spark.agent.client.LineageEvent.RunFacet; -import marquez.spark.agent.client.LineageEvent.RunLink; +import marquez.spark.agent.client.OpenLineageClient; import marquez.spark.agent.facets.ErrorFacet; import marquez.spark.agent.facets.LogicalPlanFacet; +import marquez.spark.agent.lifecycle.plan.PlanUtils; import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.JobFailed; import org.apache.spark.scheduler.JobResult; @@ -25,26 +28,31 @@ import org.apache.spark.sql.execution.SQLExecution; import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart; +import scala.PartialFunction; +import scala.collection.JavaConversions; @Slf4j public class SparkSQLExecutionContext implements ExecutionContext { + private final long executionId; private final QueryExecution queryExecution; + private final ObjectMapper objectMapper = OpenLineageClient.createMapper(); + private MarquezContext marquezContext; - private final LogicalPlanFacetTraverser logicalPlanFacetTraverser; - private final DatasetLogicalPlanTraverser datasetLogicalPlanTraverser; + private final List>> outputDatasetSupplier; + private final List>> inputDatasetSupplier; public SparkSQLExecutionContext( long executionId, MarquezContext marquezContext, - LogicalPlanFacetTraverser logicalPlanFacetTraverser, - DatasetLogicalPlanTraverser datasetLogicalPlanTraverser) { + List>> outputDatasetSupplier, + List>> inputDatasetSupplier) { this.executionId = executionId; this.marquezContext = marquezContext; - this.logicalPlanFacetTraverser = logicalPlanFacetTraverser; - this.datasetLogicalPlanTraverser = datasetLogicalPlanTraverser; this.queryExecution = SQLExecution.getQueryExecution(executionId); + this.outputDatasetSupplier = outputDatasetSupplier; + this.inputDatasetSupplier = inputDatasetSupplier; } public void start(SparkListenerSQLExecutionStart startEvent) {} @@ -61,13 +69,18 @@ public void start(SparkListenerJobStart jobStart) { log.info("No execution info {}", queryExecution); return; } - DatasetLogicalPlanTraverser.TraverserResult result = - datasetLogicalPlanTraverser.build( - queryExecution.logical(), marquezContext.getJobNamespace()); + List outputDatasets = + PlanUtils.applyFirst(outputDatasetSupplier, queryExecution.logical()); + List inputDatasets = + JavaConversions.seqAsJavaList( + queryExecution.logical().collect(PlanUtils.merge(inputDatasetSupplier))) + .stream() + .flatMap(List::stream) + .collect(Collectors.toList()); LineageEvent event = LineageEvent.builder() - .inputs(result.getInputDataset()) - .outputs(result.getOutputDataset()) + .inputs(inputDatasets) + .outputs(outputDatasets) .run( buildRun( buildRunFacets( @@ -75,25 +88,17 @@ public void start(SparkListenerJobStart jobStart) { .job(buildJob()) .eventTime(toZonedTime(jobStart.time())) .eventType("START") - .producer("https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client") + .producer(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI) .build(); marquezContext.emit(event); } private ParentRunFacet buildParentFacet() { - return ParentRunFacet.builder() - ._producer(URI.create("https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client")) - ._schemaURL( - URI.create( - "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#ParentRunFacet")) - .job( - JobLink.builder() - .name(marquezContext.getJobName()) - .namespace(marquezContext.getJobNamespace()) - .build()) - .run(RunLink.builder().runId(marquezContext.getParentRunId()).build()) - .build(); + return PlanUtils.parentRunFacet( + marquezContext.getParentRunId(), + marquezContext.getJobName(), + marquezContext.getJobNamespace()); } @Override @@ -105,13 +110,18 @@ public void end(SparkListenerJobEnd jobEnd) { } log.debug("Traversing logical plan {}", queryExecution.logical().toJSON()); log.debug("Physical plan executed {}", queryExecution.executedPlan().toJSON()); - DatasetLogicalPlanTraverser.TraverserResult r = - datasetLogicalPlanTraverser.build( - queryExecution.logical(), marquezContext.getJobNamespace()); + List outputDatasets = + PlanUtils.applyFirst(outputDatasetSupplier, queryExecution.logical()); + List inputDatasets = + JavaConversions.seqAsJavaList( + queryExecution.logical().collect(PlanUtils.merge(inputDatasetSupplier))) + .stream() + .flatMap(List::stream) + .collect(Collectors.toList()); LineageEvent event = LineageEvent.builder() - .inputs(r.getInputDataset()) - .outputs(r.getOutputDataset()) + .inputs(inputDatasets) + .outputs(outputDatasets) .run( buildRun( buildRunFacets( @@ -121,7 +131,7 @@ public void end(SparkListenerJobEnd jobEnd) { .job(buildJob()) .eventTime(toZonedTime(jobEnd.time())) .eventType(getEventType(jobEnd.jobResult())) - .producer("https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client") + .producer(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI) .build(); marquezContext.emit(event); @@ -156,8 +166,7 @@ protected RunFacet buildRunFacets( } protected LogicalPlanFacet buildLogicalPlanFacet(LogicalPlan plan) { - Map planMap = logicalPlanFacetTraverser.visit(plan); - return LogicalPlanFacet.builder().plan(planMap).build(); + return LogicalPlanFacet.builder().plan(plan).build(); } protected ErrorFacet buildJobErrorFacet(JobResult jobResult) { diff --git a/src/main/java/marquez/spark/agent/lifecycle/plan/BigQueryNodeVisitor.java b/src/main/java/marquez/spark/agent/lifecycle/plan/BigQueryNodeVisitor.java new file mode 100644 index 0000000000000..8726c37a557dd --- /dev/null +++ b/src/main/java/marquez/spark/agent/lifecycle/plan/BigQueryNodeVisitor.java @@ -0,0 +1,89 @@ +package marquez.spark.agent.lifecycle.plan; + +import com.google.cloud.spark.bigquery.BigQueryRelation; +import com.google.cloud.spark.bigquery.BigQueryRelationProvider; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; +import marquez.spark.agent.client.LineageEvent.Dataset; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.LogicalRelation; +import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand; +import org.apache.spark.sql.sources.CreatableRelationProvider; +import scala.runtime.AbstractPartialFunction; + +/** + * {@link LogicalPlan} visitor that matches {@link BigQueryRelation}s or {@link + * SaveIntoDataSourceCommand}s that use a {@link BigQueryRelationProvider}. This function extracts a + * {@link Dataset} from the BigQuery table referenced by the relation. The convention used for + * naming is a URI of bigquery://<projectId>.<.datasetId>.<tableName> + * . The namespace for bigquery tables is always bigquery and the name is the FQN. + */ +public class BigQueryNodeVisitor extends AbstractPartialFunction> { + private final SQLContext sqlContext; + + public BigQueryNodeVisitor(SQLContext sqlContext) { + this.sqlContext = sqlContext; + } + + public static boolean hasBigQueryClasses() { + try { + BigQueryNodeVisitor.class + .getClassLoader() + .loadClass("com.google.cloud.spark.bigquery.BigQueryRelation"); + return true; + } catch (Exception e) { + // swallow- we don't care + } + return false; + } + + @Override + public boolean isDefinedAt(LogicalPlan plan) { + return bigQuerySupplier(plan).isPresent(); + } + + private Optional> bigQuerySupplier(LogicalPlan plan) { + // SaveIntoDataSourceCommand is a special case because it references a CreatableRelationProvider + // Every other write instance references a LogicalRelation(BigQueryRelation, _, _, _) + if (plan instanceof SaveIntoDataSourceCommand) { + SaveIntoDataSourceCommand saveCommand = (SaveIntoDataSourceCommand) plan; + CreatableRelationProvider relationProvider = saveCommand.dataSource(); + if (relationProvider instanceof BigQueryRelationProvider) { + return Optional.of( + () -> + (BigQueryRelation) + ((BigQueryRelationProvider) relationProvider) + .createRelation(sqlContext, saveCommand.options(), saveCommand.schema())); + } + } else { + if (plan instanceof LogicalRelation + && ((LogicalRelation) plan).relation() instanceof BigQueryRelation) { + return Optional.of(() -> (BigQueryRelation) ((LogicalRelation) plan).relation()); + } + } + return Optional.empty(); + } + + @Override + public List apply(LogicalPlan x) { + return bigQuerySupplier(x) + .map( + s -> { + BigQueryRelation relation = s.get(); + String name = + String.format( + "%s.%s.%s", + relation.tableId().getProject(), + relation.tableId().getDataset(), + relation.tableId().getTable()); + return Collections.singletonList( + PlanUtils.getDataset( + URI.create(String.format("bigquery://%s", name)), x.schema())); + }) + .orElse(null); + } +} diff --git a/src/main/java/marquez/spark/agent/lifecycle/plan/HadoopFsRelationVisitor.java b/src/main/java/marquez/spark/agent/lifecycle/plan/HadoopFsRelationVisitor.java new file mode 100644 index 0000000000000..e0ef87c940f82 --- /dev/null +++ b/src/main/java/marquez/spark/agent/lifecycle/plan/HadoopFsRelationVisitor.java @@ -0,0 +1,61 @@ +package marquez.spark.agent.lifecycle.plan; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import marquez.spark.agent.client.LineageEvent.Dataset; +import org.apache.spark.SparkContext; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.HadoopFsRelation; +import org.apache.spark.sql.execution.datasources.LogicalRelation; +import scala.collection.JavaConversions; +import scala.runtime.AbstractPartialFunction; + +/** + * {@link LogicalPlan} visitor that extracts a {@link Dataset} from a {@link HadoopFsRelation}. It + * is assumed that a single directory maps to a single {@link Dataset}. Any files referenced are + * replaced by their parent directory and all files in a given directory are assumed to belong to + * the same {@link Dataset}. Directory partitioning is currently not addressed. + */ +@Slf4j +public class HadoopFsRelationVisitor extends AbstractPartialFunction> { + private final SparkContext context; + + public HadoopFsRelationVisitor(SparkContext context) { + this.context = context; + } + + @Override + public boolean isDefinedAt(LogicalPlan x) { + return x instanceof LogicalRelation + && ((LogicalRelation) x).relation() instanceof HadoopFsRelation; + } + + @Override + public List apply(LogicalPlan x) { + HadoopFsRelation relation = (HadoopFsRelation) ((LogicalRelation) x).relation(); + return JavaConversions.asJavaCollection(relation.location().rootPaths()).stream() + .map( + p -> { + try { + if (p.getFileSystem(context.hadoopConfiguration()).getFileStatus(p).isFile()) { + return p.getParent(); + } else { + return p; + } + } catch (IOException e) { + log.warn("Unable to get file system for path ", e); + return p; + } + }) + .distinct() + .map( + p -> { + // TODO- refactor this to return a single partitioned dataset based on static + // static partitions in the relation + return PlanUtils.getDataset(p.toUri(), relation.schema()); + }) + .collect(Collectors.toList()); + } +} diff --git a/src/main/java/marquez/spark/agent/lifecycle/plan/InputDatasetVisitors.java b/src/main/java/marquez/spark/agent/lifecycle/plan/InputDatasetVisitors.java new file mode 100644 index 0000000000000..ff47340a2245f --- /dev/null +++ b/src/main/java/marquez/spark/agent/lifecycle/plan/InputDatasetVisitors.java @@ -0,0 +1,34 @@ +package marquez.spark.agent.lifecycle.plan; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; +import marquez.spark.agent.client.LineageEvent.Dataset; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import scala.PartialFunction; + +/** + * Constructs a list of valid {@link LogicalPlan} visitors that can extract an input {@link + * Dataset}. Checks the classpath for classes that are not bundled with Spark to avoid {@link + * ClassNotFoundException}s during plan traversal. + */ +public class InputDatasetVisitors + implements Supplier>>> { + private final SQLContext sqlContext; + + public InputDatasetVisitors(SQLContext sqlContext) { + this.sqlContext = sqlContext; + } + + @Override + public List>> get() { + List>> list = new ArrayList<>(); + list.add(new HadoopFsRelationVisitor(sqlContext.sparkContext())); + list.add(new JDBCRelationVisitor(sqlContext)); + if (BigQueryNodeVisitor.hasBigQueryClasses()) { + list.add(new BigQueryNodeVisitor(sqlContext)); + } + return list; + } +} diff --git a/src/main/java/marquez/spark/agent/lifecycle/plan/InsertIntoDataSourceDirVisitor.java b/src/main/java/marquez/spark/agent/lifecycle/plan/InsertIntoDataSourceDirVisitor.java new file mode 100644 index 0000000000000..e35cc7a6dcdb2 --- /dev/null +++ b/src/main/java/marquez/spark/agent/lifecycle/plan/InsertIntoDataSourceDirVisitor.java @@ -0,0 +1,37 @@ +package marquez.spark.agent.lifecycle.plan; + +import java.net.URI; +import java.util.Collections; +import java.util.List; +import marquez.spark.agent.client.LineageEvent.Dataset; +import marquez.spark.agent.facets.OutputStatisticsFacet; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand; +import scala.runtime.AbstractPartialFunction; + +/** + * {@link LogicalPlan} visitor that matches an {@link InsertIntoDataSourceDirCommand} and extracts + * the output {@link Dataset} being written. + */ +public class InsertIntoDataSourceDirVisitor + extends AbstractPartialFunction> { + + @Override + public boolean isDefinedAt(LogicalPlan x) { + return x instanceof InsertIntoDataSourceDirCommand; + } + + @Override + public List apply(LogicalPlan x) { + InsertIntoDataSourceDirCommand command = (InsertIntoDataSourceDirCommand) x; + OutputStatisticsFacet outputStats = PlanUtils.getOutputStats(command.metrics()); + // URI is required by the InsertIntoDataSourceDirCommand + URI outputPath = command.storage().locationUri().get(); + String namespace = PlanUtils.namespaceUri(outputPath); + return Collections.singletonList( + PlanUtils.getDataset( + outputPath, + namespace, + PlanUtils.datasetFacet(command.schema(), namespace, outputStats))); + } +} diff --git a/src/main/java/marquez/spark/agent/lifecycle/plan/InsertIntoDataSourceVisitor.java b/src/main/java/marquez/spark/agent/lifecycle/plan/InsertIntoDataSourceVisitor.java new file mode 100644 index 0000000000000..b1eea967b4ba5 --- /dev/null +++ b/src/main/java/marquez/spark/agent/lifecycle/plan/InsertIntoDataSourceVisitor.java @@ -0,0 +1,51 @@ +package marquez.spark.agent.lifecycle.plan; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import java.util.List; +import java.util.stream.Collectors; +import marquez.spark.agent.client.LineageEvent.Dataset; +import marquez.spark.agent.facets.OutputStatisticsFacet; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.InsertIntoDataSourceCommand; +import scala.PartialFunction; +import scala.runtime.AbstractPartialFunction; + +/** + * {@link LogicalPlan} visitor that matches an {@link InsertIntoDataSourceCommand} and extracts the + * output {@link Dataset} being written. + */ +public class InsertIntoDataSourceVisitor + extends AbstractPartialFunction> { + private final List>> datasetProviders; + + public InsertIntoDataSourceVisitor( + List>> datasetProviders) { + this.datasetProviders = datasetProviders; + } + + @Override + public boolean isDefinedAt(LogicalPlan x) { + return x instanceof InsertIntoDataSourceCommand; + } + + @Override + public List apply(LogicalPlan x) { + OutputStatisticsFacet outputStats = + PlanUtils.getOutputStats(((InsertIntoDataSourceCommand) x).metrics()); + return PlanUtils.applyFirst( + datasetProviders, ((InsertIntoDataSourceCommand) x).logicalRelation()) + .stream() + // constructed datasets don't include the output stats, so add that facet here + .peek( + ds -> { + Builder facetsMap = + ImmutableMap.builder().put("stats", outputStats); + if (ds.getFacets().getAdditionalFacets() != null) { + facetsMap.putAll(ds.getFacets().getAdditionalFacets()); + } + ds.getFacets().setAdditional(facetsMap.build()); + }) + .collect(Collectors.toList()); + } +} diff --git a/src/main/java/marquez/spark/agent/lifecycle/plan/InsertIntoHadoopFsRelationVisitor.java b/src/main/java/marquez/spark/agent/lifecycle/plan/InsertIntoHadoopFsRelationVisitor.java new file mode 100644 index 0000000000000..7dff0a2f7de59 --- /dev/null +++ b/src/main/java/marquez/spark/agent/lifecycle/plan/InsertIntoHadoopFsRelationVisitor.java @@ -0,0 +1,36 @@ +package marquez.spark.agent.lifecycle.plan; + +import java.net.URI; +import java.util.Collections; +import java.util.List; +import marquez.spark.agent.client.LineageEvent.Dataset; +import marquez.spark.agent.facets.OutputStatisticsFacet; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand; +import scala.runtime.AbstractPartialFunction; + +/** + * {@link LogicalPlan} visitor that matches an {@link InsertIntoHadoopFsRelationCommand} and + * extracts the output {@link Dataset} being written. + */ +public class InsertIntoHadoopFsRelationVisitor + extends AbstractPartialFunction> { + + @Override + public boolean isDefinedAt(LogicalPlan x) { + return x instanceof InsertIntoHadoopFsRelationCommand; + } + + @Override + public List apply(LogicalPlan x) { + InsertIntoHadoopFsRelationCommand command = (InsertIntoHadoopFsRelationCommand) x; + OutputStatisticsFacet outputStats = PlanUtils.getOutputStats(command.metrics()); + URI outputPath = command.outputPath().toUri(); + String namespace = PlanUtils.namespaceUri(outputPath); + return Collections.singletonList( + PlanUtils.getDataset( + outputPath, + namespace, + PlanUtils.datasetFacet(command.schema(), namespace, outputStats))); + } +} diff --git a/src/main/java/marquez/spark/agent/lifecycle/plan/JDBCRelationVisitor.java b/src/main/java/marquez/spark/agent/lifecycle/plan/JDBCRelationVisitor.java new file mode 100644 index 0000000000000..b2ea2aba769c5 --- /dev/null +++ b/src/main/java/marquez/spark/agent/lifecycle/plan/JDBCRelationVisitor.java @@ -0,0 +1,86 @@ +package marquez.spark.agent.lifecycle.plan; + +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; +import marquez.spark.agent.client.LineageEvent.Dataset; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.LogicalRelation; +import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand; +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions; +import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation; +import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider; +import scala.runtime.AbstractFunction0; +import scala.runtime.AbstractPartialFunction; + +/** + * {@link LogicalPlan} visitor that extracts a {@link Dataset} from a {@link JDBCRelation} or a + * {@link SaveIntoDataSourceCommand} that writes using a {@link JdbcRelationProvider}. {@link + * Dataset} naming expects the namespace to be the JDBC connection URL (schema and authority only) + * and the table name to be the <database>.<tableName>. + * + *

TODO If a user specifies the {@link JDBCOptions#JDBC_QUERY_STRING()} option, we do not parse + * the sql to determine the specific tables used. Since we return a List of {@link Dataset}s, we can + * parse the sql and determine each table referenced to return a complete list of datasets + * referenced. + */ +public class JDBCRelationVisitor extends AbstractPartialFunction> { + private final SQLContext sqlContext; + + public JDBCRelationVisitor(SQLContext sqlContext) { + this.sqlContext = sqlContext; + } + + @Override + public boolean isDefinedAt(LogicalPlan x) { + return jdbcRelationSupplier(x).isPresent(); + } + + private Optional> jdbcRelationSupplier(LogicalPlan plan) { + if (plan instanceof SaveIntoDataSourceCommand) { + SaveIntoDataSourceCommand command = (SaveIntoDataSourceCommand) plan; + if (command.dataSource() instanceof JdbcRelationProvider) { + return Optional.of( + () -> + (JDBCRelation) + ((JdbcRelationProvider) (command).dataSource()) + .createRelation(sqlContext, command.options())); + } + } else if (plan instanceof LogicalRelation + && ((LogicalRelation) plan).relation() instanceof JDBCRelation) { + return Optional.of(() -> (JDBCRelation) ((LogicalRelation) plan).relation()); + } + return Optional.empty(); + } + + @Override + public List apply(LogicalPlan x) { + return jdbcRelationSupplier(x) + .map( + s -> { + JDBCRelation relation = s.get(); + // TODO- if a relation is composed of a complex sql query, we should attempt to + // extract the + // table names so that we can construct a true lineage + String tableName = + relation + .jdbcOptions() + .parameters() + .get(JDBCOptions.JDBC_TABLE_NAME()) + .getOrElse( + new AbstractFunction0() { + @Override + public String apply() { + return "COMPLEX"; + } + }); + URI connectionUri = URI.create(relation.jdbcOptions().url()); + return Collections.singletonList( + PlanUtils.getDataset(connectionUri, relation.schema())); + }) + .orElse(Collections.emptyList()); + } +} diff --git a/src/main/java/marquez/spark/agent/lifecycle/plan/OutputDatasetVisitors.java b/src/main/java/marquez/spark/agent/lifecycle/plan/OutputDatasetVisitors.java new file mode 100644 index 0000000000000..be36281a6ea84 --- /dev/null +++ b/src/main/java/marquez/spark/agent/lifecycle/plan/OutputDatasetVisitors.java @@ -0,0 +1,40 @@ +package marquez.spark.agent.lifecycle.plan; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; +import marquez.spark.agent.client.LineageEvent.Dataset; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import scala.PartialFunction; + +/** + * Constructs a list of valid {@link LogicalPlan} visitors that can extract an output {@link + * Dataset}. Checks the classpath for classes that are not bundled with Spark to avoid {@link + * ClassNotFoundException}s during plan traversal. + */ +public class OutputDatasetVisitors + implements Supplier>>> { + private final SQLContext sqlContext; + private final Supplier>>> datasetProviders; + + public OutputDatasetVisitors( + SQLContext sqlContext, + Supplier>>> datasetProviders) { + this.sqlContext = sqlContext; + this.datasetProviders = datasetProviders; + } + + @Override + public List>> get() { + List>> list = new ArrayList(); + list.add(new InsertIntoDataSourceDirVisitor()); + list.add(new InsertIntoDataSourceVisitor(datasetProviders.get())); + list.add(new InsertIntoHadoopFsRelationVisitor()); + list.add(new JDBCRelationVisitor(sqlContext)); + if (BigQueryNodeVisitor.hasBigQueryClasses()) { + list.add(new BigQueryNodeVisitor(sqlContext)); + } + return list; + } +} diff --git a/src/main/java/marquez/spark/agent/lifecycle/plan/PlanUtils.java b/src/main/java/marquez/spark/agent/lifecycle/plan/PlanUtils.java new file mode 100644 index 0000000000000..4c38642c86834 --- /dev/null +++ b/src/main/java/marquez/spark/agent/lifecycle/plan/PlanUtils.java @@ -0,0 +1,212 @@ +package marquez.spark.agent.lifecycle.plan; + +import com.google.common.collect.ImmutableMap; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import marquez.spark.agent.client.LineageEvent.Dataset; +import marquez.spark.agent.client.LineageEvent.DatasetFacet; +import marquez.spark.agent.client.LineageEvent.DatasourceDatasetFacet; +import marquez.spark.agent.client.LineageEvent.JobLink; +import marquez.spark.agent.client.LineageEvent.ParentRunFacet; +import marquez.spark.agent.client.LineageEvent.RunLink; +import marquez.spark.agent.client.LineageEvent.SchemaDatasetFacet; +import marquez.spark.agent.client.LineageEvent.SchemaField; +import marquez.spark.agent.client.OpenLineageClient; +import marquez.spark.agent.facets.OutputStatisticsFacet; +import org.apache.spark.sql.execution.metric.SQLMetric; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.PartialFunction; +import scala.PartialFunction$; +import scala.collection.Map; +import scala.runtime.AbstractFunction0; + +/** + * Utility functions for traversing a {@link + * org.apache.spark.sql.catalyst.plans.logical.LogicalPlan}. + */ +public class PlanUtils { + + /** + * Merge a list of {@link PartialFunction}s and return the first value where the function is + * defined or null if no function matches the input. + * + * @param fns + * @param arg + * @param + * @param + * @return + */ + public static R applyFirst(List> fns, T arg) { + PartialFunction fn = merge(fns); + if (fn.isDefinedAt(arg)) { + return fn.apply(arg); + } + return null; + } + + /** + * Given a list of {@link PartialFunction}s merge to produce a single function that will test the + * input against each function one by one until a match is found or {@link + * PartialFunction$#empty()} is returned. + * + * @param fns + * @param + * @param + * @return + */ + public static PartialFunction merge(List> fns) { + return fns.stream().reduce((a, b) -> a.orElse(b)).orElse(PartialFunction$.MODULE$.empty()); + } + + /** + * Given a schema, construct a valid {@link SchemaDatasetFacet}. + * + * @param structType + * @return + */ + public static SchemaDatasetFacet schemaFacet(StructType structType) { + return SchemaDatasetFacet.builder() + ._producer(URI.create(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI)) + ._schemaURL(URI.create(OpenLineageClient.OPEN_LINEAGE_SCHEMA_FACET_URI)) + .fields(transformFields(structType.fields())) + .build(); + } + + private static List transformFields(StructField[] fields) { + List list = new ArrayList<>(); + for (StructField field : fields) { + list.add(SchemaField.builder().name(field.name()).type(field.dataType().typeName()).build()); + } + return list; + } + + public static String namespaceUri(URI outputPath) { + return Optional.ofNullable(outputPath.getAuthority()) + .map(a -> String.format("%s://%s", outputPath.getScheme(), a)) + .orElse(outputPath.getScheme()); + } + + /** + * Given a {@link URI}, construct a valid {@link Dataset} following the expected naming + * conventions. + * + * @param outputPath + * @param schema + * @return + */ + public static Dataset getDataset(URI outputPath, StructType schema) { + String namespace = namespaceUri(outputPath); + DatasetFacet datasetFacet = datasetFacet(schema, namespace); + return getDataset(outputPath, namespace, datasetFacet); + } + + /** + * Construct a dataset given a {@link URI}, namespace, and preconstructed {@link DatasetFacet}. + * + * @param outputPath + * @param namespace + * @param datasetFacet + * @return + */ + public static Dataset getDataset(URI outputPath, String namespace, DatasetFacet datasetFacet) { + return Dataset.builder() + .namespace(namespace) + .name(outputPath.getPath()) + .facets(datasetFacet) + .build(); + } + + /** + * Construct a {@link DatasetFacet} given a schema and a namespace. + * + * @param schema + * @param namespaceUri + * @return + */ + public static DatasetFacet datasetFacet(StructType schema, String namespaceUri) { + return DatasetFacet.builder() + .schema(schemaFacet(schema)) + .dataSource(datasourceFacet(namespaceUri)) + .build(); + } + + /** + * Construct a {@link DatasetFacet} given a schema, a namespace, and an {@link + * OutputStatisticsFacet}. + * + * @param schema + * @param namespaceUri + * @param outputStats + * @return + */ + public static DatasetFacet datasetFacet( + StructType schema, String namespaceUri, OutputStatisticsFacet outputStats) { + return DatasetFacet.builder() + .schema(schemaFacet(schema)) + .dataSource(datasourceFacet(namespaceUri)) + .additional(ImmutableMap.of("stats", outputStats)) + .build(); + } + + /** + * Construct a {@link DatasourceDatasetFacet} given a namespace for the datasource. + * + * @param namespaceUri + * @return + */ + public static DatasourceDatasetFacet datasourceFacet(String namespaceUri) { + return DatasourceDatasetFacet.builder() + ._producer(URI.create(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI)) + ._schemaURL(URI.create(OpenLineageClient.OPEN_LINEAGE_DATASOURCE_FACET)) + .uri(namespaceUri) + .name(namespaceUri) + .build(); + } + + /** + * Construct a {@link ParentRunFacet} given the parent job's runId, job name, and namespace. + * + * @param runId + * @param parentJob + * @param parentJobNamespace + * @return + */ + public static ParentRunFacet parentRunFacet( + String runId, String parentJob, String parentJobNamespace) { + return ParentRunFacet.builder() + ._producer(URI.create(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI)) + ._schemaURL(URI.create(OpenLineageClient.OPEN_LINEAGE_PARENT_FACET_URI)) + .run(RunLink.builder().runId(runId).build()) + .job(JobLink.builder().name(parentJob).namespace(parentJobNamespace).build()) + .build(); + } + + public static OutputStatisticsFacet getOutputStats(Map metrics) { + long rowCount = + metrics + .getOrElse( + "numOutputRows", + new AbstractFunction0() { + @Override + public SQLMetric apply() { + return new SQLMetric("sum", 0L); + } + }) + .value(); + long outputBytes = + metrics + .getOrElse( + "numOutputBytes", + new AbstractFunction0() { + @Override + public SQLMetric apply() { + return new SQLMetric("sum", 0L); + } + }) + .value(); + return new OutputStatisticsFacet(rowCount, outputBytes); + } +} diff --git a/src/test/java/marquez/spark/agent/lifecycle/LibraryTest.java b/src/test/java/marquez/spark/agent/lifecycle/LibraryTest.java index 7ab2c9215549d..fc1d6bf9871bb 100644 --- a/src/test/java/marquez/spark/agent/lifecycle/LibraryTest.java +++ b/src/test/java/marquez/spark/agent/lifecycle/LibraryTest.java @@ -9,6 +9,8 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.Resources; import java.io.FileWriter; import java.io.IOException; @@ -17,6 +19,7 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; import marquez.spark.agent.MarquezAgent; import marquez.spark.agent.MarquezContext; @@ -26,6 +29,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession$; @@ -73,8 +77,8 @@ public void testSparkSql() throws IOException, TimeoutException { URL url = Resources.getResource("data.txt"); final Dataset data = spark.read().textFile(url.getPath()); - final long numAs = data.filter(s -> s.contains("a")).count(); - final long numBs = data.filter(s -> s.contains("b")).count(); + final long numAs = data.filter((FilterFunction) s -> s.contains("a")).count(); + final long numBs = data.filter((FilterFunction) s -> s.contains("b")).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); spark.sparkContext().listenerBus().waitUntilEmpty(1000); @@ -88,27 +92,64 @@ public void testSparkSql() throws IOException, TimeoutException { assertEquals(4, events.size()); + ObjectMapper objectMapper = OpenLineageClient.getObjectMapper(); for (int i = 0; i < events.size(); i++) { LineageEvent event = events.get(i); - String snapshot = - new String( - Files.readAllBytes( - Paths.get(String.format("integrations/%s/%d.json", "sparksql", i + 1)))); + Map snapshot = + objectMapper.readValue( + Paths.get(String.format("integrations/%s/%d.json", "sparksql", i + 1)).toFile(), + new TypeReference>() {}); assertEquals( snapshot, - OpenLineageClient.getObjectMapper() - .writerWithDefaultPrettyPrinter() - .writeValueAsString(event)); + cleanSerializedMap( + objectMapper.readValue( + objectMapper.writeValueAsString(event), + new TypeReference>() {}))); } verifySerialization(events); } + private Map cleanSerializedMap(Map map) { + // exprId and jvmId are not deterministic, so remove them from the maps to avoid failing + map.remove("exprId"); + map.remove("resultId"); + + // timezone is different in CI than local + map.remove("timeZoneId"); + if (map.containsKey("namespace") && map.get("namespace").equals("file")) { + map.put("name", "/path/to/data"); + } + if (map.containsKey("uri") && ((String) map.get("uri")).startsWith("file:/")) { + map.put("uri", "file:/path/to/data"); + } + map.forEach( + (k, v) -> { + if (v instanceof Map) { + cleanSerializedMap((Map) v); + } else if (v instanceof List) { + cleanSerializedList((List) v); + } + }); + return map; + } + + private void cleanSerializedList(List l) { + l.forEach( + i -> { + if (i instanceof Map) { + cleanSerializedMap((Map) i); + } else if (i instanceof List) { + cleanSerializedList((List) i); + } + }); + } + @Test public void testRdd() throws IOException { reset(marquezContext); when(marquezContext.getJobNamespace()).thenReturn("ns_name"); when(marquezContext.getJobName()).thenReturn("job_name"); - when(marquezContext.getParentRunId()).thenReturn("ea445b5c-22eb-457a-8007-01c7c52b6e54"); + when(marquezContext.getParentRunId()).thenReturn("8d99e33e-2a1c-4254-9600-18f23435fc3b"); URL url = Resources.getResource("data.txt"); SparkConf conf = new SparkConf().setAppName("Word Count").setMaster("local[*]"); diff --git a/src/test/java/marquez/spark/agent/lifecycle/StaticExecutionContextFactory.java b/src/test/java/marquez/spark/agent/lifecycle/StaticExecutionContextFactory.java index 012bd3df2b725..1739f521ee090 100644 --- a/src/test/java/marquez/spark/agent/lifecycle/StaticExecutionContextFactory.java +++ b/src/test/java/marquez/spark/agent/lifecycle/StaticExecutionContextFactory.java @@ -4,6 +4,10 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import marquez.spark.agent.MarquezContext; +import marquez.spark.agent.lifecycle.plan.InputDatasetVisitors; +import marquez.spark.agent.lifecycle.plan.OutputDatasetVisitors; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.execution.SQLExecution; /** Returns deterministic fields for contexts */ public class StaticExecutionContextFactory extends ContextFactory { @@ -31,12 +35,13 @@ protected URI getDatasetUri(URI pathUri) { @Override public SparkSQLExecutionContext createSparkSQLExecutionContext(long executionId) { + SQLContext sqlContext = SQLExecution.getQueryExecution(executionId).sparkPlan().sqlContext(); + InputDatasetVisitors inputDatasetVisitors = new InputDatasetVisitors(sqlContext); + OutputDatasetVisitors outputDatasetVisitors = + new OutputDatasetVisitors(sqlContext, inputDatasetVisitors); SparkSQLExecutionContext sparksql = new SparkSQLExecutionContext( - executionId, - marquezContext, - new StaticLogicalPlanTraverser(), - new StaticDatasetPlanTraverser()) { + executionId, marquezContext, outputDatasetVisitors.get(), inputDatasetVisitors.get()) { @Override public ZonedDateTime toZonedTime(long time) { return getZonedTime(); @@ -48,18 +53,4 @@ public ZonedDateTime toZonedTime(long time) { private static ZonedDateTime getZonedTime() { return ZonedDateTime.of(2021, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); } - - class StaticLogicalPlanTraverser extends LogicalPlanFacetTraverser { - @Override - protected Object visitPathUri(URI uri) { - return "data.txt"; - } - } - - class StaticDatasetPlanTraverser extends DatasetLogicalPlanTraverser { - @Override - protected URI visitPathUri(URI uri) { - return URI.create("gs://bucket/data.txt"); - } - } }