Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Column lineage java client #2163

Merged
merged 1 commit into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* column lineage within Marquez Java client [`#2163`](https://github.com/MarquezProject/marquez/pull/2163) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
(node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid) --upstream lineage
OR (:withDownstream AND adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid) --optional downstream lineage
)
AND node.depth < :depth
AND node.depth < :depth - 1 -- fetching single row means fetching single edge which is size 1
AND NOT is_cycle
)
SELECT
Expand Down
109 changes: 109 additions & 0 deletions api/src/test/java/marquez/ColumnLineageIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez;

import static marquez.db.ColumnLineageTestUtils.getDatasetA;
import static marquez.db.ColumnLineageTestUtils.getDatasetB;
import static marquez.db.ColumnLineageTestUtils.getDatasetC;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.Optional;
import marquez.api.JdbiUtils;
import marquez.client.MarquezClient;
import marquez.client.models.Node;
import marquez.db.LineageTestUtils;
import marquez.db.OpenLineageDao;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.LineageEvent;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@org.junit.jupiter.api.Tag("IntegrationTests")
@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
public class ColumnLineageIntegrationTest extends BaseIntegrationTest {

@BeforeEach
public void setup(Jdbi jdbi) {
OpenLineageDao openLineageDao = jdbi.onDemand(OpenLineageDao.class);

LineageEvent.JobFacet jobFacet =
new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);

LineageEvent.Dataset dataset_A = getDatasetA();
LineageEvent.Dataset dataset_B = getDatasetB();
LineageEvent.Dataset dataset_C = getDatasetC();

LineageTestUtils.createLineageRow(
openLineageDao,
"job1",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_A),
Arrays.asList(dataset_B));

LineageTestUtils.createLineageRow(
openLineageDao,
"job2",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_B),
Arrays.asList(dataset_C));
}

@AfterEach
public void tearDown(Jdbi jdbi) {
JdbiUtils.cleanDatabase(jdbi);
}

@Test
public void testColumnLineageEndpointByDataset() {
MarquezClient.Lineage lineage = client.getColumnLineage("namespace", "dataset_b");

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_b")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
}

@Test
public void testColumnLineageEndpointByDatasetField() {
MarquezClient.Lineage lineage = client.getColumnLineage("namespace", "dataset_b", "col_c");

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_b")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
}

@Test
public void testColumnLineageEndpointWithDepthLimit() {
MarquezClient.Lineage lineage =
client.getColumnLineage("namespace", "dataset_c", "col_d", 1, false);

assertThat(lineage.getGraph()).hasSize(2);
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_d")).isPresent();
}

@Test
public void testColumnLineageEndpointWithDownstream() {
MarquezClient.Lineage lineage =
client.getColumnLineage("namespace", "dataset_b", "col_c", 10, true);

assertThat(lineage.getGraph()).hasSize(4);
assertThat(getNodeByFieldName(lineage, "col_d")).isPresent();
}

private Optional<Node> getNodeByFieldName(MarquezClient.Lineage lineage, String field) {
return lineage.getGraph().stream()
.filter(n -> n.getId().asDatasetFieldId().getField().equals(field))
.findAny();
}
}
4 changes: 2 additions & 2 deletions api/src/test/java/marquez/db/ColumnLineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,9 @@ void testGetLineageWithLimitedDepth() {
assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), false, Instant.now()))
.hasSize(3);

// verify graph size is 2 when max depth is 1
// depth 1 corresponds to single ColumnLineageData with other nodes as node inputFields
assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), false, Instant.now()))
.hasSize(2);
.hasSize(1);
}

