Skip to content

Commit

Permalink
marquez java client
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Oct 13, 2022
1 parent 9316120 commit e6bd732
Show file tree
Hide file tree
Showing 22 changed files with 703 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* column lineage within Marquez Java client [`#2163`](https://github.com/MarquezProject/marquez/pull/2163) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
(node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid) --upstream lineage
OR (:withDownstream AND adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid) --optional downstream lineage
)
AND node.depth < :depth
AND node.depth < :depth - 1 -- fetching single row means fetching single edge which is size 1
AND NOT is_cycle
)
SELECT
Expand Down
109 changes: 109 additions & 0 deletions api/src/test/java/marquez/ColumnLineageIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez;

import static marquez.db.ColumnLineageTestUtils.getDatasetA;
import static marquez.db.ColumnLineageTestUtils.getDatasetB;
import static marquez.db.ColumnLineageTestUtils.getDatasetC;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.Optional;
import marquez.api.JdbiUtils;
import marquez.client.MarquezClient;
import marquez.client.models.Node;
import marquez.db.LineageTestUtils;
import marquez.db.OpenLineageDao;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.LineageEvent;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@org.junit.jupiter.api.Tag("IntegrationTests")
@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
public class ColumnLineageIntegrationTest extends BaseIntegrationTest {

@BeforeEach
public void setup(Jdbi jdbi) {
OpenLineageDao openLineageDao = jdbi.onDemand(OpenLineageDao.class);

LineageEvent.JobFacet jobFacet =
new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);

LineageEvent.Dataset dataset_A = getDatasetA();
LineageEvent.Dataset dataset_B = getDatasetB();
LineageEvent.Dataset dataset_C = getDatasetC();

LineageTestUtils.createLineageRow(
openLineageDao,
"job1",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_A),
Arrays.asList(dataset_B));

LineageTestUtils.createLineageRow(
openLineageDao,
"job2",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_B),
Arrays.asList(dataset_C));
}

@AfterEach
public void tearDown(Jdbi jdbi) {
JdbiUtils.cleanDatabase(jdbi);
}

@Test
public void testColumnLineageEndpointByDataset() {
MarquezClient.Lineage lineage = client.getColumnLineage("namespace", "dataset_b");

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_b")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
}

@Test
public void testColumnLineageEndpointByDatasetField() {
MarquezClient.Lineage lineage = client.getColumnLineage("namespace", "dataset_b", "col_c");

assertThat(lineage.getGraph()).hasSize(3);
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_b")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
}

@Test
public void testColumnLineageEndpointWithDepthLimit() {
MarquezClient.Lineage lineage =
client.getColumnLineage("namespace", "dataset_c", "col_d", 1, false);

assertThat(lineage.getGraph()).hasSize(2);
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
assertThat(getNodeByFieldName(lineage, "col_d")).isPresent();
}

@Test
public void testColumnLineageEndpointWithDownstream() {
MarquezClient.Lineage lineage =
client.getColumnLineage("namespace", "dataset_b", "col_c", 10, true);

assertThat(lineage.getGraph()).hasSize(4);
assertThat(getNodeByFieldName(lineage, "col_d")).isPresent();
}

private Optional<Node> getNodeByFieldName(MarquezClient.Lineage lineage, String field) {
return lineage.getGraph().stream()
.filter(n -> n.getId().asDatasetFieldId().getField().equals(field))
.findAny();
}
}
4 changes: 2 additions & 2 deletions api/src/test/java/marquez/db/ColumnLineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,9 @@ void testGetLineageWithLimitedDepth() {
assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), false, Instant.now()))
.hasSize(3);

// verify graph size is 2 when max depth is 1
// depth 1 corresponds to single ColumnLineageData with other nodes as node inputFields
assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), false, Instant.now()))
.hasSize(2);
.hasSize(1);
}

