Skip to content

Commit

Permalink
Clean up Feast configuration (#611)
Browse files Browse the repository at this point in the history
* Add validation to Core configuration and fix version loading

    Refactor, document, and validate Feast Core Properties

    Refactor FeastProperties to support nested store configuration

    Localize all store configuration in Serving in Spring configuration

    Various configuration updates
    * Allow Feast Serving to use types properties instead of maps
    * Reuse Feast Core Store model in serving
    * Remove redundant config classes for Redis
    * Update Serving Beans and Config classes to use ne1w configuration getters
    * Remove hot-loading from store configuration. This reduces a bit of
      flexibility, but simplifies the code and configuration

* Set default build version in Feast Core "version" field in Feast Properties

* Ensure FeatureSink creation is consistent for both Redis and BigQuery

* Move BigQueryHistoricalRetriever configuration into Retriever from ServingServiceConfig

* Allow a list of stores to be configured for forward compability

* Remove Lombok from Serving configuration

* Update Store configuration loading in serving to use a store model

* Update RedisBackedJobService to instantiate its own Redis Client

* Update comments in FeastProperties

* Fix broken default application.yml and add comments in Serving

* Refactored and cleaned up Feast Core configuration for job runners.

* Remove commented out DataflowRunnerConfig setters

* Clean up getJobManager and simplify field mapping in DataflowRunnerConfig

* Add static factory methods to retrievers

* Remove runner specific comment typo

* Add oneOfStrings validator annotation for configuration validation

* Fix broken Dataflow unit test that depends on GOOGLE_APPLICATION_CREDENTIALS
  • Loading branch information
woop authored Apr 16, 2020
1 parent e461cde commit 3fd6c7a
Show file tree
Hide file tree
Showing 49 changed files with 1,432 additions and 898 deletions.
24 changes: 24 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@
<configuration>
<skip>false</skip>
</configuration>
<executions>
<execution>
<id>build-info</id>
<goals>
<goal>build-info</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Expand Down Expand Up @@ -207,5 +215,21 @@
<artifactId>jaxb-api</artifactId>
</dependency>

<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>2.0.0.Final</version>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.1.2.Final</version>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator-annotation-processor</artifactId>
<version>6.1.2.Final</version>
</dependency>

</dependencies>
</project>
188 changes: 173 additions & 15 deletions core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,211 @@
*/
package feast.core.config;

import java.util.Map;
import feast.core.config.FeastProperties.StreamProperties.FeatureStreamOptions;
import feast.core.validators.OneOfStrings;
import java.util.*;
import javax.annotation.PostConstruct;
import javax.validation.*;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Positive;
import lombok.Getter;
import lombok.Setter;
import org.hibernate.validator.constraints.URL;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.info.BuildProperties;

@Getter
@Setter
@ConfigurationProperties(prefix = "feast", ignoreInvalidFields = true)
public class FeastProperties {

private String version;
private JobProperties jobs;
/**
* Instantiates a new Feast properties.
*
* @param buildProperties Feast build properties
*/
@Autowired
public FeastProperties(BuildProperties buildProperties) {
setVersion(buildProperties.getVersion());
}

/** Instantiates a new Feast properties. */
public FeastProperties() {}

/* Feast Core Build Version */
@NotBlank private String version = "unknown";

/* Population job properties */
@NotNull private JobProperties jobs;

@NotNull
/* Feast Kafka stream properties */
private StreamProperties stream;

/** Feast job properties. These properties are used for ingestion jobs. */
@Getter
@Setter
public static class JobProperties {

private String runner;
private Map<String, String> options;
@NotBlank
/* The active Apache Beam runner name. This name references one instance of the Runner class */
private String activeRunner;

/** List of configured job runners. */
private List<Runner> runners = new ArrayList<>();

/**
* Gets a {@link Runner} instance of the active runner
*
* @return the active runner
*/
public Runner getActiveRunner() {
for (Runner runner : getRunners()) {
if (activeRunner.equals(runner.getName())) {
return runner;
}
}
throw new RuntimeException(
String.format(
"Active runner is misconfigured. Could not find runner: %s.", activeRunner));
}

/** Job Runner class. */
@Getter
@Setter
public static class Runner {
/** Job runner name. This must be unique. */
String name;

/** Job runner type DirectRunner, DataflowRunner currently supported */
String type;

/**
* Job runner configuration options. See the following for options
* https://api.docs.feast.dev/grpc/feast.core.pb.html#Runner
*/
Map<String, String> options = new HashMap<>();

/**
* Gets the job runner type as an enum.
*
* @return Returns the job runner type as {@link feast.core.job.Runner}
*/
public feast.core.job.Runner getType() {
return feast.core.job.Runner.fromString(type);
}
}

@NotNull
/* Population job metric properties */
private MetricsProperties metrics;
private JobUpdatesProperties updates;
}

@Getter
@Setter
public static class JobUpdatesProperties {
/* Timeout in seconds for each attempt to update or submit a new job to the runner */
@Positive private long jobUpdateTimeoutSeconds;

private long timeoutSeconds;
private long pollingIntervalMillis;
/* Job update polling interval in millisecond. How frequently Feast will update running jobs. */
@Positive private long pollingIntervalMilliseconds;
}

/** Properties used to configure Feast's managed Kafka feature stream. */
@Getter
@Setter
public static class StreamProperties {

/* Feature stream type. Only "kafka" is supported. */
@OneOfStrings({"kafka"})
@NotBlank
private String type;
private Map<String, String> options;

/* Feature stream options */
@NotNull private FeatureStreamOptions options;

/** Feature stream options */
@Getter
@Setter
public static class FeatureStreamOptions {

/* Kafka topic to use for feature sets without source topics. */
@NotBlank private String topic = "feast-features";

/**
* Comma separated list of Kafka bootstrap servers. Used for feature sets without a defined
* source.
*/
@NotBlank private String bootstrapServers = "localhost:9092";

/* Defines the number of copies of managed feature stream Kafka. */
@Positive private short replicationFactor = 1;

/* Number of Kafka partitions to to use for managed feature stream. */
@Positive private int partitions = 1;
}
}

/** Feast population job metrics */
@Getter
@Setter
public static class MetricsProperties {

/* Population job metrics enabled */
private boolean enabled;

/* Metric type. Possible options: statsd */
@OneOfStrings({"statsd"})
@NotBlank
private String type;
private String host;
private int port;

/* Host of metric sink */
@URL private String host;

/* Port of metric sink */
@Positive private int port;
}

/**
* Validates all FeastProperties. This method runs after properties have been initialized and
* individually and conditionally validates each class.
*/
@PostConstruct
public void validate() {
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
Validator validator = factory.getValidator();

// Validate root fields in FeastProperties
Set<ConstraintViolation<FeastProperties>> violations = validator.validate(this);
if (!violations.isEmpty()) {
throw new ConstraintViolationException(violations);
}

// Validate Stream properties
Set<ConstraintViolation<StreamProperties>> streamPropertyViolations =
validator.validate(getStream());
if (!streamPropertyViolations.isEmpty()) {
throw new ConstraintViolationException(streamPropertyViolations);
}

// Validate Stream Options
Set<ConstraintViolation<FeatureStreamOptions>> featureStreamOptionsViolations =
validator.validate(getStream().getOptions());
if (!featureStreamOptionsViolations.isEmpty()) {
throw new ConstraintViolationException(featureStreamOptionsViolations);
}

// Validate JobProperties
Set<ConstraintViolation<JobProperties>> jobPropertiesViolations = validator.validate(getJobs());
if (!jobPropertiesViolations.isEmpty()) {
throw new ConstraintViolationException(jobPropertiesViolations);
}

// Validate MetricsProperties
if (getJobs().getMetrics().isEnabled()) {
Set<ConstraintViolation<MetricsProperties>> jobMetricViolations =
validator.validate(getJobs().getMetrics());
if (!jobMetricViolations.isEmpty()) {
throw new ConstraintViolationException(jobMetricViolations);
}
}
}
}
9 changes: 4 additions & 5 deletions core/src/main/java/feast/core/config/FeatureStreamConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public Source getDefaultSource(FeastProperties feastProperties) {
SourceType featureStreamType = SourceType.valueOf(streamProperties.getType().toUpperCase());
switch (featureStreamType) {
case KAFKA:
String bootstrapServers = streamProperties.getOptions().get("bootstrapServers");
String topicName = streamProperties.getOptions().get("topic");
String bootstrapServers = streamProperties.getOptions().getBootstrapServers();
String topicName = streamProperties.getOptions().getTopic();
Map<String, Object> map = new HashMap<>();
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
map.put(
Expand All @@ -59,9 +59,8 @@ public Source getDefaultSource(FeastProperties feastProperties) {
NewTopic newTopic =
new NewTopic(
topicName,
Integer.valueOf(streamProperties.getOptions().getOrDefault("numPartitions", "1")),
Short.valueOf(
streamProperties.getOptions().getOrDefault("replicationFactor", "1")));
streamProperties.getOptions().getPartitions(),
streamProperties.getOptions().getReplicationFactor());
CreateTopicsResult createTopicsResult =
client.createTopics(Collections.singleton(newTopic));
try {
Expand Down
70 changes: 10 additions & 60 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,11 @@
*/
package feast.core.config;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.DataflowScopes;
import com.google.common.base.Strings;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.config.FeastProperties.JobUpdatesProperties;
import feast.core.job.JobManager;
import feast.core.job.Runner;
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.direct.DirectJobRegistry;
import feast.core.job.direct.DirectRunnerJobManager;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -44,65 +33,26 @@
public class JobConfig {

/**
* Get a JobManager according to the runner type and dataflow configuration.
* Get a JobManager according to the runner type and Dataflow configuration.
*
* @param feastProperties feast config properties
*/
@Bean
@Autowired
public JobManager getJobManager(
FeastProperties feastProperties, DirectJobRegistry directJobRegistry) {
public JobManager getJobManager(FeastProperties feastProperties) {

JobProperties jobProperties = feastProperties.getJobs();
Runner runner = Runner.fromString(jobProperties.getRunner());
if (jobProperties.getOptions() == null) {
jobProperties.setOptions(new HashMap<>());
}
Map<String, String> jobOptions = jobProperties.getOptions();
switch (runner) {
case DATAFLOW:
if (Strings.isNullOrEmpty(jobOptions.getOrDefault("region", null))
|| Strings.isNullOrEmpty(jobOptions.getOrDefault("project", null))) {
log.error("Project and location of the Dataflow runner is not configured");
throw new IllegalStateException(
"Project and location of Dataflow runner must be specified for jobs to be run on Dataflow runner.");
}
try {
GoogleCredential credential =
GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all());
Dataflow dataflow =
new Dataflow(
GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(),
credential);
FeastProperties.JobProperties.Runner runner = jobProperties.getActiveRunner();
Map<String, String> runnerConfigOptions = runner.getOptions();
FeastProperties.MetricsProperties metrics = jobProperties.getMetrics();

return new DataflowJobManager(
dataflow, jobProperties.getOptions(), jobProperties.getMetrics());
} catch (IOException e) {
throw new IllegalStateException(
"Unable to find credential required for Dataflow monitoring API", e);
} catch (GeneralSecurityException e) {
throw new IllegalStateException("Security exception while connecting to Dataflow API", e);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize DataflowJobManager", e);
}
switch (runner.getType()) {
case DATAFLOW:
return new DataflowJobManager(runnerConfigOptions, metrics);
case DIRECT:
return new DirectRunnerJobManager(
jobProperties.getOptions(), directJobRegistry, jobProperties.getMetrics());
return new DirectRunnerJobManager(runnerConfigOptions, new DirectJobRegistry(), metrics);
default:
throw new IllegalArgumentException("Unsupported runner: " + jobProperties.getRunner());
throw new IllegalArgumentException("Unsupported runner: " + runner);
}
}

/** Get a direct job registry */
@Bean
public DirectJobRegistry directJobRegistry() {
return new DirectJobRegistry();
}

/** Extracts job update options from feast core options. */
@Bean
public JobUpdatesProperties jobUpdatesProperties(FeastProperties feastProperties) {
return feastProperties.getJobs().getUpdates();
}
}
Loading

0 comments on commit 3fd6c7a

Please sign in to comment.