@Test
Expand Down
50 changes: 50 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import marquez.client.models.LineageEvent;
import marquez.client.models.Namespace;
import marquez.client.models.NamespaceMeta;
import marquez.client.models.Node;
import marquez.client.models.Run;
import marquez.client.models.RunMeta;
import marquez.client.models.RunState;
Expand All @@ -57,6 +58,7 @@
public class MarquezClient {

static final URL DEFAULT_BASE_URL = Utils.toUrl("http://localhost:8080");
static final int DEFAULT_LINEAGE_GRAPH_DEPTH = 20;

@VisibleForTesting static final int DEFAULT_LIMIT = 100;
@VisibleForTesting static final int DEFAULT_OFFSET = 0;
Expand Down Expand Up @@ -113,6 +115,36 @@ public enum SortDirection {
@Getter public final String value;
}

public Lineage getColumnLineage(@NonNull String namespaceName, @NonNull String datasetName) {
return getColumnLineage(namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineage(
@NonNull String namespaceName, @NonNull String datasetName, @NonNull String field) {
return getColumnLineage(namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineage(
@NonNull String namespaceName,
@NonNull String datasetName,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrl(namespaceName, datasetName, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Lineage getColumnLineage(
@NonNull String namespaceName,
@NonNull String datasetName,
@NonNull String field,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrl(namespaceName, datasetName, field, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Namespace createNamespace(
@NonNull String namespaceName, @NonNull NamespaceMeta namespaceMeta) {
final String bodyAsJson = http.put(url.toNamespaceUrl(namespaceName), namespaceMeta.toJson());
Expand Down Expand Up @@ -660,4 +692,22 @@ String toJson() {
return Utils.toJson(this);
}
}

@Value
public static class Lineage {
@Getter Set<Node> graph;

@JsonCreator
Lineage(@JsonProperty("graph") final Set<Node> value) {
this.graph = ImmutableSet.copyOf(value);
}

static Lineage fromJson(final String json) {
return Utils.fromJson(json, new TypeReference<Lineage>() {});
}

String toJson() {
return Utils.toJson(this);
}
}
}
4 changes: 4 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezPathV1.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,8 @@ static String createTagPath(String name) {
static String searchPath() {
return path("/search");
}

static String columnLineagePath() {
return path("/column-lineage/");
}
}
21 changes: 21 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezUrl.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
import static marquez.client.MarquezPathV1.columnLineagePath;
import static marquez.client.MarquezPathV1.createRunPath;
import static marquez.client.MarquezPathV1.createTagPath;
import static marquez.client.MarquezPathV1.datasetPath;
Expand Down Expand Up @@ -41,6 +42,9 @@
import java.util.Map;
import javax.annotation.Nullable;
import lombok.NonNull;
import marquez.client.models.DatasetFieldId;
import marquez.client.models.DatasetId;
import marquez.client.models.NodeId;
import marquez.client.models.RunState;
import marquez.client.models.SearchFilter;
import marquez.client.models.SearchSort;
Expand Down Expand Up @@ -205,4 +209,21 @@ URL toSearchUrl(
queryParams.put("limit", limit);
return from(searchPath(), queryParams.build());
}

URL toColumnLineageUrl(
String namespace, String dataset, String field, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetFieldId(namespace, dataset, field)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}

URL toColumnLineageUrl(String namespace, String dataset, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetId(namespace, dataset)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}
}
11 changes: 11 additions & 0 deletions clients/java/src/main/java/marquez/client/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,15 @@ public static void addAuthTo(
@NonNull final HttpRequestBase request, @NonNull final String apiKey) {
request.addHeader(AUTHORIZATION, "Bearer " + apiKey);
}

public static String checkNotBlank(@NonNull final String arg) {
if (emptyOrBlank(arg)) {
throw new IllegalArgumentException();
}
return arg;
}

private static boolean emptyOrBlank(final String arg) {
return arg.trim().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@Getter
public class ColumnLineage {
@NonNull private String name;
@NonNull private List<ColumnLineageInputField> inputFields;
@NonNull private List<DatasetFieldId> inputFields;
@NonNull private String transformationDescription;
@NonNull private String transformationType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import marquez.client.Utils;

@Getter
@AllArgsConstructor
@EqualsAndHashCode
public class ColumnLineageNodeData implements NodeData {
@NonNull String namespace;
@NonNull String dataset;
@NonNull String field;
@NonNull String fieldType;
@NonNull String transformationDescription;
@NonNull String transformationType;
@NonNull List<DatasetFieldId> inputFields;

public static ColumnLineageNodeData fromJson(@NonNull final String json) {
return Utils.fromJson(json, new TypeReference<ColumnLineageNodeData>() {});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import lombok.NonNull;
import lombok.Value;

@Value
public class DatasetFieldId {
@NonNull String namespace;
@NonNull String dataset;
@NonNull String field;
}
27 changes: 27 additions & 0 deletions clients/java/src/main/java/marquez/client/models/Edge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import java.util.Comparator;
import lombok.NonNull;
import lombok.Value;

@Value
public class Edge implements Comparable<Edge> {
@NonNull NodeId origin;
@NonNull NodeId destination;

public static Edge of(@NonNull final NodeId origin, @NonNull final NodeId destination) {
return new Edge(origin, destination);
}

@Override
public int compareTo(Edge o) {
return Comparator.comparing(Edge::getOrigin)
.thenComparing(Edge::getDestination)
.compare(this, o);
}
}
Loading