Skip to content

Commit

Permalink
python client for column lineage API
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Oct 24, 2022
1 parent 74f062f commit 2c8c161
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Column lineage within Marquez Java client [`#2163`](https://github.com/MarquezProject/marquez/pull/2163) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Endpoint to get column lineage by a job [`#2204`](https://github.com/MarquezProject/marquez/pull/2204) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Column lineage within Marquez Python client [`#`](https://github.com/MarquezProject/marquez/pull/) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)


### Fixed
Expand Down
25 changes: 25 additions & 0 deletions clients/python/marquez_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
DEFAULT_TIMEOUT_MS,
DEFAULT_LIMIT,
DEFAULT_OFFSET,
DEFAULT_DEPTH,
DEFAULT_WITH_DOWNSTREAM,
API_PATH_V1
)
from marquez_client.models import (
Expand Down Expand Up @@ -341,6 +343,29 @@ def get_job_run(self, run_id):
Utils.is_valid_uuid(run_id, 'run_id')
return self._get(self._url('/jobs/runs/{0}', run_id))

def get_column_lineage_by_dataset(self, namespace, dataset, depth = None, with_downstream = None):
node_id = "dataset:{0}:{1}".format(namespace, dataset)
return self._get_column_lineage(node_id, depth
, with_downstream)

def get_column_lineage_by_dataset_field(self, namespace, dataset, field, depth = None, with_downstream = None):
node_id = "dataset_field:{0}:{1}:{2}".format(namespace, dataset, field)
return self._get_column_lineage(node_id, depth, with_downstream)

def get_column_lineage_by_job(self, namespace, job, depth = None, with_downstream = None):
node_id = "job:{0}:{1}".format(namespace, job)
return self._get_column_lineage(node_id, depth, with_downstream)

def _get_column_lineage(self, node_id, depth, with_downstream):
return self._get(
self._url('/column-lineage'),
params={
'nodeId': node_id,
'depth': depth or DEFAULT_DEPTH,
'withDownstream': with_downstream or DEFAULT_WITH_DOWNSTREAM
}
)

@deprecated(deprecated_in='0.20.0', removed_in='0.25.0',
details='Use OpenLineage instead, see `https://openlineage.io`')
def mark_job_run_as_started(self, run_id, at=None):
Expand Down
2 changes: 2 additions & 0 deletions clients/python/marquez_client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
DEFAULT_MARQUEZ_URL = 'http://localhost:8080'
DEFAULT_LIMIT = 100
DEFAULT_OFFSET = 0
DEFAULT_DEPTH = 20
DEFAULT_WITH_DOWNSTREAM = False

API_PATH_V1 = '/api/v1'
118 changes: 117 additions & 1 deletion clients/python/tests/test_marquez_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
from marquez_client import MarquezClient
from marquez_client.constants import (
DEFAULT_LIMIT,
DEFAULT_OFFSET
DEFAULT_OFFSET,
DEFAULT_DEPTH,
DEFAULT_WITH_DOWNSTREAM
)
from marquez_client.models import (
DatasetType,
Expand Down Expand Up @@ -237,6 +239,37 @@
'producer': PRODUCER
}

COLUMN_LINEAGE = [
{
'id': 'datasetField:namespace:commonDataset:columnA',
'type': 'DATASET_FIELD',
'data': {
'type': 'DATASET_FIELD',
'namespace': 'namespace',
'dataset': 'otherDataset',
'field': 'columnA',
'fieldType': 'integer',
'transformationDescription': 'identical',
'transformationType': 'IDENTITY',
'inputFields': [
{ 'namespace': 'namespace', 'dataset': 'otherDataset', 'field': 'columnB'}
]
},
'inEdges': [
{
'origin': 'datasetField:namespace:otherDataset:columnB',
'destination': 'datasetField:namespace:commonDataset:columnA'
}
],
'outEdges': [
{
'origin': 'datasetField:namespace:commonDataset:columnA',
'destination': 'datasetField:namespace:otherDataset:columnC'
}
]
}
]


@pytest.fixture
def client():
Expand Down Expand Up @@ -938,3 +971,86 @@ def test_list_tags(mock_get, client):
},
timeout=mock.ANY
)


@mock.patch('requests.get')
def test_get_column_lineage_by_dataset(mock_get, client):
mock_get.return_value.status_code.return_value = HTTPStatus.OK
mock_get.return_value.json.return_value = COLUMN_LINEAGE

column_lineage = client.get_column_lineage_by_dataset(
"namespace_a",
"dataset_a",
DEFAULT_DEPTH,
DEFAULT_WITH_DOWNSTREAM
)

assert column_lineage == COLUMN_LINEAGE

mock_get.assert_called_once_with(
url=client._url(
'/column-lineage'
),
headers=mock.ANY,
params={
'nodeId': 'dataset:namespace_a:dataset_a',
'depth': DEFAULT_DEPTH,
'withDownstream': DEFAULT_WITH_DOWNSTREAM
},
timeout=mock.ANY
)

@mock.patch('requests.get')
def test_get_column_lineage_by_dataset_field(mock_get, client):
mock_get.return_value.status_code.return_value = HTTPStatus.OK
mock_get.return_value.json.return_value = COLUMN_LINEAGE

column_lineage = client.get_column_lineage_by_dataset_field(
"namespace_a",
"dataset_a",
"field_a",
DEFAULT_DEPTH,
DEFAULT_WITH_DOWNSTREAM
)

assert column_lineage == COLUMN_LINEAGE

mock_get.assert_called_once_with(
url=client._url(
'/column-lineage'
),
headers=mock.ANY,
params={
'nodeId': 'dataset_field:namespace_a:dataset_a:field_a',
'depth': DEFAULT_DEPTH,
'withDownstream': DEFAULT_WITH_DOWNSTREAM
},
timeout=mock.ANY
)

@mock.patch('requests.get')
def test_get_column_lineage_by_job(mock_get, client):
mock_get.return_value.status_code.return_value = HTTPStatus.OK
mock_get.return_value.json.return_value = COLUMN_LINEAGE

column_lineage = client.get_column_lineage_by_job(
"namespace_a",
"job_a",
DEFAULT_DEPTH,
DEFAULT_WITH_DOWNSTREAM
)

assert column_lineage == COLUMN_LINEAGE

mock_get.assert_called_once_with(
url=client._url(
'/column-lineage'
),
headers=mock.ANY,
params={
'nodeId': 'job:namespace_a:job_a',
'depth': DEFAULT_DEPTH,
'withDownstream': DEFAULT_WITH_DOWNSTREAM
},
timeout=mock.ANY
)

0 comments on commit 2c8c161

Please sign in to comment.