Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
tito12 committed Dec 21, 2022
2 parents a14dfe5 + 1d28adf commit 461fd19
Show file tree
Hide file tree
Showing 60 changed files with 765 additions and 379 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ jobs:
- v1-web-{{ .Branch }}
- run: npm install
- run: npm run test
- run: npm run eslint-fix
- run: npm run build
- save_cache:
paths:
Expand Down
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
API_PORT=5000
API_ADMIN_PORT=5001
WEB_PORT=3000
TAG=0.28.0
TAG=0.29.0
51 changes: 44 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,31 +1,68 @@
# Changelog

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.28.0...HEAD)
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.29.0...HEAD)

## [0.29.0](https://github.com/MarquezProject/marquez/compare/0.28.0...0.29.0) - 2022-12-19

### Added

<<<<<<< HEAD
* Column-lineage endpoints supports point-in-time requests [`#2265`](https://github.com/MarquezProject/marquez/pull/2265) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Enable requesting `column-lineage` endpoint by a dataset version, job version or dataset field of a specific dataset version.*
* Present column lineage of a dataset [`#2293`](https://github.com/MarquezProject/marquez/pull/2293) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Column lineage of a dataset with a single level of depth can
be displayed in datase details tab.*

* Add point-in-time requests support to column-lineage endpoints [`#2265`](https://github.com/MarquezProject/marquez/pull/2265) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Enables requesting `column-lineage` endpoint by a dataset version, job version or dataset field of a specific dataset version.*
* Add column lineage point-in-time Java client methods [`#2269`](https://github.com/MarquezProject/marquez/pull/2269) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Java client methods to retrieve point-in-time `column-lineage`. Please note that the existing methods `getColumnLineageByDataset`, `getColumnLineageByDataset` and `getColumnLineageByDatasetField` are replaced by a single `getColumnLineage` method taking `NodeId` as a parameter.*
* Add raw event viewer to UI [`#2249`](https://github.com/MarquezProject/marquez/pull/2249) [@tito12](https://github.com/tito12)
*A new events page enables filtering events by date and expanding the payload by clicking on each event.*
* Update events page with styling synchronization [`#2324`](https://github.com/MarquezProject/marquez/pull/2324) [@phixMe](https://github.com/phixMe)
*Makes some updates to the new page to make it conform better to the overall design system.*
* Update helm Ingress template to be cross-compatible with recent k8s versions [`#2275`](https://github.com/MarquezProject/marquez/pull/2275) [@jlukenoff](https://github.com/jlukenoff)
*Certain components of the Ingress schema have changed in recent versions of Kubernetes. This change updates the Ingress helm template to render based on the semantic Kubernetes version.*
* Add delete namespace endpoint doc to OpenAPI docs [`#2295`](https://github.com/MarquezProject/marquez/pull/2295) [@mobuchowski](https://github.com/mobuchowski)
*Adds a doc about the delete namespace endpoint.*
* Add i18next and language switcher for i18n of UI [`#2254`](https://github.com/MarquezProject/marquez/pull/2254) [@merobi-hub](https://github.com/merobi-hub) [@phixMe](https://github.com/phixMe)
*Adds i18next framework, language switcher, and translations for i18n of UI.*
* Add indexed `created_at` column to lineage events table [`#2299`](https://github.com/MarquezProject/marquez/pull/2299) [@prachim-collab](https://github.com/prachim-collab)
*A new timestamp column in the database supports analytics use cases by allowing for identification of incrementally created events (backwards-compatible).*

### Fixed

* Allow null column type in column-lineage [`#2272`](https://github.com/MarquezProject/marquez/pull/2272) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Allow null column type in column lineage [`#2272`](https://github.com/MarquezProject/marquez/pull/2272) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*The column-lineage endpoint was throwing an exception when no data type of the field was provided. Includes a test.*
* Include error message for JSON processing exception [`#2271`](https://github.com/MarquezProject/marquez/pull/2271) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*In case of JSON processing exceptions Marquez API should return exception message to a client.*
* Fix column lineage when multiple jobs write to same dataset [`#2289`](https://github.com/MarquezProject/marquez/pull/2289) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*The fix deprecates the way fields `transformationDescription` and `transformationType` are returned. The depracated way of returning those fields will be removed in 0.30.0.*
*In case of JSON processing exceptions, the Marquez API now returns an exception message to a client.*
* Fix column lineage when multiple jobs write to same dataset [`#2289`](https://github.com/MarquezProject/marquez/pull/2289) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*The fix deprecates the way the fields `transformationDescription` and `transformationType` are returned. The deprecated way of returning those fields will be removed in 0.30.0.*
* Use raw link for `iconSearchArrow.svg` [`#2280`](https://github.com/MarquezProject/marquez/pull/2280) [@wslulciuc](https://github.com/wslulciuc)
*Using a direct link to the events viewer icon fixes a loading issue.*
* Fill run state of parent run when created by child run [`#2296`](https://github.com/MarquezProject/marquez/pull/2296) [@fm100](https://github.com/fm100)
*Adds a run state to the parent at creation time to address a missing run state issue in Airflow integration.*
* Update migration query to make it work with existing view [`#2308`](https://github.com/MarquezProject/marquez/pull/2308) [@fm100](https://github.com/fm100)
*Changes the V52 migration query to drop the view before `ALTER`. Because repeatable migration runs only when its checksum changes, it was necessary to get the view definition first then drop and recreate it.*
* Fix lineage for orphaned datasets [`#2314`](https://github.com/MarquezProject/marquez/pull/2314) [@collado-mike](https://github.com/collado-mike)
*Fixes lineage for datasets generated by jobs whose current versions no longer write to the databases in question.*
* Ensure job data in lineage query is not null or empty [`#2253`](https://github.com/MarquezProject/marquez/pull/2253) [@wslulciuc](https://github.com/wslulciuc)
*Changes the API to return an empty graph in the edge case of a job UUID that has no lineage when calling `LineageDao.getLineage()` yet is associated with a dataset. This case formerly resulted in an empty set and backend exception. Also includes logging and an API check for a `nodeID`.*
* Make `name` and `type` required for datasets [`#2305`](https://github.com/MarquezProject/marquez/pull/2305) [@wslulciuc](https://github.com/wslulciuc)
*When generating Typescript from the OpenAPI spec, `name` and `type` were not required but should have been.*
* Remove unused filter on `RunDao.updateStartState()` [`#2319`](https://github.com/MarquezProject/marquez/pull/2319) [@wslulciuc](https://github.com/wslulciuc)
*Removes the conditions `updated_at < transitionedAt` and `start_run_state_uuid != null` to allow for updating the run state.*
* Update linter [`#2322`](https://github.com/MarquezProject/marquez/pull/2322) [@phixMe](https://github.com/phixMe)
*Adds `npm run eslint-fix` to the CI config to fail if it does not return with a RC 0.*
* Fix asset loading for web [`#2323`](https://github.com/MarquezProject/marquez/pull/2323) [@phixMe](https://github.com/phixMe)
*Fixes the webpack config and allows files to be imported in a modern capacity that enforces the assets exist.*

## [0.28.0](https://github.com/MarquezProject/marquez/compare/0.27.0...0.28.0) - 2022-11-21

### Added

* Optimize current runs query for lineage API [`#2211`](https://github.com/MarquezProject/marquez/pull/2211) [@prachim-collab](https://github.com/prachim-collab)
*Add a simpler, alternate `getCurrentRuns` query that gets only simple runs from the database without the additional data from tables such as `run_args`, `job_context`, `facets`, etc., which required extra table joins.*
* Add Code Quality, DCO and Governance docs to project [`#2237`](https://github.com/MarquezProject/marquez/pull/2237) [`#2241`](https://github.com/MarquezProject/marquez/pull/2241) [@merobi-hub](https://github.com/MarquezProject/marquez/commits?author=merobi-hub)
* Add Code Quality, DCO and Governance docs to project [`#2237`](https://github.com/MarquezProject/marquez/pull/2237) [`#2241`](https://github.com/MarquezProject/marquez/pull/2241) [@merobi-hub](https://github.com/merobi-hub)
*Adds a number of standard governance and procedure docs to the project.*
* Add possibility to soft-delete namespaces [`#2244`](https://github.com/MarquezProject/marquez/pull/2244) [@mobuchowski](https://github.com/mobuchowski)
*Adds the ability to "hide" inactive namespaces. The namespaces are undeleted when a relevant OL event is received.*
Expand Down
32 changes: 32 additions & 0 deletions api/src/main/java/marquez/api/BaseResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import marquez.api.exceptions.RunAlreadyExistsException;
import marquez.api.exceptions.RunNotFoundException;
import marquez.api.exceptions.SourceNotFoundException;
import marquez.common.models.DatasetFieldId;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.FieldName;
import marquez.common.models.JobId;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
Expand All @@ -37,6 +39,7 @@
import marquez.service.ServiceFactory;
import marquez.service.SourceService;
import marquez.service.TagService;
import marquez.service.models.NodeId;
import marquez.service.models.Run;

public class BaseResource {
Expand Down Expand Up @@ -74,6 +77,10 @@ void throwIfNotExists(@NonNull NamespaceName namespaceName) {
}
}

void throwIfNotExists(@NonNull DatasetId datasetId) {
throwIfNotExists(datasetId.getNamespace(), datasetId.getName());
}

void throwIfNotExists(@NonNull NamespaceName namespaceName, @NonNull DatasetName datasetName) {
if (!datasetService.exists(namespaceName.getValue(), datasetName.getValue())) {
throw new DatasetNotFoundException(datasetName);
Expand All @@ -86,6 +93,13 @@ void throwIfSourceNotExists(SourceName sourceName) {
}
}

void throwIfNotExists(@NonNull DatasetFieldId datasetFieldId) {
throwIfNotExists(
datasetFieldId.getDatasetId().getNamespace(),
datasetFieldId.getDatasetId().getName(),
datasetFieldId.getFieldName());
}

void throwIfNotExists(
@NonNull NamespaceName namespaceName,
@NonNull DatasetName datasetName,
Expand All @@ -96,6 +110,10 @@ void throwIfNotExists(
}
}

void throwIfNotExists(@NonNull JobId jobId) {
throwIfNotExists(jobId.getNamespace(), jobId.getName());
}

void throwIfNotExists(@NonNull NamespaceName namespaceName, @NonNull JobName jobName) {
if (!jobService.exists(namespaceName.getValue(), jobName.getValue())) {
throw new JobNotFoundException(jobName);
Expand Down Expand Up @@ -137,6 +155,20 @@ void throwIfDatasetsNotExist(ImmutableSet<DatasetId> datasets) {
}
}

void throwIfNotExists(@NonNull NodeId nodeId) {
if (!nodeId.hasVersion()) {
if (nodeId.isDatasetType()) {
throwIfNotExists(nodeId.asDatasetId());
} else if (nodeId.isDatasetFieldType()) {
throwIfNotExists(nodeId.asDatasetFieldId());
} else if (nodeId.isJobType()) {
throwIfNotExists(nodeId.asJobId());
} else if (nodeId.isRunType()) {
throwIfNotExists(nodeId.asRunId());
}
}
}

URI locationFor(@NonNull UriInfo uriInfo, @NonNull Run run) {
return uriInfo
.getBaseUriBuilder()
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private int determineStatusCode(Throwable e) {
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
throwIfNotExists(nodeId);
return Response.ok(lineageService.lineage(nodeId, depth, true)).build();
}

Expand Down
8 changes: 8 additions & 0 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids,
WHERE ds.uuid IN (<dsUuids>)""")
Set<DatasetData> getDatasetData(@BindList Set<UUID> dsUuids);

@SqlQuery(
"""
SELECT ds.*, dv.fields, dv.lifecycle_state
FROM datasets_view ds
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName""")
DatasetData getDatasetData(String namespaceName, String datasetName);

@SqlQuery(
"""
SELECT j.uuid FROM jobs j
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public interface RunDao extends BaseDao {
+ "SET updated_at = :transitionedAt, "
+ " start_run_state_uuid = :startRunStateUuid,"
+ " started_at = :transitionedAt "
+ "WHERE uuid = :rowUuid AND (updated_at < :transitionedAt or start_run_state_uuid is null)")
+ "WHERE uuid = :rowUuid")
void updateStartState(UUID rowUuid, Instant transitionedAt, UUID startRunStateUuid);

@SqlUpdate(
Expand Down
41 changes: 38 additions & 3 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.google.common.base.Functions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -20,6 +21,7 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.common.models.DatasetId;
import marquez.common.models.JobId;
Expand Down Expand Up @@ -48,14 +50,30 @@ public LineageService(LineageDao delegate, JobDao jobDao) {

// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth);
Optional<UUID> optionalUUID = getJobUuid(nodeId);
if (optionalUUID.isEmpty()) {
throw new NodeIdNotFoundException("Could not find node");
log.warn(
"Failed to get job associated with node '{}', returning orphan graph...",
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}
UUID job = optionalUUID.get();

log.debug("Attempting to get lineage for job '{}'", job);
Set<JobData> jobData = getLineage(Collections.singleton(job), depth);

// Ensure job data is not empty, an empty set cannot be passed to LineageDao.getCurrentRuns() or
// LineageDao.getCurrentRunsWithFacets().
if (jobData.isEmpty()) {
// Log warning, then return an orphan lineage graph; a graph should contain at most one
// job->dataset relationship.
log.warn(
"Failed to get lineage for job '{}' associated with node '{}', returning orphan graph...",
job,
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}

List<Run> runs =
withRunFacets
? getCurrentRunsWithFacets(
Expand All @@ -81,10 +99,26 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
if (!datasetIds.isEmpty()) {
datasets.addAll(this.getDatasetData(datasetIds));
}
if (nodeId.isDatasetType()
&& datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) {
log.warn(
"Found jobs {} which no longer share lineage with dataset '{}' - discarding",
jobData.stream().map(JobData::getId).toList(),
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}

return toLineage(jobData, datasets);
}

private Lineage toLineageWithOrphanDataset(@NonNull DatasetId datasetId) {
final DatasetData datasetData =
getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());
return new Lineage(
ImmutableSortedSet.of(
Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build()));
}

private Lineage toLineage(Set<JobData> jobData, Set<DatasetData> datasets) {
Set<Node> nodes = new LinkedHashSet<>();
// build mapping for later
Expand Down Expand Up @@ -214,7 +248,8 @@ public Optional<UUID> getJobUuid(NodeId nodeId) {
return getJobFromInputOrOutput(
datasetId.getName().getValue(), datasetId.getNamespace().getValue());
} else {
throw new NodeIdNotFoundException("Node must be a dataset node or job node");
throw new NodeIdNotFoundException(
String.format("Node '%s' must be of type dataset or job!", nodeId.getValue()));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,28 @@
/* SPDX-License-Identifier: Apache-2.0 */
ALTER TABLE dataset_symlinks ALTER COLUMN name TYPE VARCHAR;

DO
$$
DECLARE
datasets_view_exists boolean;
datasets_view_definition text;
BEGIN
SELECT EXISTS (
SELECT * FROM information_schema.views
WHERE table_schema='public' AND table_name='datasets_view'
) INTO datasets_view_exists;

IF datasets_view_exists THEN
-- Altering is not allowed when the column is being used from views. So here,
-- we temporarily drop the view before altering and recreate it.
SELECT view_definition FROM information_schema.views
WHERE table_schema='public' AND table_name='datasets_view'
INTO datasets_view_definition;

DROP VIEW datasets_view;
ALTER TABLE dataset_symlinks ALTER COLUMN name TYPE VARCHAR;
EXECUTE format('CREATE VIEW datasets_view AS %s', datasets_view_definition);
ELSE
ALTER TABLE dataset_symlinks ALTER COLUMN name TYPE VARCHAR;
END IF;
END
$$ LANGUAGE plpgsql;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- new column will be created with 'null' filled for existing rows
ALTER TABLE lineage_events ADD created_at TIMESTAMP WITH TIME ZONE;

create index lineage_events_created_at_index on lineage_events (created_at desc NULLS LAST);

-- The new default set to UTC now() will only apply in subsequent INSERT or UPDATE commands; it does not cause rows already in the table to change.
ALTER TABLE lineage_events ALTER COLUMN created_at SET DEFAULT (now() AT TIME ZONE 'UTC')::timestamptz;
Loading

0 comments on commit 461fd19

Please sign in to comment.