From 3dc8b6c78d1cc013e879f02841e0c431392361ee Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Mon, 24 Oct 2022 10:33:23 +0200 Subject: [PATCH] python client for column lineage API Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 1 + clients/python/marquez_client/client.py | 31 +++++ clients/python/marquez_client/constants.py | 2 + clients/python/tests/test_marquez_client.py | 120 +++++++++++++++++++- 4 files changed, 153 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78830c1711..2ca54eb0cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Column lineage within Marquez Java client [`#2163`](https://github.com/MarquezProject/marquez/pull/2163) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Endpoint to get column lineage by a job [`#2204`](https://github.com/MarquezProject/marquez/pull/2204) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) +* Python client for column lineage [`#2209`](https://github.com/MarquezProject/marquez/pull/2209) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) ### Fixed diff --git a/clients/python/marquez_client/client.py b/clients/python/marquez_client/client.py index a5ee3077e1..b5674755f7 100644 --- a/clients/python/marquez_client/client.py +++ b/clients/python/marquez_client/client.py @@ -17,6 +17,8 @@ DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT, DEFAULT_OFFSET, + DEFAULT_DEPTH, + DEFAULT_WITH_DOWNSTREAM, API_PATH_V1 ) from marquez_client.models import ( @@ -341,6 +343,35 @@ def get_job_run(self, run_id): Utils.is_valid_uuid(run_id, 'run_id') return self._get(self._url('/jobs/runs/{0}', run_id)) + def get_column_lineage_by_dataset(self, namespace, dataset, depth=None, with_downstream=None): + node_id = "dataset:{0}:{1}".format(namespace, dataset) + return self._get_column_lineage(node_id, depth, with_downstream) + + def get_column_lineage_by_dataset_field( + self, + namespace, + dataset, + field, + depth=None, + with_downstream=None + ): + node_id = "datasetField:{0}:{1}:{2}".format(namespace, dataset, field) + return self._get_column_lineage(node_id, depth, with_downstream) + + def get_column_lineage_by_job(self, namespace, job, depth=None, with_downstream=None): + node_id = "job:{0}:{1}".format(namespace, job) + return self._get_column_lineage(node_id, depth, with_downstream) + + def _get_column_lineage(self, node_id, depth, with_downstream): + return self._get( + self._url('/column-lineage'), + params={ + 'nodeId': node_id, + 'depth': depth or DEFAULT_DEPTH, + 'withDownstream': with_downstream or DEFAULT_WITH_DOWNSTREAM + } + ) + @deprecated(deprecated_in='0.20.0', removed_in='0.25.0', details='Use OpenLineage instead, see `https://openlineage.io`') def mark_job_run_as_started(self, run_id, at=None): diff --git a/clients/python/marquez_client/constants.py b/clients/python/marquez_client/constants.py index 71423c00a2..9698a52dc7 100644 --- a/clients/python/marquez_client/constants.py +++ b/clients/python/marquez_client/constants.py @@ -6,5 +6,7 @@ DEFAULT_MARQUEZ_URL = 'http://localhost:8080' DEFAULT_LIMIT = 100 DEFAULT_OFFSET = 0 +DEFAULT_DEPTH = 20 +DEFAULT_WITH_DOWNSTREAM = False API_PATH_V1 = '/api/v1' diff --git a/clients/python/tests/test_marquez_client.py b/clients/python/tests/test_marquez_client.py index 31c89300ad..3fe2b55fd9 100644 --- a/clients/python/tests/test_marquez_client.py +++ b/clients/python/tests/test_marquez_client.py @@ -11,7 +11,9 @@ from marquez_client import MarquezClient from marquez_client.constants import ( DEFAULT_LIMIT, - DEFAULT_OFFSET + DEFAULT_OFFSET, + DEFAULT_DEPTH, + DEFAULT_WITH_DOWNSTREAM ) from marquez_client.models import ( DatasetType, @@ -237,6 +239,37 @@ 'producer': PRODUCER } +COLUMN_LINEAGE = [ + { + 'id': 'datasetField:namespace:commonDataset:columnA', + 'type': 'DATASET_FIELD', + 'data': { + 'type': 'DATASET_FIELD', + 'namespace': 'namespace', + 'dataset': 'otherDataset', + 'field': 'columnA', + 'fieldType': 'integer', + 'transformationDescription': 'identical', + 'transformationType': 'IDENTITY', + 'inputFields': [ + {'namespace': 'namespace', 'dataset': 'otherDataset', 'field': 'columnB'} + ] + }, + 'inEdges': [ + { + 'origin': 'datasetField:namespace:otherDataset:columnB', + 'destination': 'datasetField:namespace:commonDataset:columnA' + } + ], + 'outEdges': [ + { + 'origin': 'datasetField:namespace:commonDataset:columnA', + 'destination': 'datasetField:namespace:otherDataset:columnC' + } + ] + } +] + @pytest.fixture def client(): @@ -938,3 +971,88 @@ def test_list_tags(mock_get, client): }, timeout=mock.ANY ) + + +@mock.patch('requests.get') +def test_get_column_lineage_by_dataset(mock_get, client): + mock_get.return_value.status_code.return_value = HTTPStatus.OK + mock_get.return_value.json.return_value = COLUMN_LINEAGE + + column_lineage = client.get_column_lineage_by_dataset( + "namespace_a", + "dataset_a", + DEFAULT_DEPTH, + DEFAULT_WITH_DOWNSTREAM + ) + + assert column_lineage == COLUMN_LINEAGE + + mock_get.assert_called_once_with( + url=client._url( + '/column-lineage' + ), + headers=mock.ANY, + params={ + 'nodeId': 'dataset:namespace_a:dataset_a', + 'depth': DEFAULT_DEPTH, + 'withDownstream': DEFAULT_WITH_DOWNSTREAM + }, + timeout=mock.ANY + ) + + +@mock.patch('requests.get') +def test_get_column_lineage_by_dataset_field(mock_get, client): + mock_get.return_value.status_code.return_value = HTTPStatus.OK + mock_get.return_value.json.return_value = COLUMN_LINEAGE + + column_lineage = client.get_column_lineage_by_dataset_field( + "namespace_a", + "dataset_a", + "field_a", + DEFAULT_DEPTH, + DEFAULT_WITH_DOWNSTREAM + ) + + assert column_lineage == COLUMN_LINEAGE + + mock_get.assert_called_once_with( + url=client._url( + '/column-lineage' + ), + headers=mock.ANY, + params={ + 'nodeId': 'datasetField:namespace_a:dataset_a:field_a', + 'depth': DEFAULT_DEPTH, + 'withDownstream': DEFAULT_WITH_DOWNSTREAM + }, + timeout=mock.ANY + ) + + +@mock.patch('requests.get') +def test_get_column_lineage_by_job(mock_get, client): + mock_get.return_value.status_code.return_value = HTTPStatus.OK + mock_get.return_value.json.return_value = COLUMN_LINEAGE + + column_lineage = client.get_column_lineage_by_job( + "namespace_a", + "job_a", + DEFAULT_DEPTH, + DEFAULT_WITH_DOWNSTREAM + ) + + assert column_lineage == COLUMN_LINEAGE + + mock_get.assert_called_once_with( + url=client._url( + '/column-lineage' + ), + headers=mock.ANY, + params={ + 'nodeId': 'job:namespace_a:job_a', + 'depth': DEFAULT_DEPTH, + 'withDownstream': DEFAULT_WITH_DOWNSTREAM + }, + timeout=mock.ANY + )