From 98198d847a48d64177d6fcdf6a137705015f8982 Mon Sep 17 00:00:00 2001 From: smadarasmi Date: Wed, 8 Jan 2020 09:28:04 +0700 Subject: [PATCH] Add Cassandra Store (#360) * create cassandra store for registration and ingestion * Downgraded Guava to 25 * Beam 2.16 uses Cassandra 3.4.0 (So we cannot use Cassandra 4.x which shades Guava) * Cassandra 3.4.0 uses Guava version 16.0 but has a compatibility check to use a different class when we use version > 19.0. * Guava version 26 (version previously used) has breaking change to method used in compatibility check in Cassandra's dependency, hence version 25 * Using Cassandra's internal field 'writetime' to handle out of order arrivals. When older records where the primary key already exist in Cassandra are ingested, they are set as tombstones in Cassandra and ignored on retrieval. * Aware that this way of handling out of order arrival is specific to Cassandra, but until we have a general way to handle out of order arrivals we need to do it this way * Cassandra's object mapper requires stating table's name along with @Table annotation * table_name is still part of CassandraConfig for use in serving module * if user registers CassandraConfig with a different table name other than "feature_store", this will throw an exception * add cassandra serving service * Abstracted OnlineServingService for common implementation of online serving stores * Complete tests remain in RedisServingServiceTest while Cassandra tests only contain basic tests for writes, and some other implementation specific to Cassandra * update documentation to reflect current API and add cassandra store to docs * add default expiration to cassandra config for when featureset does not have max age * docs update, spotless check, and bug fix on cassandra schema --- Makefile | 13 +- core/pom.xml | 2 +- .../core/config/FeatureStreamConfig.java | 2 +- .../core/job/dataflow/DataflowJobManager.java | 2 +- .../core/job/direct/DirectJobRegistry.java | 2 +- .../job/direct/DirectRunnerJobManager.java | 2 +- .../main/java/feast/core/log/AuditLogger.java | 2 +- .../core/service/JobCoordinatorService.java | 18 ++ .../feast/core/service/JobStatusService.java | 80 ++++++ .../java/feast/core/util/TypeConversion.java | 2 +- .../feast/core/validators/MatchersTest.java | 4 +- docs/contributing.md | 96 ++++++- .../feast/charts/feast-serving/values.yaml | 9 + ingestion/pom.xml | 13 + .../transform/CassandraMutationMapper.java | 60 +++++ .../CassandraMutationMapperFactory.java | 42 +++ .../ingestion/transform/WriteToStore.java | 26 ++ .../java/feast/ingestion/utils/StoreUtil.java | 87 ++++++ .../java/feast/ingestion/utils/ValueUtil.java | 57 ++++ .../serving/cassandra/CassandraMutation.java | 121 +++++++++ .../FeatureRowToCassandraMutationDoFn.java | 85 ++++++ .../transform/CassandraWriteToStoreIT.java | 248 ++++++++++++++++++ .../ingestion/util/CassandraStoreUtilIT.java | 167 ++++++++++++ ...FeatureRowToCassandraMutationDoFnTest.java | 225 ++++++++++++++++ .../src/test/java/feast/test/TestUtil.java | 130 +++++++++ pom.xml | 2 +- protos/feast/core/Store.proto | 33 ++- serving/pom.xml | 21 +- serving/sample_cassandra_config.yml | 13 + .../java/feast/serving/FeastProperties.java | 85 ++++++ .../configuration/ServingServiceConfig.java | 62 +++++ .../service/CassandraServingService.java | 153 +++++++++++ .../serving/service/OnlineServingService.java | 236 +++++++++++++++++ .../serving/service/RedisServingService.java | 36 ++- .../java/feast/serving/util/ValueUtil.java | 53 ++++ serving/src/main/resources/application.yml | 20 +- .../CassandraServingServiceITTest.java | 244 +++++++++++++++++ .../service/CassandraServingServiceTest.java | 117 +++++++++ .../service/RedisServingServiceTest.java | 18 +- .../java/feast/serving/test/TestUtil.java | 81 ++++++ .../embedded-store/LoadCassandra.cql | 8 + 41 files changed, 2625 insertions(+), 52 deletions(-) create mode 100644 core/src/main/java/feast/core/service/JobStatusService.java create mode 100644 ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapper.java create mode 100644 ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapperFactory.java create mode 100644 ingestion/src/main/java/feast/ingestion/utils/ValueUtil.java create mode 100644 ingestion/src/main/java/feast/store/serving/cassandra/CassandraMutation.java create mode 100644 ingestion/src/main/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFn.java create mode 100644 ingestion/src/test/java/feast/ingestion/transform/CassandraWriteToStoreIT.java create mode 100644 ingestion/src/test/java/feast/ingestion/util/CassandraStoreUtilIT.java create mode 100644 ingestion/src/test/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFnTest.java create mode 100644 serving/sample_cassandra_config.yml create mode 100644 serving/src/main/java/feast/serving/service/CassandraServingService.java create mode 100644 serving/src/main/java/feast/serving/service/OnlineServingService.java create mode 100644 serving/src/main/java/feast/serving/util/ValueUtil.java create mode 100644 serving/src/test/java/feast/serving/service/CassandraServingServiceITTest.java create mode 100644 serving/src/test/java/feast/serving/service/CassandraServingServiceTest.java create mode 100644 serving/src/test/java/feast/serving/test/TestUtil.java create mode 100644 serving/src/test/resources/embedded-store/LoadCassandra.cql diff --git a/Makefile b/Makefile index de61fe2892..ef2c54fc17 100644 --- a/Makefile +++ b/Makefile @@ -1,19 +1,20 @@ -# +# # Copyright 2019 The Feast Authors -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # - +REGISTRY := gcr.io/pm-registry/feast +VERSION := latest PROJECT_ROOT := $(shell git rev-parse --show-toplevel) test: @@ -52,4 +53,4 @@ build-html: mkdir -p $(PROJECT_ROOT)/dist/grpc cd $(PROJECT_ROOT)/protos && $(MAKE) gen-docs cd $(PROJECT_ROOT)/sdk/python/docs && $(MAKE) html - cp -r $(PROJECT_ROOT)/sdk/python/docs/html/* $(PROJECT_ROOT)/dist/python \ No newline at end of file + cp -r $(PROJECT_ROOT)/sdk/python/docs/html/* $(PROJECT_ROOT)/dist/python diff --git a/core/pom.xml b/core/pom.xml index e1567ae8fe..b56f74dea2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -110,7 +110,7 @@ protobuf-java-util - + com.google.guava guava diff --git a/core/src/main/java/feast/core/config/FeatureStreamConfig.java b/core/src/main/java/feast/core/config/FeatureStreamConfig.java index 45de359ac7..3743b60af4 100644 --- a/core/src/main/java/feast/core/config/FeatureStreamConfig.java +++ b/core/src/main/java/feast/core/config/FeatureStreamConfig.java @@ -69,7 +69,7 @@ public Source getDefaultSource(FeastProperties feastProperties) { } catch (InterruptedException | ExecutionException e) { if (e.getCause().getClass().equals(TopicExistsException.class)) { log.warn( - Strings.lenientFormat( + String.format( "Unable to create topic %s in the feature stream, topic already exists, using existing topic.", topicName)); } else { diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 7115ee3f66..05873dec8b 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -149,7 +149,7 @@ public void abortJob(String dataflowJobId) { } catch (Exception e) { log.error("Unable to drain job with id: {}, cause: {}", dataflowJobId, e.getMessage()); throw new RuntimeException( - Strings.lenientFormat("Unable to drain job with id: %s", dataflowJobId), e); + String.format("Unable to drain job with id: %s", dataflowJobId), e); } } diff --git a/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java b/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java index f7ded9fec7..8f6c87053f 100644 --- a/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java +++ b/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java @@ -41,7 +41,7 @@ public DirectJobRegistry() { public void add(DirectJob job) { if (jobs.containsKey(job.getJobId())) { throw new IllegalArgumentException( - Strings.lenientFormat("Job with id %s already exists and is running", job.getJobId())); + String.format("Job with id %s already exists and is running", job.getJobId())); } jobs.put(job.getJobId(), job); } diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index b01d37d892..61c7eb7e7d 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -149,7 +149,7 @@ public void abortJob(String extId) { job.abort(); } catch (IOException e) { throw new RuntimeException( - Strings.lenientFormat("Unable to abort DirectRunner job %s", extId), e); + String.format("Unable to abort DirectRunner job %s", extId), e); } jobs.remove(extId); } diff --git a/core/src/main/java/feast/core/log/AuditLogger.java b/core/src/main/java/feast/core/log/AuditLogger.java index 5349b5548b..2c60307805 100644 --- a/core/src/main/java/feast/core/log/AuditLogger.java +++ b/core/src/main/java/feast/core/log/AuditLogger.java @@ -44,7 +44,7 @@ public static void log( map.put("resource", resource.toString()); map.put("id", id); map.put("action", action.toString()); - map.put("detail", Strings.lenientFormat(detail, args)); + map.put("detail", String.format(detail, args)); ObjectMessage msg = new ObjectMessage(map); log.log(AUDIT_LEVEL, msg); diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 3678135a52..761d199880 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -170,6 +170,24 @@ private void updateFeatureSetStatuses(List jobUpdateTasks) { } } } + } + /** + * Drain the given job. If this is successful, the job will start the draining process. When the + * draining process is complete, the job will be cleaned up and removed. + * + *

Batch jobs will be cancelled, as draining these jobs is not supported by beam. + * + * @param id feast-internal id of a job + */ + public void abortJob(String id) { + Optional jobOptional = jobInfoRepository.findById(id); + if (!jobOptional.isPresent()) { + throw new RetrievalException(String.format("Unable to retrieve job with id %s", id)); + } + JobInfo job = jobOptional.get(); + if (JobStatus.getTerminalState().contains(job.getStatus())) { + throw new IllegalStateException("Unable to stop job already in terminal state"); + } ready.removeAll(pending); ready.forEach( fs -> { diff --git a/core/src/main/java/feast/core/service/JobStatusService.java b/core/src/main/java/feast/core/service/JobStatusService.java new file mode 100644 index 0000000000..26d81647fa --- /dev/null +++ b/core/src/main/java/feast/core/service/JobStatusService.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class JobStatusService { + // + // private JobInfoRepository jobInfoRepository; + // private MetricsRepository metricsRepository; + // + // @Autowired + // public JobStatusService( + // JobInfoRepository jobInfoRepository, + // MetricsRepository metricsRepository) { + // this.jobInfoRepository = jobInfoRepository; + // this.metricsRepository = metricsRepository; + // } + // + // /** + // * Lists all jobs registered to the db, sorted by provided orderBy + // * + // * @param orderBy list order + // * @return list of JobDetails + // */ + // @Transactional + // public List listJobs(Sort orderBy) { + // List jobs = jobInfoRepository.findAll(orderBy); + // return jobs.stream().map(JobInfo::getJobDetail).collect(Collectors.toList()); + // } + // + // /** + // * Lists all jobs registered to the db, sorted chronologically by creation time + // * + // * @return list of JobDetails + // */ + // @Transactional + // public List listJobs() { + // return listJobs(Sort.by(Sort.Direction.ASC, "created")); + // } + // + // /** + // * Gets information regarding a single job. + // * + // * @param id feast-internal job id + // * @return JobDetail for that job + // */ + // @Transactional + // public JobDetail getJob(String id) { + // Optional job = jobInfoRepository.findById(id); + // if (!job.isPresent()) { + // throw new RetrievalException(String.format("Unable to retrieve job with id %s", + // id)); + // } + // JobDetail.Builder jobDetailBuilder = job.get().getJobDetail().toBuilder(); + // List metrics = metricsRepository.findByJobInfo_Id(id); + // for (Metrics metric : metrics) { + // jobDetailBuilder.putMetrics(metric.getName(), metric.getValue()); + // } + // return jobDetailBuilder.build(); + // } + +} diff --git a/core/src/main/java/feast/core/util/TypeConversion.java b/core/src/main/java/feast/core/util/TypeConversion.java index e01a551135..a7dd2b0d2a 100644 --- a/core/src/main/java/feast/core/util/TypeConversion.java +++ b/core/src/main/java/feast/core/util/TypeConversion.java @@ -85,7 +85,7 @@ public static String convertMapToJsonString(Map map) { public static String[] convertMapToArgs(Map map) { List args = new ArrayList<>(); for (Entry arg : map.entrySet()) { - args.add(Strings.lenientFormat("--%s=%s", arg.getKey(), arg.getValue())); + args.add(String.format("--%s=%s", arg.getKey(), arg.getValue())); } return args.toArray(new String[] {}); } diff --git a/core/src/test/java/feast/core/validators/MatchersTest.java b/core/src/test/java/feast/core/validators/MatchersTest.java index 774e58c7a8..13c9e006a4 100644 --- a/core/src/test/java/feast/core/validators/MatchersTest.java +++ b/core/src/test/java/feast/core/validators/MatchersTest.java @@ -43,7 +43,7 @@ public void checkUpperSnakeCaseShouldPassForLegitUpperSnakeCaseWithNumbers() { public void checkUpperSnakeCaseShouldThrowIllegalArgumentExceptionWithFieldForInvalidString() { exception.expect(IllegalArgumentException.class); exception.expectMessage( - Strings.lenientFormat( + String.format( "invalid value for field %s: %s", "someField", "argument must be in upper snake case, and cannot include any special characters.")); @@ -61,7 +61,7 @@ public void checkLowerSnakeCaseShouldPassForLegitLowerSnakeCase() { public void checkLowerSnakeCaseShouldThrowIllegalArgumentExceptionWithFieldForInvalidString() { exception.expect(IllegalArgumentException.class); exception.expectMessage( - Strings.lenientFormat( + String.format( "invalid value for field %s: %s", "someField", "argument must be in lower snake case, and cannot include any special characters.")); diff --git a/docs/contributing.md b/docs/contributing.md index 38caffd654..0ca107ff10 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -6,13 +6,13 @@ The following guide will help you quickly run Feast in your local machine. The main components of Feast are: -* **Feast Core** handles FeatureSpec registration, starts and monitors Ingestion +* **Feast Core** handles FeatureSpec registration, starts and monitors Ingestion jobs and ensures that Feast internal metadata is consistent. * **Feast Ingestion** subscribes to streams of FeatureRow and writes the feature - values to registered Stores. + values to registered Stores. * **Feast Serving** handles requests for features values retrieval from the end users. @@ -29,13 +29,13 @@ The main components of Feast are: > **Assumptions:** > -> 1. Postgres is running in "localhost:5432" and has a database called "postgres" which +> 1. Postgres is running in "localhost:5432" and has a database called "postgres" which > -> can be accessed with credentials user "postgres" and password "password". +> can be accessed with credentials user "postgres" and password "password". > -> To use different database name and credentials, please update +> To use different database name and credentials, please update > -> "$FEAST\_HOME/core/src/main/resources/application.yml" +> "$FEAST\_HOME/core/src/main/resources/application.yml" > > or set these environment variables: DB\_HOST, DB\_USERNAME, DB\_PASSWORD. > @@ -52,16 +52,17 @@ cd feast #### Starting Feast Core ```text -# Please check the default configuration for Feast Core in +# Please check the default configuration for Feast Core in # "$FEAST_HOME/core/src/main/resources/application.yml" and update it accordingly. -# +# # Start Feast Core GRPC server on localhost:6565 mvn --projects core spring-boot:run # If Feast Core starts successfully, verify the correct Stores are registered # correctly, for example by using grpc_cli. -grpc_cli call localhost:6565 GetStores '' +grpc_cli call localhost:6565 ListStores '' +<<<<<<< HEAD:docs/contributing.md # Should return something similar to the following. # Note that you should change BigQuery projectId and datasetId accordingly # in "$FEAST_HOME/core/src/main/resources/application.yml" @@ -91,12 +92,19 @@ store { project_id: "my-google-project-id" dataset_id: "my-bigquery-dataset-id" } +# Should return something similar to the following if you have not updated any stores +{ + "store": [] } ``` #### Starting Feast Serving -Feast Serving requires administrators to provide an **existing** store name in Feast. An instance of Feast Serving can only retrieve features from a **single** store. +Feast Serving requires administrators to provide an **existing** store name in Feast. +An instance of Feast Serving can only retrieve features from a **single** store. +> In order to retrieve features from multiple stores you must start **multiple** +instances of Feast serving. If you start multiple Feast serving on a single host, +make sure that they are listening on different ports. > In order to retrieve features from multiple stores you must start **multiple** instances of Feast serving. If you start multiple Feast serving on a single host, make sure that they are listening on different ports. @@ -111,12 +119,76 @@ grpc_cli call localhost:6566 GetFeastServingType '' type: FEAST_SERVING_TYPE_ONLINE ``` +#### Updating a store + +Create a new Store by sending a request to Feast Core. + +``` +# Example of updating a redis store + +grpc_cli call localhost:6565 UpdateStore ' +store { + name: "SERVING" + type: REDIS + subscriptions { + name: "*" + version: ">0" + } + redis_config { + host: "localhost" + port: 6379 + } +} +' + +# Other supported stores examples (replacing redis_config): +# BigQuery +bigquery_config { + project_id: "my-google-project-id" + dataset_id: "my-bigquery-dataset-id" +} + +# Cassandra: two options in cassandra depending on replication strategy +# See details: https://docs.datastax.com/en/cassandra/3.0/cassandra/architecture/archDataDistributeReplication.html +# +# Please note that table name must be "feature_store" as is specified in the @Table annotation of the +# datastax object mapper + +# SimpleStrategy +cassandra_config { + bootstrap_hosts: "localhost" + port: 9042 + keyspace: "feast" + table_name: "feature_store" + replication_options { + class: "SimpleStrategy" + replication_factor: 1 + } +} + +# NetworkTopologyStrategy +cassandra_config { + bootstrap_hosts: "localhost" + port: 9042 + keyspace: "feast" + table_name: "feature_store" + replication_options { + class: "NetworkTopologyStrategy" + east: 2 + west: 2 + } +} + +# To check that the Stores has been updated correctly. +grpc_cli call localhost:6565 ListStores '' +``` + #### Registering a FeatureSet Create a new FeatureSet on Feast by sending a request to Feast Core. When a feature set is successfully registered, Feast Core will start an **ingestion** job that listens for new features in the FeatureSet. Note that Feast currently only supports source of type "KAFKA", so you must have access to a running Kafka broker to register a FeatureSet successfully. ```text -# Example of registering a new driver feature set +# Example of registering a new driver feature set # Note the source value, it assumes that you have access to a Kafka broker # running on localhost:9092 @@ -156,7 +228,7 @@ grpc_cli call localhost:6565 GetFeatureSets '' # and written to the registered stores. # Make sure the value here is the topic assigned to the feature set # ... producer.send("feast-driver-features" ...) -# +# # Install Python SDK to help writing FeatureRow messages to Kafka cd $FEAST_HOME/sdk/python pip3 install -e . diff --git a/infra/charts/feast/charts/feast-serving/values.yaml b/infra/charts/feast/charts/feast-serving/values.yaml index d489a48748..d50cd30885 100644 --- a/infra/charts/feast/charts/feast-serving/values.yaml +++ b/infra/charts/feast/charts/feast-serving/values.yaml @@ -56,6 +56,15 @@ application.yaml: config-path: /etc/feast/feast-serving/store.yaml redis-pool-max-size: 128 redis-pool-max-idle: 64 + cassandra-pool-core-local-connections: 1 + cassandra-pool-max-local-connections: 1 + cassandra-pool-core-remote-connections: 1 + cassandra-pool-max-remote-connections: 1 + cassandra-pool-max-requests-local-connection: 32768 + cassandra-pool-max-requests-remote-connection: 2048 + cassandra-pool-new-local-connection-threshold: 30000 + cassandra-pool-new-remote-connection-threshold: 400 + cassandra-pool-timeout-millis: 0 jobs: staging-location: "" store-type: "" diff --git a/ingestion/pom.xml b/ingestion/pom.xml index c829674a64..a9e982290e 100644 --- a/ingestion/pom.xml +++ b/ingestion/pom.xml @@ -215,6 +215,12 @@ ${org.apache.beam.version} + + org.apache.beam + beam-sdks-java-io-cassandra + ${org.apache.beam.version} + + redis.clients jedis @@ -238,6 +244,13 @@ test + + org.cassandraunit + cassandra-unit-shaded + 3.11.2.0 + test + + com.google.guava guava diff --git a/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapper.java b/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapper.java new file mode 100644 index 0000000000..b1e5c4f0ce --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapper.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.mapping.Mapper.Option; +import feast.store.serving.cassandra.CassandraMutation; +import java.io.Serializable; +import java.util.Iterator; +import java.util.concurrent.Future; +import org.apache.beam.sdk.io.cassandra.Mapper; + +/** A {@link Mapper} that supports writing {@code CassandraMutation}s with the Beam Cassandra IO. */ +public class CassandraMutationMapper implements Mapper, Serializable { + + private com.datastax.driver.mapping.Mapper mapper; + + CassandraMutationMapper(com.datastax.driver.mapping.Mapper mapper) { + this.mapper = mapper; + } + + @Override + public Iterator map(ResultSet resultSet) { + throw new UnsupportedOperationException("Only supports write operations"); + } + + @Override + public Future deleteAsync(CassandraMutation entityClass) { + throw new UnsupportedOperationException("Only supports write operations"); + } + + /** + * Saves records to Cassandra with: - Cassandra's internal write time set to the timestamp of the + * record. Cassandra will not override an existing record with the same partition key if the write + * time is older - Expiration of the record + * + * @param entityClass Cassandra's object mapper + */ + @Override + public Future saveAsync(CassandraMutation entityClass) { + return mapper.saveAsync( + entityClass, + Option.timestamp(entityClass.getWriteTime()), + Option.ttl(entityClass.getTtl())); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapperFactory.java b/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapperFactory.java new file mode 100644 index 0000000000..9f34465099 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapperFactory.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform; + +import com.datastax.driver.core.Session; +import com.datastax.driver.mapping.MappingManager; +import feast.store.serving.cassandra.CassandraMutation; +import org.apache.beam.sdk.io.cassandra.Mapper; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public class CassandraMutationMapperFactory implements SerializableFunction { + + private transient MappingManager mappingManager; + private Class entityClass; + + public CassandraMutationMapperFactory(Class entityClass) { + this.entityClass = entityClass; + } + + @Override + public Mapper apply(Session session) { + if (mappingManager == null) { + this.mappingManager = new MappingManager(session); + } + + return new CassandraMutationMapper(mappingManager.mapper(entityClass)); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java index b7901c2f90..e8aaf1437d 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java @@ -16,12 +16,14 @@ */ package feast.ingestion.transform; +import com.datastax.driver.core.Session; import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors; import com.google.api.services.bigquery.model.TableRow; import com.google.auto.value.AutoValue; import feast.core.FeatureSetProto.FeatureSet; import feast.core.StoreProto.Store; import feast.core.StoreProto.Store.BigQueryConfig; +import feast.core.StoreProto.Store.CassandraConfig; import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; import feast.ingestion.options.ImportOptions; @@ -29,11 +31,16 @@ import feast.ingestion.values.FailedElement; import feast.store.serving.bigquery.FeatureRowToTableRow; import feast.store.serving.bigquery.GetTableDestination; +import feast.store.serving.cassandra.CassandraMutation; +import feast.store.serving.cassandra.FeatureRowToCassandraMutationDoFn; import feast.store.serving.redis.FeatureRowToRedisMutationDoFn; import feast.store.serving.redis.RedisCustomIO; import feast.types.FeatureRowProto.FeatureRow; import java.io.IOException; +import java.util.Arrays; import java.util.Map; +import org.apache.beam.sdk.io.cassandra.CassandraIO; +import org.apache.beam.sdk.io.cassandra.Mapper; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; @@ -47,6 +54,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TypeDescriptors; @@ -151,6 +159,24 @@ public void processElement(ProcessContext context) { .build()); } break; + case CASSANDRA: + CassandraConfig cassandraConfig = getStore().getCassandraConfig(); + SerializableFunction mapperFactory = + new CassandraMutationMapperFactory(CassandraMutation.class); + input + .apply( + "Create CassandraMutation from FeatureRow", + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + getFeatureSetSpecs(), cassandraConfig.getDefaultTtl()))) + .apply( + CassandraIO.write() + .withHosts(Arrays.asList(cassandraConfig.getBootstrapHosts().split(","))) + .withPort(cassandraConfig.getPort()) + .withKeyspace(cassandraConfig.getKeyspace()) + .withEntity(CassandraMutation.class) + .withMapperFactoryFn(mapperFactory)); + break; default: log.error("Store type '{}' is not supported. No Feature Row will be written.", storeType); break; diff --git a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java index 7af98fb8f0..cfee48ad0c 100644 --- a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java +++ b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java @@ -18,6 +18,14 @@ import static feast.types.ValueProto.ValueType; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.schemabuilder.Create; +import com.datastax.driver.core.schemabuilder.KeyspaceOptions; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.datastax.driver.mapping.MappingManager; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.DatasetId; @@ -40,13 +48,19 @@ import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.FeatureSetProto.FeatureSpec; import feast.core.StoreProto.Store; +import feast.core.StoreProto.Store.CassandraConfig; import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; +import feast.store.serving.cassandra.CassandraMutation; import feast.types.ValueProto.ValueType.Enum; +import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import redis.clients.jedis.JedisPool; @@ -113,6 +127,9 @@ public static void setupStore(Store store, FeatureSet featureSet) { store.getBigqueryConfig().getDatasetId(), BigQueryOptions.getDefaultInstance().getService()); break; + case CASSANDRA: + StoreUtil.setupCassandra(store.getCassandraConfig()); + break; default: log.warn("Store type '{}' is unsupported", storeType); break; @@ -250,4 +267,74 @@ public static void checkRedisConnection(RedisConfig redisConfig) { } jedisPool.close(); } + + /** + * Ensures Cassandra is accessible, else throw a RuntimeException. Creates Cassandra keyspace and + * table if it does not already exist + * + * @param cassandraConfig Please refer to feast.core.Store proto + */ + public static void setupCassandra(CassandraConfig cassandraConfig) { + List contactPoints = + Arrays.stream(cassandraConfig.getBootstrapHosts().split(",")) + .map(host -> new InetSocketAddress(host, cassandraConfig.getPort())) + .collect(Collectors.toList()); + Cluster cluster = Cluster.builder().addContactPointsWithPorts(contactPoints).build(); + Session session; + + try { + String keyspace = cassandraConfig.getKeyspace(); + KeyspaceMetadata keyspaceMetadata = cluster.getMetadata().getKeyspace(keyspace); + if (keyspaceMetadata == null) { + log.info("Creating keyspace '{}'", keyspace); + Map replicationOptions = + cassandraConfig.getReplicationOptionsMap().entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + KeyspaceOptions createKeyspace = + SchemaBuilder.createKeyspace(keyspace) + .ifNotExists() + .with() + .replication(replicationOptions); + session = cluster.newSession(); + session.execute(createKeyspace); + } + + session = cluster.connect(keyspace); + // Currently no support for creating table from entity mapper: + // https://datastax-oss.atlassian.net/browse/JAVA-569 + Create createTable = + SchemaBuilder.createTable(keyspace, cassandraConfig.getTableName()) + .ifNotExists() + .addPartitionKey(CassandraMutation.ENTITIES, DataType.text()) + .addClusteringColumn(CassandraMutation.FEATURE, DataType.text()) + .addColumn(CassandraMutation.VALUE, DataType.blob()); + log.info("Create Cassandra table if not exists.."); + session.execute(createTable); + + validateCassandraTable(session); + + session.close(); + } catch (RuntimeException e) { + throw new RuntimeException( + String.format( + "Failed to connect to Cassandra at bootstrap hosts: '%s' port: '%s'. Please check that your Cassandra is running and accessible from Feast.", + contactPoints.stream() + .map(InetSocketAddress::getHostName) + .collect(Collectors.joining(",")), + cassandraConfig.getPort()), + e); + } + cluster.close(); + } + + private static void validateCassandraTable(Session session) { + try { + new MappingManager(session).mapper(CassandraMutation.class).getTableMetadata(); + } catch (RuntimeException e) { + throw new RuntimeException( + String.format( + "Table created does not match the datastax object mapper: %s", + CassandraMutation.class.getSimpleName())); + } + } } diff --git a/ingestion/src/main/java/feast/ingestion/utils/ValueUtil.java b/ingestion/src/main/java/feast/ingestion/utils/ValueUtil.java new file mode 100644 index 0000000000..87a327e772 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/utils/ValueUtil.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.utils; + +import feast.types.ValueProto.Value; + +/** + * Utility class for converting {@link Value} of different types to a string for storing as key in + * data stores + */ +public class ValueUtil { + + public static String toString(Value value) { + String strValue; + switch (value.getValCase()) { + case BYTES_VAL: + strValue = value.getBytesVal().toString(); + break; + case STRING_VAL: + strValue = value.getStringVal(); + break; + case INT32_VAL: + strValue = String.valueOf(value.getInt32Val()); + break; + case INT64_VAL: + strValue = String.valueOf(value.getInt64Val()); + break; + case DOUBLE_VAL: + strValue = String.valueOf(value.getDoubleVal()); + break; + case FLOAT_VAL: + strValue = String.valueOf(value.getFloatVal()); + break; + case BOOL_VAL: + strValue = String.valueOf(value.getBoolVal()); + break; + default: + throw new IllegalArgumentException( + String.format("toString method not supported for type %s", value.getValCase())); + } + return strValue; + } +} diff --git a/ingestion/src/main/java/feast/store/serving/cassandra/CassandraMutation.java b/ingestion/src/main/java/feast/store/serving/cassandra/CassandraMutation.java new file mode 100644 index 0000000000..23bbb9c3b3 --- /dev/null +++ b/ingestion/src/main/java/feast/store/serving/cassandra/CassandraMutation.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.store.serving.cassandra; + +import com.datastax.driver.mapping.annotations.ClusteringColumn; +import com.datastax.driver.mapping.annotations.Computed; +import com.datastax.driver.mapping.annotations.PartitionKey; +import com.datastax.driver.mapping.annotations.Table; +import feast.core.FeatureSetProto.EntitySpec; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.ingestion.utils.ValueUtil; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; + +/** + * Cassandra's object mapper that handles basic CRUD operations in Cassandra tables More info: + * https://docs.datastax.com/en/developer/java-driver/3.1/manual/object_mapper/ + */ +@DefaultCoder(value = AvroCoder.class) +@Table(name = "feature_store") +public final class CassandraMutation implements Serializable { + + public static final String ENTITIES = "entities"; + public static final String FEATURE = "feature"; + public static final String VALUE = "value"; + + @PartitionKey private final String entities; + + @ClusteringColumn private final String feature; + + private final ByteBuffer value; + + @Computed(value = "writetime(value)") + private final long writeTime; + + @Computed(value = "ttl(value)") + private final int ttl; + + // NoArgs constructor is needed when using Beam's CassandraIO withEntity and specifying this + // class, + // it looks for an init() method + CassandraMutation() { + this.entities = null; + this.feature = null; + this.value = null; + this.writeTime = 0; + this.ttl = 0; + } + + CassandraMutation(String entities, String feature, ByteBuffer value, long writeTime, int ttl) { + this.entities = entities; + this.feature = feature; + this.value = value; + this.writeTime = writeTime; + this.ttl = ttl; + } + + public long getWriteTime() { + return writeTime; + } + + public int getTtl() { + return ttl; + } + + static String keyFromFeatureRow(FeatureSetSpec featureSetSpec, FeatureRow featureRow) { + Set entityNames = + featureSetSpec.getEntitiesList().stream() + .map(EntitySpec::getName) + .collect(Collectors.toSet()); + List entities = new ArrayList<>(); + for (Field field : featureRow.getFieldsList()) { + if (entityNames.contains(field.getName())) { + entities.add(field); + } + } + return featureRow.getFeatureSet() + + ":" + + entities.stream() + .map(f -> f.getName() + "=" + ValueUtil.toString(f.getValue())) + .collect(Collectors.joining("|")); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o instanceof CassandraMutation) { + CassandraMutation that = (CassandraMutation) o; + return this.entities.equals(that.entities) + && this.feature.equals(that.feature) + && this.value.equals(that.value) + && this.writeTime == that.writeTime + && this.ttl == that.ttl; + } + return false; + } +} diff --git a/ingestion/src/main/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFn.java b/ingestion/src/main/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFn.java new file mode 100644 index 0000000000..177a8703e6 --- /dev/null +++ b/ingestion/src/main/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFn.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.store.serving.cassandra; + +import com.google.protobuf.Duration; +import com.google.protobuf.util.Timestamps; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.FeatureSetProto.FeatureSpec; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.sdk.transforms.DoFn; +import org.slf4j.Logger; + +public class FeatureRowToCassandraMutationDoFn extends DoFn { + + private static final Logger log = + org.slf4j.LoggerFactory.getLogger(FeatureRowToCassandraMutationDoFn.class); + private Map featureSetSpecs; + private Map maxAges; + + public FeatureRowToCassandraMutationDoFn(Map specs, Duration defaultTtl) { + this.featureSetSpecs = specs; + this.maxAges = new HashMap<>(); + for (FeatureSetSpec spec : specs.values()) { + String featureSetName = spec.getName() + ":" + spec.getVersion(); + if (spec.getMaxAge() != null && spec.getMaxAge().getSeconds() > 0) { + maxAges.put(featureSetName, Math.toIntExact(spec.getMaxAge().getSeconds())); + } else { + maxAges.put(featureSetName, Math.toIntExact(defaultTtl.getSeconds())); + } + } + } + + /** Output a Cassandra mutation object for every feature in the feature row. */ + @ProcessElement + public void processElement(ProcessContext context) { + FeatureRow featureRow = context.element(); + try { + FeatureSetSpec featureSetSpec = featureSetSpecs.get(featureRow.getFeatureSet()); + Set featureNames = + featureSetSpec.getFeaturesList().stream() + .map(FeatureSpec::getName) + .collect(Collectors.toSet()); + String key = CassandraMutation.keyFromFeatureRow(featureSetSpec, featureRow); + + Collection mutations = new ArrayList<>(); + for (Field field : featureRow.getFieldsList()) { + if (featureNames.contains(field.getName())) { + mutations.add( + new CassandraMutation( + key, + field.getName(), + ByteBuffer.wrap(field.getValue().toByteArray()), + Timestamps.toMicros(featureRow.getEventTimestamp()), + maxAges.get(featureRow.getFeatureSet()))); + } + } + + mutations.forEach(context::output); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } +} diff --git a/ingestion/src/test/java/feast/ingestion/transform/CassandraWriteToStoreIT.java b/ingestion/src/test/java/feast/ingestion/transform/CassandraWriteToStoreIT.java new file mode 100644 index 0000000000..da914ca175 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/CassandraWriteToStoreIT.java @@ -0,0 +1,248 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.StoreProto.Store; +import feast.core.StoreProto.Store.CassandraConfig; +import feast.core.StoreProto.Store.StoreType; +import feast.test.TestUtil; +import feast.test.TestUtil.LocalCassandra; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.Value; +import feast.types.ValueProto.ValueType.Enum; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.thrift.transport.TTransportException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +public class CassandraWriteToStoreIT implements Serializable { + private FeatureSetSpec featureSetSpec; + private FeatureRow row; + + class FakeCassandraWriteToStore extends WriteToStore { + + private FeatureSetSpec featureSetSpec; + + FakeCassandraWriteToStore(FeatureSetSpec featureSetSpec) { + this.featureSetSpec = featureSetSpec; + } + + @Override + public Store getStore() { + return Store.newBuilder() + .setType(StoreType.CASSANDRA) + .setName("SERVING") + .setCassandraConfig(getCassandraConfig()) + .build(); + } + + @Override + public Map getFeatureSetSpecs() { + return new HashMap() { + { + put(featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), featureSetSpec); + } + }; + } + } + + private static CassandraConfig getCassandraConfig() { + return CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setTableName("feature_store") + .setKeyspace("test") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "SimpleStrategy"); + put("replication_factor", "1"); + } + }) + .build(); + } + + @BeforeClass + public static void startServer() throws InterruptedException, IOException, TTransportException { + LocalCassandra.start(); + LocalCassandra.createKeyspaceAndTable(getCassandraConfig()); + } + + @Before + public void setUp() { + featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 10, + new HashMap() { + { + put("entity1", Enum.INT64); + put("entity2", Enum.STRING); + } + }, + new HashMap() { + { + put("feature1", Enum.INT64); + put("feature2", Enum.INT64); + } + }); + row = + TestUtil.createFeatureRow( + featureSetSpec, + 100, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("entity2", TestUtil.strValue("a")); + put("feature1", TestUtil.intValue(1)); + put("feature2", TestUtil.intValue(2)); + } + }); + } + + @Rule public transient TestPipeline testPipeline = TestPipeline.create(); + + @AfterClass + public static void cleanUp() { + LocalCassandra.stop(); + } + + @Test + public void testWriteCassandra_happyPath() throws InvalidProtocolBufferException { + PCollection input = testPipeline.apply(Create.of(row)); + + input.apply(new FakeCassandraWriteToStore(featureSetSpec)); + + testPipeline.run(); + + ResultSet resultSet = LocalCassandra.getSession().execute("SELECT * FROM test.feature_store"); + List actualResults = getResults(resultSet); + + List expectedFields = + Arrays.asList( + Field.newBuilder().setName("feature1").setValue(TestUtil.intValue(1)).build(), + Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(2)).build()); + + assertTrue(actualResults.containsAll(expectedFields)); + assertEquals(expectedFields.size(), actualResults.size()); + } + + @Test(timeout = 30000) + public void testWriteCassandra_shouldNotRetrieveExpiredValues() + throws InvalidProtocolBufferException { + // Set max age to 1 second + FeatureSetSpec featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 1, + new HashMap() { + { + put("entity1", Enum.INT64); + put("entity2", Enum.STRING); + } + }, + new HashMap() { + { + put("feature1", Enum.INT64); + put("feature2", Enum.INT64); + } + }); + + PCollection input = testPipeline.apply(Create.of(row)); + + input.apply(new FakeCassandraWriteToStore(featureSetSpec)); + + testPipeline.run(); + + while (true) { + ResultSet resultSet = + LocalCassandra.getSession() + .execute("SELECT feature, value, ttl(value) as expiry FROM test.feature_store"); + List results = getResults(resultSet); + if (results.isEmpty()) break; + } + } + + @Test + public void testWriteCassandra_shouldNotOverrideNewerValues() + throws InvalidProtocolBufferException { + FeatureRow olderRow = + TestUtil.createFeatureRow( + featureSetSpec, + 10, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("entity2", TestUtil.strValue("a")); + put("feature1", TestUtil.intValue(3)); + put("feature2", TestUtil.intValue(4)); + } + }); + + PCollection input = testPipeline.apply(Create.of(row, olderRow)); + + input.apply(new FakeCassandraWriteToStore(featureSetSpec)); + + testPipeline.run(); + + ResultSet resultSet = LocalCassandra.getSession().execute("SELECT * FROM test.feature_store"); + List actualResults = getResults(resultSet); + + List expectedFields = + Arrays.asList( + Field.newBuilder().setName("feature1").setValue(TestUtil.intValue(1)).build(), + Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(2)).build()); + + assertTrue(actualResults.containsAll(expectedFields)); + assertEquals(expectedFields.size(), actualResults.size()); + } + + private List getResults(ResultSet resultSet) throws InvalidProtocolBufferException { + List results = new ArrayList<>(); + while (!resultSet.isExhausted()) { + Row row = resultSet.one(); + results.add( + Field.newBuilder() + .setName(row.getString("feature")) + .setValue(Value.parseFrom(row.getBytes("value"))) + .build()); + } + return results; + } +} diff --git a/ingestion/src/test/java/feast/ingestion/util/CassandraStoreUtilIT.java b/ingestion/src/test/java/feast/ingestion/util/CassandraStoreUtilIT.java new file mode 100644 index 0000000000..8c6874ecac --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/util/CassandraStoreUtilIT.java @@ -0,0 +1,167 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.util; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.schemabuilder.Create; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import feast.core.StoreProto.Store.CassandraConfig; +import feast.ingestion.utils.StoreUtil; +import feast.store.serving.cassandra.CassandraMutation; +import feast.test.TestUtil.LocalCassandra; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.thrift.transport.TTransportException; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class CassandraStoreUtilIT { + + @BeforeClass + public static void startServer() throws InterruptedException, IOException, TTransportException { + LocalCassandra.start(); + } + + @After + public void teardown() { + LocalCassandra.stop(); + } + + @Test + public void setupCassandra_shouldCreateKeyspaceAndTable() { + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("feature_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "NetworkTopologyStrategy"); + put("dc1", "2"); + put("dc2", "3"); + } + }) + .build(); + StoreUtil.setupCassandra(config); + + Map actualReplication = + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getReplication(); + Map expectedReplication = + new HashMap() { + { + put("class", "org.apache.cassandra.locator.NetworkTopologyStrategy"); + put("dc1", "2"); + put("dc2", "3"); + } + }; + TableMetadata tableMetadata = + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getTable("feature_store"); + + Assert.assertEquals(expectedReplication, actualReplication); + Assert.assertNotNull(tableMetadata); + } + + @Test + public void setupCassandra_shouldBeIdempotent_whenTableAlreadyExistsAndSchemaMatches() { + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("feature_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "SimpleStrategy"); + put("replication_factor", "2"); + } + }) + .build(); + + LocalCassandra.createKeyspaceAndTable(config); + + // Check table is created + Assert.assertNotNull( + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getTable("feature_store")); + + StoreUtil.setupCassandra(config); + + Assert.assertNotNull( + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getTable("feature_store")); + } + + @Test(expected = RuntimeException.class) + public void setupCassandra_shouldThrowException_whenTableNameDoesNotMatchObjectMapper() { + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("test_data_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "NetworkTopologyStrategy"); + put("dc1", "2"); + put("dc2", "3"); + } + }) + .build(); + StoreUtil.setupCassandra(config); + } + + @Test(expected = RuntimeException.class) + public void setupCassandra_shouldThrowException_whenTableSchemaDoesNotMatchObjectMapper() { + LocalCassandra.getSession() + .execute( + "CREATE KEYSPACE test " + + "WITH REPLICATION = {" + + "'class': 'SimpleStrategy', 'replication_factor': 2 }"); + + Create createTable = + SchemaBuilder.createTable("test", "feature_store") + .ifNotExists() + .addPartitionKey(CassandraMutation.ENTITIES, DataType.text()) + .addClusteringColumn( + "featureName", DataType.text()) // Column name does not match in CassandraMutation + .addColumn(CassandraMutation.VALUE, DataType.blob()); + LocalCassandra.getSession().execute(createTable); + + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("feature_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "SimpleStrategy"); + put("replication_factor", "2"); + } + }) + .build(); + + StoreUtil.setupCassandra(config); + } +} diff --git a/ingestion/src/test/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFnTest.java b/ingestion/src/test/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFnTest.java new file mode 100644 index 0000000000..412c6488f2 --- /dev/null +++ b/ingestion/src/test/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFnTest.java @@ -0,0 +1,225 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.store.serving.cassandra; + +import com.google.protobuf.Duration; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.test.TestUtil; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.ValueProto.Value; +import feast.types.ValueProto.ValueType.Enum; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.HashMap; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; + +public class FeatureRowToCassandraMutationDoFnTest implements Serializable { + + @Rule public transient TestPipeline testPipeline = TestPipeline.create(); + + @Test + public void processElement_shouldCreateCassandraMutation_givenFeatureRow() { + FeatureSetSpec featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 10, + new HashMap() { + { + put("entity1", Enum.INT64); + } + }, + new HashMap() { + { + put("feature1", Enum.STRING); + } + }); + FeatureRow featureRow = + TestUtil.createFeatureRow( + featureSetSpec, + 10, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("feature1", TestUtil.strValue("a")); + } + }); + + PCollection input = testPipeline.apply(Create.of(featureRow)); + + PCollection output = + input.apply( + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + new HashMap() { + { + put( + featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), + featureSetSpec); + } + }, + Duration.newBuilder().setSeconds(0).build()))); + + CassandraMutation[] expected = + new CassandraMutation[] { + new CassandraMutation( + "fs:1:entity1=1", + "feature1", + ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), + 10000000, + 10) + }; + + PAssert.that(output).containsInAnyOrder(expected); + + testPipeline.run(); + } + + @Test + public void + processElement_shouldCreateCassandraMutations_givenFeatureRowWithMultipleEntitiesAndFeatures() { + FeatureSetSpec featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 10, + new HashMap() { + { + put("entity1", Enum.INT64); + put("entity2", Enum.STRING); + } + }, + new HashMap() { + { + put("feature1", Enum.STRING); + put("feature2", Enum.INT64); + } + }); + FeatureRow featureRow = + TestUtil.createFeatureRow( + featureSetSpec, + 10, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("entity2", TestUtil.strValue("b")); + put("feature1", TestUtil.strValue("a")); + put("feature2", TestUtil.intValue(2)); + } + }); + + PCollection input = testPipeline.apply(Create.of(featureRow)); + + PCollection output = + input.apply( + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + new HashMap() { + { + put( + featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), + featureSetSpec); + } + }, + Duration.newBuilder().setSeconds(0).build()))); + + CassandraMutation[] expected = + new CassandraMutation[] { + new CassandraMutation( + "fs:1:entity1=1|entity2=b", + "feature1", + ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), + 10000000, + 10), + new CassandraMutation( + "fs:1:entity1=1|entity2=b", + "feature2", + ByteBuffer.wrap(TestUtil.intValue(2).toByteArray()), + 10000000, + 10) + }; + + PAssert.that(output).containsInAnyOrder(expected); + + testPipeline.run(); + } + + @Test + public void processElement_shouldUseDefaultMaxAge_whenMissingMaxAge() { + Duration defaultTtl = Duration.newBuilder().setSeconds(500).build(); + FeatureSetSpec featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 0, + new HashMap() { + { + put("entity1", Enum.INT64); + } + }, + new HashMap() { + { + put("feature1", Enum.STRING); + } + }); + FeatureRow featureRow = + TestUtil.createFeatureRow( + featureSetSpec, + 10, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("feature1", TestUtil.strValue("a")); + } + }); + + PCollection input = testPipeline.apply(Create.of(featureRow)); + + PCollection output = + input.apply( + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + new HashMap() { + { + put( + featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), + featureSetSpec); + } + }, + defaultTtl))); + + CassandraMutation[] expected = + new CassandraMutation[] { + new CassandraMutation( + "fs:1:entity1=1", + "feature1", + ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), + 10000000, + 500) + }; + + PAssert.that(output).containsInAnyOrder(expected); + + testPipeline.run(); + } +} diff --git a/ingestion/src/test/java/feast/test/TestUtil.java b/ingestion/src/test/java/feast/test/TestUtil.java index 5c16d7e9e3..d9ce882e9d 100644 --- a/ingestion/src/test/java/feast/test/TestUtil.java +++ b/ingestion/src/test/java/feast/test/TestUtil.java @@ -18,10 +18,15 @@ import static feast.ingestion.utils.SpecUtil.getFeatureSetReference; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import com.google.protobuf.util.Timestamps; import feast.core.FeatureSetProto.FeatureSet; +import feast.core.StoreProto.Store.CassandraConfig; import feast.ingestion.transform.WriteToStore; +import feast.ingestion.utils.StoreUtil; import feast.storage.RedisProto.RedisKey; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FeatureRowProto.FeatureRow.Builder; @@ -35,13 +40,18 @@ import feast.types.ValueProto.StringList; import feast.types.ValueProto.Value; import feast.types.ValueProto.ValueType; +import feast.types.ValueProto.ValueType.Enum; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; import org.apache.beam.sdk.PipelineResult; @@ -53,8 +63,10 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.cassandraunit.utils.EmbeddedCassandraServerHelper; import org.joda.time.Duration; import redis.embedded.RedisServer; @@ -83,6 +95,37 @@ public static void stop() { } } + public static class LocalCassandra { + + public static void start() throws InterruptedException, IOException, TTransportException { + EmbeddedCassandraServerHelper.startEmbeddedCassandra(); + } + + public static void createKeyspaceAndTable(CassandraConfig config) { + StoreUtil.setupCassandra(config); + } + + public static String getHost() { + return EmbeddedCassandraServerHelper.getHost(); + } + + public static int getPort() { + return EmbeddedCassandraServerHelper.getNativeTransportPort(); + } + + public static Cluster getCluster() { + return EmbeddedCassandraServerHelper.getCluster(); + } + + public static Session getSession() { + return EmbeddedCassandraServerHelper.getSession(); + } + + public static void stop() { + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra(); + } + } + public static class LocalKafka { private static KafkaServerStartable server; @@ -165,6 +208,85 @@ public static void publishFeatureRowsToKafka( }); } + /** + * Create a Feature Set Spec. + * + * @param name name of the feature set + * @param version version of the feature set + * @param maxAgeSeconds max age + * @param entities entities provided as map of string to {@link Enum} + * @param features features provided as map of string to {@link Enum} + * @return {@link FeatureSetSpec} + */ + public static FeatureSetSpec createFeatureSetSpec( + String name, + int version, + int maxAgeSeconds, + Map entities, + Map features) { + FeatureSetSpec.Builder featureSetSpec = + FeatureSetSpec.newBuilder() + .setName(name) + .setVersion(version) + .setMaxAge(com.google.protobuf.Duration.newBuilder().setSeconds(maxAgeSeconds).build()); + + for (Entry entity : entities.entrySet()) { + featureSetSpec.addEntities( + EntitySpec.newBuilder().setName(entity.getKey()).setValueType(entity.getValue()).build()); + } + + for (Entry feature : features.entrySet()) { + featureSetSpec.addFeatures( + FeatureSpec.newBuilder() + .setName(feature.getKey()) + .setValueType(feature.getValue()) + .build()); + } + + return featureSetSpec.build(); + } + + /** + * Create a Feature Row. + * + * @param featureSetSpec {@link FeatureSetSpec} + * @param timestampSeconds timestamp given in seconds + * @param fields fields provided as a map name to {@link Value} + * @return {@link FeatureRow} + */ + public static FeatureRow createFeatureRow( + FeatureSetSpec featureSetSpec, long timestampSeconds, Map fields) { + List featureNames = + featureSetSpec.getFeaturesList().stream() + .map(FeatureSpec::getName) + .collect(Collectors.toList()); + List entityNames = + featureSetSpec.getEntitiesList().stream() + .map(EntitySpec::getName) + .collect(Collectors.toList()); + List requiredFields = + Stream.concat(featureNames.stream(), entityNames.stream()).collect(Collectors.toList()); + + if (fields.keySet().containsAll(requiredFields)) { + FeatureRow.Builder featureRow = + FeatureRow.newBuilder() + .setFeatureSet(featureSetSpec.getName() + ":" + featureSetSpec.getVersion()) + .setEventTimestamp(Timestamp.newBuilder().setSeconds(timestampSeconds).build()); + for (Entry field : fields.entrySet()) { + featureRow.addFields( + Field.newBuilder().setName(field.getKey()).setValue(field.getValue()).build()); + } + return featureRow.build(); + } else { + String missingFields = + requiredFields.stream() + .filter(f -> !fields.keySet().contains(f)) + .collect(Collectors.joining(",")); + throw new IllegalArgumentException( + "FeatureRow is missing some fields defined in FeatureSetSpec: " + missingFields); + } + } + /** * Create a Feature Row with random value according to the FeatureSetSpec * @@ -418,4 +540,12 @@ public static void waitUntilAllElementsAreWrittenToStore( } } } + + public static Value intValue(int val) { + return Value.newBuilder().setInt64Val(val).build(); + } + + public static Value strValue(String val) { + return Value.newBuilder().setStringVal(val).build(); + } } diff --git a/pom.xml b/pom.xml index 821d3b7232..3426c73510 100644 --- a/pom.xml +++ b/pom.xml @@ -203,7 +203,7 @@ com.google.guava guava - 26.0-jre + 25.0-jre com.google.protobuf diff --git a/protos/feast/core/Store.proto b/protos/feast/core/Store.proto index 931a9d46b6..ef98f023c1 100644 --- a/protos/feast/core/Store.proto +++ b/protos/feast/core/Store.proto @@ -17,6 +17,8 @@ syntax = "proto3"; package feast.core; +import "google/protobuf/duration.proto"; + option java_package = "feast.core"; option java_outer_classname = "StoreProto"; option go_package = "github.com/gojek/feast/sdk/go/protos/feast/core"; @@ -103,7 +105,20 @@ message Store { // BIGQUERY = 2; - // Unsupported in Feast 0.3 + // Cassandra stores entities as a string partition key, feature as clustering column. + // NOTE: This store currently uses max_age defined in FeatureSet for ttl + // + // Columns: + // - entities: concatenated string of feature set name and all entities' keys and values + // entities concatenated format - [feature_set]:[entity_name1=entity_value1]|[entity_name2=entity_value2] + // TODO: string representation of float or double types may have different value in different runtime or platform + // - feature: clustering column where each feature is a column + // - value: byte array of Value (refer to feast.types.Value) + // + // Internal columns: + // - writeTime: timestamp of the written record. This is used to ensure that new records are not replaced + // by older ones + // - ttl: expiration time the record. Currently using max_age from feature set spec as ttl CASSANDRA = 3; } @@ -123,8 +138,22 @@ message Store { } message CassandraConfig { - string host = 1; + // - bootstrapHosts: [comma delimited value of hosts] + string bootstrap_hosts = 1; int32 port = 2; + string keyspace = 3; + + // Please note that table name must be "feature_store" as is specified in the @Table annotation of the + // datastax object mapper + string table_name = 4; + + // This specifies the replication strategy to use. Please refer to docs for more details: + // https://docs.datastax.com/en/dse/6.7/cql/cql/cql_reference/cql_commands/cqlCreateKeyspace.html#cqlCreateKeyspace__cqlCreateKeyspacereplicationmap-Pr3yUQ7t + map replication_options = 5; + + // Default expiration in seconds to use when FeatureSetSpec does not have max_age defined. + // Specify 0 for no default expiration + google.protobuf.Duration default_ttl = 6; } message Subscription { diff --git a/serving/pom.xml b/serving/pom.xml index be573be45c..0b36ad40e5 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -143,7 +143,19 @@ redis.clients jedis - + + + com.datastax.cassandra + cassandra-driver-core + 3.4.0 + + + io.netty + * + + + + com.google.guava guava @@ -229,6 +241,13 @@ spring-boot-starter-test test + + + org.cassandraunit + cassandra-unit-shaded + 3.11.2.0 + test + diff --git a/serving/sample_cassandra_config.yml b/serving/sample_cassandra_config.yml new file mode 100644 index 0000000000..ca3d4cbbcc --- /dev/null +++ b/serving/sample_cassandra_config.yml @@ -0,0 +1,13 @@ +name: serving +type: CASSANDRA +cassandra_config: + bootstrap_hosts: localhost + port: 9042 + keyspace: feast + table_name: feature_store + replication_options: + class: SimpleStrategy + replication_factor: 1 +subscriptions: + - name: "*" + version: ">0" diff --git a/serving/src/main/java/feast/serving/FeastProperties.java b/serving/src/main/java/feast/serving/FeastProperties.java index 505d7d0330..52db658ca4 100644 --- a/serving/src/main/java/feast/serving/FeastProperties.java +++ b/serving/src/main/java/feast/serving/FeastProperties.java @@ -85,6 +85,15 @@ public static class StoreProperties { private String configPath; private int redisPoolMaxSize; private int redisPoolMaxIdle; + private int cassandraPoolCoreLocalConnections; + private int cassandraPoolMaxLocalConnections; + private int cassandraPoolCoreRemoteConnections; + private int cassandraPoolMaxRemoteConnections; + private int cassandraPoolMaxRequestsLocalConnection; + private int cassandraPoolMaxRequestsRemoteConnection; + private int cassandraPoolNewLocalConnectionThreshold; + private int cassandraPoolNewRemoteConnectionThreshold; + private int cassandraPoolTimeoutMillis; public String getConfigPath() { return this.configPath; @@ -98,6 +107,42 @@ public int getRedisPoolMaxIdle() { return this.redisPoolMaxIdle; } + public int getCassandraPoolCoreLocalConnections() { + return this.cassandraPoolCoreLocalConnections; + } + + public int getCassandraPoolMaxLocalConnections() { + return this.cassandraPoolMaxLocalConnections; + } + + public int getCassandraPoolCoreRemoteConnections() { + return this.cassandraPoolCoreRemoteConnections; + } + + public int getCassandraPoolMaxRemoteConnections() { + return this.cassandraPoolMaxRemoteConnections; + } + + public int getCassandraPoolMaxRequestsLocalConnection() { + return this.cassandraPoolMaxRequestsLocalConnection; + } + + public int getCassandraPoolMaxRequestsRemoteConnection() { + return this.cassandraPoolMaxRequestsRemoteConnection; + } + + public int getCassandraPoolNewLocalConnectionThreshold() { + return this.cassandraPoolNewLocalConnectionThreshold; + } + + public int getCassandraPoolNewRemoteConnectionThreshold() { + return this.cassandraPoolNewRemoteConnectionThreshold; + } + + public int getCassandraPoolTimeoutMillis() { + return this.cassandraPoolTimeoutMillis; + } + public void setConfigPath(String configPath) { this.configPath = configPath; } @@ -109,6 +154,46 @@ public void setRedisPoolMaxSize(int redisPoolMaxSize) { public void setRedisPoolMaxIdle(int redisPoolMaxIdle) { this.redisPoolMaxIdle = redisPoolMaxIdle; } + + public void setCassandraPoolCoreLocalConnections(int cassandraPoolCoreLocalConnections) { + this.cassandraPoolCoreLocalConnections = cassandraPoolCoreLocalConnections; + } + + public void setCassandraPoolMaxLocalConnections(int cassandraPoolMaxLocalConnections) { + this.cassandraPoolMaxLocalConnections = cassandraPoolMaxLocalConnections; + } + + public void setCassandraPoolCoreRemoteConnections(int cassandraPoolCoreRemoteConnections) { + this.cassandraPoolCoreRemoteConnections = cassandraPoolCoreRemoteConnections; + } + + public void setCassandraPoolMaxRemoteConnections(int cassandraPoolMaxRemoteConnections) { + this.cassandraPoolMaxRemoteConnections = cassandraPoolMaxRemoteConnections; + } + + public void setCassandraPoolMaxRequestsLocalConnection( + int cassandraPoolMaxRequestsLocalConnection) { + this.cassandraPoolMaxRequestsLocalConnection = cassandraPoolMaxRequestsLocalConnection; + } + + public void setCassandraPoolMaxRequestsRemoteConnection( + int cassandraPoolMaxRequestsRemoteConnection) { + this.cassandraPoolMaxRequestsRemoteConnection = cassandraPoolMaxRequestsRemoteConnection; + } + + public void setCassandraPoolNewLocalConnectionThreshold( + int cassandraPoolNewLocalConnectionThreshold) { + this.cassandraPoolNewLocalConnectionThreshold = cassandraPoolNewLocalConnectionThreshold; + } + + public void setCassandraPoolNewRemoteConnectionThreshold( + int cassandraPoolNewRemoteConnectionThreshold) { + this.cassandraPoolNewRemoteConnectionThreshold = cassandraPoolNewRemoteConnectionThreshold; + } + + public void setCassandraPoolTimeoutMillis(int cassandraPoolTimeoutMillis) { + this.cassandraPoolTimeoutMillis = cassandraPoolTimeoutMillis; + } } public static class JobProperties { diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java index 3cc115978a..163823c637 100644 --- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java @@ -16,23 +16,37 @@ */ package feast.serving.configuration; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.Session; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import feast.core.StoreProto.Store; import feast.core.StoreProto.Store.BigQueryConfig; +import feast.core.StoreProto.Store.Builder; +import feast.core.StoreProto.Store.CassandraConfig; import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.Subscription; import feast.serving.FeastProperties; import feast.serving.service.BigQueryServingService; +import feast.serving.FeastProperties.JobProperties; +import feast.serving.FeastProperties.StoreProperties; +import feast.serving.service.CachedSpecService; +import feast.serving.service.CassandraServingService; import feast.serving.service.JobService; import feast.serving.service.NoopJobService; import feast.serving.service.RedisServingService; import feast.serving.service.ServingService; import feast.serving.specs.CachedSpecService; import io.opentracing.Tracer; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -61,6 +75,13 @@ private Store setStoreConfig(Store.Builder builder, Map options) .build(); return builder.setBigqueryConfig(bqConfig).build(); case CASSANDRA: + CassandraConfig cassandraConfig = + CassandraConfig.newBuilder() + .setBootstrapHosts(options.get("host")) + .setPort(Integer.parseInt(options.get("port"))) + .setKeyspace(options.get("keyspace")) + .build(); + return builder.setCassandraConfig(cassandraConfig).build(); default: throw new IllegalArgumentException( String.format( @@ -122,6 +143,47 @@ public ServingService servingService( storage); break; case CASSANDRA: + StoreProperties storeProperties = feastProperties.getStore(); + PoolingOptions poolingOptions = new PoolingOptions(); + poolingOptions.setCoreConnectionsPerHost( + HostDistance.LOCAL, storeProperties.getCassandraPoolCoreLocalConnections()); + poolingOptions.setCoreConnectionsPerHost( + HostDistance.REMOTE, storeProperties.getCassandraPoolCoreRemoteConnections()); + poolingOptions.setMaxConnectionsPerHost( + HostDistance.LOCAL, storeProperties.getCassandraPoolMaxLocalConnections()); + poolingOptions.setMaxConnectionsPerHost( + HostDistance.REMOTE, storeProperties.getCassandraPoolMaxRemoteConnections()); + poolingOptions.setMaxRequestsPerConnection( + HostDistance.LOCAL, storeProperties.getCassandraPoolMaxRequestsLocalConnection()); + poolingOptions.setMaxRequestsPerConnection( + HostDistance.REMOTE, storeProperties.getCassandraPoolMaxRequestsRemoteConnection()); + poolingOptions.setNewConnectionThreshold( + HostDistance.LOCAL, storeProperties.getCassandraPoolNewLocalConnectionThreshold()); + poolingOptions.setNewConnectionThreshold( + HostDistance.REMOTE, storeProperties.getCassandraPoolNewRemoteConnectionThreshold()); + poolingOptions.setPoolTimeoutMillis(storeProperties.getCassandraPoolTimeoutMillis()); + CassandraConfig cassandraConfig = store.getCassandraConfig(); + List contactPoints = + Arrays.stream(cassandraConfig.getBootstrapHosts().split(",")) + .map(h -> new InetSocketAddress(h, cassandraConfig.getPort())) + .collect(Collectors.toList()); + Cluster cluster = + Cluster.builder() + .addContactPointsWithPorts(contactPoints) + .withPoolingOptions(poolingOptions) + .build(); + // Session in Cassandra is thread-safe and maintains connections to cluster nodes internally + // Recommended to use one session per keyspace instead of open and close connection for each + // request + Session session = cluster.connect(); + servingService = + new CassandraServingService( + session, + cassandraConfig.getKeyspace(), + cassandraConfig.getTableName(), + specService, + tracer); + break; case UNRECOGNIZED: case INVALID: throw new IllegalArgumentException( diff --git a/serving/src/main/java/feast/serving/service/CassandraServingService.java b/serving/src/main/java/feast/serving/service/CassandraServingService.java new file mode 100644 index 0000000000..e9a7dff8ac --- /dev/null +++ b/serving/src/main/java/feast/serving/service/CassandraServingService.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.service; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Timestamp; +import feast.serving.ServingAPIProto.FeatureSetRequest; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; +import feast.serving.util.ValueUtil; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.Value; +import io.opentracing.Scope; +import io.opentracing.Tracer; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class CassandraServingService extends OnlineServingService { + + private final Session session; + private final String keyspace; + private final String tableName; + private final Tracer tracer; + + public CassandraServingService( + Session session, + String keyspace, + String tableName, + CachedSpecService specService, + Tracer tracer) { + super(specService, tracer); + this.session = session; + this.keyspace = keyspace; + this.tableName = tableName; + this.tracer = tracer; + } + + @Override + List createLookupKeys( + List featureSetEntityNames, + List entityRows, + FeatureSetRequest featureSetRequest) { + try (Scope scope = tracer.buildSpan("Cassandra-makeCassandraKeys").startActive(true)) { + String featureSetId = + String.format("%s:%s", featureSetRequest.getName(), featureSetRequest.getVersion()); + return entityRows.stream() + .map(row -> createCassandraKey(featureSetId, featureSetEntityNames, row)) + .collect(Collectors.toList()); + } + } + + @Override + protected boolean isEmpty(ResultSet response) { + return response.isExhausted(); + } + + /** + * Send a list of get request as an mget + * + * @param keys list of string keys + * @return list of {@link FeatureRow} in primitive byte representation for each key + */ + @Override + protected List getAll(List keys) { + List results = new ArrayList<>(); + for (String key : keys) { + results.add( + session.execute( + QueryBuilder.select() + .column("entities") + .column("feature") + .column("value") + .writeTime("value") + .as("writetime") + .from(keyspace, tableName) + .where(QueryBuilder.eq("entities", key)))); + } + return results; + } + + @Override + FeatureRow parseResponse(ResultSet resultSet) { + List fields = new ArrayList<>(); + Instant instant = Instant.now(); + while (!resultSet.isExhausted()) { + Row row = resultSet.one(); + long microSeconds = row.getLong("writetime"); + instant = + Instant.ofEpochSecond( + TimeUnit.MICROSECONDS.toSeconds(microSeconds), + TimeUnit.MICROSECONDS.toNanos( + Math.floorMod(microSeconds, TimeUnit.SECONDS.toMicros(1)))); + try { + fields.add( + Field.newBuilder() + .setName(row.getString("feature")) + .setValue(Value.parseFrom(ByteBuffer.wrap(row.getBytes("value").array()))) + .build()); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } + return FeatureRow.newBuilder() + .addAllFields(fields) + .setEventTimestamp( + Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build()) + .build(); + } + + /** + * Create cassandra keys + * + * @param featureSet featureSet reference of the feature. E.g. feature_set_1:1 + * @param featureSetEntityNames entity names that belong to the featureSet + * @param entityRow entityRow to build the key from + * @return String + */ + private static String createCassandraKey( + String featureSet, List featureSetEntityNames, EntityRow entityRow) { + Map fieldsMap = entityRow.getFieldsMap(); + List res = new ArrayList<>(); + for (String entityName : featureSetEntityNames) { + res.add(entityName + "=" + ValueUtil.toString(fieldsMap.get(entityName))); + } + return featureSet + ":" + String.join("|", res); + } +} diff --git a/serving/src/main/java/feast/serving/service/OnlineServingService.java b/serving/src/main/java/feast/serving/service/OnlineServingService.java new file mode 100644 index 0000000000..699d48c121 --- /dev/null +++ b/serving/src/main/java/feast/serving/service/OnlineServingService.java @@ -0,0 +1,236 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.service; + +import static feast.serving.util.Metrics.missingKeyCount; +import static feast.serving.util.Metrics.requestCount; +import static feast.serving.util.Metrics.requestLatency; +import static feast.serving.util.Metrics.staleKeyCount; + +import com.google.common.collect.Maps; +import com.google.protobuf.Duration; +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.FeatureSetProto.EntitySpec; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.serving.ServingAPIProto.FeastServingType; +import feast.serving.ServingAPIProto.FeatureSetRequest; +import feast.serving.ServingAPIProto.GetBatchFeaturesRequest; +import feast.serving.ServingAPIProto.GetBatchFeaturesResponse; +import feast.serving.ServingAPIProto.GetFeastServingInfoRequest; +import feast.serving.ServingAPIProto.GetFeastServingInfoResponse; +import feast.serving.ServingAPIProto.GetJobRequest; +import feast.serving.ServingAPIProto.GetJobResponse; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.ValueProto.Value; +import io.grpc.Status; +import io.opentracing.Scope; +import io.opentracing.Tracer; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +abstract class OnlineServingService implements ServingService { + + private final CachedSpecService specService; + private final Tracer tracer; + + OnlineServingService(CachedSpecService specService, Tracer tracer) { + this.specService = specService; + this.tracer = tracer; + } + + @Override + public GetFeastServingInfoResponse getFeastServingInfo( + GetFeastServingInfoRequest getFeastServingInfoRequest) { + return GetFeastServingInfoResponse.newBuilder() + .setType(FeastServingType.FEAST_SERVING_TYPE_ONLINE) + .build(); + } + + @Override + public GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeaturesRequest) { + throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException(); + } + + @Override + public GetJobResponse getJob(GetJobRequest getJobRequest) { + throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException(); + } + + /** {@inheritDoc} */ + @Override + public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest request) { + try (Scope scope = + tracer.buildSpan("OnlineServingService-getOnlineFeatures").startActive(true)) { + long startTime = System.currentTimeMillis(); + GetOnlineFeaturesResponse.Builder getOnlineFeaturesResponseBuilder = + GetOnlineFeaturesResponse.newBuilder(); + + List entityRows = request.getEntityRowsList(); + Map> featureValuesMap = + entityRows.stream() + .collect(Collectors.toMap(er -> er, er -> Maps.newHashMap(er.getFieldsMap()))); + + List featureSetRequests = request.getFeatureSetsList(); + for (FeatureSetRequest featureSetRequest : featureSetRequests) { + + FeatureSetSpec featureSetSpec = + specService.getFeatureSet(featureSetRequest.getName(), featureSetRequest.getVersion()); + + List featureSetEntityNames = + featureSetSpec.getEntitiesList().stream() + .map(EntitySpec::getName) + .collect(Collectors.toList()); + + Duration defaultMaxAge = featureSetSpec.getMaxAge(); + if (featureSetRequest.getMaxAge().equals(Duration.getDefaultInstance())) { + featureSetRequest = featureSetRequest.toBuilder().setMaxAge(defaultMaxAge).build(); + } + + sendAndProcessMultiGet( + createLookupKeys(featureSetEntityNames, entityRows, featureSetRequest), + entityRows, + featureValuesMap, + featureSetRequest); + } + List fieldValues = + featureValuesMap.values().stream() + .map(m -> FieldValues.newBuilder().putAllFields(m).build()) + .collect(Collectors.toList()); + requestLatency.labels("getOnlineFeatures").observe(System.currentTimeMillis() - startTime); + return getOnlineFeaturesResponseBuilder.addAllFieldValues(fieldValues).build(); + } + } + + /** + * Create lookup keys for corresponding data stores + * + * @param featureSetEntityNames list of entity names + * @param entityRows list of {@link EntityRow} + * @param featureSetRequest {@link FeatureSetRequest} + * @return list of {@link LookupKeyType} + */ + abstract List createLookupKeys( + List featureSetEntityNames, + List entityRows, + FeatureSetRequest featureSetRequest); + + /** + * Checks whether the response is empty, i.e. feature does not exist in the store + * + * @param response {@link ResponseType} + * @return boolean + */ + protected abstract boolean isEmpty(ResponseType response); + + /** + * Send a list of get requests + * + * @param keys list of {@link LookupKeyType} + * @return list of {@link ResponseType} + */ + protected abstract List getAll(List keys); + + /** + * Parse response from data store to FeatureRow + * + * @param response {@link ResponseType} + * @return {@link FeatureRow} + */ + abstract FeatureRow parseResponse(ResponseType response) throws InvalidProtocolBufferException; + + private List getResponses(List keys) { + try (Scope scope = tracer.buildSpan("OnlineServingService-sendMultiGet").startActive(true)) { + long startTime = System.currentTimeMillis(); + try { + return getAll(keys); + } catch (Exception e) { + throw Status.NOT_FOUND + .withDescription("Unable to retrieve feature from online store") + .withCause(e) + .asRuntimeException(); + } finally { + requestLatency.labels("sendMultiGet").observe(System.currentTimeMillis() - startTime); + } + } + } + + private void sendAndProcessMultiGet( + List keys, + List entityRows, + Map> featureValuesMap, + FeatureSetRequest featureSetRequest) { + List responses = getResponses(keys); + + long startTime = System.currentTimeMillis(); + try (Scope scope = tracer.buildSpan("OnlineServingService-processResponse").startActive(true)) { + String featureSetId = + String.format("%s:%d", featureSetRequest.getName(), featureSetRequest.getVersion()); + Map nullValues = + featureSetRequest.getFeatureNamesList().stream() + .collect( + Collectors.toMap( + name -> featureSetId + ":" + name, name -> Value.newBuilder().build())); + for (int i = 0; i < responses.size(); i++) { + EntityRow entityRow = entityRows.get(i); + Map featureValues = featureValuesMap.get(entityRow); + try { + ResponseType response = responses.get(i); + if (isEmpty(response)) { + missingKeyCount.labels(featureSetRequest.getName()).inc(); + featureValues.putAll(nullValues); + continue; + } + + FeatureRow featureRow = parseResponse(response); + boolean stale = isStale(featureSetRequest, entityRow, featureRow); + if (stale) { + staleKeyCount.labels(featureSetRequest.getName()).inc(); + featureValues.putAll(nullValues); + continue; + } + + requestCount.labels(featureSetRequest.getName()).inc(); + featureRow.getFieldsList().stream() + .filter(f -> featureSetRequest.getFeatureNamesList().contains(f.getName())) + .forEach(f -> featureValues.put(featureSetId + ":" + f.getName(), f.getValue())); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } + } finally { + requestLatency.labels("processResponse").observe(System.currentTimeMillis() - startTime); + } + } + + private static boolean isStale( + FeatureSetRequest featureSetRequest, EntityRow entityRow, FeatureRow featureRow) { + if (featureSetRequest.getMaxAge().equals(Duration.getDefaultInstance())) { + return false; + } + long givenTimestamp = entityRow.getEntityTimestamp().getSeconds(); + if (givenTimestamp == 0) { + givenTimestamp = System.currentTimeMillis() / 1000; + } + long timeDifference = givenTimestamp - featureRow.getEventTimestamp().getSeconds(); + return timeDifference > featureSetRequest.getMaxAge().getSeconds(); + } +} diff --git a/serving/src/main/java/feast/serving/service/RedisServingService.java b/serving/src/main/java/feast/serving/service/RedisServingService.java index 48fc485214..0ba10f74c2 100644 --- a/serving/src/main/java/feast/serving/service/RedisServingService.java +++ b/serving/src/main/java/feast/serving/service/RedisServingService.java @@ -25,7 +25,6 @@ import com.google.common.collect.Maps; import com.google.protobuf.AbstractMessageLite; -import com.google.protobuf.Duration; import com.google.protobuf.InvalidProtocolBufferException; import feast.core.FeatureSetProto.EntitySpec; import feast.core.FeatureSetProto.FeatureSetSpec; @@ -58,16 +57,15 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; -public class RedisServingService implements ServingService { +public class RedisServingService extends OnlineServingService { private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedisServingService.class); private final JedisPool jedisPool; - private final CachedSpecService specService; private final Tracer tracer; public RedisServingService(JedisPool jedisPool, CachedSpecService specService, Tracer tracer) { + super(specService, tracer); this.jedisPool = jedisPool; - this.specService = specService; this.tracer = tracer; } @@ -142,7 +140,8 @@ public GetJobResponse getJob(GetJobRequest getJobRequest) { * @param featureSetSpec featureSetSpec of the features to retrieve * @return list of RedisKeys */ - private List getRedisKeys( + @Override + List createLookupKeys( List featureSetEntityNames, List entityRows, FeatureSetSpec featureSetSpec) { @@ -156,6 +155,33 @@ private List getRedisKeys( } } + @Override + protected boolean isEmpty(byte[] response) { + return response == null; + } + + /** + * Send a list of get request as an mget + * + * @param keys list of {@link RedisKey} + * @return list of {@link FeatureRow} in primitive byte representation for each {@link RedisKey} + */ + @Override + protected List getAll(List keys) { + Jedis jedis = jedisPool.getResource(); + byte[][] binaryKeys = + keys.stream() + .map(AbstractMessageLite::toByteArray) + .collect(Collectors.toList()) + .toArray(new byte[0][0]); + return jedis.mget(binaryKeys); + } + + @Override + FeatureRow parseResponse(byte[] response) throws InvalidProtocolBufferException { + return FeatureRow.parseFrom(response); + } + /** * Create {@link RedisKey} * diff --git a/serving/src/main/java/feast/serving/util/ValueUtil.java b/serving/src/main/java/feast/serving/util/ValueUtil.java new file mode 100644 index 0000000000..e3ede6af98 --- /dev/null +++ b/serving/src/main/java/feast/serving/util/ValueUtil.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.util; + +import feast.types.ValueProto.Value; + +public class ValueUtil { + + public static String toString(Value value) { + String strValue; + switch (value.getValCase()) { + case BYTES_VAL: + strValue = value.getBytesVal().toString(); + break; + case STRING_VAL: + strValue = value.getStringVal(); + break; + case INT32_VAL: + strValue = String.valueOf(value.getInt32Val()); + break; + case INT64_VAL: + strValue = String.valueOf(value.getInt64Val()); + break; + case DOUBLE_VAL: + strValue = String.valueOf(value.getDoubleVal()); + break; + case FLOAT_VAL: + strValue = String.valueOf(value.getFloatVal()); + break; + case BOOL_VAL: + strValue = String.valueOf(value.getBoolVal()); + break; + default: + throw new IllegalArgumentException( + String.format("toString method not supported for type %s", value.getValCase())); + } + return strValue; + } +} diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index 96713c8028..8e976f33a4 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -1,7 +1,7 @@ feast: # This value is retrieved from project.version properties in pom.xml # https://docs.spring.io/spring-boot/docs/current/reference/html/ - version: @project.version@ +# version: @project.version@ # GRPC service address for Feast Core # Feast Serving requires connection to Feast Core to retrieve and reload Feast metadata (e.g. FeatureSpecs, Store information) core-host: ${FEAST_CORE_HOST:localhost} @@ -24,6 +24,24 @@ feast: redis-pool-max-size: ${FEAST_REDIS_POOL_MAX_SIZE:128} # If serving redis, the redis pool max idle conns redis-pool-max-idle: ${FEAST_REDIS_POOL_MAX_IDLE:16} + # If serving cassandra, minimum connection for local host (one in same data center) + cassandra-pool-core-local-connections: ${FEAST_CASSANDRA_CORE_LOCAL_CONNECTIONS:1} + # If serving cassandra, maximum connection for local host (one in same data center) + cassandra-pool-max-local-connections: ${FEAST_CASSANDRA_MAX_LOCAL_CONNECTIONS:1} + # If serving cassandra, minimum connection for remote host (one in remote data center) + cassandra-pool-core-remote-connections: ${FEAST_CASSANDRA_CORE_REMOTE_CONNECTIONS:1} + # If serving cassandra, maximum connection for remote host (one in same data center) + cassandra-pool-max-remote-connections: ${FEAST_CASSANDRA_MAX_REMOTE_CONNECTIONS:1} + # If serving cassandra, maximum number of concurrent requests per local connection (one in same data center) + cassandra-pool-max-requests-local-connection: ${FEAST_CASSANDRA_MAX_REQUESTS_LOCAL_CONNECTION:32768} + # If serving cassandra, maximum number of concurrent requests per remote connection (one in remote data center) + cassandra-pool-max-requests-remote-connection: ${FEAST_CASSANDRA_MAX_REQUESTS_REMOTE_CONNECTION:2048} + # If serving cassandra, number of requests which trigger opening of new local connection (if it is available) + cassandra-pool-new-local-connection-threshold: ${FEAST_CASSANDRA_NEW_LOCAL_CONNECTION_THRESHOLD:30000} + # If serving cassandra, number of requests which trigger opening of new remote connection (if it is available) + cassandra-pool-new-remote-connection-threshold: ${FEAST_CASSANDRA_NEW_REMOTE_CONNECTION_THRESHOLD:400} + # If serving cassandra, number of milliseconds to wait to acquire connection (after that go to next available host in query plan) + cassandra-pool-timeout-millis: ${FEAST_CASSANDRA_POOL_TIMEOUT_MILLIS:0} jobs: # staging-location specifies the URI to store intermediate files for batch serving. diff --git a/serving/src/test/java/feast/serving/service/CassandraServingServiceITTest.java b/serving/src/test/java/feast/serving/service/CassandraServingServiceITTest.java new file mode 100644 index 0000000000..a1778251a3 --- /dev/null +++ b/serving/src/test/java/feast/serving/service/CassandraServingServiceITTest.java @@ -0,0 +1,244 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.service; + +import static feast.serving.test.TestUtil.intValue; +import static feast.serving.test.TestUtil.responseToMapList; +import static feast.serving.test.TestUtil.strValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.utils.Bytes; +import com.google.common.collect.Lists; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Timestamp; +import feast.core.FeatureSetProto.EntitySpec; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.serving.ServingAPIProto.FeatureSetRequest; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues; +import feast.serving.test.TestUtil.LocalCassandra; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.ValueProto.Value; +import io.opentracing.Tracer; +import io.opentracing.Tracer.SpanBuilder; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.thrift.transport.TTransportException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; + +public class CassandraServingServiceITTest { + + @Mock CachedSpecService specService; + + @Mock Tracer tracer; + + private CassandraServingService cassandraServingService; + private Session session; + + @BeforeClass + public static void startServer() throws InterruptedException, IOException, TTransportException { + LocalCassandra.start(); + LocalCassandra.createKeyspaceAndTable(); + } + + @Before + public void setup() { + initMocks(this); + FeatureSetSpec featureSetSpec = + FeatureSetSpec.newBuilder() + .addEntities(EntitySpec.newBuilder().setName("entity1")) + .addEntities(EntitySpec.newBuilder().setName("entity2")) + .build(); + + when(specService.getFeatureSet("featureSet", 1)).thenReturn(featureSetSpec); + when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); + + session = + new Cluster.Builder() + .addContactPoints(LocalCassandra.getHost()) + .withPort(LocalCassandra.getPort()) + .build() + .connect(); + + populateTable(session); + + cassandraServingService = + new CassandraServingService(session, "test", "feature_store", specService, tracer); + } + + private void populateTable(Session session) { + session.execute( + insertQuery( + "test", "feature_store", "featureSet:1:entity1=1|entity2=a", "feature1", intValue(1))); + session.execute( + insertQuery( + "test", "feature_store", "featureSet:1:entity1=1|entity2=a", "feature2", intValue(1))); + session.execute( + insertQuery( + "test", "feature_store", "featureSet:1:entity1=2|entity2=b", "feature1", intValue(1))); + session.execute( + insertQuery( + "test", "feature_store", "featureSet:1:entity1=2|entity2=b", "feature2", intValue(1))); + } + + @AfterClass + public static void cleanUp() { + LocalCassandra.stop(); + } + + @Test + public void shouldReturnResponseWithValuesIfKeysPresent() { + GetOnlineFeaturesRequest request = + GetOnlineFeaturesRequest.newBuilder() + .addFeatureSets( + FeatureSetRequest.newBuilder() + .setName("featureSet") + .setVersion(1) + .addAllFeatureNames(Lists.newArrayList("feature1", "feature2")) + .build()) + .addEntityRows( + EntityRow.newBuilder() + .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100)) + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a"))) + .addEntityRows( + EntityRow.newBuilder() + .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100)) + .putFields("entity1", intValue(2)) + .putFields("entity2", strValue("b"))) + .build(); + + GetOnlineFeaturesResponse expected = + GetOnlineFeaturesResponse.newBuilder() + .addFieldValues( + FieldValues.newBuilder() + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a")) + .putFields("featureSet:1:feature1", intValue(1)) + .putFields("featureSet:1:feature2", intValue(1))) + .addFieldValues( + FieldValues.newBuilder() + .putFields("entity1", intValue(2)) + .putFields("entity2", strValue("b")) + .putFields("featureSet:1:feature1", intValue(1)) + .putFields("featureSet:1:feature2", intValue(1))) + .build(); + GetOnlineFeaturesResponse actual = cassandraServingService.getOnlineFeatures(request); + + assertThat( + responseToMapList(actual), containsInAnyOrder(responseToMapList(expected).toArray())); + } + + @Test + public void shouldReturnResponseWithUnsetValuesIfKeysNotPresent() { + GetOnlineFeaturesRequest request = + GetOnlineFeaturesRequest.newBuilder() + .addFeatureSets( + FeatureSetRequest.newBuilder() + .setName("featureSet") + .setVersion(1) + .addAllFeatureNames(Lists.newArrayList("feature1", "feature2")) + .build()) + .addEntityRows( + EntityRow.newBuilder() + .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100)) + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a"))) + // Non-existing entity keys + .addEntityRows( + EntityRow.newBuilder() + .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100)) + .putFields("entity1", intValue(55)) + .putFields("entity2", strValue("ff"))) + .build(); + + GetOnlineFeaturesResponse expected = + GetOnlineFeaturesResponse.newBuilder() + .addFieldValues( + FieldValues.newBuilder() + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a")) + .putFields("featureSet:1:feature1", intValue(1)) + .putFields("featureSet:1:feature2", intValue(1))) + // Missing keys will return empty values + .addFieldValues( + FieldValues.newBuilder() + .putFields("entity1", intValue(55)) + .putFields("entity2", strValue("ff")) + .putFields("featureSet:1:feature1", Value.newBuilder().build()) + .putFields("featureSet:1:feature2", Value.newBuilder().build())) + .build(); + GetOnlineFeaturesResponse actual = cassandraServingService.getOnlineFeatures(request); + + assertThat( + responseToMapList(actual), containsInAnyOrder(responseToMapList(expected).toArray())); + } + + // This test should fail if cassandra no longer stores write time as microseconds or if we change + // the way we parse microseconds to com.google.protobuf.Timestamp + @Test + public void shouldInsertAndParseWriteTimestampInMicroSeconds() + throws InvalidProtocolBufferException { + session.execute( + "INSERT INTO test.feature_store (entities, feature, value)\n" + + " VALUES ('ENT1', 'FEAT1'," + + Bytes.toHexString(Value.newBuilder().build().toByteArray()) + + ")\n" + + " USING TIMESTAMP 1574318287123456;"); + + ResultSet resultSet = + session.execute( + QueryBuilder.select() + .column("entities") + .column("feature") + .column("value") + .writeTime("value") + .as("writetime") + .from("test", "feature_store") + .where(QueryBuilder.eq("entities", "ENT1"))); + FeatureRow featureRow = cassandraServingService.parseResponse(resultSet); + + Assert.assertEquals( + Timestamp.newBuilder().setSeconds(1574318287).setNanos(123456000).build(), + featureRow.getEventTimestamp()); + } + + private Insert insertQuery( + String database, String table, String key, String featureName, Value value) { + return QueryBuilder.insertInto(database, table) + .value("entities", key) + .value("feature", featureName) + .value("value", ByteBuffer.wrap(value.toByteArray())); + } +} diff --git a/serving/src/test/java/feast/serving/service/CassandraServingServiceTest.java b/serving/src/test/java/feast/serving/service/CassandraServingServiceTest.java new file mode 100644 index 0000000000..f965b14a64 --- /dev/null +++ b/serving/src/test/java/feast/serving/service/CassandraServingServiceTest.java @@ -0,0 +1,117 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.service; + +import static feast.serving.test.TestUtil.intValue; +import static feast.serving.test.TestUtil.strValue; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.datastax.driver.core.Session; +import feast.serving.ServingAPIProto.FeatureSetRequest; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; +import io.opentracing.Tracer; +import io.opentracing.Tracer.SpanBuilder; +import java.util.ArrayList; +import java.util.List; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; + +public class CassandraServingServiceTest { + + @Mock Session session; + + @Mock CachedSpecService specService; + + @Mock Tracer tracer; + + private CassandraServingService cassandraServingService; + + @Before + public void setUp() { + initMocks(this); + + when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); + + cassandraServingService = + new CassandraServingService(session, "test", "feature_store", specService, tracer); + } + + @Test + public void shouldConstructCassandraKeyCorrectly() { + List cassandraKeys = + cassandraServingService.createLookupKeys( + new ArrayList() { + { + add("entity1"); + add("entity2"); + } + }, + new ArrayList() { + { + add( + EntityRow.newBuilder() + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a")) + .build()); + add( + EntityRow.newBuilder() + .putFields("entity1", intValue(2)) + .putFields("entity2", strValue("b")) + .build()); + } + }, + FeatureSetRequest.newBuilder().setName("featureSet").setVersion(1).build()); + + List expectedKeys = + new ArrayList() { + { + add("featureSet:1:entity1=1|entity2=a"); + add("featureSet:1:entity1=2|entity2=b"); + } + }; + + Assert.assertEquals(expectedKeys, cassandraKeys); + } + + @Test(expected = Exception.class) + public void shouldThrowExceptionWhenCannotConstructCassandraKey() { + List cassandraKeys = + cassandraServingService.createLookupKeys( + new ArrayList() { + { + add("entity1"); + add("entity2"); + } + }, + new ArrayList() { + { + add(EntityRow.newBuilder().putFields("entity1", intValue(1)).build()); + add( + EntityRow.newBuilder() + .putFields("entity1", intValue(2)) + .putFields("entity2", strValue("b")) + .build()); + } + }, + FeatureSetRequest.newBuilder().setName("featureSet").setVersion(1).build()); + } +} diff --git a/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java b/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java index 042107e117..eee4dc6dc3 100644 --- a/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java +++ b/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java @@ -16,6 +16,9 @@ */ package feast.serving.service; +import static feast.serving.test.TestUtil.intValue; +import static feast.serving.test.TestUtil.responseToMapList; +import static feast.serving.test.TestUtil.strValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.mockito.Mockito.when; @@ -42,7 +45,6 @@ import io.opentracing.Tracer.SpanBuilder; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import org.junit.Before; import org.junit.Test; @@ -595,20 +597,6 @@ public void shouldFilterOutUndesiredRows() { responseToMapList(actual), containsInAnyOrder(responseToMapList(expected).toArray())); } - private List> responseToMapList(GetOnlineFeaturesResponse response) { - return response.getFieldValuesList().stream() - .map(FieldValues::getFieldsMap) - .collect(Collectors.toList()); - } - - private Value intValue(int val) { - return Value.newBuilder().setInt64Val(val).build(); - } - - private Value strValue(String val) { - return Value.newBuilder().setStringVal(val).build(); - } - private FeatureSetSpec getFeatureSetSpec() { return FeatureSetSpec.newBuilder() .setProject("project") diff --git a/serving/src/test/java/feast/serving/test/TestUtil.java b/serving/src/test/java/feast/serving/test/TestUtil.java new file mode 100644 index 0000000000..9c53359071 --- /dev/null +++ b/serving/src/test/java/feast/serving/test/TestUtil.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.test; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues; +import feast.types.ValueProto.Value; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.thrift.transport.TTransportException; +import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import org.cassandraunit.utils.EmbeddedCassandraServerHelper; + +@SuppressWarnings("WeakerAccess") +public class TestUtil { + + public static class LocalCassandra { + + public static void start() throws InterruptedException, IOException, TTransportException { + EmbeddedCassandraServerHelper.startEmbeddedCassandra(); + } + + public static void createKeyspaceAndTable() { + new ClassPathCQLDataSet("embedded-store/LoadCassandra.cql", true, true) + .getCQLStatements() + .forEach(s -> LocalCassandra.getSession().execute(s)); + } + + public static String getHost() { + return EmbeddedCassandraServerHelper.getHost(); + } + + public static int getPort() { + return EmbeddedCassandraServerHelper.getNativeTransportPort(); + } + + public static Cluster getCluster() { + return EmbeddedCassandraServerHelper.getCluster(); + } + + public static Session getSession() { + return EmbeddedCassandraServerHelper.getSession(); + } + + public static void stop() { + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra(); + } + } + + public static List> responseToMapList(GetOnlineFeaturesResponse response) { + return response.getFieldValuesList().stream() + .map(FieldValues::getFieldsMap) + .collect(Collectors.toList()); + } + + public static Value intValue(int val) { + return Value.newBuilder().setInt64Val(val).build(); + } + + public static Value strValue(String val) { + return Value.newBuilder().setStringVal(val).build(); + } +} diff --git a/serving/src/test/resources/embedded-store/LoadCassandra.cql b/serving/src/test/resources/embedded-store/LoadCassandra.cql new file mode 100644 index 0000000000..c80da294b7 --- /dev/null +++ b/serving/src/test/resources/embedded-store/LoadCassandra.cql @@ -0,0 +1,8 @@ +CREATE KEYSPACE test with replication = {'class':'SimpleStrategy','replication_factor':1}; + +CREATE TABLE test.feature_store( + entities text, + feature text, + value blob, + PRIMARY KEY (entities, feature) +) WITH CLUSTERING ORDER BY (feature DESC); \ No newline at end of file