From bd9cea9826709090c002ca3806689a6dc7c8dd14 Mon Sep 17 00:00:00 2001 From: Julien Le Dem <julien@apache.org> Date: Thu, 18 Aug 2022 18:41:58 -0700 Subject: [PATCH 1/3] Create column lineage endpoint proposal --- proposals/2045-column-lineage-endpoint.md | 138 ++++++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 proposals/2045-column-lineage-endpoint.md diff --git a/proposals/2045-column-lineage-endpoint.md b/proposals/2045-column-lineage-endpoint.md new file mode 100644 index 0000000000..e1c0c2108f --- /dev/null +++ b/proposals/2045-column-lineage-endpoint.md @@ -0,0 +1,138 @@ +# Proposal: Column lineage endpoint proposal + +Author(s): @julienledem + +Created: 20022-08-18 + +Dicussion: [column lineage endpoint issue #2045](https://github.com/MarquezProject/marquez/issues/2045) + +## Overview + +OpenLineage defines a [column-level lineage facet](https://github.com/OpenLineage/OpenLineage/blob/ff0d87d30ed6c9fe39472788948266a6d3190585/spec/facets/ColumnLineageDatasetFacet.md). +We propose to add a Marquez endpoint leveraging this facet to filter down lineage for given column. + +## Proposal + +### add column lineage to existing endpoint +In the GET /lineage api, add column lineage to DATASET nodes' data +``` + { + "id": "dataset:food_delivery:public.categories", + "type": "DATASET", + "data": { + "type": "DATASET", + "id": { + "namespace": "food_delivery", + "name": "public.categories" + }, + "type": "DB_TABLE", + "name": "public.categories", + "physicalName": "public.categories", + "createdAt": "2021-03-09T02:33:18.468719Z", + "updatedAt": "2022-08-04T05:08:09.190723Z", + "namespace": "food_delivery", + "sourceName": "analytics_db", + "fields": [{ + "name": "id", + "type": "INTEGER", + "tags": [], + "description": "The unique ID of the category." + }, { + "name": "name", + "type": "VARCHAR", + "tags": [], + "description": "The name of the category." + }, { + "name": "menu_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the menu related to the category." + }, { + "name": "description", + "type": "TEXT", + "tags": [], + "description": "The description of the category." + }], +> columnLineage: { +> "a": { +> inputFields: [ +> {namespace: "ns", name: "name", "field": "a"}, +> ... other inputs +> ], +> transformationDescription: "identical", +> transformationType: "IDENTITY" +> }, +> "b": ... other output fields +> } + "tags": [], + "lastModifiedAt": "2022-08-04T05:03:09.190723Z", + "description": null, + "lastlifecycleState": null + }, + "inEdges": [{ + "origin": "job:food_delivery:etl_orders.etl_categories", + "destination": "dataset:food_delivery:public.categories" + }], + "outEdges": [{ + "origin": "dataset:food_delivery:public.categories", + "destination": "job:food_delivery:etl_orders.etl_orders_7_days" + }] + } +``` + +### add a column-level-lineage endpoint: + +``` +GET /column-lineage?nodeId=dataset:food_delivery:public.delivery_7_days&column=a +``` +that would be layered on the existing lineage endpoint but filtered down to the datasets that contribute to that column. +It also only returns dataset nodes + +``` +{ + graph: [ + { + "id": "dataset:db1:table2", + "type": "DATASET", + data: { + namespace: "DB1", + name: "table2", + columnLineage: { + "a": { + inputFields: [ + {namespace: "DB1", name: "table1, "field": "a"} + ], + transformationDescription: "identical", + transformationType: "IDENTITY" + }, + "b": ... other output fields + } + }, + ... + } + ] +``` + +### Point in time upstream lineage +return historical upstream lineage from a given Dataset version. +``` +GET /lineage?nodeId=dataset:food_delivery:public.delivery_7_days:{version} +GET /column-lineage?nodeId=dataset:food_delivery:public.delivery_7_days:{version}&column=a +``` +This returns only upstream lineage in this current proposal. +The upstream lineage is well defined to a specific version while downstream lineage is not +The data payload would also add a version field. + +## Implementation + +### columne lineage facet in lineage +Adding the columnLineage facet requires a formatting of existing facet data. +### column lineage endpoint +The `/column-lineage` endpoint leverages the `/lineage` endpoint and then filters down the payload to return the expected result. +### point-in-time upstream lineage +The point-in-time upstream lineage leverages the run to dataset version relation to track back the lineage of a given dataset of job version. +Dataset version -> run that produced it -> consumed Dataset Versions. + +## Next Steps + +Review of this proposal and production of detailed design for the implementation, in particular for the point in time lineage which might affect the dabtabase schema. From 258f38c42dc460cd9c86b72396a8827ccdc458a9 Mon Sep 17 00:00:00 2001 From: Julien Le Dem <julien@apache.org> Date: Fri, 19 Aug 2022 11:31:14 -0700 Subject: [PATCH 2/3] add more details Signed-off-by: Julien Le Dem <julien@apache.org> --- proposals/2045-column-lineage-endpoint.md | 167 +++++++++++----------- 1 file changed, 86 insertions(+), 81 deletions(-) diff --git a/proposals/2045-column-lineage-endpoint.md b/proposals/2045-column-lineage-endpoint.md index e1c0c2108f..87719a3169 100644 --- a/proposals/2045-column-lineage-endpoint.md +++ b/proposals/2045-column-lineage-endpoint.md @@ -8,76 +8,62 @@ Dicussion: [column lineage endpoint issue #2045](https://github.com/MarquezProje ## Overview -OpenLineage defines a [column-level lineage facet](https://github.com/OpenLineage/OpenLineage/blob/ff0d87d30ed6c9fe39472788948266a6d3190585/spec/facets/ColumnLineageDatasetFacet.md). -We propose to add a Marquez endpoint leveraging this facet to filter down lineage for given column. +### Use cases + - Find the current upstream dependencies of a column. A column in a dataset is derived from columns in upstream datasets. + - See column-level lineage in the dataset level lineage when available. + - Retrieve point-in-time upstream lineage for a dataset or a column. What did the lineage look like yesterday compared to today? + +### Existing elements + +- OpenLineage defines a [column-level lineage facet]- (https://github.com/OpenLineage/OpenLineage/blob/ff0d87d30ed6c9fe39472788948266a6d3190585/spec/facets/ColumnLineageDatasetFacet.md). +- Marquez has a lineage endpoint `GET /api/v1/lineage` that returns the current lineage graph connected to a job or a dataset + +### New Elements +We propose to add the following: +- Add column lineage to the lineage endpoint +- A new column-lineage endpoint leveraging the column lineage facet to retrieve lineage for a given column. +- Point-in-time upstream (dataset or column level) lineage given a version of a dataset. ## Proposal ### add column lineage to existing endpoint In the GET /lineage api, add column lineage to DATASET nodes' data -``` - { - "id": "dataset:food_delivery:public.categories", - "type": "DATASET", - "data": { - "type": "DATASET", - "id": { - "namespace": "food_delivery", - "name": "public.categories" - }, - "type": "DB_TABLE", - "name": "public.categories", - "physicalName": "public.categories", - "createdAt": "2021-03-09T02:33:18.468719Z", - "updatedAt": "2022-08-04T05:08:09.190723Z", - "namespace": "food_delivery", - "sourceName": "analytics_db", - "fields": [{ - "name": "id", - "type": "INTEGER", - "tags": [], - "description": "The unique ID of the category." - }, { - "name": "name", - "type": "VARCHAR", - "tags": [], - "description": "The name of the category." - }, { - "name": "menu_id", - "type": "INTEGER", - "tags": [], - "description": "The ID of the menu related to the category." - }, { - "name": "description", - "type": "TEXT", - "tags": [], - "description": "The description of the category." - }], -> columnLineage: { -> "a": { -> inputFields: [ -> {namespace: "ns", name: "name", "field": "a"}, -> ... other inputs -> ], -> transformationDescription: "identical", -> transformationType: "IDENTITY" -> }, -> "b": ... other output fields -> } - "tags": [], - "lastModifiedAt": "2022-08-04T05:03:09.190723Z", - "description": null, - "lastlifecycleState": null - }, - "inEdges": [{ - "origin": "job:food_delivery:etl_orders.etl_categories", - "destination": "dataset:food_delivery:public.categories" - }], - "outEdges": [{ - "origin": "dataset:food_delivery:public.categories", - "destination": "job:food_delivery:etl_orders.etl_orders_7_days" - }] - } +```diff +{ + "id": "dataset:food_delivery:public.categories", + "type": "DATASET", + "data": { + "type": "DATASET", + "id": { + "namespace": "food_delivery", + "name": "public.categories" + }, + "type": "DB_TABLE", + ... + "fields": [{ + ... + }], +> columnLineage: { +> "a": { +> inputFields: [ +> {namespace: "ns", name: "name", "field": "a"}, +> ... other inputs +> ], +> transformationDescription: "identical", +> transformationType: "IDENTITY" +> }, +> "b": ... other output fields +> } + }, + "inEdges": [{ + "origin": "job:food_delivery:etl_orders.etl_categories", + "destination": "dataset:food_delivery:public.categories" + }], + "outEdges": [{ + "origin": "dataset:food_delivery:public.categories", + "destination": "job:food_delivery:etl_orders.etl_orders_7_days" + }] +} ``` ### add a column-level-lineage endpoint: @@ -85,10 +71,12 @@ In the GET /lineage api, add column lineage to DATASET nodes' data ``` GET /column-lineage?nodeId=dataset:food_delivery:public.delivery_7_days&column=a ``` -that would be layered on the existing lineage endpoint but filtered down to the datasets that contribute to that column. -It also only returns dataset nodes +`column` is a ne parameter that must be a column in the schema of the provided dataset `nodeId`. -``` +The logic is layered on the existing lineage endpoint, filtering down to the datasets that contribute to that column. +It only returns dataset nodes. + +```diff { graph: [ { @@ -97,31 +85,48 @@ It also only returns dataset nodes data: { namespace: "DB1", name: "table2", - columnLineage: { - "a": { - inputFields: [ - {namespace: "DB1", name: "table1, "field": "a"} - ], - transformationDescription: "identical", - transformationType: "IDENTITY" - }, - "b": ... other output fields - } +> columnLineage: { +> "a": { +> inputFields: [ +> {namespace: "DB1", name: "table1, "field": "a"} +> ], +> transformationDescription: "identical", +> transformationType: "IDENTITY" +> }, +> "b": ... other output fields +> } }, ... } ] +} ``` ### Point in time upstream lineage return historical upstream lineage from a given Dataset version. +This adds the version element to the nodeId in both the existing `/api/v1/lineage` and newly proposed `/api/v1/column-lineage` endpoint ``` GET /lineage?nodeId=dataset:food_delivery:public.delivery_7_days:{version} GET /column-lineage?nodeId=dataset:food_delivery:public.delivery_7_days:{version}&column=a ``` -This returns only upstream lineage in this current proposal. -The upstream lineage is well defined to a specific version while downstream lineage is not -The data payload would also add a version field. +This returns only upstream lineage in this current proposal. This is because upstream lineage is well defined to a specific version while downstream lineage is not. The data payload would add a version field. +```diff +{ + graph: [ + { +< "id": "dataset:db1:table2", +> "id": "dataset:db1:table2#{VERSION UUID}", + "type": "DATASET", + data: { + namespace: "DB1", + name: "table2", +> version: "{VERSION UUID}" + ... + } + } + ] +} +``` ## Implementation From c4fdaeac7e588480d996c8080d1f68f44cfe9b39 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski <leszczynski.pawel@gmail.com> Date: Tue, 30 Aug 2022 10:31:25 +0200 Subject: [PATCH 3/3] extend existing proposal with comment suggestion Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> --- proposals/2045-column-lineage-endpoint.md | 177 +++++++++++----------- 1 file changed, 92 insertions(+), 85 deletions(-) diff --git a/proposals/2045-column-lineage-endpoint.md b/proposals/2045-column-lineage-endpoint.md index 87719a3169..13559b348f 100644 --- a/proposals/2045-column-lineage-endpoint.md +++ b/proposals/2045-column-lineage-endpoint.md @@ -1,6 +1,6 @@ # Proposal: Column lineage endpoint proposal -Author(s): @julienledem +Author(s): @julienledem, @pawel-big-lebowski Created: 20022-08-18 @@ -15,129 +15,136 @@ Dicussion: [column lineage endpoint issue #2045](https://github.com/MarquezProje ### Existing elements -- OpenLineage defines a [column-level lineage facet]- (https://github.com/OpenLineage/OpenLineage/blob/ff0d87d30ed6c9fe39472788948266a6d3190585/spec/facets/ColumnLineageDatasetFacet.md). +- OpenLineage defines a [column-level lineage facet]- (https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/ColumnLineageDatasetFacet.json). - Marquez has a lineage endpoint `GET /api/v1/lineage` that returns the current lineage graph connected to a job or a dataset +### Column lineage characteristics and general assumptions + +Column level lineage is a different lineage graph due to a different node granularity - kind of zoomed-in view of existing lineage. Instead of datasets and jobs being lineage graph nodes, each dataset field becomes a node. Additionally, there are edges between dataset fields, instead of datasets itself. Thus, enriching existing lineage with column lineage information would not be sufficient. That’s why we propose another API endpoint with column lineage graph. + +Upstream and downstream edges do have different characteristics. An output dataset is always produced by a single version of input dataset (one upstream), while a single input datset version can have multiple output dataset versions. Lineage graph can be then easily flooded by downstream subgraph which blurs the overall view. That's why we consider an upstream column lineage as a default one. Downstream lineage will be returned only when requested explicitly. + ### New Elements -We propose to add the following: -- Add column lineage to the lineage endpoint -- A new column-lineage endpoint leveraging the column lineage facet to retrieve lineage for a given column. -- Point-in-time upstream (dataset or column level) lineage given a version of a dataset. + +We propose the following changes: + + - Add column lineage to the dataset resource endpoint. Column lineage will NOT be added to existing `/lineage` endpoint as it may be a heavy database operation run on each lineage graph's node which we want to avoid. Based on that, column level lineage get be requested per dataset in separate requests when required. + - A new column-lineage endpoint leveraging the column lineage facet to retrieve lineage for a given column. + - Point-in-time upstream (dataset or column level) lineage given a version of a dataset. ## Proposal -### add column lineage to existing endpoint -In the GET /lineage api, add column lineage to DATASET nodes' data -```diff -{ - "id": "dataset:food_delivery:public.categories", - "type": "DATASET", - "data": { - "type": "DATASET", - "id": { - "namespace": "food_delivery", - "name": "public.categories" - }, - "type": "DB_TABLE", - ... - "fields": [{ - ... - }], -> columnLineage: { -> "a": { -> inputFields: [ -> {namespace: "ns", name: "name", "field": "a"}, -> ... other inputs -> ], -> transformationDescription: "identical", -> transformationType: "IDENTITY" -> }, -> "b": ... other output fields -> } - }, - "inEdges": [{ - "origin": "job:food_delivery:etl_orders.etl_categories", - "destination": "dataset:food_delivery:public.categories" - }], - "outEdges": [{ - "origin": "dataset:food_delivery:public.categories", - "destination": "job:food_delivery:etl_orders.etl_orders_7_days" - }] -} -``` +### Add column lineage to existing datasets endpoint + +In the `GET /api/v1/namespaces/{namespace}/datasets` api, add column lineage facet to returned dataset resource. -### add a column-level-lineage endpoint: +### Add a column-level-lineage endpoint: +New endpoints to retrieve a column lineage of a single field or a whole dataset will be added: +``` +GET /column-lineage?nodeId=dataset:{namespace}:{dataset} +GET /column-lineage?nodeId=datasetField:{namespace}:{dataset}:{field} +``` +For example: ``` -GET /column-lineage?nodeId=dataset:food_delivery:public.delivery_7_days&column=a +GET /column-lineage?nodeId=dataset:food_delivery:public.delivery_7_days +GET /column-lineage?nodeId=datasetField:food_delivery:public.delivery_7_days:a ``` -`column` is a ne parameter that must be a column in the schema of the provided dataset `nodeId`. -The logic is layered on the existing lineage endpoint, filtering down to the datasets that contribute to that column. -It only returns dataset nodes. +Although creating a new endpoint, we would like to reuse existing data structures with a new `NodeType.FIELD` introduced. -```diff +The logic returns dataset field node: + +``` +GET /column-lineage?nodeId=datasetField:db1:table1:a +... { graph: [ { - "id": "dataset:db1:table2", - "type": "DATASET", - data: { - namespace: "DB1", - name: "table2", -> columnLineage: { -> "a": { -> inputFields: [ -> {namespace: "DB1", name: "table1, "field": "a"} -> ], -> transformationDescription: "identical", -> transformationType: "IDENTITY" -> }, -> "b": ... other output fields -> } + "id": "datasetField:db1:table1:a", + "type": "DATASET_FIELD", + "data": { + "namespace": "DB1", + "name": "table2", + "field": "a", + "type": "integer", + "transformationDescription": "identical", + "transformationType": "IDENTITY", + "inputFields": [ + { "namespace": "DBA", "name": "tableA", "field": "columnA"}, + { "namespace": "DBB", "name": "tableB", "field": "columnB"}, + { "namespace": "DBC", "name": "tableC", "field": "columnC"} + ] + "inEdges": [ + { + "origin": "datasetField:db1:table1:a", + "destination": "datasetField:DBA:tableA:columnA" + }, + { + "origin": "datasetField:db1:table1:a", + "destination": "datasetField:DBB:tableB:columnB" + }, + { + "origin": "datasetField:db1:table1:a", + "destination": "datasetField:DBB:tableB:columnC" + } + ], }, ... + # Input fields, present within "inEdges", can be also returned within a graph due to a `depth` parameter greate than 0. } ] } ``` +The `depth` parameter controls how many edges, from a given dataset field, shall be returned. The default is set to `0`. In case of default equal `1`, each `inputField` will be returned as a separate node within a response graph with `inputFields` used to produce it. Please note that extending depth may increase the graph size and affect request performance. + +The endpoints above fetches upstream column-lineage for given dataset field or all fields within a dataset. Downstream column lineage is turned off by default. However, this can be turned on with an extra `withDownstream` parameter like: + +``` +GET /column-lineage?nodeId=datasetField:food_delivery:public.delivery_7_days:a&withDownstream=true + +``` +This will include `outEdges` within the returned node of the graph. + + ### Point in time upstream lineage -return historical upstream lineage from a given Dataset version. -This adds the version element to the nodeId in both the existing `/api/v1/lineage` and newly proposed `/api/v1/column-lineage` endpoint + +Point in time lineage for newly proposed `/api/v1/column-lineage` endpoint: ``` -GET /lineage?nodeId=dataset:food_delivery:public.delivery_7_days:{version} -GET /column-lineage?nodeId=dataset:food_delivery:public.delivery_7_days:{version}&column=a +GET /column-lineage?nodeId=dataset_field:food_delivery:public.delivery_7_days:a&datasetVersion=123e4567-e89b-12d3-a456-426614174000 +GET /column-lineage?nodeId=dataset_field:food_delivery:public.delivery_7_days:a&lineageAt=1661846242 ``` -This returns only upstream lineage in this current proposal. This is because upstream lineage is well defined to a specific version while downstream lineage is not. The data payload would add a version field. + +Point in time can be controlled by: + * **datasetVersion** - uuid of a specific dataset version, + * **lineageAt** - which contains a unix timestamp. + +When **lineageAt** specified, the latest dataset version before timestamp will be found. Regardles **datasetVersion** or **lineageAt** parameters applied, responses will be the same as below: + ```diff { graph: [ { -< "id": "dataset:db1:table2", -> "id": "dataset:db1:table2#{VERSION UUID}", - "type": "DATASET", - data: { - namespace: "DB1", - name: "table2", -> version: "{VERSION UUID}" - ... - } - } - ] +< "id": "datasetField:db1:table1:a", +> "id": "datasetField:db1:table1:a#{VERSION UUID}", + "type": "DATASET_FIELD", + "data": { + .... } ``` ## Implementation -### columne lineage facet in lineage -Adding the columnLineage facet requires a formatting of existing facet data. +### columne lineage facet in dataset resource endpoint +Adding the columnLineage facet requires a formatting of existing facet data (work in progress). ### column lineage endpoint The `/column-lineage` endpoint leverages the `/lineage` endpoint and then filters down the payload to return the expected result. ### point-in-time upstream lineage + The point-in-time upstream lineage leverages the run to dataset version relation to track back the lineage of a given dataset of job version. Dataset version -> run that produced it -> consumed Dataset Versions. ## Next Steps -Review of this proposal and production of detailed design for the implementation, in particular for the point in time lineage which might affect the dabtabase schema. +Review of this proposal and production of detailed design for the implementation.: