Skip to content

Commit

Permalink
Add Cassandra Store (feast-dev#360)
Browse files Browse the repository at this point in the history
* create cassandra store for registration and ingestion

  * Downgraded Guava to 25
    * Beam 2.16 uses Cassandra 3.4.0 (So we cannot use Cassandra 4.x which shades Guava)
    * Cassandra 3.4.0 uses Guava version 16.0 but has a compatibility check to use a different class when we use version > 19.0.
    * Guava version 26 (version previously used) has breaking change to method used in compatibility check in Cassandra's dependency, hence version 25
  * Using Cassandra's internal field 'writetime' to handle out of order arrivals. When older records where the primary key already exist in Cassandra are ingested, they are set as tombstones in Cassandra and ignored on retrieval.
    * Aware that this way of handling out of order arrival is specific to Cassandra, but until we have a general way to handle out of order arrivals we need to do it this way
  * Cassandra's object mapper requires stating table's name along with @table annotation
    * table_name is still part of CassandraConfig for use in serving module
    * if user registers CassandraConfig with a different table name other than "feature_store", this will throw an exception

* add cassandra serving service

  * Abstracted OnlineServingService for common implementation of online serving stores
  * Complete tests remain in RedisServingServiceTest while Cassandra tests only contain basic tests for writes, and some other implementation specific to Cassandra

* update documentation to reflect current API and add cassandra store to docs

* add default expiration to cassandra config for when featureset does not have max age

* docs update, spotless check, and bug fix on cassandra schema
  • Loading branch information
smadarasmi authored and Wirick committed Jan 23, 2020
1 parent 0a8987e commit 98198d8
Show file tree
Hide file tree
Showing 41 changed files with 2,625 additions and 52 deletions.
13 changes: 7 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
#
#
# Copyright 2019 The Feast Authors
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# https://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

REGISTRY := gcr.io/pm-registry/feast
VERSION := latest
PROJECT_ROOT := $(shell git rev-parse --show-toplevel)

test:
Expand Down Expand Up @@ -52,4 +53,4 @@ build-html:
mkdir -p $(PROJECT_ROOT)/dist/grpc
cd $(PROJECT_ROOT)/protos && $(MAKE) gen-docs
cd $(PROJECT_ROOT)/sdk/python/docs && $(MAKE) html
cp -r $(PROJECT_ROOT)/sdk/python/docs/html/* $(PROJECT_ROOT)/dist/python
cp -r $(PROJECT_ROOT)/sdk/python/docs/html/* $(PROJECT_ROOT)/dist/python
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
<artifactId>protobuf-java-util</artifactId>
</dependency>

<!--compile 'com.google.guava:guava:26.0-jre'-->
<!--compile 'com.google.guava:guava:25.0-jre'-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Source getDefaultSource(FeastProperties feastProperties) {
} catch (InterruptedException | ExecutionException e) {
if (e.getCause().getClass().equals(TopicExistsException.class)) {
log.warn(
Strings.lenientFormat(
String.format(
"Unable to create topic %s in the feature stream, topic already exists, using existing topic.",
topicName));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void abortJob(String dataflowJobId) {
} catch (Exception e) {
log.error("Unable to drain job with id: {}, cause: {}", dataflowJobId, e.getMessage());
throw new RuntimeException(
Strings.lenientFormat("Unable to drain job with id: %s", dataflowJobId), e);
String.format("Unable to drain job with id: %s", dataflowJobId), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public DirectJobRegistry() {
public void add(DirectJob job) {
if (jobs.containsKey(job.getJobId())) {
throw new IllegalArgumentException(
Strings.lenientFormat("Job with id %s already exists and is running", job.getJobId()));
String.format("Job with id %s already exists and is running", job.getJobId()));
}
jobs.put(job.getJobId(), job);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void abortJob(String extId) {
job.abort();
} catch (IOException e) {
throw new RuntimeException(
Strings.lenientFormat("Unable to abort DirectRunner job %s", extId), e);
String.format("Unable to abort DirectRunner job %s", extId), e);
}
jobs.remove(extId);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/log/AuditLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static void log(
map.put("resource", resource.toString());
map.put("id", id);
map.put("action", action.toString());
map.put("detail", Strings.lenientFormat(detail, args));
map.put("detail", String.format(detail, args));
ObjectMessage msg = new ObjectMessage(map);

log.log(AUDIT_LEVEL, msg);
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,24 @@ private void updateFeatureSetStatuses(List<JobUpdateTask> jobUpdateTasks) {
}
}
}
}
/**
* Drain the given job. If this is successful, the job will start the draining process. When the
* draining process is complete, the job will be cleaned up and removed.
*
* <p>Batch jobs will be cancelled, as draining these jobs is not supported by beam.
*
* @param id feast-internal id of a job
*/
public void abortJob(String id) {
Optional<JobInfo> jobOptional = jobInfoRepository.findById(id);
if (!jobOptional.isPresent()) {
throw new RetrievalException(String.format("Unable to retrieve job with id %s", id));
}
JobInfo job = jobOptional.get();
if (JobStatus.getTerminalState().contains(job.getStatus())) {
throw new IllegalStateException("Unable to stop job already in terminal state");
}
ready.removeAll(pending);
ready.forEach(
fs -> {
Expand Down
80 changes: 80 additions & 0 deletions core/src/main/java/feast/core/service/JobStatusService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2019 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class JobStatusService {
//
// private JobInfoRepository jobInfoRepository;
// private MetricsRepository metricsRepository;
//
// @Autowired
// public JobStatusService(
// JobInfoRepository jobInfoRepository,
// MetricsRepository metricsRepository) {
// this.jobInfoRepository = jobInfoRepository;
// this.metricsRepository = metricsRepository;
// }
//
// /**
// * Lists all jobs registered to the db, sorted by provided <code>orderBy</code>
// *
// * @param orderBy list order
// * @return list of JobDetails
// */
// @Transactional
// public List<JobDetail> listJobs(Sort orderBy) {
// List<JobInfo> jobs = jobInfoRepository.findAll(orderBy);
// return jobs.stream().map(JobInfo::getJobDetail).collect(Collectors.toList());
// }
//
// /**
// * Lists all jobs registered to the db, sorted chronologically by creation time
// *
// * @return list of JobDetails
// */
// @Transactional
// public List<JobDetail> listJobs() {
// return listJobs(Sort.by(Sort.Direction.ASC, "created"));
// }
//
// /**
// * Gets information regarding a single job.
// *
// * @param id feast-internal job id
// * @return JobDetail for that job
// */
// @Transactional
// public JobDetail getJob(String id) {
// Optional<JobInfo> job = jobInfoRepository.findById(id);
// if (!job.isPresent()) {
// throw new RetrievalException(String.format("Unable to retrieve job with id %s",
// id));
// }
// JobDetail.Builder jobDetailBuilder = job.get().getJobDetail().toBuilder();
// List<Metrics> metrics = metricsRepository.findByJobInfo_Id(id);
// for (Metrics metric : metrics) {
// jobDetailBuilder.putMetrics(metric.getName(), metric.getValue());
// }
// return jobDetailBuilder.build();
// }

}
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/util/TypeConversion.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static String convertMapToJsonString(Map<String, String> map) {
public static String[] convertMapToArgs(Map<String, String> map) {
List<String> args = new ArrayList<>();
for (Entry<String, String> arg : map.entrySet()) {
args.add(Strings.lenientFormat("--%s=%s", arg.getKey(), arg.getValue()));
args.add(String.format("--%s=%s", arg.getKey(), arg.getValue()));
}
return args.toArray(new String[] {});
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/feast/core/validators/MatchersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void checkUpperSnakeCaseShouldPassForLegitUpperSnakeCaseWithNumbers() {
public void checkUpperSnakeCaseShouldThrowIllegalArgumentExceptionWithFieldForInvalidString() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
Strings.lenientFormat(
String.format(
"invalid value for field %s: %s",
"someField",
"argument must be in upper snake case, and cannot include any special characters."));
Expand All @@ -61,7 +61,7 @@ public void checkLowerSnakeCaseShouldPassForLegitLowerSnakeCase() {
public void checkLowerSnakeCaseShouldThrowIllegalArgumentExceptionWithFieldForInvalidString() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
Strings.lenientFormat(
String.format(
"invalid value for field %s: %s",
"someField",
"argument must be in lower snake case, and cannot include any special characters."));
Expand Down
96 changes: 84 additions & 12 deletions docs/contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ The following guide will help you quickly run Feast in your local machine.

The main components of Feast are:

* **Feast Core** handles FeatureSpec registration, starts and monitors Ingestion
* **Feast Core** handles FeatureSpec registration, starts and monitors Ingestion

jobs and ensures that Feast internal metadata is consistent.

* **Feast Ingestion** subscribes to streams of FeatureRow and writes the feature

values to registered Stores.
values to registered Stores.

* **Feast Serving** handles requests for features values retrieval from the end users.

Expand All @@ -29,13 +29,13 @@ The main components of Feast are:

> **Assumptions:**
>
> 1. Postgres is running in "localhost:5432" and has a database called "postgres" which
> 1. Postgres is running in "localhost:5432" and has a database called "postgres" which
>
> can be accessed with credentials user "postgres" and password "password".
> can be accessed with credentials user "postgres" and password "password".
>
> To use different database name and credentials, please update
> To use different database name and credentials, please update
>
> "$FEAST\_HOME/core/src/main/resources/application.yml"
> "$FEAST\_HOME/core/src/main/resources/application.yml"
>
> or set these environment variables: DB\_HOST, DB\_USERNAME, DB\_PASSWORD.
>
Expand All @@ -52,16 +52,17 @@ cd feast
#### Starting Feast Core

```text
# Please check the default configuration for Feast Core in
# Please check the default configuration for Feast Core in
# "$FEAST_HOME/core/src/main/resources/application.yml" and update it accordingly.
#
#
# Start Feast Core GRPC server on localhost:6565
mvn --projects core spring-boot:run
# If Feast Core starts successfully, verify the correct Stores are registered
# correctly, for example by using grpc_cli.
grpc_cli call localhost:6565 GetStores ''
grpc_cli call localhost:6565 ListStores ''
<<<<<<< HEAD:docs/contributing.md
# Should return something similar to the following.
# Note that you should change BigQuery projectId and datasetId accordingly
# in "$FEAST_HOME/core/src/main/resources/application.yml"
Expand Down Expand Up @@ -91,12 +92,19 @@ store {
project_id: "my-google-project-id"
dataset_id: "my-bigquery-dataset-id"
}
# Should return something similar to the following if you have not updated any stores
{
"store": []
}
```

#### Starting Feast Serving

Feast Serving requires administrators to provide an **existing** store name in Feast. An instance of Feast Serving can only retrieve features from a **single** store.
Feast Serving requires administrators to provide an **existing** store name in Feast.
An instance of Feast Serving can only retrieve features from a **single** store.
> In order to retrieve features from multiple stores you must start **multiple**
instances of Feast serving. If you start multiple Feast serving on a single host,
make sure that they are listening on different ports.

> In order to retrieve features from multiple stores you must start **multiple** instances of Feast serving. If you start multiple Feast serving on a single host, make sure that they are listening on different ports.
Expand All @@ -111,12 +119,76 @@ grpc_cli call localhost:6566 GetFeastServingType ''
type: FEAST_SERVING_TYPE_ONLINE
```

#### Updating a store

Create a new Store by sending a request to Feast Core.

```
# Example of updating a redis store
grpc_cli call localhost:6565 UpdateStore '
store {
name: "SERVING"
type: REDIS
subscriptions {
name: "*"
version: ">0"
}
redis_config {
host: "localhost"
port: 6379
}
}
'
# Other supported stores examples (replacing redis_config):
# BigQuery
bigquery_config {
project_id: "my-google-project-id"
dataset_id: "my-bigquery-dataset-id"
}
# Cassandra: two options in cassandra depending on replication strategy
# See details: https://docs.datastax.com/en/cassandra/3.0/cassandra/architecture/archDataDistributeReplication.html
#
# Please note that table name must be "feature_store" as is specified in the @Table annotation of the
# datastax object mapper
# SimpleStrategy
cassandra_config {
bootstrap_hosts: "localhost"
port: 9042
keyspace: "feast"
table_name: "feature_store"
replication_options {
class: "SimpleStrategy"
replication_factor: 1
}
}
# NetworkTopologyStrategy
cassandra_config {
bootstrap_hosts: "localhost"
port: 9042
keyspace: "feast"
table_name: "feature_store"
replication_options {
class: "NetworkTopologyStrategy"
east: 2
west: 2
}
}
# To check that the Stores has been updated correctly.
grpc_cli call localhost:6565 ListStores ''
```

#### Registering a FeatureSet

Create a new FeatureSet on Feast by sending a request to Feast Core. When a feature set is successfully registered, Feast Core will start an **ingestion** job that listens for new features in the FeatureSet. Note that Feast currently only supports source of type "KAFKA", so you must have access to a running Kafka broker to register a FeatureSet successfully.

```text
# Example of registering a new driver feature set
# Example of registering a new driver feature set
# Note the source value, it assumes that you have access to a Kafka broker
# running on localhost:9092
Expand Down Expand Up @@ -156,7 +228,7 @@ grpc_cli call localhost:6565 GetFeatureSets ''
# and written to the registered stores.
# Make sure the value here is the topic assigned to the feature set
# ... producer.send("feast-driver-features" ...)
#
#
# Install Python SDK to help writing FeatureRow messages to Kafka
cd $FEAST_HOME/sdk/python
pip3 install -e .
Expand Down
9 changes: 9 additions & 0 deletions infra/charts/feast/charts/feast-serving/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ application.yaml:
config-path: /etc/feast/feast-serving/store.yaml
redis-pool-max-size: 128
redis-pool-max-idle: 64
cassandra-pool-core-local-connections: 1
cassandra-pool-max-local-connections: 1
cassandra-pool-core-remote-connections: 1
cassandra-pool-max-remote-connections: 1
cassandra-pool-max-requests-local-connection: 32768
cassandra-pool-max-requests-remote-connection: 2048
cassandra-pool-new-local-connection-threshold: 30000
cassandra-pool-new-remote-connection-threshold: 400
cassandra-pool-timeout-millis: 0
jobs:
staging-location: ""
store-type: ""
Expand Down
Loading

0 comments on commit 98198d8

Please sign in to comment.