diff --git a/core/pom.xml b/core/pom.xml index 7961b45074..f4fb6c659c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -38,6 +38,14 @@ false + + + build-info + + build-info + + + @@ -207,5 +215,21 @@ jaxb-api + + javax.validation + validation-api + 2.0.0.Final + + + org.hibernate.validator + hibernate-validator + 6.1.2.Final + + + org.hibernate.validator + hibernate-validator-annotation-processor + 6.1.2.Final + + diff --git a/core/src/main/java/feast/core/config/FeastProperties.java b/core/src/main/java/feast/core/config/FeastProperties.java index b9c787b6c7..941d51f68c 100644 --- a/core/src/main/java/feast/core/config/FeastProperties.java +++ b/core/src/main/java/feast/core/config/FeastProperties.java @@ -16,53 +16,211 @@ */ package feast.core.config; -import java.util.Map; +import feast.core.config.FeastProperties.StreamProperties.FeatureStreamOptions; +import feast.core.validators.OneOfStrings; +import java.util.*; +import javax.annotation.PostConstruct; +import javax.validation.*; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Positive; import lombok.Getter; import lombok.Setter; +import org.hibernate.validator.constraints.URL; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.info.BuildProperties; @Getter @Setter @ConfigurationProperties(prefix = "feast", ignoreInvalidFields = true) public class FeastProperties { - private String version; - private JobProperties jobs; + /** + * Instantiates a new Feast properties. + * + * @param buildProperties Feast build properties + */ + @Autowired + public FeastProperties(BuildProperties buildProperties) { + setVersion(buildProperties.getVersion()); + } + + /** Instantiates a new Feast properties. */ + public FeastProperties() {} + + /* Feast Core Build Version */ + @NotBlank private String version = "unknown"; + + /* Population job properties */ + @NotNull private JobProperties jobs; + + @NotNull + /* Feast Kafka stream properties */ private StreamProperties stream; + /** Feast job properties. These properties are used for ingestion jobs. */ @Getter @Setter public static class JobProperties { - private String runner; - private Map options; + @NotBlank + /* The active Apache Beam runner name. This name references one instance of the Runner class */ + private String activeRunner; + + /** List of configured job runners. */ + private List runners = new ArrayList<>(); + + /** + * Gets a {@link Runner} instance of the active runner + * + * @return the active runner + */ + public Runner getActiveRunner() { + for (Runner runner : getRunners()) { + if (activeRunner.equals(runner.getName())) { + return runner; + } + } + throw new RuntimeException( + String.format( + "Active runner is misconfigured. Could not find runner: %s.", activeRunner)); + } + + /** Job Runner class. */ + @Getter + @Setter + public static class Runner { + /** Job runner name. This must be unique. */ + String name; + + /** Job runner type DirectRunner, DataflowRunner currently supported */ + String type; + + /** + * Job runner configuration options. See the following for options + * https://api.docs.feast.dev/grpc/feast.core.pb.html#Runner + */ + Map options = new HashMap<>(); + + /** + * Gets the job runner type as an enum. + * + * @return Returns the job runner type as {@link feast.core.job.Runner} + */ + public feast.core.job.Runner getType() { + return feast.core.job.Runner.fromString(type); + } + } + + @NotNull + /* Population job metric properties */ private MetricsProperties metrics; - private JobUpdatesProperties updates; - } - @Getter - @Setter - public static class JobUpdatesProperties { + /* Timeout in seconds for each attempt to update or submit a new job to the runner */ + @Positive private long jobUpdateTimeoutSeconds; - private long timeoutSeconds; - private long pollingIntervalMillis; + /* Job update polling interval in millisecond. How frequently Feast will update running jobs. */ + @Positive private long pollingIntervalMilliseconds; } + /** Properties used to configure Feast's managed Kafka feature stream. */ @Getter @Setter public static class StreamProperties { + /* Feature stream type. Only "kafka" is supported. */ + @OneOfStrings({"kafka"}) + @NotBlank private String type; - private Map options; + + /* Feature stream options */ + @NotNull private FeatureStreamOptions options; + + /** Feature stream options */ + @Getter + @Setter + public static class FeatureStreamOptions { + + /* Kafka topic to use for feature sets without source topics. */ + @NotBlank private String topic = "feast-features"; + + /** + * Comma separated list of Kafka bootstrap servers. Used for feature sets without a defined + * source. + */ + @NotBlank private String bootstrapServers = "localhost:9092"; + + /* Defines the number of copies of managed feature stream Kafka. */ + @Positive private short replicationFactor = 1; + + /* Number of Kafka partitions to to use for managed feature stream. */ + @Positive private int partitions = 1; + } } + /** Feast population job metrics */ @Getter @Setter public static class MetricsProperties { + /* Population job metrics enabled */ private boolean enabled; + + /* Metric type. Possible options: statsd */ + @OneOfStrings({"statsd"}) + @NotBlank private String type; - private String host; - private int port; + + /* Host of metric sink */ + @URL private String host; + + /* Port of metric sink */ + @Positive private int port; + } + + /** + * Validates all FeastProperties. This method runs after properties have been initialized and + * individually and conditionally validates each class. + */ + @PostConstruct + public void validate() { + ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); + Validator validator = factory.getValidator(); + + // Validate root fields in FeastProperties + Set> violations = validator.validate(this); + if (!violations.isEmpty()) { + throw new ConstraintViolationException(violations); + } + + // Validate Stream properties + Set> streamPropertyViolations = + validator.validate(getStream()); + if (!streamPropertyViolations.isEmpty()) { + throw new ConstraintViolationException(streamPropertyViolations); + } + + // Validate Stream Options + Set> featureStreamOptionsViolations = + validator.validate(getStream().getOptions()); + if (!featureStreamOptionsViolations.isEmpty()) { + throw new ConstraintViolationException(featureStreamOptionsViolations); + } + + // Validate JobProperties + Set> jobPropertiesViolations = validator.validate(getJobs()); + if (!jobPropertiesViolations.isEmpty()) { + throw new ConstraintViolationException(jobPropertiesViolations); + } + + // Validate MetricsProperties + if (getJobs().getMetrics().isEnabled()) { + Set> jobMetricViolations = + validator.validate(getJobs().getMetrics()); + if (!jobMetricViolations.isEmpty()) { + throw new ConstraintViolationException(jobMetricViolations); + } + } } } diff --git a/core/src/main/java/feast/core/config/FeatureStreamConfig.java b/core/src/main/java/feast/core/config/FeatureStreamConfig.java index 45de359ac7..44f0e0e099 100644 --- a/core/src/main/java/feast/core/config/FeatureStreamConfig.java +++ b/core/src/main/java/feast/core/config/FeatureStreamConfig.java @@ -48,8 +48,8 @@ public Source getDefaultSource(FeastProperties feastProperties) { SourceType featureStreamType = SourceType.valueOf(streamProperties.getType().toUpperCase()); switch (featureStreamType) { case KAFKA: - String bootstrapServers = streamProperties.getOptions().get("bootstrapServers"); - String topicName = streamProperties.getOptions().get("topic"); + String bootstrapServers = streamProperties.getOptions().getBootstrapServers(); + String topicName = streamProperties.getOptions().getTopic(); Map map = new HashMap<>(); map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); map.put( @@ -59,9 +59,8 @@ public Source getDefaultSource(FeastProperties feastProperties) { NewTopic newTopic = new NewTopic( topicName, - Integer.valueOf(streamProperties.getOptions().getOrDefault("numPartitions", "1")), - Short.valueOf( - streamProperties.getOptions().getOrDefault("replicationFactor", "1"))); + streamProperties.getOptions().getPartitions(), + streamProperties.getOptions().getReplicationFactor()); CreateTopicsResult createTopicsResult = client.createTopics(Collections.singleton(newTopic)); try { diff --git a/core/src/main/java/feast/core/config/JobConfig.java b/core/src/main/java/feast/core/config/JobConfig.java index 728fc0545b..69636963be 100644 --- a/core/src/main/java/feast/core/config/JobConfig.java +++ b/core/src/main/java/feast/core/config/JobConfig.java @@ -16,22 +16,11 @@ */ package feast.core.config; -import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; -import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.DataflowScopes; -import com.google.common.base.Strings; import feast.core.config.FeastProperties.JobProperties; -import feast.core.config.FeastProperties.JobUpdatesProperties; import feast.core.job.JobManager; -import feast.core.job.Runner; import feast.core.job.dataflow.DataflowJobManager; import feast.core.job.direct.DirectJobRegistry; import feast.core.job.direct.DirectRunnerJobManager; -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.HashMap; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -44,65 +33,26 @@ public class JobConfig { /** - * Get a JobManager according to the runner type and dataflow configuration. + * Get a JobManager according to the runner type and Dataflow configuration. * * @param feastProperties feast config properties */ @Bean @Autowired - public JobManager getJobManager( - FeastProperties feastProperties, DirectJobRegistry directJobRegistry) { + public JobManager getJobManager(FeastProperties feastProperties) { JobProperties jobProperties = feastProperties.getJobs(); - Runner runner = Runner.fromString(jobProperties.getRunner()); - if (jobProperties.getOptions() == null) { - jobProperties.setOptions(new HashMap<>()); - } - Map jobOptions = jobProperties.getOptions(); - switch (runner) { - case DATAFLOW: - if (Strings.isNullOrEmpty(jobOptions.getOrDefault("region", null)) - || Strings.isNullOrEmpty(jobOptions.getOrDefault("project", null))) { - log.error("Project and location of the Dataflow runner is not configured"); - throw new IllegalStateException( - "Project and location of Dataflow runner must be specified for jobs to be run on Dataflow runner."); - } - try { - GoogleCredential credential = - GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all()); - Dataflow dataflow = - new Dataflow( - GoogleNetHttpTransport.newTrustedTransport(), - JacksonFactory.getDefaultInstance(), - credential); + FeastProperties.JobProperties.Runner runner = jobProperties.getActiveRunner(); + Map runnerConfigOptions = runner.getOptions(); + FeastProperties.MetricsProperties metrics = jobProperties.getMetrics(); - return new DataflowJobManager( - dataflow, jobProperties.getOptions(), jobProperties.getMetrics()); - } catch (IOException e) { - throw new IllegalStateException( - "Unable to find credential required for Dataflow monitoring API", e); - } catch (GeneralSecurityException e) { - throw new IllegalStateException("Security exception while connecting to Dataflow API", e); - } catch (Exception e) { - throw new IllegalStateException("Unable to initialize DataflowJobManager", e); - } + switch (runner.getType()) { + case DATAFLOW: + return new DataflowJobManager(runnerConfigOptions, metrics); case DIRECT: - return new DirectRunnerJobManager( - jobProperties.getOptions(), directJobRegistry, jobProperties.getMetrics()); + return new DirectRunnerJobManager(runnerConfigOptions, new DirectJobRegistry(), metrics); default: - throw new IllegalArgumentException("Unsupported runner: " + jobProperties.getRunner()); + throw new IllegalArgumentException("Unsupported runner: " + runner); } } - - /** Get a direct job registry */ - @Bean - public DirectJobRegistry directJobRegistry() { - return new DirectJobRegistry(); - } - - /** Extracts job update options from feast core options. */ - @Bean - public JobUpdatesProperties jobUpdatesProperties(FeastProperties feastProperties) { - return feastProperties.getJobs().getUpdates(); - } } diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobConfig.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobConfig.java deleted file mode 100644 index a9bbf345d1..0000000000 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobConfig.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2019 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.core.job.dataflow; - -import lombok.Value; - -@Value -public class DataflowJobConfig { - private String projectId; - private String location; -} diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index c2313d75ec..6002133e82 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -18,7 +18,12 @@ import static feast.core.util.PipelineUtil.detectClassPathResourcesToStage; +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.DataflowScopes; import com.google.common.base.Strings; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; @@ -37,6 +42,7 @@ import feast.ingestion.options.ImportOptions; import feast.ingestion.options.OptionCompressor; import java.io.IOException; +import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -60,12 +66,46 @@ public class DataflowJobManager implements JobManager { private final MetricsProperties metrics; public DataflowJobManager( - Dataflow dataflow, Map defaultOptions, MetricsProperties metricsProperties) { - this.defaultOptions = defaultOptions; + Map runnerConfigOptions, MetricsProperties metricsProperties) { + this(runnerConfigOptions, metricsProperties, getGoogleCredential()); + } + + public DataflowJobManager( + Map runnerConfigOptions, + MetricsProperties metricsProperties, + Credential credential) { + + DataflowRunnerConfig config = new DataflowRunnerConfig(runnerConfigOptions); + + Dataflow dataflow = null; + try { + dataflow = + new Dataflow( + GoogleNetHttpTransport.newTrustedTransport(), + JacksonFactory.getDefaultInstance(), + credential); + } catch (GeneralSecurityException e) { + throw new IllegalStateException("Security exception while connecting to Dataflow API", e); + } catch (IOException e) { + throw new IllegalStateException("Unable to initialize DataflowJobManager", e); + } + + this.defaultOptions = runnerConfigOptions; this.dataflow = dataflow; this.metrics = metricsProperties; - this.projectId = defaultOptions.get("project"); - this.location = defaultOptions.get("region"); + this.projectId = config.getProject(); + this.location = config.getRegion(); + } + + private static Credential getGoogleCredential() { + GoogleCredential credential = null; + try { + credential = GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all()); + } catch (IOException e) { + throw new IllegalStateException( + "Unable to find credential required for Dataflow monitoring API", e); + } + return credential; } @Override diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java b/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java new file mode 100644 index 0000000000..6fe93ca80c --- /dev/null +++ b/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java @@ -0,0 +1,113 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job.dataflow; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.Set; +import javax.validation.*; +import javax.validation.constraints.NotBlank; +import lombok.Getter; +import lombok.Setter; + +/** DataflowRunnerConfig contains configuration fields for the Dataflow job runner. */ +@Getter +@Setter +public class DataflowRunnerConfig { + + public DataflowRunnerConfig(Map runnerConfigOptions) { + + // Try to find all fields in DataflowRunnerConfig inside the runnerConfigOptions and map it into + // this object + for (Field field : DataflowRunnerConfig.class.getFields()) { + String fieldName = field.getName(); + try { + if (!runnerConfigOptions.containsKey(fieldName)) { + continue; + } + String value = runnerConfigOptions.get(fieldName); + + if (Boolean.class.equals(field.getType())) { + field.set(this, Boolean.valueOf(value)); + continue; + } + if (field.getType() == Integer.class) { + field.set(this, Integer.valueOf(value)); + continue; + } + field.set(this, value); + } catch (IllegalAccessException e) { + throw new RuntimeException( + String.format( + "Could not successfully convert DataflowRunnerConfig for key: %s", fieldName), + e); + } + } + validate(); + } + + /* Project id to use when launching jobs. */ + @NotBlank public String project; + + /* The Google Compute Engine region for creating Dataflow jobs. */ + @NotBlank public String region; + + /* GCP availability zone for operations. */ + @NotBlank public String zone; + + /* Run the job as a specific service account, instead of the default GCE robot. */ + public String serviceAccount; + + /* GCE network for launching workers. */ + @NotBlank public String network; + + /* GCE subnetwork for launching workers. */ + @NotBlank public String subnetwork; + + /* Machine type to create Dataflow worker VMs as. */ + public String workerMachineType; + + /* The autoscaling algorithm to use for the workerpool. */ + public String autoscalingAlgorithm; + + /* Specifies whether worker pools should be started with public IP addresses. */ + public Boolean usePublicIps; + + /** + * A pipeline level default location for storing temporary files. Support Google Cloud Storage + * locations, e.g. gs://bucket/object + */ + @NotBlank public String tempLocation; + + /* The maximum number of workers to use for the workerpool. */ + public Integer maxNumWorkers; + + /* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */ + public String deadLetterTableSpec; + + /** Validates Dataflow runner configuration options */ + public void validate() { + ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); + Validator validator = factory.getValidator(); + + Set> dataflowRunnerConfigViolation = + validator.validate(this); + if (!dataflowRunnerConfigViolation.isEmpty()) { + throw new ConstraintViolationException(dataflowRunnerConfigViolation); + } + } +} diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index b66d181022..b4ed341edc 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -24,7 +24,8 @@ import feast.core.FeatureSetProto.FeatureSetStatus; import feast.core.StoreProto; import feast.core.StoreProto.Store.Subscription; -import feast.core.config.FeastProperties.JobUpdatesProperties; +import feast.core.config.FeastProperties; +import feast.core.config.FeastProperties.JobProperties; import feast.core.dao.FeatureSetRepository; import feast.core.dao.JobRepository; import feast.core.job.JobManager; @@ -58,7 +59,7 @@ public class JobCoordinatorService { private FeatureSetRepository featureSetRepository; private SpecService specService; private JobManager jobManager; - private JobUpdatesProperties jobUpdatesProperties; + private JobProperties jobProperties; @Autowired public JobCoordinatorService( @@ -66,12 +67,12 @@ public JobCoordinatorService( FeatureSetRepository featureSetRepository, SpecService specService, JobManager jobManager, - JobUpdatesProperties jobUpdatesProperties) { + FeastProperties feastProperties) { this.jobRepository = jobRepository; this.featureSetRepository = featureSetRepository; this.specService = specService; this.jobManager = jobManager; - this.jobUpdatesProperties = jobUpdatesProperties; + this.jobProperties = feastProperties.getJobs(); } /** @@ -86,7 +87,7 @@ public JobCoordinatorService( *

4) Updates Feature set statuses */ @Transactional - @Scheduled(fixedDelayString = "${feast.jobs.updates.pollingIntervalMillis}") + @Scheduled(fixedDelayString = "${feast.jobs.polling_interval_milliseconds}") public void Poll() throws InvalidProtocolBufferException { log.info("Polling for new jobs..."); List jobUpdateTasks = new ArrayList<>(); @@ -121,7 +122,7 @@ public void Poll() throws InvalidProtocolBufferException { store, originalJob, jobManager, - jobUpdatesProperties.getTimeoutSeconds())); + jobProperties.getJobUpdateTimeoutSeconds())); }); } } diff --git a/core/src/main/java/feast/core/validators/OneOfStringValidator.java b/core/src/main/java/feast/core/validators/OneOfStringValidator.java new file mode 100644 index 0000000000..6b84e44b01 --- /dev/null +++ b/core/src/main/java/feast/core/validators/OneOfStringValidator.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.validators; + +import java.util.Arrays; +import javax.validation.ConstraintValidator; +import javax.validation.ConstraintValidatorContext; + +/** Validates whether a string value is found within a collection. */ +public class OneOfStringValidator implements ConstraintValidator { + + /** Values that are permitted for a specific instance of this validator */ + String[] allowedValues; + + /** + * Initialize the OneOfStringValidator with a collection of allowed String values. + * + * @param constraintAnnotation + */ + @Override + public void initialize(OneOfStrings constraintAnnotation) { + allowedValues = constraintAnnotation.value(); + } + + /** + * Validates whether a string value is found within the collection defined in the annotation. + * + * @param value String value that should be validated + * @param context Provides contextual data and operation when applying a given constraint + * validator + * @return Boolean value indicating whether the string is found within the allowed values. + */ + @Override + public boolean isValid(String value, ConstraintValidatorContext context) { + return Arrays.asList(allowedValues).contains(value); + } +} diff --git a/core/src/main/java/feast/core/validators/OneOfStrings.java b/core/src/main/java/feast/core/validators/OneOfStrings.java new file mode 100644 index 0000000000..dba290438c --- /dev/null +++ b/core/src/main/java/feast/core/validators/OneOfStrings.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.validators; + +import java.lang.annotation.*; +import javax.validation.Constraint; +import javax.validation.Payload; + +/** + * Annotation for String "one of" validation. Allows for the definition of a collection through an + * annotation. The collection is used to test values defined in the object. + */ +@Target({ + ElementType.METHOD, + ElementType.FIELD, + ElementType.ANNOTATION_TYPE, + ElementType.CONSTRUCTOR, + ElementType.PARAMETER +}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Constraint(validatedBy = OneOfStringValidator.class) +public @interface OneOfStrings { + /** @return Default error message that is returned if the incorrect value is set */ + String message() default "Field value must be one of the following: {value}"; + + /** Allows for the specification of validation groups to which this constraint belongs. */ + Class[] groups() default {}; + + /** An attribute payload that can be used to assign custom payload objects to a constraint. */ + Class[] payload() default {}; + + /** @return Default value that is returned if no allowed values are configured */ + String[] value() default {}; +} diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml index ee060fffc9..51395cf644 100644 --- a/core/src/main/resources/application.yml +++ b/core/src/main/resources/application.yml @@ -23,18 +23,39 @@ grpc: enable-reflection: true feast: -# version: @project.version@ jobs: - # Runner type for feature population jobs. Currently supported runner types are - # DirectRunner and DataflowRunner. - runner: DirectRunner - # Key-value dict of job options to be passed to the population jobs. - options: {} - updates: - # Job update polling interval in milliseconds: how often Feast checks if new jobs should be sent to the runner. - pollingIntervalMillis: 60000 - # Timeout in seconds for each attempt to update or submit a new job to the runner. - timeoutSeconds: 240 + # Job update polling interval in milliseconds: how often Feast checks if new jobs should be sent to the runner. + polling_interval_milliseconds: 60000 + + # Timeout in seconds for each attempt to update or submit a new job to the runner. + job_update_timeout_seconds: 240 + + # Name of the active runner in "runners" that should be used. Only a single runner can be active at one time. + active_runner: direct + + # List of runner configurations. Please see protos/feast/core/Runner.proto for more details + # Alternatively see the following for options https://api.docs.feast.dev/grpc/feast.core.pb.html#Runner + runners: + - name: direct + type: DirectRunner + options: {} + + - name: dataflow + type: DataflowRunner + options: + project: my_gcp_project + region: asia-east1 + zone: asia-east1-a + tempLocation: gs://bucket/tempLocation + network: default + subnetwork: regions/asia-east1/subnetworks/mysubnetwork + maxNumWorkers: 1 + autoscalingAlgorithm: THROUGHPUT_BASED + usePublicIps: false + workerMachineType: n1-standard-1 + deadLetterTableSpec: project_id:dataset_id.table_id + + # Configuration options for metric collection for all ingestion jobs metrics: # Enable metrics pushing for all ingestion jobs. enabled: false @@ -49,9 +70,10 @@ feast: # Feature stream type. Only kafka is supported. type: kafka # Feature stream options. + # See the following for options https://api.docs.feast.dev/grpc/feast.core.pb.html#KafkaSourceConfig options: topic: feast-features - bootstrapServers: kafka:9092 + bootstrapServers: localhost:9092 replicationFactor: 1 partitions: 1 diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index 2d562d38df..e610f39373 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -22,6 +22,8 @@ import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential; import com.google.api.services.dataflow.Dataflow; import com.google.common.collect.Lists; import com.google.protobuf.Duration; @@ -77,9 +79,20 @@ public void setUp() { defaults = new HashMap<>(); defaults.put("project", "project"); defaults.put("region", "region"); + defaults.put("zone", "zone"); + defaults.put("tempLocation", "tempLocation"); + defaults.put("network", "network"); + defaults.put("subnetwork", "subnetwork"); MetricsProperties metricsProperties = new MetricsProperties(); metricsProperties.setEnabled(false); - dfJobManager = new DataflowJobManager(dataflow, defaults, metricsProperties); + Credential credential = null; + try { + credential = MockGoogleCredential.getApplicationDefault(); + } catch (IOException e) { + e.printStackTrace(); + } + + dfJobManager = new DataflowJobManager(defaults, metricsProperties, credential); dfJobManager = spy(dfJobManager); } diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index aa71f201dd..52e838c3d9 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -39,7 +39,8 @@ import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; import feast.core.StoreProto.Store.Subscription; -import feast.core.config.FeastProperties.JobUpdatesProperties; +import feast.core.config.FeastProperties; +import feast.core.config.FeastProperties.JobProperties; import feast.core.dao.FeatureSetRepository; import feast.core.dao.JobRepository; import feast.core.job.JobManager; @@ -65,13 +66,15 @@ public class JobCoordinatorServiceTest { @Mock SpecService specService; @Mock FeatureSetRepository featureSetRepository; - private JobUpdatesProperties jobUpdatesProperties; + private FeastProperties feastProperties; @Before public void setUp() { initMocks(this); - jobUpdatesProperties = new JobUpdatesProperties(); - jobUpdatesProperties.setTimeoutSeconds(5); + feastProperties = new FeastProperties(); + JobProperties jobProperties = new JobProperties(); + jobProperties.setJobUpdateTimeoutSeconds(5); + feastProperties.setJobs(jobProperties); } @Test @@ -79,7 +82,7 @@ public void shouldDoNothingIfNoStoresFound() throws InvalidProtocolBufferExcepti when(specService.listStores(any())).thenReturn(ListStoresResponse.newBuilder().build()); JobCoordinatorService jcs = new JobCoordinatorService( - jobRepository, featureSetRepository, specService, jobManager, jobUpdatesProperties); + jobRepository, featureSetRepository, specService, jobManager, feastProperties); jcs.Poll(); verify(jobRepository, times(0)).saveAndFlush(any()); } @@ -105,7 +108,7 @@ public void shouldDoNothingIfNoMatchingFeatureSetsFound() throws InvalidProtocol .thenReturn(ListFeatureSetsResponse.newBuilder().build()); JobCoordinatorService jcs = new JobCoordinatorService( - jobRepository, featureSetRepository, specService, jobManager, jobUpdatesProperties); + jobRepository, featureSetRepository, specService, jobManager, feastProperties); jcs.Poll(); verify(jobRepository, times(0)).saveAndFlush(any()); } @@ -196,7 +199,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep JobCoordinatorService jcs = new JobCoordinatorService( - jobRepository, featureSetRepository, specService, jobManager, jobUpdatesProperties); + jobRepository, featureSetRepository, specService, jobManager, feastProperties); jcs.Poll(); verify(jobRepository, times(1)).saveAndFlush(jobArgCaptor.capture()); Job actual = jobArgCaptor.getValue(); @@ -318,7 +321,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { JobCoordinatorService jcs = new JobCoordinatorService( - jobRepository, featureSetRepository, specService, jobManager, jobUpdatesProperties); + jobRepository, featureSetRepository, specService, jobManager, feastProperties); jcs.Poll(); verify(jobRepository, times(2)).saveAndFlush(jobArgCaptor.capture()); diff --git a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java index 1b88433381..b62f83f0f3 100644 --- a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java +++ b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java @@ -83,12 +83,9 @@ public static FeatureSink getFeatureSink( StoreType storeType = store.getType(); switch (storeType) { case REDIS: - return RedisFeatureSink.builder() - .setRedisConfig(store.getRedisConfig()) - .setFeatureSetSpecs(featureSetSpecs) - .build(); + return RedisFeatureSink.fromConfig(store.getRedisConfig(), featureSetSpecs); case BIGQUERY: - return BigQueryFeatureSink.fromConfig(store.getBigqueryConfig()); + return BigQueryFeatureSink.fromConfig(store.getBigqueryConfig(), featureSetSpecs); default: throw new RuntimeException(String.format("Store type '{}' is unsupported", storeType)); } diff --git a/pom.xml b/pom.xml index 5a9ab5292a..7b7cd1d0fe 100644 --- a/pom.xml +++ b/pom.xml @@ -486,6 +486,14 @@ true + + + build-info + + build-info + + + diff --git a/protos/feast/core/Runner.proto b/protos/feast/core/Runner.proto new file mode 100644 index 0000000000..779f4d44be --- /dev/null +++ b/protos/feast/core/Runner.proto @@ -0,0 +1,73 @@ +// +// * Copyright 2020 The Feast Authors +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * https://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// + +syntax = "proto3"; +package feast.core; + +option java_package = "feast.core"; +option java_outer_classname = "RunnerProto"; +option go_package = "github.com/gojek/feast/sdk/go/protos/feast/core"; + +message DirectRunnerConfigOptions { + /** + * Controls the amount of target parallelism the DirectRunner will use. + * Defaults to the greater of the number of available processors and 3. Must be a value + * greater than zero. + */ + int32 targetParallelism = 1; + + /* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */ + string deadLetterTableSpec = 2; +} + +message DataflowRunnerConfigOptions { + /* Project id to use when launching jobs. */ + string project = 1; + + /* The Google Compute Engine region for creating Dataflow jobs. */ + string region = 2; + + /* GCP availability zone for operations. */ + string zone = 3; + + /* Run the job as a specific service account, instead of the default GCE robot. */ + string serviceAccount = 4; + + /* GCE network for launching workers. */ + string network = 5; + + /* GCE subnetwork for launching workers. e.g. regions/asia-east1/subnetworks/mysubnetwork */ + string subnetwork = 6; + + /* Machine type to create Dataflow worker VMs as. */ + string workerMachineType = 7; + + /* The autoscaling algorithm to use for the workerpool. */ + string autoscalingAlgorithm = 8; + + /* Specifies whether worker pools should be started with public IP addresses. */ + bool usePublicIps = 9; + + // A pipeline level default location for storing temporary files. Support Google Cloud Storage locations, + // e.g. gs://bucket/object + string tempLocation = 10; + + /* The maximum number of workers to use for the workerpool. */ + int32 maxNumWorkers = 11; + + /* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */ + string deadLetterTableSpec = 12; +} \ No newline at end of file diff --git a/protos/feast/core/Source.proto b/protos/feast/core/Source.proto index 8a6cbd415a..b9e6227199 100644 --- a/protos/feast/core/Source.proto +++ b/protos/feast/core/Source.proto @@ -39,9 +39,15 @@ enum SourceType { } message KafkaSourceConfig { - // - bootstrapServers: [comma delimited value of host[:port]] + // Comma separated list of Kafka bootstrap servers. Used for feature sets without a defined source host[:port]] string bootstrap_servers = 1; - // - topics: [Kafka topic name. This value is provisioned by core and should not be set by the user.] + // Kafka topic to use for feature sets without user defined topics string topic = 2; + + // Number of Kafka partitions to to use for managed feature stream. + int32 partitions = 3; + + // Defines the number of copies of managed feature stream Kafka. + int32 replicationFactor = 4; } \ No newline at end of file diff --git a/protos/feast/core/Store.proto b/protos/feast/core/Store.proto index 931a9d46b6..de9af0a99f 100644 --- a/protos/feast/core/Store.proto +++ b/protos/feast/core/Store.proto @@ -120,6 +120,9 @@ message Store { message BigQueryConfig { string project_id = 1; string dataset_id = 2; + string staging_location = 3; + int32 initial_retry_delay_seconds = 4; + int32 total_timeout_seconds = 5; } message CassandraConfig { diff --git a/serving/lombok.config b/serving/lombok.config deleted file mode 100644 index 8f7e8aa1ac..0000000000 --- a/serving/lombok.config +++ /dev/null @@ -1 +0,0 @@ -lombok.addLombokGeneratedAnnotation = true \ No newline at end of file diff --git a/serving/pom.xml b/serving/pom.xml index 1390bfdc80..bbb694011a 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -34,7 +34,7 @@ spring-plugins Spring Plugins - http://repo.spring.io/plugins-release + https://repo.spring.io/plugins-release @@ -46,6 +46,14 @@ false + + + build-info + + build-info + + + org.apache.maven.plugins @@ -76,6 +84,7 @@ ${project.version} + dev.feast feast-storage-api diff --git a/serving/sample_redis_config.yml b/serving/sample_redis_config.yml deleted file mode 100644 index b3461649a1..0000000000 --- a/serving/sample_redis_config.yml +++ /dev/null @@ -1,9 +0,0 @@ -name: serving -type: REDIS -redis_config: - host: localhost - port: 6379 -subscriptions: - - name: "*" - project: "*" - version: "*" diff --git a/serving/src/main/java/feast/serving/FeastProperties.java b/serving/src/main/java/feast/serving/FeastProperties.java deleted file mode 100644 index 505d7d0330..0000000000 --- a/serving/src/main/java/feast/serving/FeastProperties.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2019 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.serving; - -// Feast configuration properties that maps Feast configuration from default application.yml file to -// a Java object. -// https://www.baeldung.com/configuration-properties-in-spring-boot -// https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-external-config.html#boot-features-external-config-typesafe-configuration-properties - -import java.util.Map; -import org.springframework.boot.context.properties.ConfigurationProperties; - -@ConfigurationProperties(prefix = "feast") -public class FeastProperties { - private String version; - private String coreHost; - private int coreGrpcPort; - private StoreProperties store; - private JobProperties jobs; - private TracingProperties tracing; - - public String getVersion() { - return this.version; - } - - public String getCoreHost() { - return this.coreHost; - } - - public int getCoreGrpcPort() { - return this.coreGrpcPort; - } - - public StoreProperties getStore() { - return this.store; - } - - public JobProperties getJobs() { - return this.jobs; - } - - public TracingProperties getTracing() { - return this.tracing; - } - - public void setVersion(String version) { - this.version = version; - } - - public void setCoreHost(String coreHost) { - this.coreHost = coreHost; - } - - public void setCoreGrpcPort(int coreGrpcPort) { - this.coreGrpcPort = coreGrpcPort; - } - - public void setStore(StoreProperties store) { - this.store = store; - } - - public void setJobs(JobProperties jobs) { - this.jobs = jobs; - } - - public void setTracing(TracingProperties tracing) { - this.tracing = tracing; - } - - public static class StoreProperties { - private String configPath; - private int redisPoolMaxSize; - private int redisPoolMaxIdle; - - public String getConfigPath() { - return this.configPath; - } - - public int getRedisPoolMaxSize() { - return this.redisPoolMaxSize; - } - - public int getRedisPoolMaxIdle() { - return this.redisPoolMaxIdle; - } - - public void setConfigPath(String configPath) { - this.configPath = configPath; - } - - public void setRedisPoolMaxSize(int redisPoolMaxSize) { - this.redisPoolMaxSize = redisPoolMaxSize; - } - - public void setRedisPoolMaxIdle(int redisPoolMaxIdle) { - this.redisPoolMaxIdle = redisPoolMaxIdle; - } - } - - public static class JobProperties { - private String stagingLocation; - private int bigqueryInitialRetryDelaySecs; - private int bigqueryTotalTimeoutSecs; - private String storeType; - private Map storeOptions; - - public String getStagingLocation() { - return this.stagingLocation; - } - - public int getBigqueryInitialRetryDelaySecs() { - return bigqueryInitialRetryDelaySecs; - } - - public int getBigqueryTotalTimeoutSecs() { - return bigqueryTotalTimeoutSecs; - } - - public String getStoreType() { - return this.storeType; - } - - public Map getStoreOptions() { - return this.storeOptions; - } - - public void setStagingLocation(String stagingLocation) { - this.stagingLocation = stagingLocation; - } - - public void setBigqueryInitialRetryDelaySecs(int bigqueryInitialRetryDelaySecs) { - this.bigqueryInitialRetryDelaySecs = bigqueryInitialRetryDelaySecs; - } - - public void setBigqueryTotalTimeoutSecs(int bigqueryTotalTimeoutSecs) { - this.bigqueryTotalTimeoutSecs = bigqueryTotalTimeoutSecs; - } - - public void setStoreType(String storeType) { - this.storeType = storeType; - } - - public void setStoreOptions(Map storeOptions) { - this.storeOptions = storeOptions; - } - } - - public static class TracingProperties { - private boolean enabled; - private String tracerName; - private String serviceName; - - public boolean isEnabled() { - return this.enabled; - } - - public String getTracerName() { - return this.tracerName; - } - - public String getServiceName() { - return this.serviceName; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - public void setTracerName(String tracerName) { - this.tracerName = tracerName; - } - - public void setServiceName(String serviceName) { - this.serviceName = serviceName; - } - } -} diff --git a/serving/src/main/java/feast/serving/ServingApplication.java b/serving/src/main/java/feast/serving/ServingApplication.java index ae9bb87a0b..ab036d04d1 100644 --- a/serving/src/main/java/feast/serving/ServingApplication.java +++ b/serving/src/main/java/feast/serving/ServingApplication.java @@ -16,11 +16,20 @@ */ package feast.serving; +import feast.serving.config.FeastProperties; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; +import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration; +import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; -@SpringBootApplication +@SpringBootApplication( + exclude = { + DataSourceAutoConfiguration.class, + DataSourceTransactionManagerAutoConfiguration.class, + HibernateJpaAutoConfiguration.class + }) @EnableConfigurationProperties(FeastProperties.class) public class ServingApplication { public static void main(String[] args) { diff --git a/serving/src/main/java/feast/serving/configuration/ContextClosedHandler.java b/serving/src/main/java/feast/serving/config/ContextClosedHandler.java similarity index 96% rename from serving/src/main/java/feast/serving/configuration/ContextClosedHandler.java rename to serving/src/main/java/feast/serving/config/ContextClosedHandler.java index a4f6d64d84..2bc97439f3 100644 --- a/serving/src/main/java/feast/serving/configuration/ContextClosedHandler.java +++ b/serving/src/main/java/feast/serving/config/ContextClosedHandler.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.serving.configuration; +package feast.serving.config; import java.util.concurrent.ScheduledExecutorService; import org.springframework.beans.factory.annotation.Autowired; diff --git a/serving/src/main/java/feast/serving/config/FeastProperties.java b/serving/src/main/java/feast/serving/config/FeastProperties.java new file mode 100644 index 0000000000..bf3387728a --- /dev/null +++ b/serving/src/main/java/feast/serving/config/FeastProperties.java @@ -0,0 +1,541 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.config; + +// Feast configuration properties that maps Feast configuration from default application.yml file to +// a Java object. +// https://www.baeldung.com/configuration-properties-in-spring-boot +// https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-external-config.html#boot-features-external-config-typesafe-configuration-properties + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import feast.core.StoreProto; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.Positive; +import org.apache.logging.log4j.core.config.plugins.validation.constraints.ValidHost; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.info.BuildProperties; + +/** Feast Serving properties. */ +@ConfigurationProperties(prefix = "feast", ignoreInvalidFields = true) +public class FeastProperties { + + /** + * Instantiates a new Feast Serving properties. + * + * @param buildProperties the build properties + */ + @Autowired + public FeastProperties(BuildProperties buildProperties) { + setVersion(buildProperties.getVersion()); + } + + /** Instantiates a new Feast class. */ + public FeastProperties() {} + + /* Feast Serving build version */ + @NotBlank private String version = "unknown"; + + /* Feast Core host to connect to. */ + @ValidHost @NotBlank private String coreHost; + + /* Feast Core port to connect to. */ + @Positive private int coreGrpcPort; + + /** + * Finds and returns the active store + * + * @return Returns the {@link Store} model object + */ + public Store getActiveStore() { + for (Store store : getStores()) { + if (activeStore.equals(store.getName())) { + return store; + } + } + throw new RuntimeException( + String.format("Active store is misconfigured. Could not find store: %s.", activeStore)); + } + + /** + * Set the name of the active store found in the "stores" configuration list + * + * @param activeStore String name to active store + */ + public void setActiveStore(String activeStore) { + this.activeStore = activeStore; + } + + /** Name of the active store configuration (only one store can be active at a time). */ + @NotBlank private String activeStore; + + /** + * Collection of store configurations. The active store is selected by the "activeStore" field. + */ + private List stores = new ArrayList<>(); + + /* Job Store properties to retain state of async jobs. */ + private JobStoreProperties jobStore; + + /* Metric tracing properties. */ + private TracingProperties tracing; + + /** + * Gets Serving store configuration as a list of {@link Store}. + * + * @return List of stores objects + */ + public List getStores() { + return stores; + } + + /** + * Gets Feast Serving build version. + * + * @return the build version + */ + public String getVersion() { + return version; + } + + /** + * Sets build version + * + * @param version the build version + */ + public void setVersion(String version) { + this.version = version; + } + + /** + * Gets Feast Core host. + * + * @return Feast Core host + */ + public String getCoreHost() { + return coreHost; + } + + /** + * Sets Feast Core host to connect to. + * + * @param coreHost Feast Core host + */ + public void setCoreHost(String coreHost) { + this.coreHost = coreHost; + } + + /** + * Gets Feast Core gRPC port. + * + * @return Feast Core gRPC port + */ + public int getCoreGrpcPort() { + return coreGrpcPort; + } + + /** + * Sets Feast Core gRPC port. + * + * @param coreGrpcPort gRPC port of Feast Core + */ + public void setCoreGrpcPort(int coreGrpcPort) { + this.coreGrpcPort = coreGrpcPort; + } + + /** + * Sets the collection of configured stores. + * + * @param stores List of {@link Store} + */ + public void setStores(List stores) { + this.stores = stores; + } + + /** Store configuration class for database that this Feast Serving uses. */ + public static class Store { + + private String name; + + private String type; + + private Map config = new HashMap<>(); + + private List subscriptions = new ArrayList<>(); + + /** + * Gets name of this store. This is unique to this specific instance. + * + * @return the name of the store + */ + public String getName() { + return name; + } + + /** + * Sets the name of this store. + * + * @param name the name of the store + */ + public void setName(String name) { + this.name = name; + } + + /** + * Gets the store type. Example are REDIS or BIGQUERY + * + * @return the store type as a String. + */ + public String getType() { + return type; + } + + /** + * Sets the store type + * + * @param type the type + */ + public void setType(String type) { + this.type = type; + } + + /** + * Converts this {@link Store} to a {@StoreProto.Store} + * + * @return {@StoreProto.Store} with configuration set + * @throws InvalidProtocolBufferException the invalid protocol buffer exception + * @throws JsonProcessingException the json processing exception + */ + public StoreProto.Store toProto() + throws InvalidProtocolBufferException, JsonProcessingException { + List subscriptions = getSubscriptions(); + List subscriptionProtos = + subscriptions.stream().map(Subscription::toProto).collect(Collectors.toList()); + + StoreProto.Store.Builder storeProtoBuilder = + StoreProto.Store.newBuilder() + .setName(name) + .setType(StoreProto.Store.StoreType.valueOf(type)) + .addAllSubscriptions(subscriptionProtos); + + ObjectMapper jsonWriter = new ObjectMapper(); + + // TODO: All of this logic should be moved to the store layer. Only a Map + // should be sent to a store and it should do its own validation. + switch (StoreProto.Store.StoreType.valueOf(type)) { + case REDIS: + StoreProto.Store.RedisConfig.Builder redisConfig = + StoreProto.Store.RedisConfig.newBuilder(); + JsonFormat.parser().merge(jsonWriter.writeValueAsString(config), redisConfig); + return storeProtoBuilder.setRedisConfig(redisConfig.build()).build(); + case BIGQUERY: + StoreProto.Store.BigQueryConfig.Builder bqConfig = + StoreProto.Store.BigQueryConfig.newBuilder(); + JsonFormat.parser().merge(jsonWriter.writeValueAsString(config), bqConfig); + return storeProtoBuilder.setBigqueryConfig(bqConfig.build()).build(); + case CASSANDRA: + StoreProto.Store.CassandraConfig.Builder cassandraConfig = + StoreProto.Store.CassandraConfig.newBuilder(); + JsonFormat.parser().merge(jsonWriter.writeValueAsString(config), cassandraConfig); + return storeProtoBuilder.setCassandraConfig(cassandraConfig.build()).build(); + default: + throw new InvalidProtocolBufferException("Invalid store set"); + } + } + + /** + * Get the subscriptions to this specific store. The subscriptions indicate which feature sets a + * store subscribes to. + * + * @return List of subscriptions. + */ + public List getSubscriptions() { + return subscriptions; + } + + /** + * Sets the store specific configuration. See getSubscriptions() for more details. + * + * @param subscriptions the subscriptions list + */ + public void setSubscriptions(List subscriptions) { + this.subscriptions = subscriptions; + } + + /** + * Gets the configuration to this specific store. This is a map of strings. These options are + * unique to the store. Please see protos/feast/core/Store.proto for the store specific + * configuration options + * + * @return Returns the store specific configuration + */ + public Map getConfig() { + return config; + } + + /** + * Sets the store config. Please protos/feast/core/Store.proto for the specific options for each + * store. + * + * @param config the config map + */ + public void setConfig(Map config) { + this.config = config; + } + + /** + * The Subscription type. + * + *

Note: Please see protos/feast/core/CoreService.proto for details on how to subscribe to + * feature sets. + */ + public class Subscription { + /** Feast project to subscribe to. */ + String project; + + /** Feature set to subscribe to. */ + String name; + + /** Feature set versions to subscribe to. */ + String version; + + /** + * Gets Feast project subscribed to. + * + * @return the project string + */ + public String getProject() { + return project; + } + + /** + * Sets Feast project to subscribe to for this store. + * + * @param project the project + */ + public void setProject(String project) { + this.project = project; + } + + /** + * Gets the feature set name to subscribe to. + * + * @return the name + */ + public String getName() { + return name; + } + + /** + * Sets the feature set name to subscribe to. + * + * @param name the name + */ + public void setName(String name) { + this.name = name; + } + + /** + * Gets the feature set version that is being subscribed to by this store. + * + * @return the version + */ + public String getVersion() { + return version; + } + + /** + * Sets the feature set version that is being subscribed to by this store. + * + * @param version the version + */ + public void setVersion(String version) { + this.version = version; + } + + /** + * Convert this {@link Subscription} to a {@link StoreProto.Store.Subscription}. + * + * @return the store proto . store . subscription + */ + public StoreProto.Store.Subscription toProto() { + return StoreProto.Store.Subscription.newBuilder() + .setName(getName()) + .setProject(getProject()) + .setVersion(getVersion()) + .build(); + } + } + } + + /** + * Gets job store properties + * + * @return the job store properties + */ + public JobStoreProperties getJobStore() { + return jobStore; + } + + /** + * Set job store properties + * + * @param jobStore Job store properties to set + */ + public void setJobStore(JobStoreProperties jobStore) { + this.jobStore = jobStore; + } + + /** + * Gets tracing properties + * + * @return tracing properties + */ + public TracingProperties getTracing() { + return tracing; + } + + /** + * Sets the tracing configuration. + * + * @param tracing the tracing + */ + public void setTracing(TracingProperties tracing) { + this.tracing = tracing; + } + + /** The type Job store properties. */ + public static class JobStoreProperties { + + /** Job Store Redis Host */ + private String redisHost; + + /** Job Store Redis Host */ + private int redisPort; + + /** + * Gets redis host. + * + * @return the redis host + */ + public String getRedisHost() { + return redisHost; + } + + /** + * Sets redis host. + * + * @param redisHost the redis host + */ + public void setRedisHost(String redisHost) { + this.redisHost = redisHost; + } + + /** + * Gets redis port. + * + * @return the redis port + */ + public int getRedisPort() { + return redisPort; + } + + /** + * Sets redis port. + * + * @param redisPort the redis port + */ + public void setRedisPort(int redisPort) { + this.redisPort = redisPort; + } + } + + /** Trace metric collection properties */ + public static class TracingProperties { + + /** Tracing enabled/disabled */ + private boolean enabled; + + /** Name of tracer to use (only "jaeger") */ + private String tracerName; + + /** Service name uniquely identifies this Feast Serving deployment */ + private String serviceName; + + /** + * Is tracing enabled + * + * @return boolean flag + */ + public boolean isEnabled() { + return enabled; + } + + /** + * Sets tracing enabled or disabled. + * + * @param enabled flag + */ + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + /** + * Gets tracer name ('jaeger') + * + * @return the tracer name + */ + public String getTracerName() { + return tracerName; + } + + /** + * Sets tracer name. + * + * @param tracerName the tracer name + */ + public void setTracerName(String tracerName) { + this.tracerName = tracerName; + } + + /** + * Gets the service name. The service name uniquely identifies this Feast serving instance. + * + * @return the service name + */ + public String getServiceName() { + return serviceName; + } + + /** + * Sets service name. + * + * @param serviceName the service name + */ + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + } +} diff --git a/serving/src/main/java/feast/serving/configuration/InstrumentationConfig.java b/serving/src/main/java/feast/serving/config/InstrumentationConfig.java similarity index 96% rename from serving/src/main/java/feast/serving/configuration/InstrumentationConfig.java rename to serving/src/main/java/feast/serving/config/InstrumentationConfig.java index 2cd284829c..30269c5d0e 100644 --- a/serving/src/main/java/feast/serving/configuration/InstrumentationConfig.java +++ b/serving/src/main/java/feast/serving/config/InstrumentationConfig.java @@ -14,9 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.serving.configuration; +package feast.serving.config; -import feast.serving.FeastProperties; import io.opentracing.Tracer; import io.opentracing.noop.NoopTracerFactory; import io.prometheus.client.exporter.MetricsServlet; diff --git a/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java b/serving/src/main/java/feast/serving/config/JobServiceConfig.java similarity index 56% rename from serving/src/main/java/feast/serving/configuration/JobServiceConfig.java rename to serving/src/main/java/feast/serving/config/JobServiceConfig.java index fa94dab832..fa2272e5cd 100644 --- a/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java +++ b/serving/src/main/java/feast/serving/config/JobServiceConfig.java @@ -14,14 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.serving.configuration; +package feast.serving.config; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.protobuf.InvalidProtocolBufferException; import feast.core.StoreProto.Store.StoreType; -import feast.serving.FeastProperties; import feast.serving.service.JobService; import feast.serving.service.NoopJobService; import feast.serving.service.RedisBackedJobService; -import feast.serving.specs.CachedSpecService; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -29,24 +29,11 @@ public class JobServiceConfig { @Bean - public JobService jobService( - FeastProperties feastProperties, - CachedSpecService specService, - StoreConfiguration storeConfiguration) { - if (!specService.getStore().getType().equals(StoreType.BIGQUERY)) { + public JobService jobService(FeastProperties feastProperties) + throws InvalidProtocolBufferException, JsonProcessingException { + if (!feastProperties.getActiveStore().toProto().getType().equals(StoreType.BIGQUERY)) { return new NoopJobService(); } - StoreType storeType = StoreType.valueOf(feastProperties.getJobs().getStoreType()); - switch (storeType) { - case REDIS: - return new RedisBackedJobService(storeConfiguration.getJobStoreRedisConnection()); - case INVALID: - case BIGQUERY: - case CASSANDRA: - case UNRECOGNIZED: - default: - throw new IllegalArgumentException( - String.format("Unsupported store type '%s' for job store", storeType)); - } + return new RedisBackedJobService(feastProperties.getJobStore()); } } diff --git a/serving/src/main/java/feast/serving/configuration/ServingApiConfiguration.java b/serving/src/main/java/feast/serving/config/ServingApiConfiguration.java similarity index 97% rename from serving/src/main/java/feast/serving/configuration/ServingApiConfiguration.java rename to serving/src/main/java/feast/serving/config/ServingApiConfiguration.java index 539b25a0fc..ce4fe13437 100644 --- a/serving/src/main/java/feast/serving/configuration/ServingApiConfiguration.java +++ b/serving/src/main/java/feast/serving/config/ServingApiConfiguration.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.serving.configuration; +package feast.serving.config; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; diff --git a/serving/src/main/java/feast/serving/config/ServingServiceConfig.java b/serving/src/main/java/feast/serving/config/ServingServiceConfig.java new file mode 100644 index 0000000000..ec84e6c4fe --- /dev/null +++ b/serving/src/main/java/feast/serving/config/ServingServiceConfig.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.config; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.StoreProto; +import feast.serving.service.HistoricalServingService; +import feast.serving.service.JobService; +import feast.serving.service.NoopJobService; +import feast.serving.service.OnlineServingService; +import feast.serving.service.ServingService; +import feast.serving.specs.CachedSpecService; +import feast.storage.api.retriever.HistoricalRetriever; +import feast.storage.api.retriever.OnlineRetriever; +import feast.storage.connectors.bigquery.retriever.BigQueryHistoricalRetriever; +import feast.storage.connectors.redis.retriever.RedisOnlineRetriever; +import io.opentracing.Tracer; +import java.util.Map; +import org.slf4j.Logger; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ServingServiceConfig { + + private static final Logger log = org.slf4j.LoggerFactory.getLogger(ServingServiceConfig.class); + + @Bean + public ServingService servingService( + FeastProperties feastProperties, + CachedSpecService specService, + JobService jobService, + Tracer tracer) + throws InvalidProtocolBufferException, JsonProcessingException { + ServingService servingService = null; + FeastProperties.Store store = feastProperties.getActiveStore(); + StoreProto.Store.StoreType storeType = store.toProto().getType(); + Map config = store.getConfig(); + + switch (storeType) { + case REDIS: + OnlineRetriever redisRetriever = RedisOnlineRetriever.create(config); + servingService = new OnlineServingService(redisRetriever, specService, tracer); + break; + case BIGQUERY: + if (jobService.getClass() == NoopJobService.class) { + throw new IllegalArgumentException( + "Unable to instantiate JobService which is required by BigQueryHistoricalRetriever."); + } + HistoricalRetriever bqRetriever = BigQueryHistoricalRetriever.create(config); + servingService = new HistoricalServingService(bqRetriever, specService, jobService); + break; + case CASSANDRA: + case UNRECOGNIZED: + case INVALID: + throw new IllegalArgumentException( + String.format( + "Unsupported store type '%s' for store name '%s'", + store.getType(), store.getName())); + } + + return servingService; + } +} diff --git a/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java similarity index 87% rename from serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java rename to serving/src/main/java/feast/serving/config/SpecServiceConfig.java index 26ebfa956c..dbe2de665e 100644 --- a/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java +++ b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java @@ -14,13 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.serving.configuration; +package feast.serving.config; -import feast.serving.FeastProperties; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.StoreProto; import feast.serving.specs.CachedSpecService; import feast.serving.specs.CoreSpecService; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -58,10 +58,11 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService( } @Bean - public CachedSpecService specService(FeastProperties feastProperties) { + public CachedSpecService specService(FeastProperties feastProperties) + throws InvalidProtocolBufferException, JsonProcessingException { CoreSpecService coreService = new CoreSpecService(feastCoreHost, feastCorePort); - Path path = Paths.get(feastProperties.getStore().getConfigPath()); - CachedSpecService cachedSpecStorage = new CachedSpecService(coreService, path); + StoreProto.Store storeProto = feastProperties.getActiveStore().toProto(); + CachedSpecService cachedSpecStorage = new CachedSpecService(coreService, storeProto); try { cachedSpecStorage.populateCache(); } catch (Exception e) { diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java deleted file mode 100644 index 28df853e22..0000000000 --- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2019 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.serving.configuration; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryOptions; -import com.google.cloud.storage.Storage; -import com.google.cloud.storage.StorageOptions; -import feast.core.StoreProto.Store; -import feast.core.StoreProto.Store.BigQueryConfig; -import feast.core.StoreProto.Store.RedisConfig; -import feast.core.StoreProto.Store.Subscription; -import feast.serving.FeastProperties; -import feast.serving.service.*; -import feast.serving.specs.CachedSpecService; -import feast.storage.api.retriever.HistoricalRetriever; -import feast.storage.api.retriever.OnlineRetriever; -import feast.storage.connectors.bigquery.retriever.BigQueryHistoricalRetriever; -import feast.storage.connectors.redis.retriever.RedisOnlineRetriever; -import io.opentracing.Tracer; -import java.util.Map; -import org.slf4j.Logger; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class ServingServiceConfig { - - private static final Logger log = org.slf4j.LoggerFactory.getLogger(ServingServiceConfig.class); - - private Store setStoreConfig(Store.Builder builder, Map options) { - switch (builder.getType()) { - case REDIS: - RedisConfig redisConfig = - RedisConfig.newBuilder() - .setHost(options.get("host")) - .setPort(Integer.parseInt(options.get("port"))) - .build(); - return builder.setRedisConfig(redisConfig).build(); - case BIGQUERY: - BigQueryConfig bqConfig = - BigQueryConfig.newBuilder() - .setProjectId(options.get("projectId")) - .setDatasetId(options.get("datasetId")) - .build(); - return builder.setBigqueryConfig(bqConfig).build(); - case CASSANDRA: - default: - throw new IllegalArgumentException( - String.format( - "Unsupported store %s provided, only REDIS or BIGQUERY are currently supported.", - builder.getType())); - } - } - - @Bean - public ServingService servingService( - FeastProperties feastProperties, - CachedSpecService specService, - JobService jobService, - Tracer tracer, - StoreConfiguration storeConfiguration) { - ServingService servingService = null; - Store store = specService.getStore(); - - switch (store.getType()) { - case REDIS: - OnlineRetriever redisRetriever = - new RedisOnlineRetriever(storeConfiguration.getServingRedisConnection()); - servingService = new OnlineServingService(redisRetriever, specService, tracer); - break; - case BIGQUERY: - BigQueryConfig bqConfig = store.getBigqueryConfig(); - BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); - Storage storage = StorageOptions.getDefaultInstance().getService(); - String jobStagingLocation = feastProperties.getJobs().getStagingLocation(); - if (!jobStagingLocation.contains("://")) { - throw new IllegalArgumentException( - String.format("jobStagingLocation is not a valid URI: %s", jobStagingLocation)); - } - if (jobStagingLocation.endsWith("/")) { - jobStagingLocation = jobStagingLocation.substring(0, jobStagingLocation.length() - 1); - } - if (!jobStagingLocation.startsWith("gs://")) { - throw new IllegalArgumentException( - "Store type BIGQUERY requires job staging location to be a valid and existing Google Cloud Storage URI. Invalid staging location: " - + jobStagingLocation); - } - if (jobService.getClass() == NoopJobService.class) { - throw new IllegalArgumentException( - "Unable to instantiate jobService for BigQuery store."); - } - - HistoricalRetriever bqRetriever = - BigQueryHistoricalRetriever.builder() - .setBigquery(bigquery) - .setDatasetId(bqConfig.getDatasetId()) - .setProjectId(bqConfig.getProjectId()) - .setJobStagingLocation(jobStagingLocation) - .setInitialRetryDelaySecs( - feastProperties.getJobs().getBigqueryInitialRetryDelaySecs()) - .setTotalTimeoutSecs(feastProperties.getJobs().getBigqueryTotalTimeoutSecs()) - .setStorage(storage) - .build(); - - servingService = new HistoricalServingService(bqRetriever, specService, jobService); - break; - case CASSANDRA: - case UNRECOGNIZED: - case INVALID: - throw new IllegalArgumentException( - String.format( - "Unsupported store type '%s' for store name '%s'", - store.getType(), store.getName())); - } - - return servingService; - } - - private Subscription parseSubscription(String subscription) { - String[] split = subscription.split(":"); - return Subscription.newBuilder().setName(split[0]).setVersion(split[1]).build(); - } -} diff --git a/serving/src/main/java/feast/serving/configuration/StoreConfiguration.java b/serving/src/main/java/feast/serving/configuration/StoreConfiguration.java deleted file mode 100644 index 84dc7b7f8d..0000000000 --- a/serving/src/main/java/feast/serving/configuration/StoreConfiguration.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.serving.configuration; - -import io.lettuce.core.api.StatefulRedisConnection; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class StoreConfiguration { - - // We can define other store specific beans here - // These beans can be autowired or can be created in this class. - private final StatefulRedisConnection servingRedisConnection; - private final StatefulRedisConnection jobStoreRedisConnection; - - @Autowired - public StoreConfiguration( - ObjectProvider> servingRedisConnection, - ObjectProvider> jobStoreRedisConnection) { - this.servingRedisConnection = servingRedisConnection.getIfAvailable(); - this.jobStoreRedisConnection = jobStoreRedisConnection.getIfAvailable(); - } - - public StatefulRedisConnection getServingRedisConnection() { - return servingRedisConnection; - } - - public StatefulRedisConnection getJobStoreRedisConnection() { - return jobStoreRedisConnection; - } -} diff --git a/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java b/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java deleted file mode 100644 index 77d9262bcb..0000000000 --- a/serving/src/main/java/feast/serving/configuration/redis/JobStoreRedisConfig.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.serving.configuration.redis; - -import com.google.common.base.Enums; -import feast.core.StoreProto; -import feast.serving.FeastProperties; -import io.lettuce.core.RedisClient; -import io.lettuce.core.RedisURI; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.codec.ByteArrayCodec; -import io.lettuce.core.resource.ClientResources; -import io.lettuce.core.resource.DefaultClientResources; -import java.util.Map; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class JobStoreRedisConfig { - - @Bean(destroyMethod = "shutdown") - ClientResources jobStoreClientResources() { - return DefaultClientResources.create(); - } - - @Bean(destroyMethod = "shutdown") - RedisClient jobStoreRedisClient( - ClientResources jobStoreClientResources, FeastProperties feastProperties) { - StoreProto.Store.StoreType storeType = - Enums.getIfPresent( - StoreProto.Store.StoreType.class, feastProperties.getJobs().getStoreType()) - .orNull(); - if (storeType != StoreProto.Store.StoreType.REDIS) return null; - Map jobStoreConf = feastProperties.getJobs().getStoreOptions(); - // If job conf is empty throw StoreException - if (jobStoreConf == null - || jobStoreConf.get("host") == null - || jobStoreConf.get("host").isEmpty() - || jobStoreConf.get("port") == null - || jobStoreConf.get("port").isEmpty()) - throw new IllegalArgumentException("Store Configuration is not set"); - RedisURI uri = - RedisURI.create(jobStoreConf.get("host"), Integer.parseInt(jobStoreConf.get("port"))); - return RedisClient.create(jobStoreClientResources, uri); - } - - @Bean(destroyMethod = "close") - StatefulRedisConnection jobStoreRedisConnection( - ObjectProvider jobStoreRedisClient) { - if (jobStoreRedisClient.getIfAvailable() == null) return null; - return jobStoreRedisClient.getIfAvailable().connect(new ByteArrayCodec()); - } -} diff --git a/serving/src/main/java/feast/serving/configuration/redis/ServingStoreRedisConfig.java b/serving/src/main/java/feast/serving/configuration/redis/ServingStoreRedisConfig.java deleted file mode 100644 index 17a50eef6d..0000000000 --- a/serving/src/main/java/feast/serving/configuration/redis/ServingStoreRedisConfig.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.serving.configuration.redis; - -import feast.core.StoreProto; -import feast.serving.specs.CachedSpecService; -import io.lettuce.core.RedisClient; -import io.lettuce.core.RedisURI; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.codec.ByteArrayCodec; -import io.lettuce.core.resource.ClientResources; -import io.lettuce.core.resource.DefaultClientResources; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.context.annotation.*; - -@Configuration -public class ServingStoreRedisConfig { - - @Bean - StoreProto.Store.RedisConfig servingStoreRedisConf(CachedSpecService specService) { - if (specService.getStore().getType() != StoreProto.Store.StoreType.REDIS) return null; - return specService.getStore().getRedisConfig(); - } - - @Bean(destroyMethod = "shutdown") - ClientResources servingClientResources() { - return DefaultClientResources.create(); - } - - @Bean(destroyMethod = "shutdown") - RedisClient servingRedisClient( - ClientResources servingClientResources, - ObjectProvider servingStoreRedisConf) { - if (servingStoreRedisConf.getIfAvailable() == null) return null; - RedisURI redisURI = - RedisURI.create( - servingStoreRedisConf.getIfAvailable().getHost(), - servingStoreRedisConf.getIfAvailable().getPort()); - return RedisClient.create(servingClientResources, redisURI); - } - - @Bean(destroyMethod = "close") - StatefulRedisConnection servingRedisConnection( - ObjectProvider servingRedisClient) { - if (servingRedisClient.getIfAvailable() == null) return null; - return servingRedisClient.getIfAvailable().connect(new ByteArrayCodec()); - } -} diff --git a/serving/src/main/java/feast/serving/controller/ServingServiceGRpcController.java b/serving/src/main/java/feast/serving/controller/ServingServiceGRpcController.java index 0eba67d4b4..91f38e2bd4 100644 --- a/serving/src/main/java/feast/serving/controller/ServingServiceGRpcController.java +++ b/serving/src/main/java/feast/serving/controller/ServingServiceGRpcController.java @@ -16,7 +16,6 @@ */ package feast.serving.controller; -import feast.serving.FeastProperties; import feast.serving.ServingAPIProto.GetBatchFeaturesRequest; import feast.serving.ServingAPIProto.GetBatchFeaturesResponse; import feast.serving.ServingAPIProto.GetFeastServingInfoRequest; @@ -26,6 +25,7 @@ import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest; import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; import feast.serving.ServingServiceGrpc.ServingServiceImplBase; +import feast.serving.config.FeastProperties; import feast.serving.exception.SpecRetrievalException; import feast.serving.interceptors.GrpcMonitoringInterceptor; import feast.serving.service.ServingService; diff --git a/serving/src/main/java/feast/serving/controller/ServingServiceRestController.java b/serving/src/main/java/feast/serving/controller/ServingServiceRestController.java index b0e349fd6b..344ab7cf3a 100644 --- a/serving/src/main/java/feast/serving/controller/ServingServiceRestController.java +++ b/serving/src/main/java/feast/serving/controller/ServingServiceRestController.java @@ -18,11 +18,11 @@ import static feast.serving.util.mappers.ResponseJSONMapper.mapGetOnlineFeaturesResponse; -import feast.serving.FeastProperties; import feast.serving.ServingAPIProto.GetFeastServingInfoRequest; import feast.serving.ServingAPIProto.GetFeastServingInfoResponse; import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest; import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; +import feast.serving.config.FeastProperties; import feast.serving.service.ServingService; import feast.serving.util.RequestHelper; import io.opentracing.Tracer; diff --git a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java index 0bf5363037..dd010e5897 100644 --- a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java +++ b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java @@ -19,8 +19,13 @@ import com.google.protobuf.util.JsonFormat; import feast.serving.ServingAPIProto.Job; import feast.serving.ServingAPIProto.Job.Builder; +import feast.serving.config.FeastProperties; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.codec.ByteArrayCodec; +import io.lettuce.core.resource.DefaultClientResources; import java.util.Optional; import org.joda.time.Duration; import org.slf4j.Logger; @@ -37,6 +42,16 @@ public class RedisBackedJobService implements JobService { // and since users normally don't require info about relatively old jobs. private final int defaultExpirySeconds = (int) Duration.standardDays(1).getStandardSeconds(); + public RedisBackedJobService(FeastProperties.JobStoreProperties jobStoreProperties) { + RedisURI uri = + RedisURI.create(jobStoreProperties.getRedisHost(), jobStoreProperties.getRedisPort()); + + this.syncCommand = + RedisClient.create(DefaultClientResources.create(), uri) + .connect(new ByteArrayCodec()) + .sync(); + } + public RedisBackedJobService(StatefulRedisConnection connection) { this.syncCommand = connection.sync(); } diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java index 246be8c5fd..2f68711bf2 100644 --- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java @@ -18,7 +18,6 @@ import static feast.serving.util.RefUtil.generateFeatureSetStringRef; import static feast.serving.util.RefUtil.generateFeatureStringRef; -import static feast.serving.util.mappers.YamlToProtoMapper.yamlToStoreProto; import static java.util.Comparator.comparingInt; import static java.util.stream.Collectors.groupingBy; @@ -27,11 +26,10 @@ import com.google.common.cache.LoadingCache; import feast.core.CoreServiceProto.ListFeatureSetsRequest; import feast.core.CoreServiceProto.ListFeatureSetsResponse; -import feast.core.CoreServiceProto.UpdateStoreRequest; -import feast.core.CoreServiceProto.UpdateStoreResponse; import feast.core.FeatureSetProto.FeatureSet; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.FeatureSetProto.FeatureSpec; +import feast.core.StoreProto; import feast.core.StoreProto.Store; import feast.core.StoreProto.Store.Subscription; import feast.serving.ServingAPIProto.FeatureReference; @@ -39,9 +37,6 @@ import feast.storage.api.retriever.FeatureSetRequest; import io.grpc.StatusRuntimeException; import io.prometheus.client.Gauge; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -59,7 +54,6 @@ public class CachedSpecService { private static final Logger log = org.slf4j.LoggerFactory.getLogger(CachedSpecService.class); private final CoreSpecService coreService; - private final Path configPath; private final Map featureToFeatureSetMapping; @@ -80,10 +74,9 @@ public class CachedSpecService { .help("epoch time of the last time the cache was updated") .register(); - public CachedSpecService(CoreSpecService coreService, Path configPath) { - this.configPath = configPath; + public CachedSpecService(CoreSpecService coreService, StoreProto.Store store) { this.coreService = coreService; - this.store = updateStore(readConfig(configPath)); + this.store = store; Map featureSets = getFeatureSetMap(); featureToFeatureSetMapping = @@ -156,7 +149,6 @@ public List getFeatureSets(List featureRefe * from core to preload the cache. */ public void populateCache() { - this.store = updateStore(readConfig(configPath)); Map featureSetMap = getFeatureSetMap(); featureSetCache.putAll(featureSetMap); featureToFeatureSetMapping.putAll(getFeatureToFeatureSetMapping(featureSetMap)); @@ -239,29 +231,4 @@ private Map getFeatureToFeatureSetMapping( }); return mapping; } - - private Store readConfig(Path path) { - try { - List fileContents = Files.readAllLines(path); - String yaml = fileContents.stream().reduce("", (l1, l2) -> l1 + "\n" + l2); - log.info("loaded store config at {}: \n{}", path.toString(), yaml); - return yamlToStoreProto(yaml); - } catch (IOException e) { - throw new RuntimeException( - String.format("Unable to read store config at %s", path.toAbsolutePath()), e); - } - } - - private Store updateStore(Store store) { - UpdateStoreRequest request = UpdateStoreRequest.newBuilder().setStore(store).build(); - try { - UpdateStoreResponse updateStoreResponse = coreService.updateStore(request); - if (!updateStoreResponse.getStore().equals(store)) { - throw new RuntimeException("Core store config not matching current store config"); - } - return updateStoreResponse.getStore(); - } catch (Exception e) { - throw new RuntimeException("Unable to update store configuration", e); - } - } } diff --git a/serving/src/main/java/feast/serving/util/mappers/YamlToProtoMapper.java b/serving/src/main/java/feast/serving/util/mappers/YamlToProtoMapper.java deleted file mode 100644 index 00ad1fabb1..0000000000 --- a/serving/src/main/java/feast/serving/util/mappers/YamlToProtoMapper.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2019 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.serving.util.mappers; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.google.protobuf.util.JsonFormat; -import feast.core.StoreProto.Store; -import feast.core.StoreProto.Store.Builder; -import java.io.IOException; - -public class YamlToProtoMapper { - private static final ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory()); - private static final ObjectMapper jsonWriter = new ObjectMapper(); - - public static Store yamlToStoreProto(String yaml) throws IOException { - Object obj = yamlReader.readValue(yaml, Object.class); - String jsonString = jsonWriter.writeValueAsString(obj); - Builder builder = Store.newBuilder(); - JsonFormat.parser().merge(jsonString, builder); - return builder.build(); - } -} diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index 96713c8028..053fddfaff 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -1,12 +1,51 @@ feast: - # This value is retrieved from project.version properties in pom.xml - # https://docs.spring.io/spring-boot/docs/current/reference/html/ - version: @project.version@ # GRPC service address for Feast Core # Feast Serving requires connection to Feast Core to retrieve and reload Feast metadata (e.g. FeatureSpecs, Store information) core-host: ${FEAST_CORE_HOST:localhost} core-grpc-port: ${FEAST_CORE_GRPC_PORT:6565} + # Indicates the active store. Only a single store in the last can be active at one time. In the future this key + # will be deprecated in order to allow multiple stores to be served from a single serving instance + active_store: online + + # List of store configurations + stores: + # Below are two store configurations. One for Redis and one for BigQuery. + # Please see https://api.docs.feast.dev/grpc/feast.core.pb.html#Store for configuration options + - name: online # Name of the store (referenced by active_store) + type: REDIS # Type of the store. REDIS, BIGQUERY are available options + config: # Store specific configuration. See + host: localhost + port: 6379 + # Subscriptions indicate which feature sets needs to be retrieved and used to populate this store + subscriptions: + # Wildcards match all options. No filtering is done. + - name: "*" + project: "*" + version: "*" + + - name: historical + type: BIGQUERY + config: # Store specific configuration. + # GCP Project + project_id: my_project + # BigQuery Dataset Id + dataset_id: my_dataset + # staging-location specifies the URI to store intermediate files for batch serving. + # Feast Serving client is expected to have read access to this staging location + # to download the batch features. + # For example: gs://mybucket/myprefix + # Please omit the trailing slash in the URI. + staging-location: gs://mybucket/myprefix + # Retry options for BigQuery retrieval jobs + bigquery-initial-retry-delay-secs: 1 + # BigQuery timeout for retrieval jobs + bigquery-total-timeout-secs: 21600 + subscriptions: + - name: "*" + project: "*" + version: "*" + tracing: # If true, Feast will provide tracing data (using OpenTracing API) for various RPC method calls # which can be useful to debug performance issues and perform benchmarking @@ -17,41 +56,13 @@ feast: # The service name identifier for the tracing data service-name: feast_serving - store: - # Path containing the store configuration for this serving store. - config-path: ${FEAST_STORE_CONFIG_PATH:serving/sample_redis_config.yml} - # If serving redis, the redis pool max size - redis-pool-max-size: ${FEAST_REDIS_POOL_MAX_SIZE:128} - # If serving redis, the redis pool max idle conns - redis-pool-max-idle: ${FEAST_REDIS_POOL_MAX_IDLE:16} - - jobs: - # staging-location specifies the URI to store intermediate files for batch serving. - # Feast Serving client is expected to have read access to this staging location - # to download the batch features. - # - # For example: gs://mybucket/myprefix - # Please omit the trailing slash in the URI. - staging-location: ${FEAST_JOB_STAGING_LOCATION:} - # - # Retry options for BigQuery jobs: - bigquery-initial-retry-delay-secs: 1 - bigquery-total-timeout-secs: 21600 - # - # Type of store to store job metadata. This only needs to be set if the - # serving store type is Bigquery. - store-type: ${FEAST_JOB_STORE_TYPE:} - # - # Job store connection options. If the job store is redis, the following items are required: - # - # store-options: - # host: localhost - # port: 6379 - # Optionally, you can configure the connection pool with the following items: - # max-conn: 8 - # max-idle: 8 - # max-wait-millis: 50 - store-options: {} + # The job store is used to maintain job management state for Feast Serving. This is required when using certain + # historical stores like BigQuery. Only Redis is supported as a job store. + job_store: + # Redis host to connect to + redis_host: localhost + # Redis port to connect to + redis_port: 6379 grpc: # The port number Feast Serving GRPC service should listen on diff --git a/serving/src/test/java/feast/serving/controller/ServingServiceGRpcControllerTest.java b/serving/src/test/java/feast/serving/controller/ServingServiceGRpcControllerTest.java index f2c51bc7dd..d23f9da1d2 100644 --- a/serving/src/test/java/feast/serving/controller/ServingServiceGRpcControllerTest.java +++ b/serving/src/test/java/feast/serving/controller/ServingServiceGRpcControllerTest.java @@ -19,11 +19,11 @@ import static org.mockito.MockitoAnnotations.initMocks; import com.google.protobuf.Timestamp; -import feast.serving.FeastProperties; import feast.serving.ServingAPIProto.FeatureReference; import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest; import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; +import feast.serving.config.FeastProperties; import feast.serving.service.ServingService; import feast.types.ValueProto.Value; import io.grpc.StatusRuntimeException; diff --git a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java index 01c9304bda..f4f795ed32 100644 --- a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java +++ b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java @@ -38,15 +38,10 @@ import feast.serving.specs.CachedSpecService; import feast.serving.specs.CoreSpecService; import feast.storage.api.retriever.FeatureSetRequest; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -55,7 +50,6 @@ public class CachedSpecServiceTest { - private File configFile; private Store store; @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -66,27 +60,9 @@ public class CachedSpecServiceTest { private CachedSpecService cachedSpecService; @Before - public void setUp() throws IOException { + public void setUp() { initMocks(this); - configFile = File.createTempFile("serving", ".yml"); - String yamlString = - "name: SERVING\n" - + "type: REDIS\n" - + "redis_config:\n" - + " host: localhost\n" - + " port: 6379\n" - + "subscriptions:\n" - + "- project: project\n" - + " name: fs1\n" - + " version: \"*\"\n" - + "- project: project\n" - + " name: fs2\n" - + " version: \"*\""; - BufferedWriter writer = new BufferedWriter(new FileWriter(configFile)); - writer.write(yamlString); - writer.close(); - store = Store.newBuilder() .setName("SERVING") @@ -164,12 +140,7 @@ public void setUp() throws IOException { .build())) .thenReturn(ListFeatureSetsResponse.newBuilder().addAllFeatureSets(fs2FeatureSets).build()); - cachedSpecService = new CachedSpecService(coreService, configFile.toPath()); - } - - @After - public void tearDown() { - configFile.delete(); + cachedSpecService = new CachedSpecService(coreService, store); } @Test diff --git a/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java index 34bc31d2c2..23626c2cb8 100644 --- a/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java +++ b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java @@ -26,6 +26,7 @@ import redis.embedded.RedisServer; public class RedisBackedJobServiceTest { + private static Integer REDIS_PORT = 51235; private RedisServer redis; @@ -41,7 +42,7 @@ public void teardown() { } @Test - public void shouldRecoverIfRedisConnectionIsLost() throws IOException { + public void shouldRecoverIfRedisConnectionIsLost() { RedisClient client = RedisClient.create(RedisURI.create("localhost", REDIS_PORT)); RedisBackedJobService jobService = new RedisBackedJobService(client.connect(new ByteArrayCodec())); diff --git a/serving/src/test/java/feast/serving/util/mappers/YamlToProtoMapperTest.java b/serving/src/test/java/feast/serving/util/mappers/YamlToProtoMapperTest.java deleted file mode 100644 index 6f95f5307b..0000000000 --- a/serving/src/test/java/feast/serving/util/mappers/YamlToProtoMapperTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2019 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.serving.util.mappers; - -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.*; - -import feast.core.StoreProto.Store; -import feast.core.StoreProto.Store.RedisConfig; -import feast.core.StoreProto.Store.StoreType; -import feast.core.StoreProto.Store.Subscription; -import java.io.IOException; -import org.junit.Test; - -public class YamlToProtoMapperTest { - - @Test - public void shouldConvertYamlToProto() throws IOException { - String yaml = - "name: test\n" - + "type: REDIS\n" - + "redis_config:\n" - + " host: localhost\n" - + " port: 6379\n" - + "subscriptions:\n" - + "- project: \"*\"\n" - + " name: \"*\"\n" - + " version: \"*\"\n"; - Store store = YamlToProtoMapper.yamlToStoreProto(yaml); - Store expected = - Store.newBuilder() - .setName("test") - .setType(StoreType.REDIS) - .setRedisConfig(RedisConfig.newBuilder().setHost("localhost").setPort(6379)) - .addSubscriptions( - Subscription.newBuilder().setProject("*").setName("*").setVersion("*")) - .build(); - assertThat(store, equalTo(expected)); - } -} diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retriever/BigQueryHistoricalRetriever.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retriever/BigQueryHistoricalRetriever.java index 27ba07e82e..cd372511c0 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retriever/BigQueryHistoricalRetriever.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retriever/BigQueryHistoricalRetriever.java @@ -24,6 +24,7 @@ import com.google.cloud.bigquery.*; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import feast.serving.ServingAPIProto; import feast.serving.ServingAPIProto.DatasetSource; import feast.storage.api.retriever.FeatureSetRequest; @@ -33,6 +34,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.*; import java.util.stream.Collectors; @@ -48,6 +50,36 @@ public abstract class BigQueryHistoricalRetriever implements HistoricalRetriever public static final long TEMP_TABLE_EXPIRY_DURATION_MS = Duration.ofDays(1).toMillis(); private static final long SUBQUERY_TIMEOUT_SECS = 900; // 15 minutes + public static HistoricalRetriever create(Map config) { + + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + Storage storage = StorageOptions.getDefaultInstance().getService(); + + String jobStagingLocation = config.get("staging-location"); + if (!jobStagingLocation.contains("://")) { + throw new IllegalArgumentException( + String.format("jobStagingLocation is not a valid URI: %s", jobStagingLocation)); + } + if (jobStagingLocation.endsWith("/")) { + jobStagingLocation = jobStagingLocation.substring(0, jobStagingLocation.length() - 1); + } + if (!jobStagingLocation.startsWith("gs://")) { + throw new IllegalArgumentException( + "Store type BIGQUERY requires job staging location to be a valid and existing Google Cloud Storage URI. Invalid staging location: " + + jobStagingLocation); + } + + return builder() + .setBigquery(bigquery) + .setDatasetId(config.get("dataset_id")) + .setProjectId(config.get("project_id")) + .setJobStagingLocation(config.get("staging-location")) + .setInitialRetryDelaySecs(Integer.parseInt(config.get("bigquery-initial-retry-delay-secs"))) + .setTotalTimeoutSecs(Integer.parseInt(config.get("bigquery-total-timeout-secs"))) + .setStorage(storage) + .build(); + } + public abstract String projectId(); public abstract String datasetId(); diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java index 8860db2622..d155d3f1f5 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java @@ -57,9 +57,11 @@ public abstract class BigQueryFeatureSink implements FeatureSink { * your own client. * * @param config {@link BigQueryConfig} + * @param featureSetSpecs * @return {@link BigQueryFeatureSink.Builder} */ - public static BigQueryFeatureSink fromConfig(BigQueryConfig config) { + public static FeatureSink fromConfig( + BigQueryConfig config, Map featureSetSpecs) { return builder() .setDatasetId(config.getDatasetId()) .setProjectId(config.getProjectId()) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java index c8bb33de5f..0963731988 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java @@ -29,8 +29,11 @@ import feast.types.FieldProto.Field; import feast.types.ValueProto.Value; import io.grpc.Status; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.codec.ByteArrayCodec; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -41,10 +44,24 @@ public class RedisOnlineRetriever implements OnlineRetriever { private final RedisCommands syncCommands; - public RedisOnlineRetriever(StatefulRedisConnection connection) { + private RedisOnlineRetriever(StatefulRedisConnection connection) { this.syncCommands = connection.sync(); } + public static OnlineRetriever create(Map config) { + + StatefulRedisConnection connection = + RedisClient.create( + RedisURI.create(config.get("host"), Integer.parseInt(config.get("port")))) + .connect(new ByteArrayCodec()); + + return new RedisOnlineRetriever(connection); + } + + public static OnlineRetriever create(StatefulRedisConnection connection) { + return new RedisOnlineRetriever(connection); + } + /** * Gets online features from redis. This method returns a list of {@link FeatureRow}s * corresponding to each feature set spec. Each feature row in the list then corresponds to an diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java index 63c8c68d9b..8801460231 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java @@ -19,6 +19,7 @@ import com.google.auto.value.AutoValue; import feast.core.FeatureSetProto.FeatureSet; import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.StoreProto; import feast.core.StoreProto.Store.RedisConfig; import feast.storage.api.writer.FeatureSink; import feast.storage.api.writer.WriteResult; @@ -33,6 +34,18 @@ @AutoValue public abstract class RedisFeatureSink implements FeatureSink { + /** + * Initialize a {@link RedisFeatureSink.Builder} from a {@link StoreProto.Store.RedisConfig}. + * + * @param redisConfig {@link RedisConfig} + * @param featureSetSpecs + * @return {@link RedisFeatureSink.Builder} + */ + public static FeatureSink fromConfig( + RedisConfig redisConfig, Map featureSetSpecs) { + return builder().setFeatureSetSpecs(featureSetSpecs).setRedisConfig(redisConfig).build(); + } + public abstract RedisConfig getRedisConfig(); public abstract Map getFeatureSetSpecs(); @@ -54,6 +67,7 @@ public abstract static class Builder { @Override public void prepareWrite(FeatureSet featureSet) { + RedisClient redisClient = RedisClient.create(RedisURI.create(getRedisConfig().getHost(), getRedisConfig().getPort())); try { diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/RedisOnlineRetrieverTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/RedisOnlineRetrieverTest.java index 11c216c5a0..41bbfaa74c 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/RedisOnlineRetrieverTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/RedisOnlineRetrieverTest.java @@ -33,6 +33,7 @@ import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; import feast.storage.RedisProto.RedisKey; import feast.storage.api.retriever.FeatureSetRequest; +import feast.storage.api.retriever.OnlineRetriever; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FieldProto.Field; import feast.types.ValueProto.Value; @@ -52,14 +53,14 @@ public class RedisOnlineRetrieverTest { @Mock RedisCommands syncCommands; - private RedisOnlineRetriever redisOnlineRetriever; + private OnlineRetriever redisOnlineRetriever; private byte[][] redisKeyList; @Before public void setUp() { initMocks(this); when(connection.sync()).thenReturn(syncCommands); - redisOnlineRetriever = new RedisOnlineRetriever(connection); + redisOnlineRetriever = RedisOnlineRetriever.create(connection); redisKeyList = Lists.newArrayList( RedisKey.newBuilder() @@ -135,7 +136,7 @@ public void shouldReturnResponseWithValuesIfKeysPresent() { .map(x -> KeyValue.from(new byte[1], Optional.of(x.toByteArray()))) .collect(Collectors.toList()); - redisOnlineRetriever = new RedisOnlineRetriever(connection); + redisOnlineRetriever = RedisOnlineRetriever.create(connection); when(connection.sync()).thenReturn(syncCommands); when(syncCommands.mget(redisKeyList)).thenReturn(featureRowBytes); @@ -211,7 +212,7 @@ public void shouldReturnResponseWithUnsetValuesIfKeysNotPresent() { .collect(Collectors.toList()); featureRowBytes.add(null); - redisOnlineRetriever = new RedisOnlineRetriever(connection); + redisOnlineRetriever = RedisOnlineRetriever.create(connection); when(connection.sync()).thenReturn(syncCommands); when(syncCommands.mget(redisKeyList)).thenReturn(featureRowBytes);