From 8b669dfc4a805e16cb637bb72220aabaa582fa03 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Thu, 29 Sep 2022 12:06:54 +0200 Subject: [PATCH] include column lineage in dataset resource Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 1 + .../java/marquez/api/DatasetResource.java | 5 +- .../java/marquez/db/ColumnLineageDao.java | 43 ++++++++++++++ .../main/java/marquez/db/DatasetFieldDao.java | 27 +++++---- .../marquez/service/ColumnLineageService.java | 51 +++++++++++++++++ .../marquez/service/models/ColumnLineage.java | 24 ++++++++ .../models/ColumnLineageInputField.java | 22 +++++++ .../java/marquez/service/models/Dataset.java | 3 + .../java/marquez/service/models/DbTable.java | 1 + .../java/marquez/service/models/Stream.java | 1 + .../java/marquez/DatasetIntegrationTest.java | 48 ++++++++++++++++ .../marquez/db/ColumnLineageTestUtils.java | 9 +++ .../service/ColumnLineageServiceTest.java | 57 +++++++++++++++++++ .../marquez/client/models/ColumnLineage.java | 24 ++++++++ .../models/ColumnLineageInputField.java | 22 +++++++ .../java/marquez/client/models/Dataset.java | 3 + .../java/marquez/client/models/DbTable.java | 2 + .../java/marquez/client/models/Stream.java | 2 + .../marquez/client/MarquezClientTest.java | 4 ++ .../marquez/client/models/ModelGenerator.java | 2 + 20 files changed, 338 insertions(+), 13 deletions(-) create mode 100644 api/src/main/java/marquez/service/models/ColumnLineage.java create mode 100644 api/src/main/java/marquez/service/models/ColumnLineageInputField.java create mode 100644 clients/java/src/main/java/marquez/client/models/ColumnLineage.java create mode 100644 clients/java/src/main/java/marquez/client/models/ColumnLineageInputField.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b075dbd89..42147a4449 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) +* Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) ### Fixed * Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike) diff --git a/api/src/main/java/marquez/api/DatasetResource.java b/api/src/main/java/marquez/api/DatasetResource.java index 841d4770b6..a976139e35 100644 --- a/api/src/main/java/marquez/api/DatasetResource.java +++ b/api/src/main/java/marquez/api/DatasetResource.java @@ -12,6 +12,7 @@ import com.codahale.metrics.annotation.ResponseMetered; import com.codahale.metrics.annotation.Timed; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Arrays; import java.util.List; import java.util.Locale; import javax.validation.Valid; @@ -85,10 +86,11 @@ public Response getDataset( @PathParam("dataset") DatasetName datasetName) { throwIfNotExists(namespaceName); - final Dataset dataset = + Dataset dataset = datasetService .findWithTags(namespaceName.getValue(), datasetName.getValue()) .orElseThrow(() -> new DatasetNotFoundException(datasetName)); + columnLineageService.enrichWithColumnLineage(Arrays.asList(dataset)); return Response.ok(dataset).build(); } @@ -147,6 +149,7 @@ public Response list( final List datasets = datasetService.findAllWithTags(namespaceName.getValue(), limit, offset); + columnLineageService.enrichWithColumnLineage(datasets); final int totalCount = datasetService.countFor(namespaceName.getValue()); return Response.ok(new ResultsPage<>("datasets", datasets, totalCount)).build(); } diff --git a/api/src/main/java/marquez/db/ColumnLineageDao.java b/api/src/main/java/marquez/db/ColumnLineageDao.java index c472937cd1..b26a1213f1 100644 --- a/api/src/main/java/marquez/db/ColumnLineageDao.java +++ b/api/src/main/java/marquez/db/ColumnLineageDao.java @@ -150,4 +150,47 @@ Set getLineage( int depth, @BindList(onEmpty = NULL_STRING) List datasetFieldUuids, Instant createdAtUntil); + + @SqlQuery( + """ + WITH selected_column_lineage AS ( + SELECT cl.* + FROM column_lineage cl + JOIN dataset_fields df ON df.uuid = cl.output_dataset_field_uuid + JOIN datasets_view dv ON dv.uuid = df.dataset_uuid + WHERE ARRAY[]::DATASET_NAME[] && dv.dataset_symlinks + ), + dataset_fields_view AS ( + SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid + FROM dataset_fields df + INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid + ) + SELECT + output_fields.namespace_name, + output_fields.dataset_name, + output_fields.field_name, + output_fields.type, + ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields, + c.transformation_description, + c.transformation_type, + c.created_at, + c.updated_at + FROM selected_column_lineage c + INNER JOIN dataset_fields_view output_fields ON c.output_dataset_field_uuid = output_fields.uuid + LEFT JOIN dataset_fields_view input_fields ON c.input_dataset_field_uuid = input_fields.uuid + GROUP BY + output_fields.namespace_name, + output_fields.dataset_name, + output_fields.field_name, + output_fields.type, + c.transformation_description, + c.transformation_type, + c.created_at, + c.updated_at + """) + Set getLineageRowsForDatasets( + @BindBeanList( + propertyNames = {"left", "right"}, + value = "values") + List> datasets); } diff --git a/api/src/main/java/marquez/db/DatasetFieldDao.java b/api/src/main/java/marquez/db/DatasetFieldDao.java index a54a271604..850d95c19a 100644 --- a/api/src/main/java/marquez/db/DatasetFieldDao.java +++ b/api/src/main/java/marquez/db/DatasetFieldDao.java @@ -34,11 +34,14 @@ @RegisterRowMapper(FieldDataMapper.class) public interface DatasetFieldDao extends BaseDao { @SqlQuery( - "SELECT EXISTS (" - + "SELECT 1 FROM dataset_fields AS df " - + "INNER JOIN datasets_view AS d " - + " ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespaceName " - + "WHERE df.name = :name)") + """ + SELECT EXISTS ( + SELECT 1 FROM dataset_fields AS df + INNER JOIN datasets_view AS d ON d.uuid = df.dataset_uuid + WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks) + AND df.name = :name + ) + """) boolean exists(String namespaceName, String datasetName, String name); default Dataset updateTags( @@ -97,20 +100,20 @@ default Dataset updateTags( """ SELECT df.uuid FROM dataset_fields df - INNER JOIN datasets_view AS d - ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace + JOIN datasets_view AS d ON d.uuid = df.dataset_uuid + WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks) """) - List findDatasetFieldsUuids(String namespace, String datasetName); + List findDatasetFieldsUuids(String namespaceName, String datasetName); @SqlQuery( """ SELECT df.uuid FROM dataset_fields df - INNER JOIN datasets_view AS d - ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace - WHERE df.name = :name + JOIN datasets_view AS d ON d.uuid = df.dataset_uuid + WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks) + AND df.name = :name """) - Optional findUuid(String namespace, String datasetName, String name); + Optional findUuid(String namespaceName, String datasetName, String name); @SqlQuery( "SELECT f.*, " diff --git a/api/src/main/java/marquez/service/ColumnLineageService.java b/api/src/main/java/marquez/service/ColumnLineageService.java index baa5fc75c5..3aee7ee33d 100644 --- a/api/src/main/java/marquez/service/ColumnLineageService.java +++ b/api/src/main/java/marquez/service/ColumnLineageService.java @@ -22,10 +22,14 @@ import marquez.db.ColumnLineageDao; import marquez.db.DatasetFieldDao; import marquez.db.models.ColumnLineageNodeData; +import marquez.service.models.ColumnLineage; +import marquez.service.models.ColumnLineageInputField; +import marquez.service.models.Dataset; import marquez.service.models.Edge; import marquez.service.models.Lineage; import marquez.service.models.Node; import marquez.service.models.NodeId; +import org.apache.commons.lang3.tuple.Pair; @Slf4j public class ColumnLineageService extends DelegatingDaos.DelegatingColumnLineageDao { @@ -125,4 +129,51 @@ List getColumnNodeUuids(NodeId nodeId) { } return columnNodeUuids; } + + public void enrichWithColumnLineage(List datasets) { + if (datasets.isEmpty()) { + return; + } + + Set lineageRowsForDatasets = + getLineageRowsForDatasets( + datasets.stream() + .map(d -> Pair.of(d.getNamespace().getValue(), d.getName().getValue())) + .collect(Collectors.toList())); + + Map> datasetLineage = new HashMap<>(); + lineageRowsForDatasets.stream() + .forEach( + nodeData -> { + Dataset dataset = + datasets.stream() + .filter(d -> d.getNamespace().getValue().equals(nodeData.getNamespace())) + .filter(d -> d.getName().getValue().equals(nodeData.getDataset())) + .findAny() + .get(); + + if (!datasetLineage.containsKey(dataset)) { + datasetLineage.put(dataset, new LinkedList<>()); + } + datasetLineage + .get(dataset) + .add( + ColumnLineage.builder() + .name(nodeData.getField()) + .transformationDescription(nodeData.getTransformationDescription()) + .transformationType(nodeData.getTransformationType()) + .inputFields( + nodeData.getInputFields().stream() + .map( + f -> + new ColumnLineageInputField( + f.getNamespace(), f.getDataset(), f.getField())) + .collect(Collectors.toList())) + .build()); + }); + + datasets.stream() + .filter(dataset -> datasetLineage.containsKey(dataset)) + .forEach(dataset -> dataset.setColumnLineage(datasetLineage.get(dataset))); + } } diff --git a/api/src/main/java/marquez/service/models/ColumnLineage.java b/api/src/main/java/marquez/service/models/ColumnLineage.java new file mode 100644 index 0000000000..f5a8854495 --- /dev/null +++ b/api/src/main/java/marquez/service/models/ColumnLineage.java @@ -0,0 +1,24 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import java.util.List; +import javax.validation.constraints.NotNull; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +@EqualsAndHashCode +@ToString +@Builder +@Getter +public class ColumnLineage { + @NotNull private String name; + @NotNull private List inputFields; + @NotNull private String transformationDescription; + @NotNull private String transformationType; +} diff --git a/api/src/main/java/marquez/service/models/ColumnLineageInputField.java b/api/src/main/java/marquez/service/models/ColumnLineageInputField.java new file mode 100644 index 0000000000..13bf9f01d5 --- /dev/null +++ b/api/src/main/java/marquez/service/models/ColumnLineageInputField.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +@EqualsAndHashCode +@ToString +@Getter +@AllArgsConstructor +public class ColumnLineageInputField { + @NotNull private String datasetNamespace; + @NotNull private String datasetName; + @NotNull private String fieldName; +} diff --git a/api/src/main/java/marquez/service/models/Dataset.java b/api/src/main/java/marquez/service/models/Dataset.java index 7c21edc41b..8cdef94fcd 100644 --- a/api/src/main/java/marquez/service/models/Dataset.java +++ b/api/src/main/java/marquez/service/models/Dataset.java @@ -53,6 +53,7 @@ public abstract class Dataset { @Nullable private final String lastLifecycleState; @Nullable private final String description; @Nullable private final UUID currentVersion; + @Getter @Setter @Nullable private List columnLineage; @Getter ImmutableMap facets; @Getter private final boolean isDeleted; @@ -70,6 +71,7 @@ public Dataset( @Nullable final String lastLifecycleState, @Nullable final String description, @Nullable final UUID currentVersion, + @Nullable final ImmutableList columnLineage, @Nullable final ImmutableMap facets, boolean isDeleted) { this.id = id; @@ -86,6 +88,7 @@ public Dataset( this.lastLifecycleState = lastLifecycleState; this.description = description; this.currentVersion = currentVersion; + this.columnLineage = columnLineage; this.facets = (facets == null) ? ImmutableMap.of() : facets; this.isDeleted = isDeleted; } diff --git a/api/src/main/java/marquez/service/models/DbTable.java b/api/src/main/java/marquez/service/models/DbTable.java index bcae8360d2..df0a196394 100644 --- a/api/src/main/java/marquez/service/models/DbTable.java +++ b/api/src/main/java/marquez/service/models/DbTable.java @@ -53,6 +53,7 @@ public DbTable( lastLifecycleState, description, currentVersion, + null, facets, isDeleted); } diff --git a/api/src/main/java/marquez/service/models/Stream.java b/api/src/main/java/marquez/service/models/Stream.java index 4225ca6a24..adb48ab67a 100644 --- a/api/src/main/java/marquez/service/models/Stream.java +++ b/api/src/main/java/marquez/service/models/Stream.java @@ -59,6 +59,7 @@ public Stream( lastLifecycleState, description, currentVersion, + null, facets, isDeleted); this.schemaLocation = schemaLocation; diff --git a/api/src/test/java/marquez/DatasetIntegrationTest.java b/api/src/test/java/marquez/DatasetIntegrationTest.java index e8603c78b3..cb478fb0b6 100644 --- a/api/src/test/java/marquez/DatasetIntegrationTest.java +++ b/api/src/test/java/marquez/DatasetIntegrationTest.java @@ -5,6 +5,8 @@ package marquez; +import static marquez.db.ColumnLineageTestUtils.getDatasetA; +import static marquez.db.ColumnLineageTestUtils.getDatasetB; import static org.assertj.core.api.Assertions.assertThat; import com.fasterxml.jackson.core.type.TypeReference; @@ -22,6 +24,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import marquez.client.models.ColumnLineage; import marquez.client.models.Dataset; import marquez.client.models.DatasetId; import marquez.client.models.DatasetVersion; @@ -440,4 +443,49 @@ public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOExcep datasets = client.listDatasets(namespace); assertThat(datasets).hasSize(1); } + + @Test + public void testApp_getDatasetContainsColumnLineage() { + LineageEvent event = + new LineageEvent( + "COMPLETE", + Instant.now().atZone(ZoneId.systemDefault()), + new LineageEvent.Run(UUID.randomUUID().toString(), null), + new LineageEvent.Job("namespace", "job_name", null), + List.of(getDatasetA()), + List.of(getDatasetB()), + "the_producer"); + + CompletableFuture resp = + this.sendLineage(Utils.toJson(event)) + .thenApply(HttpResponse::statusCode) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + resp.join(); + + // verify listDatasets contains column lineage + List columnLineage; + + columnLineage = + client.listDatasets("namespace").stream() + .filter(d -> d.getName().equals("dataset_b")) + .findAny() + .get() + .getColumnLineage(); + assertThat(columnLineage).hasSize(1); + assertThat(columnLineage.get(0).getInputFields()).hasSize(2); + + // verify getDataset returns non-empty column lineage + columnLineage = client.getDataset("namespace", "dataset_b").getColumnLineage(); + assertThat(columnLineage).hasSize(1); + assertThat(columnLineage.get(0).getInputFields()).hasSize(2); + + client.deleteJob("namespace", "job_name"); + client.deleteDataset("namespace", "dataset_a"); + client.deleteDataset("namespace", "dataset_b"); + } } diff --git a/api/src/test/java/marquez/db/ColumnLineageTestUtils.java b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java index 41d50c36eb..6776dc80ec 100644 --- a/api/src/test/java/marquez/db/ColumnLineageTestUtils.java +++ b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java @@ -55,6 +55,9 @@ public static LineageEvent.Dataset getDatasetA() { Arrays.asList( new LineageEvent.SchemaField("col_a", "STRING", ""), new LineageEvent.SchemaField("col_b", "STRING", "")))) + .dataSource( + new LineageEvent.DatasourceDatasetFacet( + PRODUCER_URL, SCHEMA_URL, "the source", "http://thesource.com")) .build()); } @@ -69,6 +72,9 @@ public static LineageEvent.Dataset getDatasetB() { PRODUCER_URL, SCHEMA_URL, Arrays.asList(new LineageEvent.SchemaField("col_c", "STRING", "")))) + .dataSource( + new LineageEvent.DatasourceDatasetFacet( + PRODUCER_URL, SCHEMA_URL, "the source", "http://thesource.com")) .columnLineage( new LineageEvent.ColumnLineageFacet( PRODUCER_URL, @@ -109,6 +115,9 @@ public static LineageEvent.Dataset getDatasetC() { "namespace", "dataset_b", "col_c")), "description2", "type2")))) + .dataSource( + new LineageEvent.DatasourceDatasetFacet( + PRODUCER_URL, SCHEMA_URL, "the source", "http://thesource.com")) .build()); } } diff --git a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java index 78224080ae..5dbc2cd2c3 100644 --- a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java +++ b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java @@ -23,12 +23,15 @@ import marquez.common.models.NamespaceName; import marquez.db.ColumnLineageDao; import marquez.db.ColumnLineageTestUtils; +import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.LineageTestUtils; import marquez.db.OpenLineageDao; import marquez.db.models.ColumnLineageNodeData; import marquez.db.models.InputFieldNodeData; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.ColumnLineageInputField; +import marquez.service.models.Dataset; import marquez.service.models.Lineage; import marquez.service.models.LineageEvent; import marquez.service.models.Node; @@ -45,6 +48,7 @@ public class ColumnLineageServiceTest { private static ColumnLineageDao dao; private static OpenLineageDao openLineageDao; private static DatasetFieldDao fieldDao; + private static DatasetDao datasetDao; private static ColumnLineageService lineageService; private static LineageEvent.JobFacet jobFacet; @@ -53,6 +57,7 @@ public static void setUpOnce(Jdbi jdbi) { dao = jdbi.onDemand(ColumnLineageDao.class); openLineageDao = jdbi.onDemand(OpenLineageDao.class); fieldDao = jdbi.onDemand(DatasetFieldDao.class); + datasetDao = jdbi.onDemand(DatasetDao.class); lineageService = new ColumnLineageService(dao, fieldDao); jobFacet = new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); } @@ -211,6 +216,58 @@ public void testLineageWhenLineageEmpty() { .hasSize(0); } + @Test + public void testEnrichDatasets() { + LineageEvent.Dataset dataset_A = getDatasetA(); + LineageEvent.Dataset dataset_B = getDatasetB(); + LineageEvent.Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + Dataset dataset_b = datasetDao.findDatasetByName("namespace", "dataset_b").get(); + Dataset dataset_c = datasetDao.findDatasetByName("namespace", "dataset_c").get(); + lineageService.enrichWithColumnLineage(Arrays.asList(dataset_b, dataset_c)); + + assertThat(dataset_b.getColumnLineage()).hasSize(1); + assertThat(dataset_b.getColumnLineage().get(0).getName()).isEqualTo("col_c"); + assertThat(dataset_b.getColumnLineage().get(0).getTransformationType()).isEqualTo("type1"); + assertThat(dataset_b.getColumnLineage().get(0).getTransformationDescription()) + .isEqualTo("description1"); + + List inputFields_b = + dataset_b.getColumnLineage().get(0).getInputFields(); + assertThat(inputFields_b) + .hasSize(2) + .contains(new ColumnLineageInputField("namespace", "dataset_a", "col_a")) + .contains(new ColumnLineageInputField("namespace", "dataset_a", "col_b")); + + assertThat(dataset_c.getColumnLineage()).hasSize(1); + assertThat(dataset_c.getColumnLineage().get(0).getName()).isEqualTo("col_d"); + assertThat(dataset_c.getColumnLineage().get(0).getTransformationType()).isEqualTo("type2"); + assertThat(dataset_c.getColumnLineage().get(0).getTransformationDescription()) + .isEqualTo("description2"); + + List inputFields_c = + dataset_c.getColumnLineage().get(0).getInputFields(); + assertThat(inputFields_c) + .hasSize(1) + .contains(new ColumnLineageInputField("namespace", "dataset_b", "col_c")); + } + private Optional getNode(Lineage lineage, String datasetName, String fieldName) { return lineage.getGraph().stream() .filter(n -> n.getId().asDatasetFieldId().getFieldName().getValue().equals(fieldName)) diff --git a/clients/java/src/main/java/marquez/client/models/ColumnLineage.java b/clients/java/src/main/java/marquez/client/models/ColumnLineage.java new file mode 100644 index 0000000000..4b23b65b2a --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/ColumnLineage.java @@ -0,0 +1,24 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import java.util.List; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +@EqualsAndHashCode +@ToString +@Builder +@Getter +public class ColumnLineage { + @NonNull private String name; + @NonNull private List inputFields; + @NonNull private String transformationDescription; + @NonNull private String transformationType; +} diff --git a/clients/java/src/main/java/marquez/client/models/ColumnLineageInputField.java b/clients/java/src/main/java/marquez/client/models/ColumnLineageInputField.java new file mode 100644 index 0000000000..b7c6193a8f --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/ColumnLineageInputField.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +@EqualsAndHashCode +@ToString +@Getter +@AllArgsConstructor +public class ColumnLineageInputField { + @NonNull private String datasetNamespace; + @NonNull private String datasetName; + @NonNull private String fieldName; +} diff --git a/clients/java/src/main/java/marquez/client/models/Dataset.java b/clients/java/src/main/java/marquez/client/models/Dataset.java index 39c954589a..ba4432a6e9 100644 --- a/clients/java/src/main/java/marquez/client/models/Dataset.java +++ b/clients/java/src/main/java/marquez/client/models/Dataset.java @@ -47,6 +47,7 @@ public abstract class Dataset { @Getter @NonNull private final Set tags; @Nullable private final Instant lastModifiedAt; @Nullable private final String description; + @Getter @Nullable private List columnLineage; @Getter private final Map facets; @Nullable private final UUID currentVersion; @@ -63,6 +64,7 @@ public Dataset( @Nullable final Set tags, @Nullable final Instant lastModifiedAt, @Nullable final String description, + @Nullable final List columnLineage, @Nullable final Map facets, @Nullable final UUID currentVersion) { this.id = id; @@ -77,6 +79,7 @@ public Dataset( this.tags = (tags == null) ? ImmutableSet.of() : ImmutableSet.copyOf(tags); this.lastModifiedAt = lastModifiedAt; this.description = description; + this.columnLineage = columnLineage; this.facets = (facets == null) ? ImmutableMap.of() : ImmutableMap.copyOf(facets); this.currentVersion = currentVersion; } diff --git a/clients/java/src/main/java/marquez/client/models/DbTable.java b/clients/java/src/main/java/marquez/client/models/DbTable.java index 7a43bde269..be5014248c 100644 --- a/clients/java/src/main/java/marquez/client/models/DbTable.java +++ b/clients/java/src/main/java/marquez/client/models/DbTable.java @@ -31,6 +31,7 @@ public DbTable( @Nullable final Set tags, @Nullable final Instant lastModifiedAt, @Nullable final String description, + @Nullable final List columnLineage, @Nullable final Map facets, @Nullable final UUID currentVersion) { super( @@ -46,6 +47,7 @@ public DbTable( tags, lastModifiedAt, description, + columnLineage, facets, currentVersion); } diff --git a/clients/java/src/main/java/marquez/client/models/Stream.java b/clients/java/src/main/java/marquez/client/models/Stream.java index db3b9a6e78..3d058f9ee9 100644 --- a/clients/java/src/main/java/marquez/client/models/Stream.java +++ b/clients/java/src/main/java/marquez/client/models/Stream.java @@ -36,6 +36,7 @@ public Stream( @Nullable final Instant lastModifiedAt, @Nullable final URL schemaLocation, @Nullable final String description, + @Nullable final List columnLineage, @Nullable final Map facets, @Nullable final UUID currentVersion) { super( @@ -51,6 +52,7 @@ public Stream( tags, lastModifiedAt, description, + columnLineage, facets, currentVersion); this.schemaLocation = schemaLocation; diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index f60690ba50..5898689261 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -146,6 +146,7 @@ public class MarquezClientTest { TAGS, null, DB_TABLE_DESCRIPTION, + null, DB_FACETS, CURRENT_VERSION); private static final DbTable DB_TABLE_MODIFIED = @@ -161,6 +162,7 @@ public class MarquezClientTest { TAGS, LAST_MODIFIED_AT, DB_TABLE_DESCRIPTION, + null, DB_FACETS, CURRENT_VERSION); @@ -197,6 +199,7 @@ public class MarquezClientTest { null, STREAM_SCHEMA_LOCATION, STREAM_DESCRIPTION, + null, DB_FACETS, CURRENT_VERSION); private static final Stream STREAM_MODIFIED = @@ -213,6 +216,7 @@ public class MarquezClientTest { LAST_MODIFIED_AT, STREAM_SCHEMA_LOCATION, STREAM_DESCRIPTION, + null, DB_FACETS, CURRENT_VERSION); diff --git a/clients/java/src/test/java/marquez/client/models/ModelGenerator.java b/clients/java/src/test/java/marquez/client/models/ModelGenerator.java index 1c39864089..bc28f4be08 100644 --- a/clients/java/src/test/java/marquez/client/models/ModelGenerator.java +++ b/clients/java/src/test/java/marquez/client/models/ModelGenerator.java @@ -86,6 +86,7 @@ public static DbTable newDbTableWith(UUID currentVersion) { newTagNames(2), null, newDescription(), + null, newDatasetFacets(2), currentVersion); } @@ -134,6 +135,7 @@ public static Stream newStreamWith(UUID currentVersion) { null, newSchemaLocation(), newDescription(), + null, newDatasetFacets(2), currentVersion); }