Skip to content

Commit

Permalink
get column lineage by job
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Oct 21, 2022
1 parent 3c26f6f commit ef1c494
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 30 deletions.
16 changes: 16 additions & 0 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,22 @@ WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symli
""")
List<UUID> findDatasetFieldsUuids(String namespaceName, String datasetName);

@SqlQuery(
"""
WITH latest_run AS (
SELECT DISTINCT r.uuid as uuid, r.created_at
FROM runs_view r
WHERE r.namespace_name = :namespaceName AND r.job_name = :jobName
ORDER BY r.created_at DESC
LIMIT 1
)
SELECT dataset_fields.uuid
FROM dataset_fields
JOIN dataset_versions ON dataset_versions.dataset_uuid = dataset_fields.dataset_uuid
JOIN latest_run ON dataset_versions.run_uuid = latest_run.uuid
""")
List<UUID> findFieldsUuidsByJob(String namespaceName, String jobName);

@SqlQuery(
"""
SELECT df.uuid
Expand Down
6 changes: 6 additions & 0 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import lombok.extern.slf4j.Slf4j;
import marquez.common.models.DatasetFieldId;
import marquez.common.models.DatasetId;
import marquez.common.models.JobId;
import marquez.db.ColumnLineageDao;
import marquez.db.DatasetFieldDao;
import marquez.db.models.ColumnLineageNodeData;
Expand Down Expand Up @@ -124,6 +125,11 @@ List<UUID> getColumnNodeUuids(NodeId nodeId) {
datasetFieldId.getDatasetId().getName().getValue(),
datasetFieldId.getFieldName().getValue())
.ifPresent(uuid -> columnNodeUuids.add(uuid));
} else if (nodeId.isJobType()) {
JobId jobId = nodeId.asJobId();
columnNodeUuids.addAll(
datasetFieldDao.findFieldsUuidsByJob(
jobId.getNamespace().getValue(), jobId.getName().getValue()));
}
return columnNodeUuids;
}
Expand Down
19 changes: 15 additions & 4 deletions api/src/test/java/marquez/ColumnLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void tearDown(Jdbi jdbi) {

@Test
public void testColumnLineageEndpointByDataset() {
MarquezClient.Lineage lineage = client.getColumnLineage("namespace", "dataset_b");
MarquezClient.Lineage lineage = client.getColumnLineageByDataset("namespace", "dataset_b");

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
Expand All @@ -74,7 +74,8 @@ public void testColumnLineageEndpointByDataset() {

@Test
public void testColumnLineageEndpointByDatasetField() {
MarquezClient.Lineage lineage = client.getColumnLineage("namespace", "dataset_b", "col_c");
MarquezClient.Lineage lineage =
client.getColumnLineageByDataset("namespace", "dataset_b", "col_c");

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
Expand All @@ -85,7 +86,7 @@ public void testColumnLineageEndpointByDatasetField() {
@Test
public void testColumnLineageEndpointWithDepthLimit() {
MarquezClient.Lineage lineage =
client.getColumnLineage("namespace", "dataset_c", "col_d", 1, false);
client.getColumnLineageByDatasetField("namespace", "dataset_c", "col_d", 1, false);

assertThat(lineage.getGraph()).hasSize(2);
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
Expand All @@ -95,12 +96,22 @@ public void testColumnLineageEndpointWithDepthLimit() {
@Test
public void testColumnLineageEndpointWithDownstream() {
MarquezClient.Lineage lineage =
client.getColumnLineage("namespace", "dataset_b", "col_c", 10, true);
client.getColumnLineageByDatasetField("namespace", "dataset_b", "col_c", 10, true);

assertThat(lineage.getGraph()).hasSize(4);
assertThat(getNodeByFieldName(lineage, "col_d")).isPresent();
}

@Test
public void testColumnLineaapi/src/test/java/marquez/ColumnLineageIntegrationTest.javageEndpointByJob() {
MarquezClient.Lineage lineage = client.getColumnLineageByJob("namespace", "job1");

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_b")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
}

private Optional<Node> getNodeByFieldName(MarquezClient.Lineage lineage, String field) {
return lineage.getGraph().stream()
.filter(n -> n.getId().asDatasetFieldId().getField().equals(field))
Expand Down
40 changes: 40 additions & 0 deletions api/src/test/java/marquez/service/ColumnLineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import marquez.common.models.DatasetFieldId;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.JobId;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.db.ColumnLineageDao;
import marquez.db.ColumnLineageTestUtils;
Expand Down Expand Up @@ -355,6 +357,44 @@ public void testEnrichDatasetsHasNoDuplicates() {
assertThat(dataset_b.getColumnLineage()).hasSize(1);
}

@Test
public void testGetLineageByJob() {
LineageEvent.Dataset dataset_A = getDatasetA();
LineageEvent.Dataset dataset_B = getDatasetB();
LineageEvent.Dataset dataset_C = getDatasetC();

LineageTestUtils.createLineageRow(
openLineageDao,
"job1",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_A),
Arrays.asList(dataset_B));

LineageTestUtils.createLineageRow(
openLineageDao,
"job2",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_B),
Arrays.asList(dataset_C));

// getting lineage by job_1 should be the same as getting it by dataset_B
assertThat(
lineageService.lineage(
NodeId.of(JobId.of(NamespaceName.of("namespace"), JobName.of("job1"))),
20,
true,
Instant.now()))
.isEqualTo(
lineageService.lineage(
NodeId.of(
new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))),
20,
true,
Instant.now()));
}

private Optional<Node> getNode(Lineage lineage, String datasetName, String fieldName) {
return lineage.getGraph().stream()
.filter(n -> n.getId().asDatasetFieldId().getFieldName().getValue().equals(fieldName))
Expand Down
33 changes: 25 additions & 8 deletions clients/java/src/main/java/marquez/client/MarquezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,33 +115,50 @@ public enum SortDirection {
@Getter public final String value;
}

public Lineage getColumnLineage(@NonNull String namespaceName, @NonNull String datasetName) {
return getColumnLineage(namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
public Lineage getColumnLineageByDataset(
@NonNull String namespaceName, @NonNull String datasetName) {
return getColumnLineageByDataset(
namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineage(
public Lineage getColumnLineageByDataset(
@NonNull String namespaceName, @NonNull String datasetName, @NonNull String field) {
return getColumnLineage(namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
return getColumnLineageByDatasetField(
namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineage(
public Lineage getColumnLineageByDataset(
@NonNull String namespaceName,
@NonNull String datasetName,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrl(namespaceName, datasetName, depth, withDownstream));
http.get(
url.toColumnLineageUrlByDataset(namespaceName, datasetName, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Lineage getColumnLineage(
public Lineage getColumnLineageByDatasetField(
@NonNull String namespaceName,
@NonNull String datasetName,
@NonNull String field,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrl(namespaceName, datasetName, field, depth, withDownstream));
http.get(
url.toColumnLineageUrlByDatasetField(
namespaceName, datasetName, field, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Lineage getColumnLineageByJob(@NonNull String namespaceName, @NonNull String jobName) {
return getColumnLineageByJob(namespaceName, jobName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineageByJob(
@NonNull String namespaceName, @NonNull String jobName, int depth, boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrlByJob(namespaceName, jobName, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

Expand Down
14 changes: 12 additions & 2 deletions clients/java/src/main/java/marquez/client/MarquezUrl.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import lombok.NonNull;
import marquez.client.models.DatasetFieldId;
import marquez.client.models.DatasetId;
import marquez.client.models.JobId;
import marquez.client.models.NodeId;
import marquez.client.models.RunState;
import marquez.client.models.SearchFilter;
Expand Down Expand Up @@ -210,7 +211,7 @@ URL toSearchUrl(
return from(searchPath(), queryParams.build());
}

URL toColumnLineageUrl(
URL toColumnLineageUrlByDatasetField(
String namespace, String dataset, String field, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetFieldId(namespace, dataset, field)).getValue());
Expand All @@ -219,11 +220,20 @@ URL toColumnLineageUrl(
return from(columnLineagePath(), queryParams.build());
}

URL toColumnLineageUrl(String namespace, String dataset, int depth, boolean withDownstream) {
URL toColumnLineageUrlByDataset(
String namespace, String dataset, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetId(namespace, dataset)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}

URL toColumnLineageUrlByJob(String namespace, String job, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new JobId(namespace, job)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}
}
15 changes: 15 additions & 0 deletions clients/java/src/main/java/marquez/client/models/NodeId.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public static NodeId of(@NonNull DatasetFieldId datasetFieldId) {
datasetFieldId.getField()));
}

public static NodeId of(@NonNull JobId jobId) {
return of(ID_JOINER.join(ID_PREFX_JOB, jobId.getNamespace(), jobId.getName()));
}

@JsonIgnore
public boolean isDatasetFieldType() {
return value.startsWith(ID_PREFX_DATASET_FIELD);
Expand All @@ -80,6 +84,11 @@ public boolean isDatasetType() {
return value.startsWith(ID_PREFX_DATASET + ID_DELIM);
}

@JsonIgnore
public boolean isJobType() {
return value.startsWith(ID_PREFX_JOB);
}

@JsonIgnore
private String[] parts(int expectedParts, String expectedType) {

Expand Down Expand Up @@ -124,6 +133,12 @@ public DatasetFieldId asDatasetFieldId() {
return new DatasetFieldId(parts[1], parts[2], parts[3]);
}

@JsonIgnore
public JobId asJobId() {
String[] parts = parts(3, ID_PREFX_JOB);
return new JobId(parts[1], parts[2]);
}

public static class FromValue extends StdConverter<String, NodeId> {
@Override
public NodeId convert(@NonNull String value) {
Expand Down
55 changes: 41 additions & 14 deletions clients/java/src/test/java/marquez/client/MarquezClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -982,13 +982,10 @@ public void testGetColumnLineage() throws Exception {
.thenReturn(lineageJson);

Node retrievedNode =
client.getColumnLineage("namespace", "dataset").getGraph().stream().findAny().get();
assertThat(retrievedNode.getId()).isEqualTo(node.getId());
assertThat(retrievedNode.getData()).isEqualTo(node.getData());
assertThat(retrievedNode.getInEdges().stream().findFirst())
.isEqualTo(node.getInEdges().stream().findFirst());
assertThat(retrievedNode.getOutEdges().stream().findFirst())
.isEqualTo(node.getOutEdges().stream().findFirst());
client.getColumnLineageByDataset("namespace", "dataset").getGraph().stream()
.findAny()
.get();
assertThat(retrievedNode).isEqualTo(node);
}

@Test
Expand Down Expand Up @@ -1022,15 +1019,45 @@ public void testGetColumnLineageByField() throws Exception {
.thenReturn(lineageJson);

Node retrievedNode =
client.getColumnLineage("namespace", "dataset", "some-col1").getGraph().stream()
client.getColumnLineageByDataset("namespace", "dataset", "some-col1").getGraph().stream()
.findAny()
.get();
assertThat(retrievedNode.getId()).isEqualTo(node.getId());
assertThat(retrievedNode.getData()).isEqualTo(node.getData());
assertThat(retrievedNode.getInEdges().stream().findFirst())
.isEqualTo(node.getInEdges().stream().findFirst());
assertThat(retrievedNode.getOutEdges().stream().findFirst())
.isEqualTo(node.getOutEdges().stream().findFirst());
assertThat(retrievedNode).isEqualTo(node);
}

@Test
public void testGetColumnLineageByJob() throws Exception {
Node node =
new Node(
NodeId.of(DATASET_FIELD_ID),
NodeType.DATASET_FIELD,
new ColumnLineageNodeData(
NAMESPACE_NAME,
DB_TABLE_NAME,
FIELD_NAME,
"String",
"transformationDescription",
"transformationType",
Collections.singletonList(
new DatasetFieldId("namespace", "inDataset", "some-col1"))),
ImmutableSet.of(
Edge.of(
NodeId.of(DATASET_FIELD_ID),
NodeId.of(new DatasetFieldId("namespace", "inDataset", "some-col1")))),
ImmutableSet.of(
Edge.of(
NodeId.of(new DatasetFieldId("namespace", "outDataset", "some-col2")),
NodeId.of(DATASET_FIELD_ID))));
MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(node));
String lineageJson = lineage.toJson();
when(http.get(
buildUrlFor(
"/column-lineage?nodeId=job%3Anamespace%3Ajob&depth=20&withDownstream=false")))
.thenReturn(lineageJson);

Node retrievedNode =
client.getColumnLineageByJob("namespace", "job").getGraph().stream().findAny().get();
assertThat(retrievedNode).isEqualTo(node);
}

private URL buildUrlFor(String pathTemplate) throws Exception {
Expand Down
6 changes: 4 additions & 2 deletions clients/java/src/test/java/marquez/client/MarquezUrlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ void testEncodedMarquezUrl() {
void testToColumnLineageUrl() {
Assertions.assertEquals(
"http://marquez:5000/api/v1/column-lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20&withDownstream=true",
marquezUrl.toColumnLineageUrl("namespace", "dataset", 20, true).toString());
marquezUrl.toColumnLineageUrlByDataset("namespace", "dataset", 20, true).toString());

Assertions.assertEquals(
"http://marquez:5000/api/v1/column-lineage?nodeId=datasetField%3Anamespace%3Adataset%3Afield&depth=20&withDownstream=true",
marquezUrl.toColumnLineageUrl("namespace", "dataset", "field", 20, true).toString());
marquezUrl
.toColumnLineageUrlByDatasetField("namespace", "dataset", "field", 20, true)
.toString());
}
}
15 changes: 15 additions & 0 deletions clients/java/src/test/java/marquez/client/models/NodeIdTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,19 @@ public void testDatasetField(String namespace, String dataset, String field) {
assertEquals(dataset, nodeId.asDatasetFieldId().getDataset());
assertEquals(field, nodeId.asDatasetFieldId().getField());
}

@ParameterizedTest(name = "testJob-{index} {argumentsWithNames}")
@CsvSource(
value = {"my-namespace$my-job", "org://team$my-job"},
delimiter = '$')
public void testJob(String namespace, String job) {
JobId jobId = new JobId(namespace, job);
NodeId nodeId = NodeId.of(jobId);
assertTrue(nodeId.isJobType());
assertFalse(nodeId.isDatasetType());
assertEquals(jobId, nodeId.asJobId());
assertEquals(nodeId, NodeId.of(nodeId.getValue()));
assertEquals(namespace, nodeId.asJobId().getNamespace());
assertEquals(job, nodeId.asJobId().getName());
}
}

0 comments on commit ef1c494

Please sign in to comment.