Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get column lineage by job #2204

Merged
merged 2 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* column lineage within Marquez Java client [`#2163`](https://github.com/MarquezProject/marquez/pull/2163) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Column lineage within Marquez Java client [`#2163`](https://github.com/MarquezProject/marquez/pull/2163) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Endpoint to get column lineage by a job [`#2204`](https://github.com/MarquezProject/marquez/pull/2204) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)


### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
Expand Down
16 changes: 16 additions & 0 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,22 @@ WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symli
""")
List<UUID> findDatasetFieldsUuids(String namespaceName, String datasetName);

@SqlQuery(
"""
WITH latest_run AS (
SELECT DISTINCT r.uuid as uuid, r.created_at
FROM runs_view r
WHERE r.namespace_name = :namespaceName AND r.job_name = :jobName
ORDER BY r.created_at DESC
LIMIT 1
)
SELECT dataset_fields.uuid
FROM dataset_fields
JOIN dataset_versions ON dataset_versions.dataset_uuid = dataset_fields.dataset_uuid
JOIN latest_run ON dataset_versions.run_uuid = latest_run.uuid
""")
List<UUID> findFieldsUuidsByJob(String namespaceName, String jobName);

@SqlQuery(
"""
SELECT df.uuid
Expand Down
6 changes: 6 additions & 0 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import lombok.extern.slf4j.Slf4j;
import marquez.common.models.DatasetFieldId;
import marquez.common.models.DatasetId;
import marquez.common.models.JobId;
import marquez.db.ColumnLineageDao;
import marquez.db.DatasetFieldDao;
import marquez.db.models.ColumnLineageNodeData;
Expand Down Expand Up @@ -124,6 +125,11 @@ List<UUID> getColumnNodeUuids(NodeId nodeId) {
datasetFieldId.getDatasetId().getName().getValue(),
datasetFieldId.getFieldName().getValue())
.ifPresent(uuid -> columnNodeUuids.add(uuid));
} else if (nodeId.isJobType()) {
JobId jobId = nodeId.asJobId();
columnNodeUuids.addAll(
datasetFieldDao.findFieldsUuidsByJob(
jobId.getNamespace().getValue(), jobId.getName().getValue()));
}
return columnNodeUuids;
}
Expand Down
19 changes: 15 additions & 4 deletions api/src/test/java/marquez/ColumnLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void tearDown(Jdbi jdbi) {

@Test
public void testColumnLineageEndpointByDataset() {
MarquezClient.Lineage lineage = client.getColumnLineage("namespace", "dataset_b");
MarquezClient.Lineage lineage = client.getColumnLineageByDataset("namespace", "dataset_b");

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
Expand All @@ -74,7 +74,8 @@ public void testColumnLineageEndpointByDataset() {

@Test
public void testColumnLineageEndpointByDatasetField() {
MarquezClient.Lineage lineage = client.getColumnLineage("namespace", "dataset_b", "col_c");
MarquezClient.Lineage lineage =
client.getColumnLineageByDataset("namespace", "dataset_b", "col_c");

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
Expand All @@ -85,7 +86,7 @@ public void testColumnLineageEndpointByDatasetField() {
@Test
public void testColumnLineageEndpointWithDepthLimit() {
MarquezClient.Lineage lineage =
client.getColumnLineage("namespace", "dataset_c", "col_d", 1, false);
client.getColumnLineageByDatasetField("namespace", "dataset_c", "col_d", 1, false);

assertThat(lineage.getGraph()).hasSize(2);
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
Expand All @@ -95,12 +96,22 @@ public void testColumnLineageEndpointWithDepthLimit() {
@Test
public void testColumnLineageEndpointWithDownstream() {
MarquezClient.Lineage lineage =
client.getColumnLineage("namespace", "dataset_b", "col_c", 10, true);
client.getColumnLineageByDatasetField("namespace", "dataset_b", "col_c", 10, true);

assertThat(lineage.getGraph()).hasSize(4);
assertThat(getNodeByFieldName(lineage, "col_d")).isPresent();
}

@Test
public void testColumnLineageEndpointByJob() {
MarquezClient.Lineage lineage = client.getColumnLineageByJob("namespace", "job1");

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_b")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
}

private Optional<Node> getNodeByFieldName(MarquezClient.Lineage lineage, String field) {
return lineage.getGraph().stream()
.filter(n -> n.getId().asDatasetFieldId().getField().equals(field))
Expand Down
40 changes: 40 additions & 0 deletions api/src/test/java/marquez/service/ColumnLineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import marquez.common.models.DatasetFieldId;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.JobId;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.db.ColumnLineageDao;
import marquez.db.ColumnLineageTestUtils;
Expand Down Expand Up @@ -355,6 +357,44 @@ public void testEnrichDatasetsHasNoDuplicates() {
assertThat(dataset_b.getColumnLineage()).hasSize(1);
}

@Test
public void testGetLineageByJob() {
LineageEvent.Dataset dataset_A = getDatasetA();
LineageEvent.Dataset dataset_B = getDatasetB();
LineageEvent.Dataset dataset_C = getDatasetC();

LineageTestUtils.createLineageRow(
openLineageDao,
"job1",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_A),
Arrays.asList(dataset_B));

LineageTestUtils.createLineageRow(
openLineageDao,
"job2",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_B),
Arrays.asList(dataset_C));

// getting lineage by job_1 should be the same as getting it by dataset_B
assertThat(
lineageService.lineage(
NodeId.of(JobId.of(NamespaceName.of("namespace"), JobName.of("job1"))),
20,
true,
Instant.now()))
.isEqualTo(
lineageService.lineage(
NodeId.of(
new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))),
20,
true,
Instant.now()));
}

private Optional<Node> getNode(Lineage lineage, String datasetName, String fieldName) {
return lineage.getGraph().stream()
.filter(n -> n.getId().asDatasetFieldId().getFieldName().getValue().equals(fieldName))
Expand Down
33 changes: 25 additions & 8 deletions clients/java/src/main/java/marquez/client/MarquezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,33 +115,50 @@ public enum SortDirection {
@Getter public final String value;
}

public Lineage getColumnLineage(@NonNull String namespaceName, @NonNull String datasetName) {
return getColumnLineage(namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
public Lineage getColumnLineageByDataset(
@NonNull String namespaceName, @NonNull String datasetName) {
return getColumnLineageByDataset(
namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineage(
public Lineage getColumnLineageByDataset(
@NonNull String namespaceName, @NonNull String datasetName, @NonNull String field) {
return getColumnLineage(namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
return getColumnLineageByDatasetField(
namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineage(
public Lineage getColumnLineageByDataset(
@NonNull String namespaceName,
@NonNull String datasetName,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrl(namespaceName, datasetName, depth, withDownstream));
http.get(
url.toColumnLineageUrlByDataset(namespaceName, datasetName, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Lineage getColumnLineage(
public Lineage getColumnLineageByDatasetField(
@NonNull String namespaceName,
@NonNull String datasetName,
@NonNull String field,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrl(namespaceName, datasetName, field, depth, withDownstream));
http.get(
url.toColumnLineageUrlByDatasetField(
namespaceName, datasetName, field, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Lineage getColumnLineageByJob(@NonNull String namespaceName, @NonNull String jobName) {
return getColumnLineageByJob(namespaceName, jobName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineageByJob(
@NonNull String namespaceName, @NonNull String jobName, int depth, boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrlByJob(namespaceName, jobName, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

Expand Down
14 changes: 12 additions & 2 deletions clients/java/src/main/java/marquez/client/MarquezUrl.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import lombok.NonNull;
import marquez.client.models.DatasetFieldId;
import marquez.client.models.DatasetId;
import marquez.client.models.JobId;
import marquez.client.models.NodeId;
import marquez.client.models.RunState;
import marquez.client.models.SearchFilter;
Expand Down Expand Up @@ -210,7 +211,7 @@ URL toSearchUrl(
return from(searchPath(), queryParams.build());
}

URL toColumnLineageUrl(
URL toColumnLineageUrlByDatasetField(
String namespace, String dataset, String field, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetFieldId(namespace, dataset, field)).getValue());
Expand All @@ -219,11 +220,20 @@ URL toColumnLineageUrl(
return from(columnLineagePath(), queryParams.build());
}

URL toColumnLineageUrl(String namespace, String dataset, int depth, boolean withDownstream) {
URL toColumnLineageUrlByDataset(
String namespace, String dataset, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetId(namespace, dataset)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}

URL toColumnLineageUrlByJob(String namespace, String job, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new JobId(namespace, job)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}
}
15 changes: 15 additions & 0 deletions clients/java/src/main/java/marquez/client/models/NodeId.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public static NodeId of(@NonNull DatasetFieldId datasetFieldId) {
datasetFieldId.getField()));
}

public static NodeId of(@NonNull JobId jobId) {
return of(ID_JOINER.join(ID_PREFX_JOB, jobId.getNamespace(), jobId.getName()));
}

@JsonIgnore
public boolean isDatasetFieldType() {
return value.startsWith(ID_PREFX_DATASET_FIELD);
Expand All @@ -80,6 +84,11 @@ public boolean isDatasetType() {
return value.startsWith(ID_PREFX_DATASET + ID_DELIM);
}

@JsonIgnore
public boolean isJobType() {
return value.startsWith(ID_PREFX_JOB);
}

@JsonIgnore
private String[] parts(int expectedParts, String expectedType) {

Expand Down Expand Up @@ -124,6 +133,12 @@ public DatasetFieldId asDatasetFieldId() {
return new DatasetFieldId(parts[1], parts[2], parts[3]);
}

@JsonIgnore
public JobId asJobId() {
String[] parts = parts(3, ID_PREFX_JOB);
return new JobId(parts[1], parts[2]);
}

public static class FromValue extends StdConverter<String, NodeId> {
@Override
public NodeId convert(@NonNull String value) {
Expand Down
55 changes: 41 additions & 14 deletions clients/java/src/test/java/marquez/client/MarquezClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -982,13 +982,10 @@ public void testGetColumnLineage() throws Exception {
.thenReturn(lineageJson);

Node retrievedNode =
client.getColumnLineage("namespace", "dataset").getGraph().stream().findAny().get();
assertThat(retrievedNode.getId()).isEqualTo(node.getId());
assertThat(retrievedNode.getData()).isEqualTo(node.getData());
assertThat(retrievedNode.getInEdges().stream().findFirst())
.isEqualTo(node.getInEdges().stream().findFirst());
assertThat(retrievedNode.getOutEdges().stream().findFirst())
.isEqualTo(node.getOutEdges().stream().findFirst());
client.getColumnLineageByDataset("namespace", "dataset").getGraph().stream()
.findAny()
.get();
assertThat(retrievedNode).isEqualTo(node);
}

@Test
Expand Down Expand Up @@ -1022,15 +1019,45 @@ public void testGetColumnLineageByField() throws Exception {
.thenReturn(lineageJson);

Node retrievedNode =
client.getColumnLineage("namespace", "dataset", "some-col1").getGraph().stream()
client.getColumnLineageByDataset("namespace", "dataset", "some-col1").getGraph().stream()
.findAny()
.get();
assertThat(retrievedNode.getId()).isEqualTo(node.getId());
assertThat(retrievedNode.getData()).isEqualTo(node.getData());
assertThat(retrievedNode.getInEdges().stream().findFirst())
.isEqualTo(node.getInEdges().stream().findFirst());
assertThat(retrievedNode.getOutEdges().stream().findFirst())
.isEqualTo(node.getOutEdges().stream().findFirst());
assertThat(retrievedNode).isEqualTo(node);
}

@Test
public void testGetColumnLineageByJob() throws Exception {
Node node =
new Node(
NodeId.of(DATASET_FIELD_ID),
NodeType.DATASET_FIELD,
new ColumnLineageNodeData(
NAMESPACE_NAME,
DB_TABLE_NAME,
FIELD_NAME,
"String",
"transformationDescription",
"transformationType",
Collections.singletonList(
new DatasetFieldId("namespace", "inDataset", "some-col1"))),
ImmutableSet.of(
Edge.of(
NodeId.of(DATASET_FIELD_ID),
NodeId.of(new DatasetFieldId("namespace", "inDataset", "some-col1")))),
ImmutableSet.of(
Edge.of(
NodeId.of(new DatasetFieldId("namespace", "outDataset", "some-col2")),
NodeId.of(DATASET_FIELD_ID))));
MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(node));
String lineageJson = lineage.toJson();
when(http.get(
buildUrlFor(
"/column-lineage?nodeId=job%3Anamespace%3Ajob&depth=20&withDownstream=false")))
.thenReturn(lineageJson);

Node retrievedNode =
client.getColumnLineageByJob("namespace", "job").getGraph().stream().findAny().get();
assertThat(retrievedNode).isEqualTo(node);
}

private URL buildUrlFor(String pathTemplate) throws Exception {
Expand Down
6 changes: 4 additions & 2 deletions clients/java/src/test/java/marquez/client/MarquezUrlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ void testEncodedMarquezUrl() {
void testToColumnLineageUrl() {
Assertions.assertEquals(
"http://marquez:5000/api/v1/column-lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20&withDownstream=true",
marquezUrl.toColumnLineageUrl("namespace", "dataset", 20, true).toString());
marquezUrl.toColumnLineageUrlByDataset("namespace", "dataset", 20, true).toString());

Assertions.assertEquals(
"http://marquez:5000/api/v1/column-lineage?nodeId=datasetField%3Anamespace%3Adataset%3Afield&depth=20&withDownstream=true",
marquezUrl.toColumnLineageUrl("namespace", "dataset", "field", 20, true).toString());
marquezUrl
.toColumnLineageUrlByDatasetField("namespace", "dataset", "field", 20, true)
.toString());
}
}
15 changes: 15 additions & 0 deletions clients/java/src/test/java/marquez/client/models/NodeIdTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,19 @@ public void testDatasetField(String namespace, String dataset, String field) {
assertEquals(dataset, nodeId.asDatasetFieldId().getDataset());
assertEquals(field, nodeId.asDatasetFieldId().getField());
}

@ParameterizedTest(name = "testJob-{index} {argumentsWithNames}")
@CsvSource(
value = {"my-namespace$my-job", "org://team$my-job"},
delimiter = '$')
public void testJob(String namespace, String job) {
JobId jobId = new JobId(namespace, job);
NodeId nodeId = NodeId.of(jobId);
assertTrue(nodeId.isJobType());
assertFalse(nodeId.isDatasetType());
assertEquals(jobId, nodeId.asJobId());
assertEquals(nodeId, NodeId.of(nodeId.getValue()));
assertEquals(namespace, nodeId.asJobId().getNamespace());
assertEquals(job, nodeId.asJobId().getName());
}
}