diff --git a/Makefile b/Makefile index de61fe2892..ef2c54fc17 100644 --- a/Makefile +++ b/Makefile @@ -1,19 +1,20 @@ -# +# # Copyright 2019 The Feast Authors -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # - +REGISTRY := gcr.io/pm-registry/feast +VERSION := latest PROJECT_ROOT := $(shell git rev-parse --show-toplevel) test: @@ -52,4 +53,4 @@ build-html: mkdir -p $(PROJECT_ROOT)/dist/grpc cd $(PROJECT_ROOT)/protos && $(MAKE) gen-docs cd $(PROJECT_ROOT)/sdk/python/docs && $(MAKE) html - cp -r $(PROJECT_ROOT)/sdk/python/docs/html/* $(PROJECT_ROOT)/dist/python \ No newline at end of file + cp -r $(PROJECT_ROOT)/sdk/python/docs/html/* $(PROJECT_ROOT)/dist/python diff --git a/core/pom.xml b/core/pom.xml index e1567ae8fe..b56f74dea2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -110,7 +110,7 @@ protobuf-java-util - + com.google.guava guava diff --git a/core/src/main/java/feast/core/config/FeatureStreamConfig.java b/core/src/main/java/feast/core/config/FeatureStreamConfig.java index 45de359ac7..3743b60af4 100644 --- a/core/src/main/java/feast/core/config/FeatureStreamConfig.java +++ b/core/src/main/java/feast/core/config/FeatureStreamConfig.java @@ -69,7 +69,7 @@ public Source getDefaultSource(FeastProperties feastProperties) { } catch (InterruptedException | ExecutionException e) { if (e.getCause().getClass().equals(TopicExistsException.class)) { log.warn( - Strings.lenientFormat( + String.format( "Unable to create topic %s in the feature stream, topic already exists, using existing topic.", topicName)); } else { diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 7115ee3f66..05873dec8b 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -149,7 +149,7 @@ public void abortJob(String dataflowJobId) { } catch (Exception e) { log.error("Unable to drain job with id: {}, cause: {}", dataflowJobId, e.getMessage()); throw new RuntimeException( - Strings.lenientFormat("Unable to drain job with id: %s", dataflowJobId), e); + String.format("Unable to drain job with id: %s", dataflowJobId), e); } } diff --git a/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java b/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java index f7ded9fec7..8f6c87053f 100644 --- a/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java +++ b/core/src/main/java/feast/core/job/direct/DirectJobRegistry.java @@ -41,7 +41,7 @@ public DirectJobRegistry() { public void add(DirectJob job) { if (jobs.containsKey(job.getJobId())) { throw new IllegalArgumentException( - Strings.lenientFormat("Job with id %s already exists and is running", job.getJobId())); + String.format("Job with id %s already exists and is running", job.getJobId())); } jobs.put(job.getJobId(), job); } diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index b01d37d892..61c7eb7e7d 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -149,7 +149,7 @@ public void abortJob(String extId) { job.abort(); } catch (IOException e) { throw new RuntimeException( - Strings.lenientFormat("Unable to abort DirectRunner job %s", extId), e); + String.format("Unable to abort DirectRunner job %s", extId), e); } jobs.remove(extId); } diff --git a/core/src/main/java/feast/core/log/AuditLogger.java b/core/src/main/java/feast/core/log/AuditLogger.java index 5349b5548b..2c60307805 100644 --- a/core/src/main/java/feast/core/log/AuditLogger.java +++ b/core/src/main/java/feast/core/log/AuditLogger.java @@ -44,7 +44,7 @@ public static void log( map.put("resource", resource.toString()); map.put("id", id); map.put("action", action.toString()); - map.put("detail", Strings.lenientFormat(detail, args)); + map.put("detail", String.format(detail, args)); ObjectMessage msg = new ObjectMessage(map); log.log(AUDIT_LEVEL, msg); diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 3678135a52..761d199880 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -170,6 +170,24 @@ private void updateFeatureSetStatuses(List jobUpdateTasks) { } } } + } + /** + * Drain the given job. If this is successful, the job will start the draining process. When the + * draining process is complete, the job will be cleaned up and removed. + * + *

