Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cassandra Store #360

Merged
merged 5 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 69 additions & 30 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,42 +54,18 @@ mvn --projects core spring-boot:run

# If Feast Core starts successfully, verify the correct Stores are registered
# correctly, for example by using grpc_cli.
grpc_cli call localhost:6565 GetStores ''
grpc_cli call localhost:6565 ListStores ''

# Should return something similar to the following.
# Note that you should change BigQuery projectId and datasetId accordingly
# in "$FEAST_HOME/core/src/main/resources/application.yml"

store {
name: "SERVING"
type: REDIS
subscriptions {
name: "*"
version: ">0"
}
redis_config {
host: "localhost"
port: 6379
}
}
store {
name: "WAREHOUSE"
type: BIGQUERY
subscriptions {
name: "*"
version: ">0"
}
bigquery_config {
project_id: "my-google-project-id"
dataset_id: "my-bigquery-dataset-id"
}
# Should return something similar to the following if you have not updated any stores
{
"store": []
}
```

#### Starting Feast Serving

Feast Serving requires administrators to provide an **existing** store name in Feast.
An instance of Feast Serving can only retrieve features from a **single** store.
Feast Serving requires administrators to provide an **existing** store name in Feast.
An instance of Feast Serving can only retrieve features from a **single** store.
> In order to retrieve features from multiple stores you must start **multiple**
instances of Feast serving. If you start multiple Feast serving on a single host,
make sure that they are listening on different ports.
Expand All @@ -105,6 +81,69 @@ grpc_cli call localhost:6566 GetFeastServingType ''
type: FEAST_SERVING_TYPE_ONLINE
```

#### Updating a store

Create a new Store by sending a request to Feast Core.

```
# Example of updating a redis store

grpc_cli call localhost:6565 UpdateStore '
store {
name: "SERVING"
type: REDIS
subscriptions {
name: "*"
version: ">0"
}
redis_config {
host: "localhost"
port: 6379
}
}
'

# Other supported stores examples (replacing redis_config):
# BigQuery
bigquery_config {
project_id: "my-google-project-id"
dataset_id: "my-bigquery-dataset-id"
}

# Cassandra: two options in cassandra depending on replication strategy
# See details: https://docs.datastax.com/en/cassandra/3.0/cassandra/architecture/archDataDistributeReplication.html
#
# Please note that table name must be "feature_store" as is specified in the @Table annotation of the
# datastax object mapper

# SimpleStrategy
cassandra_config {
bootstrap_hosts: "localhost"
port: 9042
keyspace: "feast"
table_name: "feature_store"
replication_options {
class: "SimpleStrategy"
replication_factor: 1
}
}

# NetworkTopologyStrategy
cassandra_config {
bootstrap_hosts: "localhost"
port: 9042
keyspace: "feast"
table_name: "feature_store"
replication_options {
class: "NetworkTopologyStrategy"
east: 2
west: 2
}
}

# To check that the Stores has been updated correctly.
grpc_cli call localhost:6565 ListStores ''
```

#### Registering a FeatureSet

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
<artifactId>protobuf-java-util</artifactId>
</dependency>

<!--compile 'com.google.guava:guava:26.0-jre'-->
<!--compile 'com.google.guava:guava:25.0-jre'-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Source getDefaultSource(FeastProperties feastProperties) {
} catch (InterruptedException | ExecutionException e) {
if (e.getCause().getClass().equals(TopicExistsException.class)) {
log.warn(
Strings.lenientFormat(
String.format(
"Unable to create topic %s in the feature stream, topic already exists, using existing topic.",
topicName));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void abortJob(String dataflowJobId) {
} catch (Exception e) {
log.error("Unable to drain job with id: {}, cause: {}", dataflowJobId, e.getMessage());
throw new RuntimeException(
Strings.lenientFormat("Unable to drain job with id: %s", dataflowJobId), e);
String.format("Unable to drain job with id: %s", dataflowJobId), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public DirectJobRegistry() {
public void add(DirectJob job) {
if (jobs.containsKey(job.getJobId())) {
throw new IllegalArgumentException(
Strings.lenientFormat("Job with id %s already exists and is running", job.getJobId()));
String.format("Job with id %s already exists and is running", job.getJobId()));
}
jobs.put(job.getJobId(), job);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void abortJob(String extId) {
job.abort();
} catch (IOException e) {
throw new RuntimeException(
Strings.lenientFormat("Unable to abort DirectRunner job %s", extId), e);
String.format("Unable to abort DirectRunner job %s", extId), e);
}
jobs.remove(extId);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/log/AuditLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static void log(
map.put("resource", resource.toString());
map.put("id", id);
map.put("action", action.toString());
map.put("detail", Strings.lenientFormat(detail, args));
map.put("detail", String.format(detail, args));
ObjectMessage msg = new ObjectMessage(map);

log.log(AUDIT_LEVEL, msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private JobInfo updateJob(
public void abortJob(String id) {
Optional<JobInfo> jobOptional = jobInfoRepository.findById(id);
if (!jobOptional.isPresent()) {
throw new RetrievalException(Strings.lenientFormat("Unable to retrieve job with id %s", id));
throw new RetrievalException(String.format("Unable to retrieve job with id %s", id));
}
JobInfo job = jobOptional.get();
if (JobStatus.getTerminalState().contains(job.getStatus())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class JobStatusService {
// public JobDetail getJob(String id) {
// Optional<JobInfo> job = jobInfoRepository.findById(id);
// if (!job.isPresent()) {
// throw new RetrievalException(Strings.lenientFormat("Unable to retrieve job with id %s",
// throw new RetrievalException(String.format("Unable to retrieve job with id %s",
// id));
// }
// JobDetail.Builder jobDetailBuilder = job.get().getJobDetail().toBuilder();
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/util/TypeConversion.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static String convertMapToJsonString(Map<String, String> map) {
public static String[] convertMapToArgs(Map<String, String> map) {
List<String> args = new ArrayList<>();
for (Entry<String, String> arg : map.entrySet()) {
args.add(Strings.lenientFormat("--%s=%s", arg.getKey(), arg.getValue()));
args.add(String.format("--%s=%s", arg.getKey(), arg.getValue()));
}
return args.toArray(new String[] {});
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/feast/core/validators/MatchersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void checkUpperSnakeCaseShouldPassForLegitUpperSnakeCaseWithNumbers() {
public void checkUpperSnakeCaseShouldThrowIllegalArgumentExceptionWithFieldForInvalidString() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
Strings.lenientFormat(
String.format(
"invalid value for field %s: %s",
"someField",
"argument must be in upper snake case, and cannot include any special characters."));
Expand All @@ -61,7 +61,7 @@ public void checkLowerSnakeCaseShouldPassForLegitLowerSnakeCase() {
public void checkLowerSnakeCaseShouldThrowIllegalArgumentExceptionWithFieldForInvalidString() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
Strings.lenientFormat(
String.format(
"invalid value for field %s: %s",
"someField",
"argument must be in lower snake case, and cannot include any special characters."));
Expand Down
9 changes: 9 additions & 0 deletions infra/charts/feast/charts/feast-serving/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ application.yaml:
config-path: /etc/feast/feast-serving/store.yaml
redis-pool-max-size: 128
redis-pool-max-idle: 64
cassandra-pool-core-local-connections: 1
cassandra-pool-max-local-connections: 1
cassandra-pool-core-remote-connections: 1
cassandra-pool-max-remote-connections: 1
cassandra-pool-max-requests-local-connection: 32768
cassandra-pool-max-requests-remote-connection: 2048
cassandra-pool-new-local-connection-threshold: 30000
cassandra-pool-new-remote-connection-threshold: 400
cassandra-pool-timeout-millis: 0
jobs:
staging-location: ""
store-type: ""
Expand Down
13 changes: 13 additions & 0 deletions ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@
<version>${org.apache.beam.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-cassandra</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
Expand Down Expand Up @@ -245,6 +251,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit-shaded</artifactId>
<version>3.11.2.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2019 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.transform;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.mapping.Mapper.Option;
import feast.store.serving.cassandra.CassandraMutation;
import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.Future;
import org.apache.beam.sdk.io.cassandra.Mapper;

/** A {@link Mapper} that supports writing {@code CassandraMutation}s with the Beam Cassandra IO. */
public class CassandraMutationMapper implements Mapper<CassandraMutation>, Serializable {

private com.datastax.driver.mapping.Mapper<CassandraMutation> mapper;

CassandraMutationMapper(com.datastax.driver.mapping.Mapper<CassandraMutation> mapper) {
this.mapper = mapper;
}

@Override
public Iterator<CassandraMutation> map(ResultSet resultSet) {
throw new UnsupportedOperationException("Only supports write operations");
}

@Override
public Future<Void> deleteAsync(CassandraMutation entityClass) {
throw new UnsupportedOperationException("Only supports write operations");
}

/**
* Saves records to Cassandra with: - Cassandra's internal write time set to the timestamp of the
* record. Cassandra will not override an existing record with the same partition key if the write
* time is older - Expiration of the record
*
* @param entityClass Cassandra's object mapper
*/
@Override
public Future<Void> saveAsync(CassandraMutation entityClass) {
return mapper.saveAsync(
entityClass,
Option.timestamp(entityClass.getWriteTime()),
Option.ttl(entityClass.getTtl()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2019 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.transform;

import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.MappingManager;
import feast.store.serving.cassandra.CassandraMutation;
import org.apache.beam.sdk.io.cassandra.Mapper;
import org.apache.beam.sdk.transforms.SerializableFunction;

public class CassandraMutationMapperFactory implements SerializableFunction<Session, Mapper> {

private transient MappingManager mappingManager;
private Class<CassandraMutation> entityClass;

public CassandraMutationMapperFactory(Class<CassandraMutation> entityClass) {
this.entityClass = entityClass;
}

@Override
public Mapper apply(Session session) {
if (mappingManager == null) {
this.mappingManager = new MappingManager(session);
}

return new CassandraMutationMapper(mappingManager.mapper(entityClass));
}
}
Loading