Skip to content

Commit

Permalink
remove namespace endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Sep 15, 2022
1 parent b31503c commit 48454a6
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 137 deletions.
34 changes: 15 additions & 19 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import marquez.api.models.SortDirection;
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.LineageEvent;
Expand Down Expand Up @@ -107,28 +110,21 @@ public Response getLineage(
public Response getLineageEvents(
@QueryParam("before") @DefaultValue("2030-01-01T00:00:00+00:00") ZonedDateTimeParam before,
@QueryParam("after") @DefaultValue("1970-01-01T00:00:00+00:00") ZonedDateTimeParam after,
@QueryParam("sortDirection") @DefaultValue("desc") SortDirection sortDirection,
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit) {
final List<LineageEvent> events =
openLineageDao.getAllLineageEvents(before.get(), after.get(), limit);
final List<LineageEvent> events;
if (sortDirection.getValue().equalsIgnoreCase("desc")) {
events = openLineageDao.getAllLineageEventsDesc(before.get(), after.get(), limit);
} else if (sortDirection.getValue().equalsIgnoreCase("asc")) {
events = openLineageDao.getAllLineageEventsAsc(before.get(), after.get(), limit);
} else {
return Response.status(BAD_REQUEST)
.entity(String.format("%s should be either 'asc' or 'desc", sortDirection.getValue()))
.build();
}
return Response.ok(new Events(events)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Path("/namespace/{namespace}/events/lineage")
@Produces(APPLICATION_JSON)
public Response getLineageEventsByNamespace(
@PathParam("namespace") String namespace,
@QueryParam("before") @DefaultValue("2030-01-01T00:00:00+00:00") ZonedDateTimeParam before,
@QueryParam("after") @DefaultValue("1970-01-01T00:00:00+00:00") ZonedDateTimeParam after,
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit) {
final List<LineageEvent> event =
openLineageDao.getLineageEventsByNamespace(namespace, before.get(), after.get(), limit);
return Response.ok(new Events(event)).build();
}

@Value
static class Events {
@NonNull
Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/marquez/api/models/SortDirection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package marquez.api.models;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
public enum SortDirection {
DESC("desc"),
ASC("asc");

@Getter
public final String value;
}
46 changes: 8 additions & 38 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
import marquez.service.models.LineageEvent.SchemaField;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.postgresql.util.PGobject;
Expand Down Expand Up @@ -98,46 +97,17 @@ void createLineageEvent(
AND le.event_time >= :after)
ORDER BY le.event_time DESC
LIMIT :limit""")
List<LineageEvent> getAllLineageEvents(ZonedDateTime before, ZonedDateTime after, int limit);
List<LineageEvent> getAllLineageEventsDesc(ZonedDateTime before, ZonedDateTime after, int limit);

/**
* This is a "hack" to get inputs/outputs namespace from jsonb column: <a
* href="https://github.com/jdbi/jdbi/issues/1510#issuecomment-485423083">explanation</a>
*/
@SqlQuery(
"""
WITH job_events AS (
SELECT le.event
FROM lineage_events le
WHERE le.job_namespace = :namespace
AND (le.event_time < :before
AND le.event_time >= :after)
ORDER BY le.event_time DESC
), dataset_events AS (
SELECT le.event, le.event_time
FROM lineage_events le
JOIN dataset_versions dv on le.run_uuid = dv.run_uuid
JOIN datasets ds on dv.dataset_uuid = ds.uuid
JOIN namespaces n on ds.namespace_uuid = n.uuid
WHERE n.name = :namespace
AND (le.event_time < :before
AND le.event_time >= :after)
ORDER BY le.event_time DESC
)
SELECT le.event
FROM (
SELECT * FROM dataset_events
UNION ALL
SELECT * FROM job_events
) le
ORDER BY le.event_time
LIMIT :limit
""")
List<LineageEvent> getLineageEventsByNamespace(
@Bind("namespace") String namespace,
@Bind("before") ZonedDateTime before,
@Bind("after") ZonedDateTime after,
@Bind("limit") int limit);
SELECT event
FROM lineage_events le
WHERE (le.event_time < :before
AND le.event_time >= :after)
ORDER BY le.event_time ASC
LIMIT :limit""")
List<LineageEvent> getAllLineageEventsAsc(ZonedDateTime before, ZonedDateTime after, int limit);

default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) {
UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
CREATE INDEX CONCURRENTLY lineage_events_event_time
on lineage_events(event_time DESC);

CREATE INDEX CONCURRENTLY lineage_events_namespace_event_time
on lineage_events(job_namespace, event_time DESC);
59 changes: 27 additions & 32 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import marquez.client.MarquezClient;
import marquez.client.models.Dataset;
import marquez.client.models.DatasetVersion;
import marquez.client.models.Job;
Expand Down Expand Up @@ -414,7 +415,7 @@ public void testSendEventAndGetItBack() {
}

@Test
public void testFindEventByDatasetNamespace() {
public void testFindEventIsSortedByTime() {
marquez.service.models.LineageEvent.Run run =
new marquez.service.models.LineageEvent.Run(
UUID.randomUUID().toString(),
Expand All @@ -426,50 +427,44 @@ public void testFindEventByDatasetNamespace() {
.build();

ZonedDateTime time = ZonedDateTime.now(ZoneId.of("UTC"));
marquez.service.models.LineageEvent.Dataset dataset =
marquez.service.models.LineageEvent.Dataset.builder()
.namespace(NAMESPACE_NAME)
.name(DB_TABLE_NAME)
.build();

marquez.service.models.LineageEvent.LineageEventBuilder builder =
marquez.service.models.LineageEvent.builder()
.producer("testFindEventByDatasetNamespace")
.eventType("COMPLETE")
.producer("testFindEventIsSortedByTime")
.run(run)
.job(job)
.eventTime(time)
.inputs(Collections.emptyList());
.inputs(Collections.emptyList())
.outputs(Collections.singletonList(dataset));

for (int i = 0; i < 10; i++) {
marquez.service.models.LineageEvent.Dataset dataset =
marquez.service.models.LineageEvent.Dataset.builder()
.namespace(String.format("namespace%d", i))
.name(DB_TABLE_NAME)
.build();
marquez.service.models.LineageEvent firstEvent =
builder.eventTime(time).eventType("START").build();

marquez.service.models.LineageEvent event =
builder.outputs(Collections.singletonList(dataset)).build();
CompletableFuture<Integer> resp = sendEvent(firstEvent);
assertThat(resp.join()).isEqualTo(201);

final CompletableFuture<Integer> resp = sendEvent(event);
assertThat(resp.join()).isEqualTo(201);
}
marquez.service.models.LineageEvent secondEvent =
builder.eventTime(time.plusSeconds(10)).eventType("COMPLETE").build();

List<LineageEvent> rawEvents = client.listLineageEvents("namespace3");
resp = sendEvent(secondEvent);
assertThat(resp.join()).isEqualTo(201);

marquez.service.models.LineageEvent thirdEvent =
builder
.outputs(
Collections.singletonList(
marquez.service.models.LineageEvent.Dataset.builder()
.namespace(String.format("namespace3"))
.name(DB_TABLE_NAME)
.build()))
.build();
List<LineageEvent> rawEvents = client.listLineageEvents();

assertThat(rawEvents.size()).isEqualTo(1);
assertThat(rawEvents.size()).isEqualTo(2);
ObjectMapper mapper = Utils.getMapper();
assertThat((JsonNode) mapper.valueToTree(thirdEvent))
assertThat((JsonNode) mapper.valueToTree(firstEvent))
.isEqualTo(mapper.valueToTree(rawEvents.get(1)));
assertThat((JsonNode) mapper.valueToTree(secondEvent))
.isEqualTo(mapper.valueToTree(rawEvents.get(0)));
}

@Test
public void testFindEventIsSortedByTime() {
public void testFindEventIsSortedByTimeAsc() {
marquez.service.models.LineageEvent.Run run =
new marquez.service.models.LineageEvent.Run(
UUID.randomUUID().toString(),
Expand Down Expand Up @@ -507,14 +502,14 @@ public void testFindEventIsSortedByTime() {
resp = sendEvent(secondEvent);
assertThat(resp.join()).isEqualTo(201);

List<LineageEvent> rawEvents = client.listLineageEvents(NAMESPACE_NAME);
List<LineageEvent> rawEvents = client.listLineageEvents(MarquezClient.SortDirection.ASC, 10);

assertThat(rawEvents.size()).isEqualTo(2);
ObjectMapper mapper = Utils.getMapper();
assertThat((JsonNode) mapper.valueToTree(firstEvent))
.isEqualTo(mapper.valueToTree(rawEvents.get(1)));
assertThat((JsonNode) mapper.valueToTree(secondEvent))
.isEqualTo(mapper.valueToTree(rawEvents.get(0)));
assertThat((JsonNode) mapper.valueToTree(secondEvent))
.isEqualTo(mapper.valueToTree(rawEvents.get(1)));
}

private CompletableFuture<Integer> sendEvent(marquez.service.models.LineageEvent event) {
Expand Down
9 changes: 9 additions & 0 deletions api/src/test/java/marquez/api/OpenLineageResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.dropwizard.testing.junit5.DropwizardExtensionsSupport;
import io.dropwizard.testing.junit5.ResourceExtension;
import java.util.Map;
import javax.ws.rs.core.Response;
import marquez.common.Utils;
import marquez.db.OpenLineageDao;
import marquez.service.LineageService;
Expand Down Expand Up @@ -63,4 +64,12 @@ public void testGetLineage() {

assertEquals(lineage, LINEAGE);
}

@Test
public void testGetLineageEventsBadSort() {
final Response response =
UNDER_TEST.target("/api/v1/events/lineage").queryParam("sort", "asdf").request().get();

assertEquals(response.getStatus(), 400);
}
}
18 changes: 9 additions & 9 deletions clients/java/src/main/java/marquez/client/MarquezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -89,21 +90,20 @@ public MarquezClient(final URL baseUrl, @Nullable final String apiKey) {
}

public List<LineageEvent> listLineageEvents() {
return listLineageEvents(DEFAULT_LIMIT, DEFAULT_OFFSET);
return listLineageEvents(SortDirection.DESC, DEFAULT_LIMIT);
}

public List<LineageEvent> listLineageEvents(int limit, int offset) {
final String bodyAsJson = http.get(url.toEventUrl(limit, offset));
public List<LineageEvent> listLineageEvents(MarquezClient.SortDirection sort, int limit) {
final String bodyAsJson = http.get(url.toEventUrl(sort, limit));
return Events.fromJson(bodyAsJson).getValue();
}

public List<LineageEvent> listLineageEvents(String namespaceName) {
return listLineageEvents(namespaceName, DEFAULT_LIMIT, DEFAULT_OFFSET);
}
@AllArgsConstructor
public enum SortDirection {
DESC("desc"),
ASC("asc");

public List<LineageEvent> listLineageEvents(String namespaceName, int limit, int offset) {
final String bodyAsJson = http.get(url.toEventUrl(namespaceName, limit, offset));
return Events.fromJson(bodyAsJson).getValue();
@Getter public final String value;
}

public Namespace createNamespace(
Expand Down
14 changes: 7 additions & 7 deletions clients/java/src/main/java/marquez/client/MarquezUrl.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import static marquez.client.MarquezPathV1.fieldTagPath;
import static marquez.client.MarquezPathV1.jobPath;
import static marquez.client.MarquezPathV1.jobVersionPath;
import static marquez.client.MarquezPathV1.lineageEventPath;
import static marquez.client.MarquezPathV1.listDatasetVersionsPath;
import static marquez.client.MarquezPathV1.listDatasetsPath;
import static marquez.client.MarquezPathV1.listJobVersionsPath;
Expand Down Expand Up @@ -76,6 +75,11 @@ URL from(String path, @Nullable Map<String, Object> queryParams) {
}
}

private Map<String, Object> newQueryParamsWith(MarquezClient.SortDirection sort, int limit) {
checkArgument(limit >= 0, "limit must be >= 0");
return ImmutableMap.of("sort", sort.getValue(), "limit", limit);
}

private Map<String, Object> newQueryParamsWith(int limit, int offset) {
checkArgument(limit >= 0, "limit must be >= 0");
checkArgument(offset >= 0, "offset must be >= 0");
Expand All @@ -90,12 +94,8 @@ URL toNamespaceUrl(String namespaceName) {
return from(namespacePath(namespaceName));
}

URL toEventUrl(int limit, int offset) {
return from(MarquezPathV1.lineageEventPath(), newQueryParamsWith(limit, offset));
}

URL toEventUrl(String namespaceName, int limit, int offset) {
return from(lineageEventPath(namespaceName), newQueryParamsWith(limit, offset));
URL toEventUrl(MarquezClient.SortDirection sort, int limit) {
return from(MarquezPathV1.lineageEventPath(), newQueryParamsWith(sort, limit));
}

URL toSourceUrl(String sourceName) {
Expand Down
11 changes: 6 additions & 5 deletions clients/java/src/test/java/marquez/client/MarquezClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -697,20 +697,21 @@ public void testListDatasetVersions() throws Exception {
@Test
public void testListEvents() throws Exception {
Events events = new Events(Collections.singletonList(RAW_LINEAGE_EVENT));
when(http.get(buildUrlFor("/events/lineage?limit=10&offset=0")))
when(http.get(buildUrlFor("/events/lineage?sort=desc&limit=100")))
.thenReturn(
Utils.toJson(new ResultsPage<>("events", events.getValue(), events.getValue().size())));
final List<LineageEvent> listEvents = client.listLineageEvents(10, 0);
final List<LineageEvent> listEvents = client.listLineageEvents();
assertThat(listEvents).asList().containsExactly(RAW_LINEAGE_EVENT);
}

@Test
public void testListEventsWithNamespace() throws Exception {
public void testListEventsWithSortDirection() throws Exception {
Events events = new Events(Collections.singletonList(RAW_LINEAGE_EVENT));
when(http.get(buildUrlFor("/namespace/%s/events/lineage?limit=10&offset=0", NAMESPACE_NAME)))
when(http.get(buildUrlFor("/events/lineage?sort=desc&limit=10", NAMESPACE_NAME)))
.thenReturn(
Utils.toJson(new ResultsPage<>("events", events.getValue(), events.getValue().size())));
final List<LineageEvent> listEvents = client.listLineageEvents(NAMESPACE_NAME, 10, 0);
final List<LineageEvent> listEvents =
client.listLineageEvents(MarquezClient.SortDirection.DESC, 10);
assertThat(listEvents).asList().containsExactly(RAW_LINEAGE_EVENT);
}

Expand Down
Loading

0 comments on commit 48454a6

Please sign in to comment.