Batch jobs will be cancelled, as draining these jobs is not supported by beam. + * + * @param id feast-internal id of a job + */ + public void abortJob(String id) { + Optional jobOptional = jobInfoRepository.findById(id); + if (!jobOptional.isPresent()) { + throw new RetrievalException(String.format("Unable to retrieve job with id %s", id)); + } + JobInfo job = jobOptional.get(); + if (JobStatus.getTerminalState().contains(job.getStatus())) { + throw new IllegalStateException("Unable to stop job already in terminal state"); + } ready.removeAll(pending); ready.forEach( fs -> { diff --git a/core/src/main/java/feast/core/service/JobStatusService.java b/core/src/main/java/feast/core/service/JobStatusService.java new file mode 100644 index 0000000000..26d81647fa --- /dev/null +++ b/core/src/main/java/feast/core/service/JobStatusService.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class JobStatusService { + // + // private JobInfoRepository jobInfoRepository; + // private MetricsRepository metricsRepository; + // + // @Autowired + // public JobStatusService( + // JobInfoRepository jobInfoRepository, + // MetricsRepository metricsRepository) { + // this.jobInfoRepository = jobInfoRepository; + // this.metricsRepository = metricsRepository; + // } + // + // /** + // * Lists all jobs registered to the db, sorted by provided orderBy + // * + // * @param orderBy list order + // * @return list of JobDetails + // */ + // @Transactional + // public List listJobs(Sort orderBy) { + // List jobs = jobInfoRepository.findAll(orderBy); + // return jobs.stream().map(JobInfo::getJobDetail).collect(Collectors.toList()); + // } + // + // /** + // * Lists all jobs registered to the db, sorted chronologically by creation time + // * + // * @return list of JobDetails + // */ + // @Transactional + // public List listJobs() { + // return listJobs(Sort.by(Sort.Direction.ASC, "created")); + // } + // + // /** + // * Gets information regarding a single job. + // * + // * @param id feast-internal job id + // * @return JobDetail for that job + // */ + // @Transactional + // public JobDetail getJob(String id) { + // Optional job = jobInfoRepository.findById(id); + // if (!job.isPresent()) { + // throw new RetrievalException(String.format("Unable to retrieve job with id %s", + // id)); + // } + // JobDetail.Builder jobDetailBuilder = job.get().getJobDetail().toBuilder(); + // List metrics = metricsRepository.findByJobInfo_Id(id); + // for (Metrics metric : metrics) { + // jobDetailBuilder.putMetrics(metric.getName(), metric.getValue()); + // } + // return jobDetailBuilder.build(); + // } + +} diff --git a/core/src/main/java/feast/core/util/TypeConversion.java b/core/src/main/java/feast/core/util/TypeConversion.java index e01a551135..a7dd2b0d2a 100644 --- a/core/src/main/java/feast/core/util/TypeConversion.java +++ b/core/src/main/java/feast/core/util/TypeConversion.java @@ -85,7 +85,7 @@ public static String convertMapToJsonString(Map map) { public static String[] convertMapToArgs(Map map) { List args = new ArrayList<>(); for (Entry arg : map.entrySet()) { - args.add(Strings.lenientFormat("--%s=%s", arg.getKey(), arg.getValue())); + args.add(String.format("--%s=%s", arg.getKey(), arg.getValue())); } return args.toArray(new String[] {}); } diff --git a/core/src/test/java/feast/core/validators/MatchersTest.java b/core/src/test/java/feast/core/validators/MatchersTest.java index 774e58c7a8..13c9e006a4 100644 --- a/core/src/test/java/feast/core/validators/MatchersTest.java +++ b/core/src/test/java/feast/core/validators/MatchersTest.java @@ -43,7 +43,7 @@ public void checkUpperSnakeCaseShouldPassForLegitUpperSnakeCaseWithNumbers() { public void checkUpperSnakeCaseShouldThrowIllegalArgumentExceptionWithFieldForInvalidString() { exception.expect(IllegalArgumentException.class); exception.expectMessage( - Strings.lenientFormat( + String.format( "invalid value for field %s: %s", "someField", "argument must be in upper snake case, and cannot include any special characters.")); @@ -61,7 +61,7 @@ public void checkLowerSnakeCaseShouldPassForLegitLowerSnakeCase() { public void checkLowerSnakeCaseShouldThrowIllegalArgumentExceptionWithFieldForInvalidString() { exception.expect(IllegalArgumentException.class); exception.expectMessage( - Strings.lenientFormat( + String.format( "invalid value for field %s: %s", "someField", "argument must be in lower snake case, and cannot include any special characters.")); diff --git a/docs/contributing.md b/docs/contributing.md index 38caffd654..0ca107ff10 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -6,13 +6,13 @@ The following guide will help you quickly run Feast in your local machine. The main components of Feast are: -* **Feast Core** handles FeatureSpec registration, starts and monitors Ingestion +* **Feast Core** handles FeatureSpec registration, starts and monitors Ingestion jobs and ensures that Feast internal metadata is consistent. * **Feast Ingestion** subscribes to streams of FeatureRow and writes the feature - values to registered Stores. + values to registered Stores. * **Feast Serving** handles requests for features values retrieval from the end users. @@ -29,13 +29,13 @@ The main components of Feast are: > **Assumptions:** > -> 1. Postgres is running in "localhost:5432" and has a database called "postgres" which +> 1. Postgres is running in "localhost:5432" and has a database called "postgres" which > -> can be accessed with credentials user "postgres" and password "password". +> can be accessed with credentials user "postgres" and password "password". > -> To use different database name and credentials, please update +> To use different database name and credentials, please update > -> "$FEAST\_HOME/core/src/main/resources/application.yml" +> "$FEAST\_HOME/core/src/main/resources/application.yml" > > or set these environment variables: DB\_HOST, DB\_USERNAME, DB\_PASSWORD. > @@ -52,16 +52,17 @@ cd feast #### Starting Feast Core ```text -# Please check the default configuration for Feast Core in +# Please check the default configuration for Feast Core in # "$FEAST_HOME/core/src/main/resources/application.yml" and update it accordingly. -# +# # Start Feast Core GRPC server on localhost:6565 mvn --projects core spring-boot:run # If Feast Core starts successfully, verify the correct Stores are registered # correctly, for example by using grpc_cli. -grpc_cli call localhost:6565 GetStores '' +grpc_cli call localhost:6565 ListStores '' +<<<<<<< HEAD:docs/contributing.md # Should return something similar to the following. # Note that you should change BigQuery projectId and datasetId accordingly # in "$FEAST_HOME/core/src/main/resources/application.yml" @@ -91,12 +92,19 @@ store { project_id: "my-google-project-id" dataset_id: "my-bigquery-dataset-id" } +# Should return something similar to the following if you have not updated any stores +{ + "store": [] } ``` #### Starting Feast Serving -Feast Serving requires administrators to provide an **existing** store name in Feast. An instance of Feast Serving can only retrieve features from a **single** store. +Feast Serving requires administrators to provide an **existing** store name in Feast. +An instance of Feast Serving can only retrieve features from a **single** store. +> In order to retrieve features from multiple stores you must start **multiple** +instances of Feast serving. If you start multiple Feast serving on a single host, +make sure that they are listening on different ports. > In order to retrieve features from multiple stores you must start **multiple** instances of Feast serving. If you start multiple Feast serving on a single host, make sure that they are listening on different ports. @@ -111,12 +119,76 @@ grpc_cli call localhost:6566 GetFeastServingType '' type: FEAST_SERVING_TYPE_ONLINE ``` +#### Updating a store + +Create a new Store by sending a request to Feast Core. + +``` +# Example of updating a redis store + +grpc_cli call localhost:6565 UpdateStore ' +store { + name: "SERVING" + type: REDIS + subscriptions { + name: "*" + version: ">0" + } + redis_config { + host: "localhost" + port: 6379 + } +} +' + +# Other supported stores examples (replacing redis_config): +# BigQuery +bigquery_config { + project_id: "my-google-project-id" + dataset_id: "my-bigquery-dataset-id" +} + +# Cassandra: two options in cassandra depending on replication strategy +# See details: https://docs.datastax.com/en/cassandra/3.0/cassandra/architecture/archDataDistributeReplication.html +# +# Please note that table name must be "feature_store" as is specified in the @Table annotation of the +# datastax object mapper + +# SimpleStrategy +cassandra_config { + bootstrap_hosts: "localhost" + port: 9042 + keyspace: "feast" + table_name: "feature_store" + replication_options { + class: "SimpleStrategy" + replication_factor: 1 + } +} + +# NetworkTopologyStrategy +cassandra_config { + bootstrap_hosts: "localhost" + port: 9042 + keyspace: "feast" + table_name: "feature_store" + replication_options { + class: "NetworkTopologyStrategy" + east: 2 + west: 2 + } +} + +# To check that the Stores has been updated correctly. +grpc_cli call localhost:6565 ListStores '' +``` + #### Registering a FeatureSet Create a new FeatureSet on Feast by sending a request to Feast Core. When a feature set is successfully registered, Feast Core will start an **ingestion** job that listens for new features in the FeatureSet. Note that Feast currently only supports source of type "KAFKA", so you must have access to a running Kafka broker to register a FeatureSet successfully. ```text -# Example of registering a new driver feature set +# Example of registering a new driver feature set # Note the source value, it assumes that you have access to a Kafka broker # running on localhost:9092 @@ -156,7 +228,7 @@ grpc_cli call localhost:6565 GetFeatureSets '' # and written to the registered stores. # Make sure the value here is the topic assigned to the feature set # ... producer.send("feast-driver-features" ...) -# +# # Install Python SDK to help writing FeatureRow messages to Kafka cd $FEAST_HOME/sdk/python pip3 install -e . diff --git a/infra/charts/feast/charts/feast-serving/values.yaml b/infra/charts/feast/charts/feast-serving/values.yaml index d489a48748..d50cd30885 100644 --- a/infra/charts/feast/charts/feast-serving/values.yaml +++ b/infra/charts/feast/charts/feast-serving/values.yaml @@ -56,6 +56,15 @@ application.yaml: config-path: /etc/feast/feast-serving/store.yaml redis-pool-max-size: 128 redis-pool-max-idle: 64 + cassandra-pool-core-local-connections: 1 + cassandra-pool-max-local-connections: 1 + cassandra-pool-core-remote-connections: 1 + cassandra-pool-max-remote-connections: 1 + cassandra-pool-max-requests-local-connection: 32768 + cassandra-pool-max-requests-remote-connection: 2048 + cassandra-pool-new-local-connection-threshold: 30000 + cassandra-pool-new-remote-connection-threshold: 400 + cassandra-pool-timeout-millis: 0 jobs: staging-location: "" store-type: "" diff --git a/ingestion/pom.xml b/ingestion/pom.xml index c829674a64..a9e982290e 100644 --- a/ingestion/pom.xml +++ b/ingestion/pom.xml @@ -215,6 +215,12 @@ ${org.apache.beam.version} + + org.apache.beam + beam-sdks-java-io-cassandra + ${org.apache.beam.version} + + redis.clients jedis @@ -238,6 +244,13 @@ test + + org.cassandraunit + cassandra-unit-shaded + 3.11.2.0 + test + + com.google.guava guava diff --git a/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapper.java b/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapper.java new file mode 100644 index 0000000000..b1e5c4f0ce --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapper.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.mapping.Mapper.Option; +import feast.store.serving.cassandra.CassandraMutation; +import java.io.Serializable; +import java.util.Iterator; +import java.util.concurrent.Future; +import org.apache.beam.sdk.io.cassandra.Mapper; + +/** A {@link Mapper} that supports writing {@code CassandraMutation}s with the Beam Cassandra IO. */ +public class CassandraMutationMapper implements Mapper, Serializable { + + private com.datastax.driver.mapping.Mapper mapper; + + CassandraMutationMapper(com.datastax.driver.mapping.Mapper mapper) { + this.mapper = mapper; + } + + @Override + public Iterator map(ResultSet resultSet) { + throw new UnsupportedOperationException("Only supports write operations"); + } + + @Override + public Future deleteAsync(CassandraMutation entityClass) { + throw new UnsupportedOperationException("Only supports write operations"); + } + + /** + * Saves records to Cassandra with: - Cassandra's internal write time set to the timestamp of the + * record. Cassandra will not override an existing record with the same partition key if the write + * time is older - Expiration of the record + * + * @param entityClass Cassandra's object mapper + */ + @Override + public Future saveAsync(CassandraMutation entityClass) { + return mapper.saveAsync( + entityClass, + Option.timestamp(entityClass.getWriteTime()), + Option.ttl(entityClass.getTtl())); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapperFactory.java b/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapperFactory.java new file mode 100644 index 0000000000..9f34465099 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/CassandraMutationMapperFactory.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform; + +import com.datastax.driver.core.Session; +import com.datastax.driver.mapping.MappingManager; +import feast.store.serving.cassandra.CassandraMutation; +import org.apache.beam.sdk.io.cassandra.Mapper; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public class CassandraMutationMapperFactory implements SerializableFunction { + + private transient MappingManager mappingManager; + private Class entityClass; + + public CassandraMutationMapperFactory(Class entityClass) { + this.entityClass = entityClass; + } + + @Override + public Mapper apply(Session session) { + if (mappingManager == null) { + this.mappingManager = new MappingManager(session); + } + + return new CassandraMutationMapper(mappingManager.mapper(entityClass)); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java index b7901c2f90..e8aaf1437d 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java @@ -16,12 +16,14 @@ */ package feast.ingestion.transform; +import com.datastax.driver.core.Session; import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors; import com.google.api.services.bigquery.model.TableRow; import com.google.auto.value.AutoValue; import feast.core.FeatureSetProto.FeatureSet; import feast.core.StoreProto.Store; import feast.core.StoreProto.Store.BigQueryConfig; +import feast.core.StoreProto.Store.CassandraConfig; import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; import feast.ingestion.options.ImportOptions; @@ -29,11 +31,16 @@ import feast.ingestion.values.FailedElement; import feast.store.serving.bigquery.FeatureRowToTableRow; import feast.store.serving.bigquery.GetTableDestination; +import feast.store.serving.cassandra.CassandraMutation; +import feast.store.serving.cassandra.FeatureRowToCassandraMutationDoFn; import feast.store.serving.redis.FeatureRowToRedisMutationDoFn; import feast.store.serving.redis.RedisCustomIO; import feast.types.FeatureRowProto.FeatureRow; import java.io.IOException; +import java.util.Arrays; import java.util.Map; +import org.apache.beam.sdk.io.cassandra.CassandraIO; +import org.apache.beam.sdk.io.cassandra.Mapper; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; @@ -47,6 +54,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TypeDescriptors; @@ -151,6 +159,24 @@ public void processElement(ProcessContext context) { .build()); } break; + case CASSANDRA: + CassandraConfig cassandraConfig = getStore().getCassandraConfig(); + SerializableFunction mapperFactory = + new CassandraMutationMapperFactory(CassandraMutation.class); + input + .apply( + "Create CassandraMutation from FeatureRow", + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + getFeatureSetSpecs(), cassandraConfig.getDefaultTtl()))) + .apply( + CassandraIO.write() + .withHosts(Arrays.asList(cassandraConfig.getBootstrapHosts().split(","))) + .withPort(cassandraConfig.getPort()) + .withKeyspace(cassandraConfig.getKeyspace()) + .withEntity(CassandraMutation.class) + .withMapperFactoryFn(mapperFactory)); + break; default: log.error("Store type '{}' is not supported. No Feature Row will be written.", storeType); break; diff --git a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java index 7af98fb8f0..cfee48ad0c 100644 --- a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java +++ b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java @@ -18,6 +18,14 @@ import static feast.types.ValueProto.ValueType; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.schemabuilder.Create; +import com.datastax.driver.core.schemabuilder.KeyspaceOptions; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.datastax.driver.mapping.MappingManager; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.DatasetId; @@ -40,13 +48,19 @@ import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.FeatureSetProto.FeatureSpec; import feast.core.StoreProto.Store; +import feast.core.StoreProto.Store.CassandraConfig; import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; +import feast.store.serving.cassandra.CassandraMutation; import feast.types.ValueProto.ValueType.Enum; +import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import redis.clients.jedis.JedisPool; @@ -113,6 +127,9 @@ public static void setupStore(Store store, FeatureSet featureSet) { store.getBigqueryConfig().getDatasetId(), BigQueryOptions.getDefaultInstance().getService()); break; + case CASSANDRA: + StoreUtil.setupCassandra(store.getCassandraConfig()); + break; default: log.warn("Store type '{}' is unsupported", storeType); break; @@ -250,4 +267,74 @@ public static void checkRedisConnection(RedisConfig redisConfig) { } jedisPool.close(); } + + /** + * Ensures Cassandra is accessible, else throw a RuntimeException. Creates Cassandra keyspace and + * table if it does not already exist + * + * @param cassandraConfig Please refer to feast.core.Store proto + */ + public static void setupCassandra(CassandraConfig cassandraConfig) { + List contactPoints = + Arrays.stream(cassandraConfig.getBootstrapHosts().split(",")) + .map(host -> new InetSocketAddress(host, cassandraConfig.getPort())) + .collect(Collectors.toList()); + Cluster cluster = Cluster.builder().addContactPointsWithPorts(contactPoints).build(); + Session session; + + try { + String keyspace = cassandraConfig.getKeyspace(); + KeyspaceMetadata keyspaceMetadata = cluster.getMetadata().getKeyspace(keyspace); + if (keyspaceMetadata == null) { + log.info("Creating keyspace '{}'", keyspace); + Map replicationOptions = + cassandraConfig.getReplicationOptionsMap().entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + KeyspaceOptions createKeyspace = + SchemaBuilder.createKeyspace(keyspace) + .ifNotExists() + .with() + .replication(replicationOptions); + session = cluster.newSession(); + session.execute(createKeyspace); + } + + session = cluster.connect(keyspace); + // Currently no support for creating table from entity mapper: + // https://datastax-oss.atlassian.net/browse/JAVA-569 + Create createTable = + SchemaBuilder.createTable(keyspace, cassandraConfig.getTableName()) + .ifNotExists() + .addPartitionKey(CassandraMutation.ENTITIES, DataType.text()) + .addClusteringColumn(CassandraMutation.FEATURE, DataType.text()) + .addColumn(CassandraMutation.VALUE, DataType.blob()); + log.info("Create Cassandra table if not exists.."); + session.execute(createTable); + + validateCassandraTable(session); + + session.close(); + } catch (RuntimeException e) { + throw new RuntimeException( + String.format( + "Failed to connect to Cassandra at bootstrap hosts: '%s' port: '%s'. Please check that your Cassandra is running and accessible from Feast.", + contactPoints.stream() + .map(InetSocketAddress::getHostName) + .collect(Collectors.joining(",")), + cassandraConfig.getPort()), + e); + } + cluster.close(); + } + + private static void validateCassandraTable(Session session) { + try { + new MappingManager(session).mapper(CassandraMutation.class).getTableMetadata(); + } catch (RuntimeException e) { + throw new RuntimeException( + String.format( + "Table created does not match the datastax object mapper: %s", + CassandraMutation.class.getSimpleName())); + } + } } diff --git a/ingestion/src/main/java/feast/ingestion/utils/ValueUtil.java b/ingestion/src/main/java/feast/ingestion/utils/ValueUtil.java new file mode 100644 index 0000000000..87a327e772 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/utils/ValueUtil.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.utils; + +import feast.types.ValueProto.Value; + +/** + * Utility class for converting {@link Value} of different types to a string for storing as key in + * data stores + */ +public class ValueUtil { + + public static String toString(Value value) { + String strValue; + switch (value.getValCase()) { + case BYTES_VAL: + strValue = value.getBytesVal().toString(); + break; + case STRING_VAL: + strValue = value.getStringVal(); + break; + case INT32_VAL: + strValue = String.valueOf(value.getInt32Val()); + break; + case INT64_VAL: + strValue = String.valueOf(value.getInt64Val()); + break; + case DOUBLE_VAL: + strValue = String.valueOf(value.getDoubleVal()); + break; + case FLOAT_VAL: + strValue = String.valueOf(value.getFloatVal()); + break; + case BOOL_VAL: + strValue = String.valueOf(value.getBoolVal()); + break; + default: + throw new IllegalArgumentException( + String.format("toString method not supported for type %s", value.getValCase())); + } + return strValue; + } +} diff --git a/ingestion/src/main/java/feast/store/serving/cassandra/CassandraMutation.java b/ingestion/src/main/java/feast/store/serving/cassandra/CassandraMutation.java new file mode 100644 index 0000000000..23bbb9c3b3 --- /dev/null +++ b/ingestion/src/main/java/feast/store/serving/cassandra/CassandraMutation.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.store.serving.cassandra; + +import com.datastax.driver.mapping.annotations.ClusteringColumn; +import com.datastax.driver.mapping.annotations.Computed; +import com.datastax.driver.mapping.annotations.PartitionKey; +import com.datastax.driver.mapping.annotations.Table; +import feast.core.FeatureSetProto.EntitySpec; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.ingestion.utils.ValueUtil; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; + +/** + * Cassandra's object mapper that handles basic CRUD operations in Cassandra tables More info: + * https://docs.datastax.com/en/developer/java-driver/3.1/manual/object_mapper/ + */ +@DefaultCoder(value = AvroCoder.class) +@Table(name = "feature_store") +public final class CassandraMutation implements Serializable { + + public static final String ENTITIES = "entities"; + public static final String FEATURE = "feature"; + public static final String VALUE = "value"; + + @PartitionKey private final String entities; + + @ClusteringColumn private final String feature; + + private final ByteBuffer value; + + @Computed(value = "writetime(value)") + private final long writeTime; + + @Computed(value = "ttl(value)") + private final int ttl; + + // NoArgs constructor is needed when using Beam's CassandraIO withEntity and specifying this + // class, + // it looks for an init() method + CassandraMutation() { + this.entities = null; + this.feature = null; + this.value = null; + this.writeTime = 0; + this.ttl = 0; + } + + CassandraMutation(String entities, String feature, ByteBuffer value, long writeTime, int ttl) { + this.entities = entities; + this.feature = feature; + this.value = value; + this.writeTime = writeTime; + this.ttl = ttl; + } + + public long getWriteTime() { + return writeTime; + } + + public int getTtl() { + return ttl; + } + + static String keyFromFeatureRow(FeatureSetSpec featureSetSpec, FeatureRow featureRow) { + Set entityNames = + featureSetSpec.getEntitiesList().stream() + .map(EntitySpec::getName) + .collect(Collectors.toSet()); + List entities = new ArrayList<>(); + for (Field field : featureRow.getFieldsList()) { + if (entityNames.contains(field.getName())) { + entities.add(field); + } + } + return featureRow.getFeatureSet() + + ":" + + entities.stream() + .map(f -> f.getName() + "=" + ValueUtil.toString(f.getValue())) + .collect(Collectors.joining("|")); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o instanceof CassandraMutation) { + CassandraMutation that = (CassandraMutation) o; + return this.entities.equals(that.entities) + && this.feature.equals(that.feature) + && this.value.equals(that.value) + && this.writeTime == that.writeTime + && this.ttl == that.ttl; + } + return false; + } +} diff --git a/ingestion/src/main/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFn.java b/ingestion/src/main/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFn.java new file mode 100644 index 0000000000..177a8703e6 --- /dev/null +++ b/ingestion/src/main/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFn.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.store.serving.cassandra; + +import com.google.protobuf.Duration; +import com.google.protobuf.util.Timestamps; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.FeatureSetProto.FeatureSpec; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.sdk.transforms.DoFn; +import org.slf4j.Logger; + +public class FeatureRowToCassandraMutationDoFn extends DoFn { + + private static final Logger log = + org.slf4j.LoggerFactory.getLogger(FeatureRowToCassandraMutationDoFn.class); + private Map featureSetSpecs; + private Map maxAges; + + public FeatureRowToCassandraMutationDoFn(Map specs, Duration defaultTtl) { + this.featureSetSpecs = specs; + this.maxAges = new HashMap<>(); + for (FeatureSetSpec spec : specs.values()) { + String featureSetName = spec.getName() + ":" + spec.getVersion(); + if (spec.getMaxAge() != null && spec.getMaxAge().getSeconds() > 0) { + maxAges.put(featureSetName, Math.toIntExact(spec.getMaxAge().getSeconds())); + } else { + maxAges.put(featureSetName, Math.toIntExact(defaultTtl.getSeconds())); + } + } + } + + /** Output a Cassandra mutation object for every feature in the feature row. */ + @ProcessElement + public void processElement(ProcessContext context) { + FeatureRow featureRow = context.element(); + try { + FeatureSetSpec featureSetSpec = featureSetSpecs.get(featureRow.getFeatureSet()); + Set featureNames = + featureSetSpec.getFeaturesList().stream() + .map(FeatureSpec::getName) + .collect(Collectors.toSet()); + String key = CassandraMutation.keyFromFeatureRow(featureSetSpec, featureRow); + + Collection mutations = new ArrayList<>(); + for (Field field : featureRow.getFieldsList()) { + if (featureNames.contains(field.getName())) { + mutations.add( + new CassandraMutation( + key, + field.getName(), + ByteBuffer.wrap(field.getValue().toByteArray()), + Timestamps.toMicros(featureRow.getEventTimestamp()), + maxAges.get(featureRow.getFeatureSet()))); + } + } + + mutations.forEach(context::output); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } +} diff --git a/ingestion/src/test/java/feast/ingestion/transform/CassandraWriteToStoreIT.java b/ingestion/src/test/java/feast/ingestion/transform/CassandraWriteToStoreIT.java new file mode 100644 index 0000000000..da914ca175 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/CassandraWriteToStoreIT.java @@ -0,0 +1,248 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.StoreProto.Store; +import feast.core.StoreProto.Store.CassandraConfig; +import feast.core.StoreProto.Store.StoreType; +import feast.test.TestUtil; +import feast.test.TestUtil.LocalCassandra; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.Value; +import feast.types.ValueProto.ValueType.Enum; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.thrift.transport.TTransportException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +public class CassandraWriteToStoreIT implements Serializable { + private FeatureSetSpec featureSetSpec; + private FeatureRow row; + + class FakeCassandraWriteToStore extends WriteToStore { + + private FeatureSetSpec featureSetSpec; + + FakeCassandraWriteToStore(FeatureSetSpec featureSetSpec) { + this.featureSetSpec = featureSetSpec; + } + + @Override + public Store getStore() { + return Store.newBuilder() + .setType(StoreType.CASSANDRA) + .setName("SERVING") + .setCassandraConfig(getCassandraConfig()) + .build(); + } + + @Override + public Map getFeatureSetSpecs() { + return new HashMap() { + { + put(featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), featureSetSpec); + } + }; + } + } + + private static CassandraConfig getCassandraConfig() { + return CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setTableName("feature_store") + .setKeyspace("test") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "SimpleStrategy"); + put("replication_factor", "1"); + } + }) + .build(); + } + + @BeforeClass + public static void startServer() throws InterruptedException, IOException, TTransportException { + LocalCassandra.start(); + LocalCassandra.createKeyspaceAndTable(getCassandraConfig()); + } + + @Before + public void setUp() { + featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 10, + new HashMap() { + { + put("entity1", Enum.INT64); + put("entity2", Enum.STRING); + } + }, + new HashMap() { + { + put("feature1", Enum.INT64); + put("feature2", Enum.INT64); + } + }); + row = + TestUtil.createFeatureRow( + featureSetSpec, + 100, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("entity2", TestUtil.strValue("a")); + put("feature1", TestUtil.intValue(1)); + put("feature2", TestUtil.intValue(2)); + } + }); + } + + @Rule public transient TestPipeline testPipeline = TestPipeline.create(); + + @AfterClass + public static void cleanUp() { + LocalCassandra.stop(); + } + + @Test + public void testWriteCassandra_happyPath() throws InvalidProtocolBufferException { + PCollection input = testPipeline.apply(Create.of(row)); + + input.apply(new FakeCassandraWriteToStore(featureSetSpec)); + + testPipeline.run(); + + ResultSet resultSet = LocalCassandra.getSession().execute("SELECT * FROM test.feature_store"); + List actualResults = getResults(resultSet); + + List expectedFields = + Arrays.asList( + Field.newBuilder().setName("feature1").setValue(TestUtil.intValue(1)).build(), + Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(2)).build()); + + assertTrue(actualResults.containsAll(expectedFields)); + assertEquals(expectedFields.size(), actualResults.size()); + } + + @Test(timeout = 30000) + public void testWriteCassandra_shouldNotRetrieveExpiredValues() + throws InvalidProtocolBufferException { + // Set max age to 1 second + FeatureSetSpec featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 1, + new HashMap() { + { + put("entity1", Enum.INT64); + put("entity2", Enum.STRING); + } + }, + new HashMap() { + { + put("feature1", Enum.INT64); + put("feature2", Enum.INT64); + } + }); + + PCollection input = testPipeline.apply(Create.of(row)); + + input.apply(new FakeCassandraWriteToStore(featureSetSpec)); + + testPipeline.run(); + + while (true) { + ResultSet resultSet = + LocalCassandra.getSession() + .execute("SELECT feature, value, ttl(value) as expiry FROM test.feature_store"); + List results = getResults(resultSet); + if (results.isEmpty()) break; + } + } + + @Test + public void testWriteCassandra_shouldNotOverrideNewerValues() + throws InvalidProtocolBufferException { + FeatureRow olderRow = + TestUtil.createFeatureRow( + featureSetSpec, + 10, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("entity2", TestUtil.strValue("a")); + put("feature1", TestUtil.intValue(3)); + put("feature2", TestUtil.intValue(4)); + } + }); + + PCollection input = testPipeline.apply(Create.of(row, olderRow)); + + input.apply(new FakeCassandraWriteToStore(featureSetSpec)); + + testPipeline.run(); + + ResultSet resultSet = LocalCassandra.getSession().execute("SELECT * FROM test.feature_store"); + List actualResults = getResults(resultSet); + + List expectedFields = + Arrays.asList( + Field.newBuilder().setName("feature1").setValue(TestUtil.intValue(1)).build(), + Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(2)).build()); + + assertTrue(actualResults.containsAll(expectedFields)); + assertEquals(expectedFields.size(), actualResults.size()); + } + + private List getResults(ResultSet resultSet) throws InvalidProtocolBufferException { + List results = new ArrayList<>(); + while (!resultSet.isExhausted()) { + Row row = resultSet.one(); + results.add( + Field.newBuilder() + .setName(row.getString("feature")) + .setValue(Value.parseFrom(row.getBytes("value"))) + .build()); + } + return results; + } +} diff --git a/ingestion/src/test/java/feast/ingestion/util/CassandraStoreUtilIT.java b/ingestion/src/test/java/feast/ingestion/util/CassandraStoreUtilIT.java new file mode 100644 index 0000000000..8c6874ecac --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/util/CassandraStoreUtilIT.java @@ -0,0 +1,167 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.util; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.schemabuilder.Create; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import feast.core.StoreProto.Store.CassandraConfig; +import feast.ingestion.utils.StoreUtil; +import feast.store.serving.cassandra.CassandraMutation; +import feast.test.TestUtil.LocalCassandra; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.thrift.transport.TTransportException; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class CassandraStoreUtilIT { + + @BeforeClass + public static void startServer() throws InterruptedException, IOException, TTransportException { + LocalCassandra.start(); + } + + @After + public void teardown() { + LocalCassandra.stop(); + } + + @Test + public void setupCassandra_shouldCreateKeyspaceAndTable() { + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("feature_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "NetworkTopologyStrategy"); + put("dc1", "2"); + put("dc2", "3"); + } + }) + .build(); + StoreUtil.setupCassandra(config); + + Map actualReplication = + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getReplication(); + Map expectedReplication = + new HashMap() { + { + put("class", "org.apache.cassandra.locator.NetworkTopologyStrategy"); + put("dc1", "2"); + put("dc2", "3"); + } + }; + TableMetadata tableMetadata = + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getTable("feature_store"); + + Assert.assertEquals(expectedReplication, actualReplication); + Assert.assertNotNull(tableMetadata); + } + + @Test + public void setupCassandra_shouldBeIdempotent_whenTableAlreadyExistsAndSchemaMatches() { + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("feature_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "SimpleStrategy"); + put("replication_factor", "2"); + } + }) + .build(); + + LocalCassandra.createKeyspaceAndTable(config); + + // Check table is created + Assert.assertNotNull( + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getTable("feature_store")); + + StoreUtil.setupCassandra(config); + + Assert.assertNotNull( + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getTable("feature_store")); + } + + @Test(expected = RuntimeException.class) + public void setupCassandra_shouldThrowException_whenTableNameDoesNotMatchObjectMapper() { + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("test_data_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "NetworkTopologyStrategy"); + put("dc1", "2"); + put("dc2", "3"); + } + }) + .build(); + StoreUtil.setupCassandra(config); + } + + @Test(expected = RuntimeException.class) + public void setupCassandra_shouldThrowException_whenTableSchemaDoesNotMatchObjectMapper() { + LocalCassandra.getSession() + .execute( + "CREATE KEYSPACE test " + + "WITH REPLICATION = {" + + "'class': 'SimpleStrategy', 'replication_factor': 2 }"); + + Create createTable = + SchemaBuilder.createTable("test", "feature_store") + .ifNotExists() + .addPartitionKey(CassandraMutation.ENTITIES, DataType.text()) + .addClusteringColumn( + "featureName", DataType.text()) // Column name does not match in CassandraMutation + .addColumn(CassandraMutation.VALUE, DataType.blob()); + LocalCassandra.getSession().execute(createTable); + + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("feature_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "SimpleStrategy"); + put("replication_factor", "2"); + } + }) + .build(); + + StoreUtil.setupCassandra(config); + } +} diff --git a/ingestion/src/test/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFnTest.java b/ingestion/src/test/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFnTest.java new file mode 100644 index 0000000000..412c6488f2 --- /dev/null +++ b/ingestion/src/test/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFnTest.java @@ -0,0 +1,225 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.store.serving.cassandra; + +import com.google.protobuf.Duration; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.test.TestUtil; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.ValueProto.Value; +import feast.types.ValueProto.ValueType.Enum; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.HashMap; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; + +public class FeatureRowToCassandraMutationDoFnTest implements Serializable { + + @Rule public transient TestPipeline testPipeline = TestPipeline.create(); + + @Test + public void processElement_shouldCreateCassandraMutation_givenFeatureRow() { + FeatureSetSpec featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 10, + new HashMap() { + { + put("entity1", Enum.INT64); + } + }, + new HashMap() { + { + put("feature1", Enum.STRING); + } + }); + FeatureRow featureRow = + TestUtil.createFeatureRow( + featureSetSpec, + 10, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("feature1", TestUtil.strValue("a")); + } + }); + + PCollection input = testPipeline.apply(Create.of(featureRow)); + + PCollection output = + input.apply( + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + new HashMap() { + { + put( + featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), + featureSetSpec); + } + }, + Duration.newBuilder().setSeconds(0).build()))); + + CassandraMutation[] expected = + new CassandraMutation[] { + new CassandraMutation( + "fs:1:entity1=1", + "feature1", + ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), + 10000000, + 10) + }; + + PAssert.that(output).containsInAnyOrder(expected); + + testPipeline.run(); + } + + @Test + public void + processElement_shouldCreateCassandraMutations_givenFeatureRowWithMultipleEntitiesAndFeatures() { + FeatureSetSpec featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 10, + new HashMap() { + { + put("entity1", Enum.INT64); + put("entity2", Enum.STRING); + } + }, + new HashMap() { + { + put("feature1", Enum.STRING); + put("feature2", Enum.INT64); + } + }); + FeatureRow featureRow = + TestUtil.createFeatureRow( + featureSetSpec, + 10, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("entity2", TestUtil.strValue("b")); + put("feature1", TestUtil.strValue("a")); + put("feature2", TestUtil.intValue(2)); + } + }); + + PCollection input = testPipeline.apply(Create.of(featureRow)); + + PCollection output = + input.apply( + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + new HashMap() { + { + put( + featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), + featureSetSpec); + } + }, + Duration.newBuilder().setSeconds(0).build()))); + + CassandraMutation[] expected = + new CassandraMutation[] { + new CassandraMutation( + "fs:1:entity1=1|entity2=b", + "feature1", + ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), + 10000000, + 10), + new CassandraMutation( + "fs:1:entity1=1|entity2=b", + "feature2", + ByteBuffer.wrap(TestUtil.intValue(2).toByteArray()), + 10000000, + 10) + }; + + PAssert.that(output).containsInAnyOrder(expected); + + testPipeline.run(); + } + + @Test + public void processElement_shouldUseDefaultMaxAge_whenMissingMaxAge() { + Duration defaultTtl = Duration.newBuilder().setSeconds(500).build(); + FeatureSetSpec featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 0, + new HashMap() { + { + put("entity1", Enum.INT64); + } + }, + new HashMap() { + { + put("feature1", Enum.STRING); + } + }); + FeatureRow featureRow = + TestUtil.createFeatureRow( + featureSetSpec, + 10, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("feature1", TestUtil.strValue("a")); + } + }); + + PCollection input = testPipeline.apply(Create.of(featureRow)); + + PCollection output = + input.apply( + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + new HashMap() { + { + put( + featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), + featureSetSpec); + } + }, + defaultTtl))); + + CassandraMutation[] expected = + new CassandraMutation[] { + new CassandraMutation( + "fs:1:entity1=1", + "feature1", + ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), + 10000000, + 500) + }; + + PAssert.that(output).containsInAnyOrder(expected); + + testPipeline.run(); + } +} diff --git a/ingestion/src/test/java/feast/test/TestUtil.java b/ingestion/src/test/java/feast/test/TestUtil.java index 5c16d7e9e3..d9ce882e9d 100644 --- a/ingestion/src/test/java/feast/test/TestUtil.java +++ b/ingestion/src/test/java/feast/test/TestUtil.java @@ -18,10 +18,15 @@ import static feast.ingestion.utils.SpecUtil.getFeatureSetReference; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import com.google.protobuf.util.Timestamps; import feast.core.FeatureSetProto.FeatureSet; +import feast.core.StoreProto.Store.CassandraConfig; import feast.ingestion.transform.WriteToStore; +import feast.ingestion.utils.StoreUtil; import feast.storage.RedisProto.RedisKey; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FeatureRowProto.FeatureRow.Builder; @@ -35,13 +40,18 @@ import feast.types.ValueProto.StringList; import feast.types.ValueProto.Value; import feast.types.ValueProto.ValueType; +import feast.types.ValueProto.ValueType.Enum; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; import org.apache.beam.sdk.PipelineResult; @@ -53,8 +63,10 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.cassandraunit.utils.EmbeddedCassandraServerHelper; import org.joda.time.Duration; import redis.embedded.RedisServer; @@ -83,6 +95,37 @@ public static void stop() { } } + public static class LocalCassandra { + + public static void start() throws InterruptedException, IOException, TTransportException { + EmbeddedCassandraServerHelper.startEmbeddedCassandra(); + } + + public static void createKeyspaceAndTable(CassandraConfig config) { + StoreUtil.setupCassandra(config); + } + + public static String getHost() { + return EmbeddedCassandraServerHelper.getHost(); + } + + public static int getPort() { + return EmbeddedCassandraServerHelper.getNativeTransportPort(); + } + + public static Cluster getCluster() { + return EmbeddedCassandraServerHelper.getCluster(); + } + + public static Session getSession() { + return EmbeddedCassandraServerHelper.getSession(); + } + + public static void stop() { + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra(); + } + } + public static class LocalKafka { private static KafkaServerStartable server; @@ -165,6 +208,85 @@ public static void publishFeatureRowsToKafka( }); } + /** + * Create a Feature Set Spec. + * + * @param name name of the feature set + * @param version version of the feature set + * @param maxAgeSeconds max age + * @param entities entities provided as map of string to {@link Enum} + * @param features features provided as map of string to {@link Enum} + * @return {@link FeatureSetSpec} + */ + public static FeatureSetSpec createFeatureSetSpec( + String name, + int version, + int maxAgeSeconds, + Map entities, + Map features) { + FeatureSetSpec.Builder featureSetSpec = + FeatureSetSpec.newBuilder() + .setName(name) + .setVersion(version) + .setMaxAge(com.google.protobuf.Duration.newBuilder().setSeconds(maxAgeSeconds).build()); + + for (Entry entity : entities.entrySet()) { + featureSetSpec.addEntities( + EntitySpec.newBuilder().setName(entity.getKey()).setValueType(entity.getValue()).build()); + } + + for (Entry feature : features.entrySet()) { + featureSetSpec.addFeatures( + FeatureSpec.newBuilder() + .setName(feature.getKey()) + .setValueType(feature.getValue()) + .build()); + } + + return featureSetSpec.build(); + } + + /** + * Create a Feature Row. + * + * @param featureSetSpec {@link FeatureSetSpec} + * @param timestampSeconds timestamp given in seconds + * @param fields fields provided as a map name to {@link Value} + * @return {@link FeatureRow} + */ + public static FeatureRow createFeatureRow( + FeatureSetSpec featureSetSpec, long timestampSeconds, Map fields) { + List featureNames = + featureSetSpec.getFeaturesList().stream() + .map(FeatureSpec::getName) + .collect(Collectors.toList()); + List entityNames = + featureSetSpec.getEntitiesList().stream() + .map(EntitySpec::getName) + .collect(Collectors.toList()); + List requiredFields = + Stream.concat(featureNames.stream(), entityNames.stream()).collect(Collectors.toList()); + + if (fields.keySet().containsAll(requiredFields)) { + FeatureRow.Builder featureRow = + FeatureRow.newBuilder() + .setFeatureSet(featureSetSpec.getName() + ":" + featureSetSpec.getVersion()) + .setEventTimestamp(Timestamp.newBuilder().setSeconds(timestampSeconds).build()); + for (Entry field : fields.entrySet()) { + featureRow.addFields( + Field.newBuilder().setName(field.getKey()).setValue(field.getValue()).build()); + } + return featureRow.build(); + } else { + String missingFields = + requiredFields.stream() + .filter(f -> !fields.keySet().contains(f)) + .collect(Collectors.joining(",")); + throw new IllegalArgumentException( + "FeatureRow is missing some fields defined in FeatureSetSpec: " + missingFields); + } + } + /** * Create a Feature Row with random value according to the FeatureSetSpec * @@ -418,4 +540,12 @@ public static void waitUntilAllElementsAreWrittenToStore( } } } + + public static Value intValue(int val) { + return Value.newBuilder().setInt64Val(val).build(); + } + + public static Value strValue(String val) { + return Value.newBuilder().setStringVal(val).build(); + } } diff --git a/pom.xml b/pom.xml index 821d3b7232..3426c73510 100644 --- a/pom.xml +++ b/pom.xml @@ -203,7 +203,7 @@ com.google.guava guava - 26.0-jre + 25.0-jre com.google.protobuf diff --git a/protos/feast/core/Store.proto b/protos/feast/core/Store.proto index 931a9d46b6..ef98f023c1 100644 --- a/protos/feast/core/Store.proto +++ b/protos/feast/core/Store.proto @@ -17,6 +17,8 @@ syntax = "proto3"; package feast.core; +import "google/protobuf/duration.proto"; + option java_package = "feast.core"; option java_outer_classname = "StoreProto"; option go_package = "github.com/gojek/feast/sdk/go/protos/feast/core"; @@ -103,7 +105,20 @@ message Store { // BIGQUERY = 2; - // Unsupported in Feast 0.3 + // Cassandra stores entities as a string partition key, feature as clustering column. + // NOTE: This store currently uses max_age defined in FeatureSet for ttl + // + // Columns: + // - entities: concatenated string of feature set name and all entities' keys and values + // entities concatenated format - [feature_set]:[entity_name1=entity_value1]|[entity_name2=entity_value2] + // TODO: string representation of float or double types may have different value in different runtime or platform + // - feature: clustering column where each feature is a column + // - value: byte array of Value (refer to feast.types.Value) + // + // Internal columns: + // - writeTime: timestamp of the written record. This is used to ensure that new records are not replaced + // by older ones + // - ttl: expiration time the record. Currently using max_age from feature set spec as ttl CASSANDRA = 3; } @@ -123,8 +138,22 @@ message Store { } message CassandraConfig { - string host = 1; + // - bootstrapHosts: [comma delimited value of hosts] + string bootstrap_hosts = 1; int32 port = 2; + string keyspace = 3; + + // Please note that table name must be "feature_store" as is specified in the @Table annotation of the + // datastax object mapper + string table_name = 4; + + // This specifies the replication strategy to use. Please refer to docs for more details: + // https://docs.datastax.com/en/dse/6.7/cql/cql/cql_reference/cql_commands/cqlCreateKeyspace.html#cqlCreateKeyspace__cqlCreateKeyspacereplicationmap-Pr3yUQ7t + map replication_options = 5; + + // Default expiration in seconds to use when FeatureSetSpec does not have max_age defined. + // Specify 0 for no default expiration + google.protobuf.Duration default_ttl = 6; } message Subscription { diff --git a/serving/pom.xml b/serving/pom.xml index be573be45c..0b36ad40e5 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -143,7 +143,19 @@ redis.clients jedis - + + + com.datastax.cassandra + cassandra-driver-core + 3.4.0 + + + io.netty + * + + + + com.google.guava guava @@ -229,6 +241,13 @@ spring-boot-starter-test test + + + org.cassandraunit + cassandra-unit-shaded + 3.11.2.0 + test + diff --git a/serving/sample_cassandra_config.yml b/serving/sample_cassandra_config.yml new file mode 100644 index 0000000000..ca3d4cbbcc --- /dev/null +++ b/serving/sample_cassandra_config.yml @@ -0,0 +1,13 @@ +name: serving +type: CASSANDRA +cassandra_config: + bootstrap_hosts: localhost + port: 9042 + keyspace: feast + table_name: feature_store + replication_options: + class: SimpleStrategy + replication_factor: 1 +subscriptions: + - name: "*" + version: ">0" diff --git a/serving/src/main/java/feast/serving/FeastProperties.java b/serving/src/main/java/feast/serving/FeastProperties.java index 505d7d0330..52db658ca4 100644 --- a/serving/src/main/java/feast/serving/FeastProperties.java +++ b/serving/src/main/java/feast/serving/FeastProperties.java @@ -85,6 +85,15 @@ public static class StoreProperties { private String configPath; private int redisPoolMaxSize; private int redisPoolMaxIdle; + private int cassandraPoolCoreLocalConnections; + private int cassandraPoolMaxLocalConnections; + private int cassandraPoolCoreRemoteConnections; + private int cassandraPoolMaxRemoteConnections; + private int cassandraPoolMaxRequestsLocalConnection; + private int cassandraPoolMaxRequestsRemoteConnection; + private int cassandraPoolNewLocalConnectionThreshold; + private int cassandraPoolNewRemoteConnectionThreshold; + private int cassandraPoolTimeoutMillis; public String getConfigPath() { return this.configPath; @@ -98,6 +107,42 @@ public int getRedisPoolMaxIdle() { return this.redisPoolMaxIdle; } + public int getCassandraPoolCoreLocalConnections() { + return this.cassandraPoolCoreLocalConnections; + } + + public int getCassandraPoolMaxLocalConnections() { + return this.cassandraPoolMaxLocalConnections; + } + + public int getCassandraPoolCoreRemoteConnections() { + return this.cassandraPoolCoreRemoteConnections; + } + + public int getCassandraPoolMaxRemoteConnections() { + return this.cassandraPoolMaxRemoteConnections; + } + + public int getCassandraPoolMaxRequestsLocalConnection() { + return this.cassandraPoolMaxRequestsLocalConnection; + } + + public int getCassandraPoolMaxRequestsRemoteConnection() { + return this.cassandraPoolMaxRequestsRemoteConnection; + } + + public int getCassandraPoolNewLocalConnectionThreshold() { + return this.cassandraPoolNewLocalConnectionThreshold; + } + + public int getCassandraPoolNewRemoteConnectionThreshold() { + return this.cassandraPoolNewRemoteConnectionThreshold; + } + + public int getCassandraPoolTimeoutMillis() { + return this.cassandraPoolTimeoutMillis; + } + public void setConfigPath(String configPath) { this.configPath = configPath; } @@ -109,6 +154,46 @@ public void setRedisPoolMaxSize(int redisPoolMaxSize) { public void setRedisPoolMaxIdle(int redisPoolMaxIdle) { this.redisPoolMaxIdle = redisPoolMaxIdle; } + + public void setCassandraPoolCoreLocalConnections(int cassandraPoolCoreLocalConnections) { + this.cassandraPoolCoreLocalConnections = cassandraPoolCoreLocalConnections; + } + + public void setCassandraPoolMaxLocalConnections(int cassandraPoolMaxLocalConnections) { + this.cassandraPoolMaxLocalConnections = cassandraPoolMaxLocalConnections; + } + + public void setCassandraPoolCoreRemoteConnections(int cassandraPoolCoreRemoteConnections) { + this.cassandraPoolCoreRemoteConnections = cassandraPoolCoreRemoteConnections; + } + + public void setCassandraPoolMaxRemoteConnections(int cassandraPoolMaxRemoteConnections) { + this.cassandraPoolMaxRemoteConnections = cassandraPoolMaxRemoteConnections; + } + + public void setCassandraPoolMaxRequestsLocalConnection( + int cassandraPoolMaxRequestsLocalConnection) { + this.cassandraPoolMaxRequestsLocalConnection = cassandraPoolMaxRequestsLocalConnection; + } + + public void setCassandraPoolMaxRequestsRemoteConnection( + int cassandraPoolMaxRequestsRemoteConnection) { + this.cassandraPoolMaxRequestsRemoteConnection = cassandraPoolMaxRequestsRemoteConnection; + } + + public void setCassandraPoolNewLocalConnectionThreshold( + int cassandraPoolNewLocalConnectionThreshold) { + this.cassandraPoolNewLocalConnectionThreshold = cassandraPoolNewLocalConnectionThreshold; + } + + public void setCassandraPoolNewRemoteConnectionThreshold( + int cassandraPoolNewRemoteConnectionThreshold) { + this.cassandraPoolNewRemoteConnectionThreshold = cassandraPoolNewRemoteConnectionThreshold; + } + + public void setCassandraPoolTimeoutMillis(int cassandraPoolTimeoutMillis) { + this.cassandraPoolTimeoutMillis = cassandraPoolTimeoutMillis; + } } public static class JobProperties { diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java index 3cc115978a..163823c637 100644 --- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java @@ -16,23 +16,37 @@ */ package feast.serving.configuration; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.Session; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import feast.core.StoreProto.Store; import feast.core.StoreProto.Store.BigQueryConfig; +import feast.core.StoreProto.Store.Builder; +import feast.core.StoreProto.Store.CassandraConfig; import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.Subscription; import feast.serving.FeastProperties; import feast.serving.service.BigQueryServingService; +import feast.serving.FeastProperties.JobProperties; +import feast.serving.FeastProperties.StoreProperties; +import feast.serving.service.CachedSpecService; +import feast.serving.service.CassandraServingService; import feast.serving.service.JobService; import feast.serving.service.NoopJobService; import feast.serving.service.RedisServingService; import feast.serving.service.ServingService; import feast.serving.specs.CachedSpecService; import io.opentracing.Tracer; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -61,6 +75,13 @@ private Store setStoreConfig(Store.Builder builder, Map options) .build(); return builder.setBigqueryConfig(bqConfig).build(); case CASSANDRA: + CassandraConfig cassandraConfig = + CassandraConfig.newBuilder() + .setBootstrapHosts(options.get("host")) + .setPort(Integer.parseInt(options.get("port"))) + .setKeyspace(options.get("keyspace")) + .build(); + return builder.setCassandraConfig(cassandraConfig).build(); default: throw new IllegalArgumentException( String.format( @@ -122,6 +143,47 @@ public ServingService servingService( storage); break; case CASSANDRA: + StoreProperties storeProperties = feastProperties.getStore(); + PoolingOptions poolingOptions = new PoolingOptions(); + poolingOptions.setCoreConnectionsPerHost( + HostDistance.LOCAL, storeProperties.getCassandraPoolCoreLocalConnections()); + poolingOptions.setCoreConnectionsPerHost( + HostDistance.REMOTE, storeProperties.getCassandraPoolCoreRemoteConnections()); + poolingOptions.setMaxConnectionsPerHost( + HostDistance.LOCAL, storeProperties.getCassandraPoolMaxLocalConnections()); + poolingOptions.setMaxConnectionsPerHost( + HostDistance.REMOTE, storeProperties.getCassandraPoolMaxRemoteConnections()); + poolingOptions.setMaxRequestsPerConnection( + HostDistance.LOCAL, storeProperties.getCassandraPoolMaxRequestsLocalConnection()); + poolingOptions.setMaxRequestsPerConnection( + HostDistance.REMOTE, storeProperties.getCassandraPoolMaxRequestsRemoteConnection()); + poolingOptions.setNewConnectionThreshold( + HostDistance.LOCAL, storeProperties.getCassandraPoolNewLocalConnectionThreshold()); + poolingOptions.setNewConnectionThreshold( + HostDistance.REMOTE, storeProperties.getCassandraPoolNewRemoteConnectionThreshold()); + poolingOptions.setPoolTimeoutMillis(storeProperties.getCassandraPoolTimeoutMillis()); + CassandraConfig cassandraConfig = store.getCassandraConfig(); + List contactPoints = + Arrays.stream(cassandraConfig.getBootstrapHosts().split(",")) + .map(h -> new InetSocketAddress(h, cassandraConfig.getPort())) + .collect(Collectors.toList()); + Cluster cluster = + Cluster.builder() + .addContactPointsWithPorts(contactPoints) + .withPoolingOptions(poolingOptions) + .build(); + // Session in Cassandra is thread-safe and maintains connections to cluster nodes internally + // Recommended to use one session per keyspace instead of open and close connection for each + // request + Session session = cluster.connect(); + servingService = + new CassandraServingService( + session, + cassandraConfig.getKeyspace(), + cassandraConfig.getTableName(), + specService, + tracer); + break; case UNRECOGNIZED: case INVALID: throw new IllegalArgumentException( diff --git a/serving/src/main/java/feast/serving/service/CassandraServingService.java b/serving/src/main/java/feast/serving/service/CassandraServingService.java new file mode 100644 index 0000000000..e9a7dff8ac --- /dev/null +++ b/serving/src/main/java/feast/serving/service/CassandraServingService.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.service; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Timestamp; +import feast.serving.ServingAPIProto.FeatureSetRequest; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; +import feast.serving.util.ValueUtil; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.Value; +import io.opentracing.Scope; +import io.opentracing.Tracer; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class CassandraServingService extends OnlineServingService { + + private final Session session; + private final String keyspace; + private final String tableName; + private final Tracer tracer; + + public CassandraServingService( + Session session, + String keyspace, + String tableName, + CachedSpecService specService, + Tracer tracer) { + super(specService, tracer); + this.session = session; + this.keyspace = keyspace; + this.tableName = tableName; + this.tracer = tracer; + } + + @Override + List createLookupKeys( + List featureSetEntityNames, + List entityRows, + FeatureSetRequest featureSetRequest) { + try (Scope scope = tracer.buildSpan("Cassandra-makeCassandraKeys").startActive(true)) { + String featureSetId = + String.format("%s:%s", featureSetRequest.getName(), featureSetRequest.getVersion()); + return entityRows.stream() + .map(row -> createCassandraKey(featureSetId, featureSetEntityNames, row)) + .collect(Collectors.toList()); + } + } + + @Override + protected boolean isEmpty(ResultSet response) { + return response.isExhausted(); + } + + /** + * Send a list of get request as an mget + * + * @param keys list of string keys + * @return list of {@link FeatureRow} in primitive byte representation for each key + */ + @Override + protected List getAll(List keys) { + List results = new ArrayList<>(); + for (String key : keys) { + results.add( + session.execute( + QueryBuilder.select() + .column("entities") + .column("feature") + .column("value") + .writeTime("value") + .as("writetime") + .from(keyspace, tableName) + .where(QueryBuilder.eq("entities", key)))); + } + return results; + } + + @Override + FeatureRow parseResponse(ResultSet resultSet) { + List fields = new ArrayList<>(); + Instant instant = Instant.now(); + while (!resultSet.isExhausted()) { + Row row = resultSet.one(); + long microSeconds = row.getLong("writetime"); + instant = + Instant.ofEpochSecond( + TimeUnit.MICROSECONDS.toSeconds(microSeconds), + TimeUnit.MICROSECONDS.toNanos( + Math.floorMod(microSeconds, TimeUnit.SECONDS.toMicros(1)))); + try { + fields.add( + Field.newBuilder() + .setName(row.getString("feature")) + .setValue(Value.parseFrom(ByteBuffer.wrap(row.getBytes("value").array()))) + .build()); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } + return FeatureRow.newBuilder() + .addAllFields(fields) + .setEventTimestamp( + Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build()) + .build(); + } + + /** + * Create cassandra keys + * + * @param featureSet featureSet reference of the feature. E.g. feature_set_1:1 + * @param featureSetEntityNames entity names that belong to the featureSet + * @param entityRow entityRow to build the key from + * @return String + */ + private static String createCassandraKey( + String featureSet, List featureSetEntityNames, EntityRow entityRow) { + Map fieldsMap = entityRow.getFieldsMap(); + List res = new ArrayList<>(); + for (String entityName : featureSetEntityNames) { + res.add(entityName + "=" + ValueUtil.toString(fieldsMap.get(entityName))); + } + return featureSet + ":" + String.join("|", res); + } +} diff --git a/serving/src/main/java/feast/serving/service/OnlineServingService.java b/serving/src/main/java/feast/serving/service/OnlineServingService.java new file mode 100644 index 0000000000..699d48c121 --- /dev/null +++ b/serving/src/main/java/feast/serving/service/OnlineServingService.java @@ -0,0 +1,236 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.service; + +import static feast.serving.util.Metrics.missingKeyCount; +import static feast.serving.util.Metrics.requestCount; +import static feast.serving.util.Metrics.requestLatency; +import static feast.serving.util.Metrics.staleKeyCount; + +import com.google.common.collect.Maps; +import com.google.protobuf.Duration; +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.FeatureSetProto.EntitySpec; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.serving.ServingAPIProto.FeastServingType; +import feast.serving.ServingAPIProto.FeatureSetRequest; +import feast.serving.ServingAPIProto.GetBatchFeaturesRequest; +import feast.serving.ServingAPIProto.GetBatchFeaturesResponse; +import feast.serving.ServingAPIProto.GetFeastServingInfoRequest; +import feast.serving.ServingAPIProto.GetFeastServingInfoResponse; +import feast.serving.ServingAPIProto.GetJobRequest; +import feast.serving.ServingAPIProto.GetJobResponse; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.ValueProto.Value; +import io.grpc.Status; +import io.opentracing.Scope; +import io.opentracing.Tracer; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +abstract class OnlineServingService implements ServingService { + + private final CachedSpecService specService; + private final Tracer tracer; + + OnlineServingService(CachedSpecService specService, Tracer tracer) { + this.specService = specService; + this.tracer = tracer; + } + + @Override + public GetFeastServingInfoResponse getFeastServingInfo( + GetFeastServingInfoRequest getFeastServingInfoRequest) { + return GetFeastServingInfoResponse.newBuilder() + .setType(FeastServingType.FEAST_SERVING_TYPE_ONLINE) + .build(); + } + + @Override + public GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeaturesRequest) { + throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException(); + } + + @Override + public GetJobResponse getJob(GetJobRequest getJobRequest) { + throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException(); + } + + /** {@inheritDoc} */ + @Override + public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest request) { + try (Scope scope = + tracer.buildSpan("OnlineServingService-getOnlineFeatures").startActive(true)) { + long startTime = System.currentTimeMillis(); + GetOnlineFeaturesResponse.Builder getOnlineFeaturesResponseBuilder = + GetOnlineFeaturesResponse.newBuilder(); + + List entityRows = request.getEntityRowsList(); + Map> featureValuesMap = + entityRows.stream() + .collect(Collectors.toMap(er -> er, er -> Maps.newHashMap(er.getFieldsMap()))); + + List featureSetRequests = request.getFeatureSetsList(); + for (FeatureSetRequest featureSetRequest : featureSetRequests) { + + FeatureSetSpec featureSetSpec = + specService.getFeatureSet(featureSetRequest.getName(), featureSetRequest.getVersion()); + + List featureSetEntityNames = + featureSetSpec.getEntitiesList().stream() + .map(EntitySpec::getName) + .collect(Collectors.toList()); + + Duration defaultMaxAge = featureSetSpec.getMaxAge(); + if (featureSetRequest.getMaxAge().equals(Duration.getDefaultInstance())) { + featureSetRequest = featureSetRequest.toBuilder().setMaxAge(defaultMaxAge).build(); + } + + sendAndProcessMultiGet( + createLookupKeys(featureSetEntityNames, entityRows, featureSetRequest), + entityRows, + featureValuesMap, + featureSetRequest); + } + List fieldValues = + featureValuesMap.values().stream() + .map(m -> FieldValues.newBuilder().putAllFields(m).build()) + .collect(Collectors.toList()); + requestLatency.labels("getOnlineFeatures").observe(System.currentTimeMillis() - startTime); + return getOnlineFeaturesResponseBuilder.addAllFieldValues(fieldValues).build(); + } + } + + /** + * Create lookup keys for corresponding data stores + * + * @param featureSetEntityNames list of entity names + * @param entityRows list of {@link EntityRow} + * @param featureSetRequest {@link FeatureSetRequest} + * @return list of {@link LookupKeyType} + */ + abstract List createLookupKeys( + List featureSetEntityNames, + List entityRows, + FeatureSetRequest featureSetRequest); + + /** + * Checks whether the response is empty, i.e. feature does not exist in the store + * + * @param response {@link ResponseType} + * @return boolean + */ + protected abstract boolean isEmpty(ResponseType response); + + /** + * Send a list of get requests + * + * @param keys list of {@link LookupKeyType} + * @return list of {@link ResponseType} + */ + protected abstract List getAll(List keys); + + /** + * Parse response from data store to FeatureRow + * + * @param response {@link ResponseType} + * @return {@link FeatureRow} + */ + abstract FeatureRow parseResponse(ResponseType response) throws InvalidProtocolBufferException; + + private List getResponses(List keys) { + try (Scope scope = tracer.buildSpan("OnlineServingService-sendMultiGet").startActive(true)) { + long startTime = System.currentTimeMillis(); + try { + return getAll(keys); + } catch (Exception e) { + throw Status.NOT_FOUND + .withDescription("Unable to retrieve feature from online store") + .withCause(e) + .asRuntimeException(); + } finally { + requestLatency.labels("sendMultiGet").observe(System.currentTimeMillis() - startTime); + } + } + } + + private void sendAndProcessMultiGet( + List keys, + List entityRows, + Map> featureValuesMap, + FeatureSetRequest featureSetRequest) { + List responses = getResponses(keys); + + long startTime = System.currentTimeMillis(); + try (Scope scope = tracer.buildSpan("OnlineServingService-processResponse").startActive(true)) { + String featureSetId = + String.format("%s:%d", featureSetRequest.getName(), featureSetRequest.getVersion()); + Map nullValues = + featureSetRequest.getFeatureNamesList().stream() + .collect( + Collectors.toMap( + name -> featureSetId + ":" + name, name -> Value.newBuilder().build())); + for (int i = 0; i < responses.size(); i++) { + EntityRow entityRow = entityRows.get(i); + Map featureValues = featureValuesMap.get(entityRow); + try { + ResponseType response = responses.get(i); + if (isEmpty(response)) { + missingKeyCount.labels(featureSetRequest.getName()).inc(); + featureValues.putAll(nullValues); + continue; + } + + FeatureRow featureRow = parseResponse(response); + boolean stale = isStale(featureSetRequest, entityRow, featureRow); + if (stale) { + staleKeyCount.labels(featureSetRequest.getName()).inc(); + featureValues.putAll(nullValues); + continue; + } + + requestCount.labels(featureSetRequest.getName()).inc(); + featureRow.getFieldsList().stream() + .filter(f -> featureSetRequest.getFeatureNamesList().contains(f.getName())) + .forEach(f -> featureValues.put(featureSetId + ":" + f.getName(), f.getValue())); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } + } finally { + requestLatency.labels("processResponse").observe(System.currentTimeMillis() - startTime); + } + } + + private static boolean isStale( + FeatureSetRequest featureSetRequest, EntityRow entityRow, FeatureRow featureRow) { + if (featureSetRequest.getMaxAge().equals(Duration.getDefaultInstance())) { + return false; + } + long givenTimestamp = entityRow.getEntityTimestamp().getSeconds(); + if (givenTimestamp == 0) { + givenTimestamp = System.currentTimeMillis() / 1000; + } + long timeDifference = givenTimestamp - featureRow.getEventTimestamp().getSeconds(); + return timeDifference > featureSetRequest.getMaxAge().getSeconds(); + } +} diff --git a/serving/src/main/java/feast/serving/service/RedisServingService.java b/serving/src/main/java/feast/serving/service/RedisServingService.java index 48fc485214..0ba10f74c2 100644 --- a/serving/src/main/java/feast/serving/service/RedisServingService.java +++ b/serving/src/main/java/feast/serving/service/RedisServingService.java @@ -25,7 +25,6 @@ import com.google.common.collect.Maps; import com.google.protobuf.AbstractMessageLite; -import com.google.protobuf.Duration; import com.google.protobuf.InvalidProtocolBufferException; import feast.core.FeatureSetProto.EntitySpec; import feast.core.FeatureSetProto.FeatureSetSpec; @@ -58,16 +57,15 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; -public class RedisServingService implements ServingService { +public class RedisServingService extends OnlineServingService { private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedisServingService.class); private final JedisPool jedisPool; - private final CachedSpecService specService; private final Tracer tracer; public RedisServingService(JedisPool jedisPool, CachedSpecService specService, Tracer tracer) { + super(specService, tracer); this.jedisPool = jedisPool; - this.specService = specService; this.tracer = tracer; } @@ -142,7 +140,8 @@ public GetJobResponse getJob(GetJobRequest getJobRequest) { * @param featureSetSpec featureSetSpec of the features to retrieve * @return list of RedisKeys */ - private List getRedisKeys( + @Override + List createLookupKeys( List featureSetEntityNames, List entityRows, FeatureSetSpec featureSetSpec) { @@ -156,6 +155,33 @@ private List getRedisKeys( } } + @Override + protected boolean isEmpty(byte[] response) { + return response == null; + } + + /** + * Send a list of get request as an mget + * + * @param keys list of {@link RedisKey} + * @return list of {@link FeatureRow} in primitive byte representation for each {@link RedisKey} + */ + @Override + protected List getAll(List keys) { + Jedis jedis = jedisPool.getResource(); + byte[][] binaryKeys = + keys.stream() + .map(AbstractMessageLite::toByteArray) + .collect(Collectors.toList()) + .toArray(new byte[0][0]); + return jedis.mget(binaryKeys); + } + + @Override + FeatureRow parseResponse(byte[] response) throws InvalidProtocolBufferException { + return FeatureRow.parseFrom(response); + } + /** * Create {@link RedisKey} * diff --git a/serving/src/main/java/feast/serving/util/ValueUtil.java b/serving/src/main/java/feast/serving/util/ValueUtil.java new file mode 100644 index 0000000000..e3ede6af98 --- /dev/null +++ b/serving/src/main/java/feast/serving/util/ValueUtil.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.util; + +import feast.types.ValueProto.Value; + +public class ValueUtil { + + public static String toString(Value value) { + String strValue; + switch (value.getValCase()) { + case BYTES_VAL: + strValue = value.getBytesVal().toString(); + break; + case STRING_VAL: + strValue = value.getStringVal(); + break; + case INT32_VAL: + strValue = String.valueOf(value.getInt32Val()); + break; + case INT64_VAL: + strValue = String.valueOf(value.getInt64Val()); + break; + case DOUBLE_VAL: + strValue = String.valueOf(value.getDoubleVal()); + break; + case FLOAT_VAL: + strValue = String.valueOf(value.getFloatVal()); + break; + case BOOL_VAL: + strValue = String.valueOf(value.getBoolVal()); + break; + default: + throw new IllegalArgumentException( + String.format("toString method not supported for type %s", value.getValCase())); + } + return strValue; + } +} diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index 96713c8028..8e976f33a4 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -1,7 +1,7 @@ feast: # This value is retrieved from project.version properties in pom.xml # https://docs.spring.io/spring-boot/docs/current/reference/html/ - version: @project.version@ +# version: @project.version@ # GRPC service address for Feast Core # Feast Serving requires connection to Feast Core to retrieve and reload Feast metadata (e.g. FeatureSpecs, Store information) core-host: ${FEAST_CORE_HOST:localhost} @@ -24,6 +24,24 @@ feast: redis-pool-max-size: ${FEAST_REDIS_POOL_MAX_SIZE:128} # If serving redis, the redis pool max idle conns redis-pool-max-idle: ${FEAST_REDIS_POOL_MAX_IDLE:16} + # If serving cassandra, minimum connection for local host (one in same data center) + cassandra-pool-core-local-connections: ${FEAST_CASSANDRA_CORE_LOCAL_CONNECTIONS:1} + # If serving cassandra, maximum connection for local host (one in same data center) + cassandra-pool-max-local-connections: ${FEAST_CASSANDRA_MAX_LOCAL_CONNECTIONS:1} + # If serving cassandra, minimum connection for remote host (one in remote data center) + cassandra-pool-core-remote-connections: ${FEAST_CASSANDRA_CORE_REMOTE_CONNECTIONS:1} + # If serving cassandra, maximum connection for remote host (one in same data center) + cassandra-pool-max-remote-connections: ${FEAST_CASSANDRA_MAX_REMOTE_CONNECTIONS:1} + # If serving cassandra, maximum number of concurrent requests per local connection (one in same data center) + cassandra-pool-max-requests-local-connection: ${FEAST_CASSANDRA_MAX_REQUESTS_LOCAL_CONNECTION:32768} + # If serving cassandra, maximum number of concurrent requests per remote connection (one in remote data center) + cassandra-pool-max-requests-remote-connection: ${FEAST_CASSANDRA_MAX_REQUESTS_REMOTE_CONNECTION:2048} + # If serving cassandra, number of requests which trigger opening of new local connection (if it is available) + cassandra-pool-new-local-connection-threshold: ${FEAST_CASSANDRA_NEW_LOCAL_CONNECTION_THRESHOLD:30000} + # If serving cassandra, number of requests which trigger opening of new remote connection (if it is available) + cassandra-pool-new-remote-connection-threshold: ${FEAST_CASSANDRA_NEW_REMOTE_CONNECTION_THRESHOLD:400} + # If serving cassandra, number of milliseconds to wait to acquire connection (after that go to next available host in query plan) + cassandra-pool-timeout-millis: ${FEAST_CASSANDRA_POOL_TIMEOUT_MILLIS:0} jobs: # staging-location specifies the URI to store intermediate files for batch serving. diff --git a/serving/src/test/java/feast/serving/service/CassandraServingServiceITTest.java b/serving/src/test/java/feast/serving/service/CassandraServingServiceITTest.java new file mode 100644 index 0000000000..a1778251a3 --- /dev/null +++ b/serving/src/test/java/feast/serving/service/CassandraServingServiceITTest.java @@ -0,0 +1,244 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.service; + +import static feast.serving.test.TestUtil.intValue; +import static feast.serving.test.TestUtil.responseToMapList; +import static feast.serving.test.TestUtil.strValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.utils.Bytes; +import com.google.common.collect.Lists; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Timestamp; +import feast.core.FeatureSetProto.EntitySpec; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.serving.ServingAPIProto.FeatureSetRequest; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues; +import feast.serving.test.TestUtil.LocalCassandra; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.ValueProto.Value; +import io.opentracing.Tracer; +import io.opentracing.Tracer.SpanBuilder; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.thrift.transport.TTransportException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; + +public class CassandraServingServiceITTest { + + @Mock CachedSpecService specService; + + @Mock Tracer tracer; + + private CassandraServingService cassandraServingService; + private Session session; + + @BeforeClass + public static void startServer() throws InterruptedException, IOException, TTransportException { + LocalCassandra.start(); + LocalCassandra.createKeyspaceAndTable(); + } + + @Before + public void setup() { + initMocks(this); + FeatureSetSpec featureSetSpec = + FeatureSetSpec.newBuilder() + .addEntities(EntitySpec.newBuilder().setName("entity1")) + .addEntities(EntitySpec.newBuilder().setName("entity2")) + .build(); + + when(specService.getFeatureSet("featureSet", 1)).thenReturn(featureSetSpec); + when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); + + session = + new Cluster.Builder() + .addContactPoints(LocalCassandra.getHost()) + .withPort(LocalCassandra.getPort()) + .build() + .connect(); + + populateTable(session); + + cassandraServingService = + new CassandraServingService(session, "test", "feature_store", specService, tracer); + } + + private void populateTable(Session session) { + session.execute( + insertQuery( + "test", "feature_store", "featureSet:1:entity1=1|entity2=a", "feature1", intValue(1))); + session.execute( + insertQuery( + "test", "feature_store", "featureSet:1:entity1=1|entity2=a", "feature2", intValue(1))); + session.execute( + insertQuery( + "test", "feature_store", "featureSet:1:entity1=2|entity2=b", "feature1", intValue(1))); + session.execute( + insertQuery( + "test", "feature_store", "featureSet:1:entity1=2|entity2=b", "feature2", intValue(1))); + } + + @AfterClass + public static void cleanUp() { + LocalCassandra.stop(); + } + + @Test + public void shouldReturnResponseWithValuesIfKeysPresent() { + GetOnlineFeaturesRequest request = + GetOnlineFeaturesRequest.newBuilder() + .addFeatureSets( + FeatureSetRequest.newBuilder() + .setName("featureSet") + .setVersion(1) + .addAllFeatureNames(Lists.newArrayList("feature1", "feature2")) + .build()) + .addEntityRows( + EntityRow.newBuilder() + .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100)) + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a"))) + .addEntityRows( + EntityRow.newBuilder() + .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100)) + .putFields("entity1", intValue(2)) + .putFields("entity2", strValue("b"))) + .build(); + + GetOnlineFeaturesResponse expected = + GetOnlineFeaturesResponse.newBuilder() + .addFieldValues( + FieldValues.newBuilder() + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a")) + .putFields("featureSet:1:feature1", intValue(1)) + .putFields("featureSet:1:feature2", intValue(1))) + .addFieldValues( + FieldValues.newBuilder() + .putFields("entity1", intValue(2)) + .putFields("entity2", strValue("b")) + .putFields("featureSet:1:feature1", intValue(1)) + .putFields("featureSet:1:feature2", intValue(1))) + .build(); + GetOnlineFeaturesResponse actual = cassandraServingService.getOnlineFeatures(request); + + assertThat( + responseToMapList(actual), containsInAnyOrder(responseToMapList(expected).toArray())); + } + + @Test + public void shouldReturnResponseWithUnsetValuesIfKeysNotPresent() { + GetOnlineFeaturesRequest request = + GetOnlineFeaturesRequest.newBuilder() + .addFeatureSets( + FeatureSetRequest.newBuilder() + .setName("featureSet") + .setVersion(1) + .addAllFeatureNames(Lists.newArrayList("feature1", "feature2")) + .build()) + .addEntityRows( + EntityRow.newBuilder() + .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100)) + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a"))) + // Non-existing entity keys + .addEntityRows( + EntityRow.newBuilder() + .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100)) + .putFields("entity1", intValue(55)) + .putFields("entity2", strValue("ff"))) + .build(); + + GetOnlineFeaturesResponse expected = + GetOnlineFeaturesResponse.newBuilder() + .addFieldValues( + FieldValues.newBuilder() + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a")) + .putFields("featureSet:1:feature1", intValue(1)) + .putFields("featureSet:1:feature2", intValue(1))) + // Missing keys will return empty values + .addFieldValues( + FieldValues.newBuilder() + .putFields("entity1", intValue(55)) + .putFields("entity2", strValue("ff")) + .putFields("featureSet:1:feature1", Value.newBuilder().build()) + .putFields("featureSet:1:feature2", Value.newBuilder().build())) + .build(); + GetOnlineFeaturesResponse actual = cassandraServingService.getOnlineFeatures(request); + + assertThat( + responseToMapList(actual), containsInAnyOrder(responseToMapList(expected).toArray())); + } + + // This test should fail if cassandra no longer stores write time as microseconds or if we change + // the way we parse microseconds to com.google.protobuf.Timestamp + @Test + public void shouldInsertAndParseWriteTimestampInMicroSeconds() + throws InvalidProtocolBufferException { + session.execute( + "INSERT INTO test.feature_store (entities, feature, value)\n" + + " VALUES ('ENT1', 'FEAT1'," + + Bytes.toHexString(Value.newBuilder().build().toByteArray()) + + ")\n" + + " USING TIMESTAMP 1574318287123456;"); + + ResultSet resultSet = + session.execute( + QueryBuilder.select() + .column("entities") + .column("feature") + .column("value") + .writeTime("value") + .as("writetime") + .from("test", "feature_store") + .where(QueryBuilder.eq("entities", "ENT1"))); + FeatureRow featureRow = cassandraServingService.parseResponse(resultSet); + + Assert.assertEquals( + Timestamp.newBuilder().setSeconds(1574318287).setNanos(123456000).build(), + featureRow.getEventTimestamp()); + } + + private Insert insertQuery( + String database, String table, String key, String featureName, Value value) { + return QueryBuilder.insertInto(database, table) + .value("entities", key) + .value("feature", featureName) + .value("value", ByteBuffer.wrap(value.toByteArray())); + } +} diff --git a/serving/src/test/java/feast/serving/service/CassandraServingServiceTest.java b/serving/src/test/java/feast/serving/service/CassandraServingServiceTest.java new file mode 100644 index 0000000000..f965b14a64 --- /dev/null +++ b/serving/src/test/java/feast/serving/service/CassandraServingServiceTest.java @@ -0,0 +1,117 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.service; + +import static feast.serving.test.TestUtil.intValue; +import static feast.serving.test.TestUtil.strValue; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.datastax.driver.core.Session; +import feast.serving.ServingAPIProto.FeatureSetRequest; +import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; +import io.opentracing.Tracer; +import io.opentracing.Tracer.SpanBuilder; +import java.util.ArrayList; +import java.util.List; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; + +public class CassandraServingServiceTest { + + @Mock Session session; + + @Mock CachedSpecService specService; + + @Mock Tracer tracer; + + private CassandraServingService cassandraServingService; + + @Before + public void setUp() { + initMocks(this); + + when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); + + cassandraServingService = + new CassandraServingService(session, "test", "feature_store", specService, tracer); + } + + @Test + public void shouldConstructCassandraKeyCorrectly() { + List cassandraKeys = + cassandraServingService.createLookupKeys( + new ArrayList() { + { + add("entity1"); + add("entity2"); + } + }, + new ArrayList() { + { + add( + EntityRow.newBuilder() + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a")) + .build()); + add( + EntityRow.newBuilder() + .putFields("entity1", intValue(2)) + .putFields("entity2", strValue("b")) + .build()); + } + }, + FeatureSetRequest.newBuilder().setName("featureSet").setVersion(1).build()); + + List expectedKeys = + new ArrayList() { + { + add("featureSet:1:entity1=1|entity2=a"); + add("featureSet:1:entity1=2|entity2=b"); + } + }; + + Assert.assertEquals(expectedKeys, cassandraKeys); + } + + @Test(expected = Exception.class) + public void shouldThrowExceptionWhenCannotConstructCassandraKey() { + List cassandraKeys = + cassandraServingService.createLookupKeys( + new ArrayList() { + { + add("entity1"); + add("entity2"); + } + }, + new ArrayList() { + { + add(EntityRow.newBuilder().putFields("entity1", intValue(1)).build()); + add( + EntityRow.newBuilder() + .putFields("entity1", intValue(2)) + .putFields("entity2", strValue("b")) + .build()); + } + }, + FeatureSetRequest.newBuilder().setName("featureSet").setVersion(1).build()); + } +} diff --git a/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java b/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java index 042107e117..eee4dc6dc3 100644 --- a/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java +++ b/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java @@ -16,6 +16,9 @@ */ package feast.serving.service; +import static feast.serving.test.TestUtil.intValue; +import static feast.serving.test.TestUtil.responseToMapList; +import static feast.serving.test.TestUtil.strValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.mockito.Mockito.when; @@ -42,7 +45,6 @@ import io.opentracing.Tracer.SpanBuilder; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import org.junit.Before; import org.junit.Test; @@ -595,20 +597,6 @@ public void shouldFilterOutUndesiredRows() { responseToMapList(actual), containsInAnyOrder(responseToMapList(expected).toArray())); } - private List> responseToMapList(GetOnlineFeaturesResponse response) { - return response.getFieldValuesList().stream() - .map(FieldValues::getFieldsMap) - .collect(Collectors.toList()); - } - - private Value intValue(int val) { - return Value.newBuilder().setInt64Val(val).build(); - } - - private Value strValue(String val) { - return Value.newBuilder().setStringVal(val).build(); - } - private FeatureSetSpec getFeatureSetSpec() { return FeatureSetSpec.newBuilder() .setProject("project") diff --git a/serving/src/test/java/feast/serving/test/TestUtil.java b/serving/src/test/java/feast/serving/test/TestUtil.java new file mode 100644 index 0000000000..9c53359071 --- /dev/null +++ b/serving/src/test/java/feast/serving/test/TestUtil.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.test; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; +import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues; +import feast.types.ValueProto.Value; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.thrift.transport.TTransportException; +import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import org.cassandraunit.utils.EmbeddedCassandraServerHelper; + +@SuppressWarnings("WeakerAccess") +public class TestUtil { + + public static class LocalCassandra { + + public static void start() throws InterruptedException, IOException, TTransportException { + EmbeddedCassandraServerHelper.startEmbeddedCassandra(); + } + + public static void createKeyspaceAndTable() { + new ClassPathCQLDataSet("embedded-store/LoadCassandra.cql", true, true) + .getCQLStatements() + .forEach(s -> LocalCassandra.getSession().execute(s)); + } + + public static String getHost() { + return EmbeddedCassandraServerHelper.getHost(); + } + + public static int getPort() { + return EmbeddedCassandraServerHelper.getNativeTransportPort(); + } + + public static Cluster getCluster() { + return EmbeddedCassandraServerHelper.getCluster(); + } + + public static Session getSession() { + return EmbeddedCassandraServerHelper.getSession(); + } + + public static void stop() { + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra(); + } + } + + public static List> responseToMapList(GetOnlineFeaturesResponse response) { + return response.getFieldValuesList().stream() + .map(FieldValues::getFieldsMap) + .collect(Collectors.toList()); + } + + public static Value intValue(int val) { + return Value.newBuilder().setInt64Val(val).build(); + } + + public static Value strValue(String val) { + return Value.newBuilder().setStringVal(val).build(); + } +} diff --git a/serving/src/test/resources/embedded-store/LoadCassandra.cql b/serving/src/test/resources/embedded-store/LoadCassandra.cql new file mode 100644 index 0000000000..c80da294b7 --- /dev/null +++ b/serving/src/test/resources/embedded-store/LoadCassandra.cql @@ -0,0 +1,8 @@ +CREATE KEYSPACE test with replication = {'class':'SimpleStrategy','replication_factor':1}; + +CREATE TABLE test.feature_store( + entities text, + feature text, + value blob, + PRIMARY KEY (entities, feature) +) WITH CLUSTERING ORDER BY (feature DESC); \ No newline at end of file