Skip to content

Commit

Permalink
marquez java client
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Oct 12, 2022
1 parent 7b6265d commit 804adf6
Show file tree
Hide file tree
Showing 16 changed files with 615 additions and 1 deletion.
50 changes: 50 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import marquez.client.models.LineageEvent;
import marquez.client.models.Namespace;
import marquez.client.models.NamespaceMeta;
import marquez.client.models.Node;
import marquez.client.models.Run;
import marquez.client.models.RunMeta;
import marquez.client.models.RunState;
Expand All @@ -57,6 +58,7 @@
public class MarquezClient {

static final URL DEFAULT_BASE_URL = Utils.toUrl("http://localhost:8080");
static final int DEFAULT_LINEAGE_GRAPH_DEPTH = 20;

@VisibleForTesting static final int DEFAULT_LIMIT = 100;
@VisibleForTesting static final int DEFAULT_OFFSET = 0;
Expand Down Expand Up @@ -113,6 +115,36 @@ public enum SortDirection {
@Getter public final String value;
}

public Lineage getColumnLineage(@NonNull String namespaceName, @NonNull String datasetName) {
return getColumnLineage(namespaceName, datasetName, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineage(
@NonNull String namespaceName, @NonNull String datasetName, @NonNull String field) {
return getColumnLineage(namespaceName, datasetName, field, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}

public Lineage getColumnLineage(
@NonNull String namespaceName,
@NonNull String datasetName,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrl(namespaceName, datasetName, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Lineage getColumnLineage(
@NonNull String namespaceName,
@NonNull String datasetName,
@NonNull String field,
int depth,
boolean withDownstream) {
final String bodyAsJson =
http.get(url.toColumnLineageUrl(namespaceName, datasetName, field, depth, withDownstream));
return Lineage.fromJson(bodyAsJson);
}

public Namespace createNamespace(
@NonNull String namespaceName, @NonNull NamespaceMeta namespaceMeta) {
final String bodyAsJson = http.put(url.toNamespaceUrl(namespaceName), namespaceMeta.toJson());
Expand Down Expand Up @@ -660,4 +692,22 @@ String toJson() {
return Utils.toJson(this);
}
}

@Value
static class Lineage {
@Getter Set<Node> graph;

@JsonCreator
Lineage(@JsonProperty("graph") final Set<Node> value) {
this.graph = ImmutableSet.copyOf(value);
}

static Lineage fromJson(final String json) {
return Utils.fromJson(json, new TypeReference<Lineage>() {});
}

String toJson() {
return Utils.toJson(this);
}
}
}
4 changes: 4 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezPathV1.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,8 @@ static String createTagPath(String name) {
static String searchPath() {
return path("/search");
}

static String columnLineagePath() {
return path("/column-lineage/");
}
}
21 changes: 21 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezUrl.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
import static marquez.client.MarquezPathV1.columnLineagePath;
import static marquez.client.MarquezPathV1.createRunPath;
import static marquez.client.MarquezPathV1.createTagPath;
import static marquez.client.MarquezPathV1.datasetPath;
Expand Down Expand Up @@ -41,6 +42,9 @@
import java.util.Map;
import javax.annotation.Nullable;
import lombok.NonNull;
import marquez.client.models.DatasetFieldId;
import marquez.client.models.DatasetId;
import marquez.client.models.NodeId;
import marquez.client.models.RunState;
import marquez.client.models.SearchFilter;
import marquez.client.models.SearchSort;
Expand Down Expand Up @@ -205,4 +209,21 @@ URL toSearchUrl(
queryParams.put("limit", limit);
return from(searchPath(), queryParams.build());
}

URL toColumnLineageUrl(
String namespace, String dataset, String field, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetFieldId(namespace, dataset, field)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}

URL toColumnLineageUrl(String namespace, String dataset, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", NodeId.of(new DatasetId(namespace, dataset)).getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}
}
23 changes: 23 additions & 0 deletions clients/java/src/main/java/marquez/client/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package marquez.client;

import static com.google.common.base.Strings.lenientFormat;
import static org.apache.http.HttpHeaders.AUTHORIZATION;

import com.fasterxml.jackson.annotation.JsonInclude;
Expand All @@ -17,6 +18,7 @@
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.apache.http.client.methods.HttpRequestBase;

Expand Down Expand Up @@ -70,4 +72,25 @@ public static void addAuthTo(
@NonNull final HttpRequestBase request, @NonNull final String apiKey) {
request.addHeader(AUTHORIZATION, "Bearer " + apiKey);
}

public static String checkNotBlank(@NonNull final String arg) {
if (emptyOrBlank(arg)) {
throw new IllegalArgumentException();
}
return arg;
}

public static String checkNotBlank(
@NonNull final String arg,
@Nullable final String errorMessage,
@Nullable final Object... errorMessageArgs) {
if (emptyOrBlank(arg)) {
throw new IllegalArgumentException(lenientFormat(errorMessage, errorMessageArgs));
}
return arg;
}

private static boolean emptyOrBlank(final String arg) {
return arg.trim().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@Getter
public class ColumnLineage {
@NonNull private String name;
@NonNull private List<ColumnLineageInputField> inputFields;
@NonNull private List<DatasetFieldId> inputFields;
@NonNull private String transformationDescription;
@NonNull private String transformationType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import marquez.client.Utils;

@Getter
@AllArgsConstructor
@EqualsAndHashCode
public class ColumnLineageNodeData implements NodeData {
@NonNull String namespace;
@NonNull String dataset;
@NonNull String field;
@NonNull String fieldType;
@NonNull String transformationDescription;
@NonNull String transformationType;
@NonNull List<DatasetFieldId> inputFields;

public static ColumnLineageNodeData fromJson(@NonNull final String json) {
return Utils.fromJson(json, new TypeReference<ColumnLineageNodeData>() {});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import lombok.NonNull;
import lombok.Value;

@Value
public class DatasetFieldId {
@NonNull String namespace;
@NonNull String dataset;
@NonNull String field;
}
19 changes: 19 additions & 0 deletions clients/java/src/main/java/marquez/client/models/Edge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import lombok.NonNull;
import lombok.Value;

@Value
public class Edge {
@NonNull NodeId origin;
@NonNull NodeId destination;

public static Edge of(@NonNull final NodeId origin, @NonNull final NodeId destination) {
return new Edge(origin, destination);
}
}
120 changes: 120 additions & 0 deletions clients/java/src/main/java/marquez/client/models/Node.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import static marquez.client.Utils.checkNotBlank;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Sets;
import java.util.Set;
import javax.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;

@EqualsAndHashCode
@ToString
@JsonPropertyOrder({"id", "type", "data", "inEdges", "outEdges"})
public final class Node {
@Getter private final NodeId id;
@Getter private final NodeType type;
@Getter @Setter @Nullable private NodeData data;
@Getter private final Set<Edge> inEdges;
@Getter private final Set<Edge> outEdges;

public Node(
@NonNull final NodeId id,
@NonNull final NodeType type,
@Nullable final NodeData data,
@Nullable final Set<Edge> inEdges,
@Nullable final Set<Edge> outEdges) {
this.id = id;
this.type = type;
this.data = data;
this.inEdges = (inEdges == null) ? ImmutableSet.of() : ImmutableSortedSet.copyOf(inEdges);
this.outEdges = (outEdges == null) ? ImmutableSet.of() : ImmutableSortedSet.copyOf(outEdges);
}

public static Builder dataset() {
return new Builder(NodeType.DATASET);
}

public static Builder datasetField() {
return new Builder(NodeType.DATASET_FIELD);
}

public static Builder job() {
return new Builder(NodeType.JOB);
}

public static Builder run() {
return new Builder(NodeType.RUN);
}

public boolean hasInEdges() {
return !inEdges.isEmpty();
}

public boolean hasOutEdges() {
return !outEdges.isEmpty();
}

public static final class Builder {
private NodeId id;
private final NodeType type;
private NodeData data;
private Set<Edge> inEdges;
private Set<Edge> outEdges;

private Builder(@NonNull final NodeType type) {
this.type = type;
this.inEdges = ImmutableSet.of();
this.outEdges = ImmutableSet.of();
}

public Builder id(@NonNull String idString) {
return id(NodeId.of(checkNotBlank(idString)));
}

public Builder id(@NonNull NodeId id) {
this.id = id;
return this;
}

public Builder data(@Nullable NodeData data) {
this.data = data;
return this;
}

public Builder inEdges(@NonNull Edge... inEdges) {
this.inEdges = Sets.newHashSet(inEdges);
return this;
}

public Builder inEdges(@Nullable Set<Edge> inEdges) {
this.inEdges = (inEdges == null) ? ImmutableSet.of() : inEdges;
return this;
}

public Builder outEdges(@NonNull Edge... outEdges) {
this.outEdges = Sets.newHashSet(outEdges);
return this;
}

public Builder outEdges(@Nullable Set<Edge> outEdges) {
this.outEdges = (outEdges == null) ? ImmutableSet.of() : outEdges;
return this;
}

public Node build() {
return new Node(id, type, data, inEdges, outEdges);
}
}
}
16 changes: 16 additions & 0 deletions clients/java/src/main/java/marquez/client/models/NodeData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXTERNAL_PROPERTY,
property = "type")
@JsonSubTypes({@JsonSubTypes.Type(value = ColumnLineageNodeData.class, name = "DATASET_FIELD")})
public interface NodeData {}
Loading

0 comments on commit 804adf6

Please sign in to comment.