Skip to content

Commit

Permalink
Merge branch 'main' into bug/job-facets-foreign-key-constraints
Browse files Browse the repository at this point in the history
  • Loading branch information
wslulciuc authored Aug 23, 2024
2 parents a0795bf + ead480b commit b6dec32
Show file tree
Hide file tree
Showing 46 changed files with 1,591 additions and 288 deletions.
2 changes: 2 additions & 0 deletions .circleci/db-migration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ if ! ./docker/up.sh \
--args "--exit-code-from seed_marquez" \
--tag "${MARQUEZ_VERSION}" \
--no-web \
--no-search \
--seed > /dev/null; then
error "failed to start db using backup!"
exit_with_cause
Expand All @@ -77,6 +78,7 @@ log "start db using backup (marquez=${MARQUEZ_BUILD_VERSION}):"
if ! ./docker/up.sh \
--args "--exit-code-from seed_marquez" \
--no-web \
--no-search \
--no-volumes \
--build \
--seed > /dev/null; then
Expand Down
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ API_PORT=5000
API_ADMIN_PORT=5001
WEB_PORT=3000
POSTGRES_PORT=5432
SEARCH_PORT=9200
TAG=0.49.0
3 changes: 3 additions & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ dependencies {
implementation 'com.graphql-java:graphql-java:20.9'
implementation 'com.graphql-java-kickstart:graphql-java-servlet:12.0.0'

implementation 'org.opensearch.client:opensearch-rest-client:2.15.0'
implementation 'org.opensearch.client:opensearch-java:2.6.0'

testImplementation "io.dropwizard:dropwizard-testing:${dropwizardVersion}"
testImplementation "org.jdbi:jdbi3-testing:${jdbi3Version}"
testImplementation "org.jdbi:jdbi3-testcontainers:${jdbi3Version}"
Expand Down
6 changes: 5 additions & 1 deletion api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ public void run(@NonNull MarquezConfig config, @NonNull Environment env) {

final Jdbi jdbi = newJdbi(config, env, source);
final MarquezContext marquezContext =
MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build();
MarquezContext.builder()
.jdbi(jdbi)
.searchConfig(config.getSearchConfig())
.tags(config.getTags())
.build();

registerResources(config, env, marquezContext);
registerServlets(env);
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/MarquezConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import marquez.db.FlywayFactory;
import marquez.graphql.GraphqlConfig;
import marquez.jobs.DbRetentionConfig;
import marquez.search.SearchConfig;
import marquez.service.models.Tag;
import marquez.tracing.SentryConfig;

Expand Down Expand Up @@ -44,6 +45,10 @@ public class MarquezConfig extends Configuration {
@JsonProperty("sentry")
private final SentryConfig sentry = new SentryConfig();

@Getter
@JsonProperty("search")
private final SearchConfig searchConfig = new SearchConfig();

@Getter
@Setter
@JsonProperty("dbRetention")
Expand Down
21 changes: 19 additions & 2 deletions api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import marquez.db.TagDao;
import marquez.graphql.GraphqlSchemaBuilder;
import marquez.graphql.MarquezGraphqlServletBuilder;
import marquez.search.SearchConfig;
import marquez.service.ColumnLineageService;
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
Expand All @@ -53,6 +54,7 @@
import marquez.service.OpenLineageService;
import marquez.service.RunService;
import marquez.service.RunTransitionListener;
import marquez.service.SearchService;
import marquez.service.ServiceFactory;
import marquez.service.SourceService;
import marquez.service.TagService;
Expand Down Expand Up @@ -89,26 +91,31 @@ public final class MarquezContext {
@Getter private final OpenLineageService openLineageService;
@Getter private final LineageService lineageService;
@Getter private final ColumnLineageService columnLineageService;
@Getter private final SearchService searchService;
@Getter private final NamespaceResource namespaceResource;
@Getter private final SourceResource sourceResource;
@Getter private final DatasetResource datasetResource;
@Getter private final ColumnLineageResource columnLineageResource;
@Getter private final JobResource jobResource;
@Getter private final TagResource tagResource;
@Getter private final OpenLineageResource openLineageResource;
@Getter private final marquez.api.v2beta.SearchResource v2BetasearchResource;
@Getter private final SearchResource searchResource;
@Getter private final ImmutableList<Object> resources;
@Getter private final JdbiExceptionExceptionMapper jdbiException;
@Getter private final JsonProcessingExceptionMapper jsonException;
@Getter private final GraphQLHttpServlet graphqlServlet;
@Getter private final SearchConfig searchConfig;

private MarquezContext(
@NonNull final Jdbi jdbi,
@NonNull final SearchConfig searchConfig,
@NonNull final ImmutableSet<Tag> tags,
List<RunTransitionListener> runTransitionListeners) {
if (runTransitionListeners == null) {
runTransitionListeners = new ArrayList<>();
}
this.searchConfig = searchConfig;

final BaseDao baseDao = jdbi.onDemand(NamespaceDao.class);
this.namespaceDao = jdbi.onDemand(NamespaceDao.class);
Expand Down Expand Up @@ -141,6 +148,7 @@ private MarquezContext(
this.openLineageService = new OpenLineageService(baseDao, runService);
this.lineageService = new LineageService(lineageDao, jobDao);
this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao);
this.searchService = new SearchService(searchConfig);
this.jdbiException = new JdbiExceptionExceptionMapper();
this.jsonException = new JsonProcessingExceptionMapper();
final ServiceFactory serviceFactory =
Expand All @@ -151,6 +159,7 @@ private MarquezContext(
.namespaceService(namespaceService)
.tagService(tagService)
.openLineageService(openLineageService)
.searchService(searchService)
.sourceService(sourceService)
.lineageService(lineageService)
.columnLineageService(columnLineageService)
Expand All @@ -165,6 +174,7 @@ private MarquezContext(
this.tagResource = new TagResource(serviceFactory);
this.openLineageResource = new OpenLineageResource(serviceFactory, openLineageDao);
this.searchResource = new SearchResource(searchDao);
this.v2BetasearchResource = new marquez.api.v2beta.SearchResource(serviceFactory);

this.resources =
ImmutableList.of(
Expand All @@ -177,7 +187,8 @@ private MarquezContext(
jdbiException,
jsonException,
openLineageResource,
searchResource);
searchResource,
v2BetasearchResource);

final MarquezGraphqlServletBuilder servlet = new MarquezGraphqlServletBuilder();
this.graphqlServlet = servlet.getServlet(new GraphqlSchemaBuilder(jdbi));
Expand All @@ -190,6 +201,7 @@ public static Builder builder() {
public static class Builder {

private Jdbi jdbi;
private SearchConfig searchConfig;
private ImmutableSet<Tag> tags;
private List<RunTransitionListener> runTransitionListeners;

Expand All @@ -203,6 +215,11 @@ public Builder jdbi(@NonNull Jdbi jdbi) {
return this;
}

public Builder searchConfig(@NonNull SearchConfig searchConfig) {
this.searchConfig = searchConfig;
return this;
}

public Builder tags(@NonNull ImmutableSet<Tag> tags) {
this.tags = tags;
return this;
Expand All @@ -219,7 +236,7 @@ public Builder runTransitionListeners(
}

public MarquezContext build() {
return new MarquezContext(jdbi, tags, runTransitionListeners);
return new MarquezContext(jdbi, searchConfig, tags, runTransitionListeners);
}
}
}
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public OpenLineageResource(
public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncResponse asyncResponse)
throws JsonProcessingException, SQLException {
if (event instanceof LineageEvent) {
if (serviceFactory.getSearchService().isEnabled()) {
serviceFactory.getSearchService().indexEvent((LineageEvent) event);
}
openLineageService
.createAsync((LineageEvent) event)
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
Expand Down
91 changes: 91 additions & 0 deletions api/src/main/java/marquez/api/v2beta/SearchResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2018-2024 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.api.v2beta;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.validation.constraints.NotBlank;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import marquez.service.SearchService;
import marquez.service.ServiceFactory;
import org.opensearch.client.opensearch.core.SearchResponse;
import org.opensearch.client.opensearch.core.search.Hit;

@Slf4j
@Path("/api/v2beta/search")
public class SearchResource {

private final SearchService searchService;

public SearchResource(@NonNull final ServiceFactory serviceFactory) {
this.searchService = serviceFactory.getSearchService();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Produces(APPLICATION_JSON)
@Path("jobs")
public Response searchJobs(@QueryParam("q") @NotBlank String query) throws IOException {
if (searchService.isEnabled()) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
}
return formatOpenSearchResponse(this.searchService.searchJobs(query));
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Produces(APPLICATION_JSON)
@Path("datasets")
public Response searchDatasets(@QueryParam("q") @NotBlank String query) throws IOException {
if (searchService.isEnabled()) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
}
return formatOpenSearchResponse(this.searchService.searchDatasets(query));
}

private Response formatOpenSearchResponse(SearchResponse<ObjectNode> response) {
List<ObjectNode> hits =
response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
List<Map<String, List<String>>> highlights =
response.hits().hits().stream().map(Hit::highlight).collect(Collectors.toList());

return Response.ok(new OpenSearchResult(hits, highlights)).build();
}

@ToString
public static final class OpenSearchResult {
@Getter private final List<ObjectNode> hits;
@Getter private final List<Map<String, List<String>>> highlights;

@JsonCreator
public OpenSearchResult(
@NonNull List<ObjectNode> hits, @NonNull List<Map<String, List<String>>> highlights) {
this.hits = hits;
this.highlights = highlights;
}
}
}
30 changes: 30 additions & 0 deletions api/src/main/java/marquez/search/SearchConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2018-2024 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.search;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;

public class SearchConfig {
public static final boolean ENABLED = false;
public static final String SCHEME = "http";
public static final String HOST = "opensearch";
public static final int PORT = 9200;
public static final String USERNAME = "admin";
public static final String PASSWORD = "admin";

@Getter @JsonProperty private boolean enabled = ENABLED;

@Getter @JsonProperty private String scheme = SCHEME;

@Getter @JsonProperty private String host = HOST;

@Getter @JsonProperty private int port = PORT;

@Getter @JsonProperty private String username = USERNAME;

@Getter @JsonProperty private String password = PASSWORD;
}
Loading

0 comments on commit b6dec32

Please sign in to comment.