@Test
Expand Down
50 changes: 50 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import marquez.client.models.LineageEvent;
import marquez.client.models.Namespace;
import marquez.client.models.NamespaceMeta;
import marquez.client.models.Node;
import marquez.client.models.Run;
import marquez.client.models.RunMeta;
import marquez.client.models.RunState;
Expand All @@ -57,6 +58,7 @@
public class MarquezClient {

static final URL DEFAULT_BASE_URL = Utils.toUrl("http://localhost:8080");
static final int DEFAULT_LINEAGE_GRAPH_DEPTH = 20;

@VisibleForTesting static final int DEFAULT_LIMIT = 100;
@VisibleForTesting static final int DEFAULT_OFFSET = 0;
Expand Down Expand Up @@ -113,6 +115,36 @@ public enum SortDirection {
@Getter public final String value;
}

public Lineage getColumnLineage(@NonNull String namespaceName, @NonNull String datasetName) {
return getColumnLineage(namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineage(
@NonNull String namespaceName, @NonNull String datasetName, @NonNull String field) {
return getColumnLineage(namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineage(
@NonNull String namespaceName,
@NonNull String datasetName,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrl(namespaceName, datasetName, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Lineage getColumnLineage(
@NonNull String namespaceName,
@NonNull String datasetName,
@NonNull String field,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrl(namespaceName, datasetName, field, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Namespace createNamespace(
@NonNull String namespaceName, @NonNull NamespaceMeta namespaceMeta) {
final String bodyAsJson = http.put(url.toNamespaceUrl(namespaceName), namespaceMeta.toJson());
Expand Down Expand Up @@ -660,4 +692,22 @@ String toJson() {
return Utils.toJson(this);
}
}

@Value
public static class Lineage {
@Getter Set<Node> graph;

@JsonCreator
Lineage(@JsonProperty("graph") final Set<Node> value) {
this.graph = ImmutableSet.copyOf(value);
}

static Lineage fromJson(final String json) {
return Utils.fromJson(json, new TypeReference<Lineage>() {});
}

String toJson() {
return Utils.toJson(this);
}
}
}
4 changes: 4 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezPathV1.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,8 @@ static String createTagPath(String name) {
static String searchPath() {
return path("/search");
}

static String columnLineagePath() {
return path("/column-lineage/");
}
}
21 changes: 21 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezUrl.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
import static marquez.client.MarquezPathV1.columnLineagePath;
import static marquez.client.MarquezPathV1.createRunPath;
import static marquez.client.MarquezPathV1.createTagPath;
import static marquez.client.MarquezPathV1.datasetPath;
Expand Down Expand Up @@ -41,6 +42,9 @@
import java.util.Map;
import javax.annotation.Nullable;
import lombok.NonNull;
import marquez.client.models.DatasetFieldId;
import marquez.client.models.DatasetId;
import marquez.client.models.NodeId;
import marquez.client.models.RunState;
import marquez.client.models.SearchFilter;
import marquez.client.models.SearchSort;
Expand Down Expand Up @@ -205,4 +209,21 @@ URL toSearchUrl(
queryParams.put("limit", limit);
return from(searchPath(), queryParams.build());
}

URL toColumnLineageUrl(
String namespace, String dataset, String field, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetFieldId(namespace, dataset, field)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}

URL toColumnLineageUrl(String namespace, String dataset, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetId(namespace, dataset)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}
}
11 changes: 11 additions & 0 deletions clients/java/src/main/java/marquez/client/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,15 @@ public static void addAuthTo(
@NonNull final HttpRequestBase request, @NonNull final String apiKey) {
request.addHeader(AUTHORIZATION, "Bearer " + apiKey);
}

public static String checkNotBlank(@NonNull final String arg) {
if (emptyOrBlank(arg)) {
throw new IllegalArgumentException();
}
return arg;
}

private static boolean emptyOrBlank(final String arg) {
return arg.trim().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@Getter
public class ColumnLineage {
@NonNull private String name;
@NonNull private List<ColumnLineageInputField> inputFields;
@NonNull private List<DatasetFieldId> inputFields;
@NonNull private String transformationDescription;
@NonNull private String transformationType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import marquez.client.Utils;

@Getter
@AllArgsConstructor
@EqualsAndHashCode
public class ColumnLineageNodeData implements NodeData {
@NonNull String namespace;
@NonNull String dataset;
@NonNull String field;
@NonNull String fieldType;
@NonNull String transformationDescription;
@NonNull String transformationType;
@NonNull List<DatasetFieldId> inputFields;

public static ColumnLineageNodeData fromJson(@NonNull final String json) {
return Utils.fromJson(json, new TypeReference<ColumnLineageNodeData>() {});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import lombok.NonNull;
import lombok.Value;

@Value
public class DatasetFieldId {
@NonNull String namespace;
@NonNull String dataset;
@NonNull String field;
}
27 changes: 27 additions & 0 deletions clients/java/src/main/java/marquez/client/models/Edge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import java.util.Comparator;
import lombok.NonNull;
import lombok.Value;

@Value
public class Edge implements Comparable<Edge> {
@NonNull NodeId origin;
@NonNull NodeId destination;

public static Edge of(@NonNull final NodeId origin, @NonNull final NodeId destination) {
return new Edge(origin, destination);
}

@Override
public int compareTo(Edge o) {
return Comparator.comparing(Edge::getOrigin)
.thenComparing(Edge::getDestination)
.compare(this, o);
}
}
Loading

0 comments on commit e6bd732

Please sign in to comment.