diff --git a/CHANGELOG.md b/CHANGELOG.md index af61437547..78830c1711 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,9 @@ * Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) -* column lineage within Marquez Java client [`#2163`](https://github.com/MarquezProject/marquez/pull/2163) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) +* Column lineage within Marquez Java client [`#2163`](https://github.com/MarquezProject/marquez/pull/2163) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) +* Endpoint to get column lineage by a job [`#2204`](https://github.com/MarquezProject/marquez/pull/2204) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) + ### Fixed * Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike) diff --git a/api/src/main/java/marquez/db/DatasetFieldDao.java b/api/src/main/java/marquez/db/DatasetFieldDao.java index 850d95c19a..b305fe604d 100644 --- a/api/src/main/java/marquez/db/DatasetFieldDao.java +++ b/api/src/main/java/marquez/db/DatasetFieldDao.java @@ -105,6 +105,22 @@ WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symli """) List findDatasetFieldsUuids(String namespaceName, String datasetName); + @SqlQuery( + """ + WITH latest_run AS ( + SELECT DISTINCT r.uuid as uuid, r.created_at + FROM runs_view r + WHERE r.namespace_name = :namespaceName AND r.job_name = :jobName + ORDER BY r.created_at DESC + LIMIT 1 + ) + SELECT dataset_fields.uuid + FROM dataset_fields + JOIN dataset_versions ON dataset_versions.dataset_uuid = dataset_fields.dataset_uuid + JOIN latest_run ON dataset_versions.run_uuid = latest_run.uuid + """) + List findFieldsUuidsByJob(String namespaceName, String jobName); + @SqlQuery( """ SELECT df.uuid diff --git a/api/src/main/java/marquez/service/ColumnLineageService.java b/api/src/main/java/marquez/service/ColumnLineageService.java index 7a7a85a9be..ccb0b27f66 100644 --- a/api/src/main/java/marquez/service/ColumnLineageService.java +++ b/api/src/main/java/marquez/service/ColumnLineageService.java @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j; import marquez.common.models.DatasetFieldId; import marquez.common.models.DatasetId; +import marquez.common.models.JobId; import marquez.db.ColumnLineageDao; import marquez.db.DatasetFieldDao; import marquez.db.models.ColumnLineageNodeData; @@ -124,6 +125,11 @@ List getColumnNodeUuids(NodeId nodeId) { datasetFieldId.getDatasetId().getName().getValue(), datasetFieldId.getFieldName().getValue()) .ifPresent(uuid -> columnNodeUuids.add(uuid)); + } else if (nodeId.isJobType()) { + JobId jobId = nodeId.asJobId(); + columnNodeUuids.addAll( + datasetFieldDao.findFieldsUuidsByJob( + jobId.getNamespace().getValue(), jobId.getName().getValue())); } return columnNodeUuids; } diff --git a/api/src/test/java/marquez/ColumnLineageIntegrationTest.java b/api/src/test/java/marquez/ColumnLineageIntegrationTest.java index aaa838c005..91d0c886fd 100644 --- a/api/src/test/java/marquez/ColumnLineageIntegrationTest.java +++ b/api/src/test/java/marquez/ColumnLineageIntegrationTest.java @@ -64,7 +64,7 @@ public void tearDown(Jdbi jdbi) { @Test public void testColumnLineageEndpointByDataset() { - MarquezClient.Lineage lineage = client.getColumnLineage("namespace", "dataset_b"); + MarquezClient.Lineage lineage = client.getColumnLineageByDataset("namespace", "dataset_b"); assertThat(lineage.getGraph()).hasSize(3); assertThat(getNodeByFieldName(lineage, "col_a")).isPresent(); @@ -74,7 +74,8 @@ public void testColumnLineageEndpointByDataset() { @Test public void testColumnLineageEndpointByDatasetField() { - MarquezClient.Lineage lineage = client.getColumnLineage("namespace", "dataset_b", "col_c"); + MarquezClient.Lineage lineage = + client.getColumnLineageByDataset("namespace", "dataset_b", "col_c"); assertThat(lineage.getGraph()).hasSize(3); assertThat(getNodeByFieldName(lineage, "col_a")).isPresent(); @@ -85,7 +86,7 @@ public void testColumnLineageEndpointByDatasetField() { @Test public void testColumnLineageEndpointWithDepthLimit() { MarquezClient.Lineage lineage = - client.getColumnLineage("namespace", "dataset_c", "col_d", 1, false); + client.getColumnLineageByDatasetField("namespace", "dataset_c", "col_d", 1, false); assertThat(lineage.getGraph()).hasSize(2); assertThat(getNodeByFieldName(lineage, "col_c")).isPresent(); @@ -95,12 +96,22 @@ public void testColumnLineageEndpointWithDepthLimit() { @Test public void testColumnLineageEndpointWithDownstream() { MarquezClient.Lineage lineage = - client.getColumnLineage("namespace", "dataset_b", "col_c", 10, true); + client.getColumnLineageByDatasetField("namespace", "dataset_b", "col_c", 10, true); assertThat(lineage.getGraph()).hasSize(4); assertThat(getNodeByFieldName(lineage, "col_d")).isPresent(); } + @Test + public void testColumnLineageEndpointByJob() { + MarquezClient.Lineage lineage = client.getColumnLineageByJob("namespace", "job1"); + + assertThat(lineage.getGraph()).hasSize(3); + assertThat(getNodeByFieldName(lineage, "col_a")).isPresent(); + assertThat(getNodeByFieldName(lineage, "col_b")).isPresent(); + assertThat(getNodeByFieldName(lineage, "col_c")).isPresent(); + } + private Optional getNodeByFieldName(MarquezClient.Lineage lineage, String field) { return lineage.getGraph().stream() .filter(n -> n.getId().asDatasetFieldId().getField().equals(field)) diff --git a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java index 2b818a27ef..540b3e6e08 100644 --- a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java +++ b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java @@ -20,6 +20,8 @@ import marquez.common.models.DatasetFieldId; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; +import marquez.common.models.JobId; +import marquez.common.models.JobName; import marquez.common.models.NamespaceName; import marquez.db.ColumnLineageDao; import marquez.db.ColumnLineageTestUtils; @@ -355,6 +357,44 @@ public void testEnrichDatasetsHasNoDuplicates() { assertThat(dataset_b.getColumnLineage()).hasSize(1); } + @Test + public void testGetLineageByJob() { + LineageEvent.Dataset dataset_A = getDatasetA(); + LineageEvent.Dataset dataset_B = getDatasetB(); + LineageEvent.Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + // getting lineage by job_1 should be the same as getting it by dataset_B + assertThat( + lineageService.lineage( + NodeId.of(JobId.of(NamespaceName.of("namespace"), JobName.of("job1"))), + 20, + true, + Instant.now())) + .isEqualTo( + lineageService.lineage( + NodeId.of( + new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))), + 20, + true, + Instant.now())); + } + private Optional getNode(Lineage lineage, String datasetName, String fieldName) { return lineage.getGraph().stream() .filter(n -> n.getId().asDatasetFieldId().getFieldName().getValue().equals(fieldName)) diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index 69adf30990..c676c562eb 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -115,33 +115,50 @@ public enum SortDirection { @Getter public final String value; } - public Lineage getColumnLineage(@NonNull String namespaceName, @NonNull String datasetName) { - return getColumnLineage(namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false); + public Lineage getColumnLineageByDataset( + @NonNull String namespaceName, @NonNull String datasetName) { + return getColumnLineageByDataset( + namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false); } - public Lineage getColumnLineage( + public Lineage getColumnLineageByDataset( @NonNull String namespaceName, @NonNull String datasetName, @NonNull String field) { - return getColumnLineage(namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false); + return getColumnLineageByDatasetField( + namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false); } - public Lineage getColumnLineage( + public Lineage getColumnLineageByDataset( @NonNull String namespaceName, @NonNull String datasetName, int depth, boolean withDownstream) { final String bodyAsJson = - http.get(url.toColumnLineageUrl(namespaceName, datasetName, depth, withDownstream)); + http.get( + url.toColumnLineageUrlByDataset(namespaceName, datasetName, depth, withDownstream)); return Lineage.fromJson(bodyAsJson); } - public Lineage getColumnLineage( + public Lineage getColumnLineageByDatasetField( @NonNull String namespaceName, @NonNull String datasetName, @NonNull String field, int depth, boolean withDownstream) { final String bodyAsJson = - http.get(url.toColumnLineageUrl(namespaceName, datasetName, field, depth, withDownstream)); + http.get( + url.toColumnLineageUrlByDatasetField( + namespaceName, datasetName, field, depth, withDownstream)); + return Lineage.fromJson(bodyAsJson); + } + + public Lineage getColumnLineageByJob(@NonNull String namespaceName, @NonNull String jobName) { + return getColumnLineageByJob(namespaceName, jobName, DEFAULT_LINEAGE_GRAPH_DEPTH, false); + } + + public Lineage getColumnLineageByJob( + @NonNull String namespaceName, @NonNull String jobName, int depth, boolean withDownstream) { + final String bodyAsJson = + http.get(url.toColumnLineageUrlByJob(namespaceName, jobName, depth, withDownstream)); return Lineage.fromJson(bodyAsJson); } diff --git a/clients/java/src/main/java/marquez/client/MarquezUrl.java b/clients/java/src/main/java/marquez/client/MarquezUrl.java index a8b5affab3..242d85e907 100644 --- a/clients/java/src/main/java/marquez/client/MarquezUrl.java +++ b/clients/java/src/main/java/marquez/client/MarquezUrl.java @@ -44,6 +44,7 @@ import lombok.NonNull; import marquez.client.models.DatasetFieldId; import marquez.client.models.DatasetId; +import marquez.client.models.JobId; import marquez.client.models.NodeId; import marquez.client.models.RunState; import marquez.client.models.SearchFilter; @@ -210,7 +211,7 @@ URL toSearchUrl( return from(searchPath(), queryParams.build()); } - URL toColumnLineageUrl( + URL toColumnLineageUrlByDatasetField( String namespace, String dataset, String field, int depth, boolean withDownstream) { final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); queryParams.put("nodeId", NodeId.of(new DatasetFieldId(namespace, dataset, field)).getValue()); @@ -219,11 +220,20 @@ URL toColumnLineageUrl( return from(columnLineagePath(), queryParams.build()); } - URL toColumnLineageUrl(String namespace, String dataset, int depth, boolean withDownstream) { + URL toColumnLineageUrlByDataset( + String namespace, String dataset, int depth, boolean withDownstream) { final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); queryParams.put("nodeId", NodeId.of(new DatasetId(namespace, dataset)).getValue()); queryParams.put("depth", String.valueOf(depth)); queryParams.put("withDownstream", String.valueOf(withDownstream)); return from(columnLineagePath(), queryParams.build()); } + + URL toColumnLineageUrlByJob(String namespace, String job, int depth, boolean withDownstream) { + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); + queryParams.put("nodeId", NodeId.of(new JobId(namespace, job)).getValue()); + queryParams.put("depth", String.valueOf(depth)); + queryParams.put("withDownstream", String.valueOf(withDownstream)); + return from(columnLineagePath(), queryParams.build()); + } } diff --git a/clients/java/src/main/java/marquez/client/models/NodeId.java b/clients/java/src/main/java/marquez/client/models/NodeId.java index 3f51f9f1bb..c7235ec667 100644 --- a/clients/java/src/main/java/marquez/client/models/NodeId.java +++ b/clients/java/src/main/java/marquez/client/models/NodeId.java @@ -70,6 +70,10 @@ public static NodeId of(@NonNull DatasetFieldId datasetFieldId) { datasetFieldId.getField())); } + public static NodeId of(@NonNull JobId jobId) { + return of(ID_JOINER.join(ID_PREFX_JOB, jobId.getNamespace(), jobId.getName())); + } + @JsonIgnore public boolean isDatasetFieldType() { return value.startsWith(ID_PREFX_DATASET_FIELD); @@ -80,6 +84,11 @@ public boolean isDatasetType() { return value.startsWith(ID_PREFX_DATASET + ID_DELIM); } + @JsonIgnore + public boolean isJobType() { + return value.startsWith(ID_PREFX_JOB); + } + @JsonIgnore private String[] parts(int expectedParts, String expectedType) { @@ -124,6 +133,12 @@ public DatasetFieldId asDatasetFieldId() { return new DatasetFieldId(parts[1], parts[2], parts[3]); } + @JsonIgnore + public JobId asJobId() { + String[] parts = parts(3, ID_PREFX_JOB); + return new JobId(parts[1], parts[2]); + } + public static class FromValue extends StdConverter { @Override public NodeId convert(@NonNull String value) { diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index 8eef5f6270..4c2a725d4c 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -982,13 +982,10 @@ public void testGetColumnLineage() throws Exception { .thenReturn(lineageJson); Node retrievedNode = - client.getColumnLineage("namespace", "dataset").getGraph().stream().findAny().get(); - assertThat(retrievedNode.getId()).isEqualTo(node.getId()); - assertThat(retrievedNode.getData()).isEqualTo(node.getData()); - assertThat(retrievedNode.getInEdges().stream().findFirst()) - .isEqualTo(node.getInEdges().stream().findFirst()); - assertThat(retrievedNode.getOutEdges().stream().findFirst()) - .isEqualTo(node.getOutEdges().stream().findFirst()); + client.getColumnLineageByDataset("namespace", "dataset").getGraph().stream() + .findAny() + .get(); + assertThat(retrievedNode).isEqualTo(node); } @Test @@ -1022,15 +1019,45 @@ public void testGetColumnLineageByField() throws Exception { .thenReturn(lineageJson); Node retrievedNode = - client.getColumnLineage("namespace", "dataset", "some-col1").getGraph().stream() + client.getColumnLineageByDataset("namespace", "dataset", "some-col1").getGraph().stream() .findAny() .get(); - assertThat(retrievedNode.getId()).isEqualTo(node.getId()); - assertThat(retrievedNode.getData()).isEqualTo(node.getData()); - assertThat(retrievedNode.getInEdges().stream().findFirst()) - .isEqualTo(node.getInEdges().stream().findFirst()); - assertThat(retrievedNode.getOutEdges().stream().findFirst()) - .isEqualTo(node.getOutEdges().stream().findFirst()); + assertThat(retrievedNode).isEqualTo(node); + } + + @Test + public void testGetColumnLineageByJob() throws Exception { + Node node = + new Node( + NodeId.of(DATASET_FIELD_ID), + NodeType.DATASET_FIELD, + new ColumnLineageNodeData( + NAMESPACE_NAME, + DB_TABLE_NAME, + FIELD_NAME, + "String", + "transformationDescription", + "transformationType", + Collections.singletonList( + new DatasetFieldId("namespace", "inDataset", "some-col1"))), + ImmutableSet.of( + Edge.of( + NodeId.of(DATASET_FIELD_ID), + NodeId.of(new DatasetFieldId("namespace", "inDataset", "some-col1")))), + ImmutableSet.of( + Edge.of( + NodeId.of(new DatasetFieldId("namespace", "outDataset", "some-col2")), + NodeId.of(DATASET_FIELD_ID)))); + MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(node)); + String lineageJson = lineage.toJson(); + when(http.get( + buildUrlFor( + "/column-lineage?nodeId=job%3Anamespace%3Ajob&depth=20&withDownstream=false"))) + .thenReturn(lineageJson); + + Node retrievedNode = + client.getColumnLineageByJob("namespace", "job").getGraph().stream().findAny().get(); + assertThat(retrievedNode).isEqualTo(node); } private URL buildUrlFor(String pathTemplate) throws Exception { diff --git a/clients/java/src/test/java/marquez/client/MarquezUrlTest.java b/clients/java/src/test/java/marquez/client/MarquezUrlTest.java index 20889d1167..195f756981 100644 --- a/clients/java/src/test/java/marquez/client/MarquezUrlTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezUrlTest.java @@ -38,10 +38,12 @@ void testEncodedMarquezUrl() { void testToColumnLineageUrl() { Assertions.assertEquals( "http://marquez:5000/api/v1/column-lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20&withDownstream=true", - marquezUrl.toColumnLineageUrl("namespace", "dataset", 20, true).toString()); + marquezUrl.toColumnLineageUrlByDataset("namespace", "dataset", 20, true).toString()); Assertions.assertEquals( "http://marquez:5000/api/v1/column-lineage?nodeId=datasetField%3Anamespace%3Adataset%3Afield&depth=20&withDownstream=true", - marquezUrl.toColumnLineageUrl("namespace", "dataset", "field", 20, true).toString()); + marquezUrl + .toColumnLineageUrlByDatasetField("namespace", "dataset", "field", 20, true) + .toString()); } } diff --git a/clients/java/src/test/java/marquez/client/models/NodeIdTest.java b/clients/java/src/test/java/marquez/client/models/NodeIdTest.java index 3c67fb644a..359c5e59a5 100644 --- a/clients/java/src/test/java/marquez/client/models/NodeIdTest.java +++ b/clients/java/src/test/java/marquez/client/models/NodeIdTest.java @@ -47,4 +47,19 @@ public void testDatasetField(String namespace, String dataset, String field) { assertEquals(dataset, nodeId.asDatasetFieldId().getDataset()); assertEquals(field, nodeId.asDatasetFieldId().getField()); } + + @ParameterizedTest(name = "testJob-{index} {argumentsWithNames}") + @CsvSource( + value = {"my-namespace$my-job", "org://team$my-job"}, + delimiter = '$') + public void testJob(String namespace, String job) { + JobId jobId = new JobId(namespace, job); + NodeId nodeId = NodeId.of(jobId); + assertTrue(nodeId.isJobType()); + assertFalse(nodeId.isDatasetType()); + assertEquals(jobId, nodeId.asJobId()); + assertEquals(nodeId, NodeId.of(nodeId.getValue())); + assertEquals(namespace, nodeId.asJobId().getNamespace()); + assertEquals(job, nodeId.asJobId().getName()); + } }