From b8d1025e9833894ca5f3ef722a3a6ddb7e7be600 Mon Sep 17 00:00:00 2001 From: smadarasmi Date: Fri, 3 Jan 2020 17:33:45 +0700 Subject: [PATCH] docs update, spotless check, and bug fix on cassandra schema --- .../ingestion/transform/WriteToStore.java | 29 ++- .../java/feast/ingestion/utils/StoreUtil.java | 2 +- .../FeatureRowToCassandraMutationDoFn.java | 38 ++- .../transform/CassandraWriteToStoreIT.java | 34 ++- .../ingestion/util/CassandraStoreUtilIT.java | 176 ++++++++----- ...FeatureRowToCassandraMutationDoFnTest.java | 242 ++++++++++++------ .../src/test/java/feast/test/TestUtil.java | 9 +- .../embedded-store/LoadCassandra.cql | 8 - protos/feast/core/Store.proto | 9 + 9 files changed, 350 insertions(+), 197 deletions(-) delete mode 100644 ingestion/src/test/resources/embedded-store/LoadCassandra.cql diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java index eb74fffb1c..c89b95bfb2 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java @@ -68,8 +68,8 @@ public abstract class WriteToStore extends PTransform, P public static final String METRIC_NAMESPACE = "WriteToStore"; public static final String ELEMENTS_WRITTEN_METRIC = "elements_written"; - private static final Counter elementsWritten = Metrics - .counter(METRIC_NAMESPACE, ELEMENTS_WRITTEN_METRIC); + private static final Counter elementsWritten = + Metrics.counter(METRIC_NAMESPACE, ELEMENTS_WRITTEN_METRIC); public abstract Store getStore(); @@ -155,32 +155,35 @@ public void processElement(ProcessContext context) { break; case CASSANDRA: CassandraConfig cassandraConfig = getStore().getCassandraConfig(); - SerializableFunction mapperFactory = new CassandraMutationMapperFactory(CassandraMutation.class); + SerializableFunction mapperFactory = + new CassandraMutationMapperFactory(CassandraMutation.class); input .apply( "Create CassandraMutation from FeatureRow", - ParDo.of(new FeatureRowToCassandraMutationDoFn( - getFeatureSetSpecs(), cassandraConfig.getDefaultTtl())) - ) + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + getFeatureSetSpecs(), cassandraConfig.getDefaultTtl()))) .apply( CassandraIO.write() .withHosts(Arrays.asList(cassandraConfig.getBootstrapHosts().split(","))) .withPort(cassandraConfig.getPort()) .withKeyspace(cassandraConfig.getKeyspace()) .withEntity(CassandraMutation.class) - .withMapperFactoryFn(mapperFactory) - ); + .withMapperFactoryFn(mapperFactory)); break; default: log.error("Store type '{}' is not supported. No Feature Row will be written.", storeType); break; } - input.apply("IncrementWriteToStoreElementsWrittenCounter", - MapElements.into(TypeDescriptors.booleans()).via((FeatureRow row) -> { - elementsWritten.inc(); - return true; - })); + input.apply( + "IncrementWriteToStoreElementsWrittenCounter", + MapElements.into(TypeDescriptors.booleans()) + .via( + (FeatureRow row) -> { + elementsWritten.inc(); + return true; + })); return PDone.in(input.getPipeline()); } diff --git a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java index b92696c66d..3478227165 100644 --- a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java +++ b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java @@ -303,7 +303,7 @@ public static void setupCassandra(CassandraConfig cassandraConfig) { .ifNotExists() .addPartitionKey(CassandraMutation.ENTITIES, DataType.text()) .addClusteringColumn(CassandraMutation.FEATURE, DataType.text()) - .addStaticColumn(CassandraMutation.VALUE, DataType.blob()); + .addColumn(CassandraMutation.VALUE, DataType.blob()); log.info("Create Cassandra table if not exists.."); session.execute(createTable); diff --git a/ingestion/src/main/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFn.java b/ingestion/src/main/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFn.java index 0292735c58..177a8703e6 100644 --- a/ingestion/src/main/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFn.java +++ b/ingestion/src/main/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFn.java @@ -1,3 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package feast.store.serving.cassandra; import com.google.protobuf.Duration; @@ -18,8 +34,8 @@ public class FeatureRowToCassandraMutationDoFn extends DoFn { - private static final Logger log = org.slf4j.LoggerFactory - .getLogger(FeatureRowToCassandraMutationDoFn.class); + private static final Logger log = + org.slf4j.LoggerFactory.getLogger(FeatureRowToCassandraMutationDoFn.class); private Map featureSetSpecs; private Map maxAges; @@ -36,16 +52,16 @@ public FeatureRowToCassandraMutationDoFn(Map specs, Dura } } - /** - * Output a Cassandra mutation object for every feature in the feature row. - */ + /** Output a Cassandra mutation object for every feature in the feature row. */ @ProcessElement public void processElement(ProcessContext context) { FeatureRow featureRow = context.element(); try { FeatureSetSpec featureSetSpec = featureSetSpecs.get(featureRow.getFeatureSet()); - Set featureNames = featureSetSpec.getFeaturesList().stream() - .map(FeatureSpec::getName).collect(Collectors.toSet()); + Set featureNames = + featureSetSpec.getFeaturesList().stream() + .map(FeatureSpec::getName) + .collect(Collectors.toSet()); String key = CassandraMutation.keyFromFeatureRow(featureSetSpec, featureRow); Collection mutations = new ArrayList<>(); @@ -57,9 +73,7 @@ public void processElement(ProcessContext context) { field.getName(), ByteBuffer.wrap(field.getValue().toByteArray()), Timestamps.toMicros(featureRow.getEventTimestamp()), - maxAges.get(featureRow.getFeatureSet()) - ) - ); + maxAges.get(featureRow.getFeatureSet()))); } } @@ -67,7 +81,5 @@ public void processElement(ProcessContext context) { } catch (Exception e) { log.error(e.getMessage(), e); } - } - -} \ No newline at end of file +} diff --git a/ingestion/src/test/java/feast/ingestion/transform/CassandraWriteToStoreIT.java b/ingestion/src/test/java/feast/ingestion/transform/CassandraWriteToStoreIT.java index 4d66be089c..da914ca175 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/CassandraWriteToStoreIT.java +++ b/ingestion/src/test/java/feast/ingestion/transform/CassandraWriteToStoreIT.java @@ -66,13 +66,7 @@ public Store getStore() { return Store.newBuilder() .setType(StoreType.CASSANDRA) .setName("SERVING") - .setCassandraConfig( - CassandraConfig.newBuilder() - .setBootstrapHosts(LocalCassandra.getHost()) - .setPort(LocalCassandra.getPort()) - .setTableName("feature_store") - .setKeyspace("test") - .build()) + .setCassandraConfig(getCassandraConfig()) .build(); } @@ -86,10 +80,26 @@ public Map getFeatureSetSpecs() { } } + private static CassandraConfig getCassandraConfig() { + return CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setTableName("feature_store") + .setKeyspace("test") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "SimpleStrategy"); + put("replication_factor", "1"); + } + }) + .build(); + } + @BeforeClass public static void startServer() throws InterruptedException, IOException, TTransportException { LocalCassandra.start(); - LocalCassandra.createKeyspaceAndTable(); + LocalCassandra.createKeyspaceAndTable(getCassandraConfig()); } @Before @@ -120,7 +130,7 @@ public void setUp() { put("entity1", TestUtil.intValue(1)); put("entity2", TestUtil.strValue("a")); put("feature1", TestUtil.intValue(1)); - put("feature2", TestUtil.intValue(1)); + put("feature2", TestUtil.intValue(2)); } }); } @@ -146,7 +156,7 @@ public void testWriteCassandra_happyPath() throws InvalidProtocolBufferException List expectedFields = Arrays.asList( Field.newBuilder().setName("feature1").setValue(TestUtil.intValue(1)).build(), - Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(1)).build()); + Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(2)).build()); assertTrue(actualResults.containsAll(expectedFields)); assertEquals(expectedFields.size(), actualResults.size()); @@ -201,7 +211,7 @@ public void testWriteCassandra_shouldNotOverrideNewerValues() put("entity1", TestUtil.intValue(1)); put("entity2", TestUtil.strValue("a")); put("feature1", TestUtil.intValue(3)); - put("feature2", TestUtil.intValue(3)); + put("feature2", TestUtil.intValue(4)); } }); @@ -217,7 +227,7 @@ public void testWriteCassandra_shouldNotOverrideNewerValues() List expectedFields = Arrays.asList( Field.newBuilder().setName("feature1").setValue(TestUtil.intValue(1)).build(), - Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(1)).build()); + Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(2)).build()); assertTrue(actualResults.containsAll(expectedFields)); assertEquals(expectedFields.size(), actualResults.size()); diff --git a/ingestion/src/test/java/feast/ingestion/util/CassandraStoreUtilIT.java b/ingestion/src/test/java/feast/ingestion/util/CassandraStoreUtilIT.java index 1f1d11d72c..8c6874ecac 100644 --- a/ingestion/src/test/java/feast/ingestion/util/CassandraStoreUtilIT.java +++ b/ingestion/src/test/java/feast/ingestion/util/CassandraStoreUtilIT.java @@ -1,3 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package feast.ingestion.util; import com.datastax.driver.core.DataType; @@ -18,7 +34,7 @@ import org.junit.Test; public class CassandraStoreUtilIT { - + @BeforeClass public static void startServer() throws InterruptedException, IOException, TTransportException { LocalCassandra.start(); @@ -31,30 +47,35 @@ public void teardown() { @Test public void setupCassandra_shouldCreateKeyspaceAndTable() { - CassandraConfig config = CassandraConfig.newBuilder() - .setBootstrapHosts(LocalCassandra.getHost()) - .setPort(LocalCassandra.getPort()) - .setKeyspace("test") - .setTableName("feature_store") - .putAllReplicationOptions(new HashMap() {{ - put("class", "NetworkTopologyStrategy"); - put("dc1", "2"); - put("dc2", "3"); - }}) - .build(); + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("feature_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "NetworkTopologyStrategy"); + put("dc1", "2"); + put("dc2", "3"); + } + }) + .build(); StoreUtil.setupCassandra(config); - Map actualReplication = LocalCassandra.getCluster() - .getMetadata() - .getKeyspace("test") - .getReplication(); - Map expectedReplication = new HashMap() {{ - put("class", "org.apache.cassandra.locator.NetworkTopologyStrategy"); - put("dc1", "2"); - put("dc2", "3"); - }}; - TableMetadata tableMetadata = LocalCassandra.getCluster() - .getMetadata().getKeyspace("test").getTable("feature_store"); + Map actualReplication = + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getReplication(); + Map expectedReplication = + new HashMap() { + { + put("class", "org.apache.cassandra.locator.NetworkTopologyStrategy"); + put("dc1", "2"); + put("dc2", "3"); + } + }; + TableMetadata tableMetadata = + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getTable("feature_store"); Assert.assertEquals(expectedReplication, actualReplication); Assert.assertNotNull(tableMetadata); @@ -62,70 +83,85 @@ public void setupCassandra_shouldCreateKeyspaceAndTable() { @Test public void setupCassandra_shouldBeIdempotent_whenTableAlreadyExistsAndSchemaMatches() { - LocalCassandra.createKeyspaceAndTable(); + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("feature_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "SimpleStrategy"); + put("replication_factor", "2"); + } + }) + .build(); + + LocalCassandra.createKeyspaceAndTable(config); // Check table is created - Assert.assertNotNull(LocalCassandra.getCluster() - .getMetadata().getKeyspace("test").getTable("feature_store")); - - CassandraConfig config = CassandraConfig.newBuilder() - .setBootstrapHosts(LocalCassandra.getHost()) - .setPort(LocalCassandra.getPort()) - .setKeyspace("test") - .setTableName("feature_store") - .putAllReplicationOptions(new HashMap() {{ - put("class", "SimpleStrategy"); - put("replication_factor", "2"); - }}) - .build(); + Assert.assertNotNull( + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getTable("feature_store")); StoreUtil.setupCassandra(config); - Assert.assertNotNull(LocalCassandra.getCluster() - .getMetadata().getKeyspace("test").getTable("feature_store")); + Assert.assertNotNull( + LocalCassandra.getCluster().getMetadata().getKeyspace("test").getTable("feature_store")); } @Test(expected = RuntimeException.class) public void setupCassandra_shouldThrowException_whenTableNameDoesNotMatchObjectMapper() { - CassandraConfig config = CassandraConfig.newBuilder() - .setBootstrapHosts(LocalCassandra.getHost()) - .setPort(LocalCassandra.getPort()) - .setKeyspace("test") - .setTableName("test_data_store") - .putAllReplicationOptions(new HashMap() {{ - put("class", "NetworkTopologyStrategy"); - put("dc1", "2"); - put("dc2", "3"); - }}) - .build(); + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("test_data_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "NetworkTopologyStrategy"); + put("dc1", "2"); + put("dc2", "3"); + } + }) + .build(); StoreUtil.setupCassandra(config); } @Test(expected = RuntimeException.class) public void setupCassandra_shouldThrowException_whenTableSchemaDoesNotMatchObjectMapper() { - LocalCassandra.getSession().execute("CREATE KEYSPACE test " - + "WITH REPLICATION = {" - + "'class': 'SimpleStrategy', 'replication_factor': 2 }"); - - Create createTable = SchemaBuilder.createTable("test", "feature_store") - .ifNotExists() - .addPartitionKey(CassandraMutation.ENTITIES, DataType.text()) - .addClusteringColumn("featureName", DataType.text()) // Column name does not match in CassandraMutation - .addStaticColumn(CassandraMutation.VALUE, DataType.blob()); + LocalCassandra.getSession() + .execute( + "CREATE KEYSPACE test " + + "WITH REPLICATION = {" + + "'class': 'SimpleStrategy', 'replication_factor': 2 }"); + + Create createTable = + SchemaBuilder.createTable("test", "feature_store") + .ifNotExists() + .addPartitionKey(CassandraMutation.ENTITIES, DataType.text()) + .addClusteringColumn( + "featureName", DataType.text()) // Column name does not match in CassandraMutation + .addColumn(CassandraMutation.VALUE, DataType.blob()); LocalCassandra.getSession().execute(createTable); - CassandraConfig config = CassandraConfig.newBuilder() - .setBootstrapHosts(LocalCassandra.getHost()) - .setPort(LocalCassandra.getPort()) - .setKeyspace("test") - .setTableName("feature_store") - .putAllReplicationOptions(new HashMap() {{ - put("class", "SimpleStrategy"); - put("replication_factor", "2"); - }}) - .build(); + CassandraConfig config = + CassandraConfig.newBuilder() + .setBootstrapHosts(LocalCassandra.getHost()) + .setPort(LocalCassandra.getPort()) + .setKeyspace("test") + .setTableName("feature_store") + .putAllReplicationOptions( + new HashMap() { + { + put("class", "SimpleStrategy"); + put("replication_factor", "2"); + } + }) + .build(); StoreUtil.setupCassandra(config); } - } diff --git a/ingestion/src/test/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFnTest.java b/ingestion/src/test/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFnTest.java index a4680408a5..412c6488f2 100644 --- a/ingestion/src/test/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFnTest.java +++ b/ingestion/src/test/java/feast/store/serving/cassandra/FeatureRowToCassandraMutationDoFnTest.java @@ -1,3 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package feast.store.serving.cassandra; import com.google.protobuf.Duration; @@ -19,35 +35,60 @@ public class FeatureRowToCassandraMutationDoFnTest implements Serializable { - @Rule - public transient TestPipeline testPipeline = TestPipeline.create(); + @Rule public transient TestPipeline testPipeline = TestPipeline.create(); @Test public void processElement_shouldCreateCassandraMutation_givenFeatureRow() { - FeatureSetSpec featureSetSpec = TestUtil.createFeatureSetSpec("fs", 1, 10, - new HashMap() {{ put("entity1", Enum.INT64); }}, - new HashMap() {{ put("feature1", Enum.STRING); }}); - FeatureRow featureRow = TestUtil.createFeatureRow(featureSetSpec, 10, - new HashMap() {{ - put("entity1", TestUtil.intValue(1)); put("feature1", TestUtil.strValue("a")); }}); + FeatureSetSpec featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 10, + new HashMap() { + { + put("entity1", Enum.INT64); + } + }, + new HashMap() { + { + put("feature1", Enum.STRING); + } + }); + FeatureRow featureRow = + TestUtil.createFeatureRow( + featureSetSpec, + 10, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("feature1", TestUtil.strValue("a")); + } + }); PCollection input = testPipeline.apply(Create.of(featureRow)); PCollection output = - input.apply(ParDo.of(new FeatureRowToCassandraMutationDoFn( - new HashMap() {{ - put(featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), featureSetSpec); - }}, Duration.newBuilder().setSeconds(0).build()))); - - CassandraMutation[] expected = new CassandraMutation[] { - new CassandraMutation( - "fs:1:entity1=1", - "feature1", - ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), - 10000000, - 10 - ) - }; + input.apply( + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + new HashMap() { + { + put( + featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), + featureSetSpec); + } + }, + Duration.newBuilder().setSeconds(0).build()))); + + CassandraMutation[] expected = + new CassandraMutation[] { + new CassandraMutation( + "fs:1:entity1=1", + "feature1", + ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), + 10000000, + 10) + }; PAssert.that(output).containsInAnyOrder(expected); @@ -55,42 +96,68 @@ public void processElement_shouldCreateCassandraMutation_givenFeatureRow() { } @Test - public void processElement_shouldCreateCassandraMutations_givenFeatureRowWithMultipleEntitiesAndFeatures() { - FeatureSetSpec featureSetSpec = TestUtil.createFeatureSetSpec("fs", 1, 10, - new HashMap() {{ put("entity1", Enum.INT64); put("entity2", Enum.STRING); }}, - new HashMap() {{ put("feature1", Enum.STRING); put("feature2", Enum.INT64); }}); - FeatureRow featureRow = TestUtil.createFeatureRow(featureSetSpec, 10, - new HashMap() {{ - put("entity1", TestUtil.intValue(1)); - put("entity2", TestUtil.strValue("b")); - put("feature1", TestUtil.strValue("a")); - put("feature2", TestUtil.intValue(2)); - }}); + public void + processElement_shouldCreateCassandraMutations_givenFeatureRowWithMultipleEntitiesAndFeatures() { + FeatureSetSpec featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 10, + new HashMap() { + { + put("entity1", Enum.INT64); + put("entity2", Enum.STRING); + } + }, + new HashMap() { + { + put("feature1", Enum.STRING); + put("feature2", Enum.INT64); + } + }); + FeatureRow featureRow = + TestUtil.createFeatureRow( + featureSetSpec, + 10, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("entity2", TestUtil.strValue("b")); + put("feature1", TestUtil.strValue("a")); + put("feature2", TestUtil.intValue(2)); + } + }); PCollection input = testPipeline.apply(Create.of(featureRow)); PCollection output = - input.apply(ParDo.of(new FeatureRowToCassandraMutationDoFn( - new HashMap() {{ - put(featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), featureSetSpec); - }}, Duration.newBuilder().setSeconds(0).build()))); - - CassandraMutation[] expected = new CassandraMutation[] { - new CassandraMutation( - "fs:1:entity1=1|entity2=b", - "feature1", - ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), - 10000000, - 10 - ), - new CassandraMutation( - "fs:1:entity1=1|entity2=b", - "feature2", - ByteBuffer.wrap(TestUtil.intValue(2).toByteArray()), - 10000000, - 10 - ) - }; + input.apply( + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + new HashMap() { + { + put( + featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), + featureSetSpec); + } + }, + Duration.newBuilder().setSeconds(0).build()))); + + CassandraMutation[] expected = + new CassandraMutation[] { + new CassandraMutation( + "fs:1:entity1=1|entity2=b", + "feature1", + ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), + 10000000, + 10), + new CassandraMutation( + "fs:1:entity1=1|entity2=b", + "feature2", + ByteBuffer.wrap(TestUtil.intValue(2).toByteArray()), + 10000000, + 10) + }; PAssert.that(output).containsInAnyOrder(expected); @@ -100,34 +167,59 @@ public void processElement_shouldCreateCassandraMutations_givenFeatureRowWithMul @Test public void processElement_shouldUseDefaultMaxAge_whenMissingMaxAge() { Duration defaultTtl = Duration.newBuilder().setSeconds(500).build(); - FeatureSetSpec featureSetSpec = TestUtil.createFeatureSetSpec("fs", 1, 0, - new HashMap() {{ put("entity1", Enum.INT64); }}, - new HashMap() {{ put("feature1", Enum.STRING); }}); - FeatureRow featureRow = TestUtil.createFeatureRow(featureSetSpec, 10, - new HashMap() {{ - put("entity1", TestUtil.intValue(1)); put("feature1", TestUtil.strValue("a")); }}); + FeatureSetSpec featureSetSpec = + TestUtil.createFeatureSetSpec( + "fs", + 1, + 0, + new HashMap() { + { + put("entity1", Enum.INT64); + } + }, + new HashMap() { + { + put("feature1", Enum.STRING); + } + }); + FeatureRow featureRow = + TestUtil.createFeatureRow( + featureSetSpec, + 10, + new HashMap() { + { + put("entity1", TestUtil.intValue(1)); + put("feature1", TestUtil.strValue("a")); + } + }); PCollection input = testPipeline.apply(Create.of(featureRow)); PCollection output = - input.apply(ParDo.of(new FeatureRowToCassandraMutationDoFn( - new HashMap() {{ - put(featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), featureSetSpec); - }}, defaultTtl))); - - CassandraMutation[] expected = new CassandraMutation[] { - new CassandraMutation( - "fs:1:entity1=1", - "feature1", - ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), - 10000000, - 500 - ) - }; + input.apply( + ParDo.of( + new FeatureRowToCassandraMutationDoFn( + new HashMap() { + { + put( + featureSetSpec.getName() + ":" + featureSetSpec.getVersion(), + featureSetSpec); + } + }, + defaultTtl))); + + CassandraMutation[] expected = + new CassandraMutation[] { + new CassandraMutation( + "fs:1:entity1=1", + "feature1", + ByteBuffer.wrap(TestUtil.strValue("a").toByteArray()), + 10000000, + 500) + }; PAssert.that(output).containsInAnyOrder(expected); testPipeline.run(); } - } diff --git a/ingestion/src/test/java/feast/test/TestUtil.java b/ingestion/src/test/java/feast/test/TestUtil.java index 0fcb9f4e16..51d4712759 100644 --- a/ingestion/src/test/java/feast/test/TestUtil.java +++ b/ingestion/src/test/java/feast/test/TestUtil.java @@ -24,7 +24,9 @@ import feast.core.FeatureSetProto.EntitySpec; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.FeatureSetProto.FeatureSpec; +import feast.core.StoreProto.Store.CassandraConfig; import feast.ingestion.transform.WriteToStore; +import feast.ingestion.utils.StoreUtil; import feast.storage.RedisProto.RedisKey; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FeatureRowProto.FeatureRow.Builder; @@ -64,7 +66,6 @@ import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZooKeeperServerMain; -import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; import org.cassandraunit.utils.EmbeddedCassandraServerHelper; import org.joda.time.Duration; import redis.embedded.RedisServer; @@ -100,10 +101,8 @@ public static void start() throws InterruptedException, IOException, TTransportE EmbeddedCassandraServerHelper.startEmbeddedCassandra(); } - public static void createKeyspaceAndTable() { - new ClassPathCQLDataSet("embedded-store/LoadCassandra.cql", true, true) - .getCQLStatements() - .forEach(s -> LocalCassandra.getSession().execute(s)); + public static void createKeyspaceAndTable(CassandraConfig config) { + StoreUtil.setupCassandra(config); } public static String getHost() { diff --git a/ingestion/src/test/resources/embedded-store/LoadCassandra.cql b/ingestion/src/test/resources/embedded-store/LoadCassandra.cql deleted file mode 100644 index c80da294b7..0000000000 --- a/ingestion/src/test/resources/embedded-store/LoadCassandra.cql +++ /dev/null @@ -1,8 +0,0 @@ -CREATE KEYSPACE test with replication = {'class':'SimpleStrategy','replication_factor':1}; - -CREATE TABLE test.feature_store( - entities text, - feature text, - value blob, - PRIMARY KEY (entities, feature) -) WITH CLUSTERING ORDER BY (feature DESC); \ No newline at end of file diff --git a/protos/feast/core/Store.proto b/protos/feast/core/Store.proto index 34254cf383..9e1dc33143 100644 --- a/protos/feast/core/Store.proto +++ b/protos/feast/core/Store.proto @@ -110,6 +110,8 @@ message Store { // // Columns: // - entities: concatenated string of feature set name and all entities' keys and values + // entities concatenated format - [feature_set]:[entity_name1=entity_value1]|[entity_name2=entity_value2] + // TODO: string representation of float or double types may have different value in different runtime or platform // - feature: clustering column where each feature is a column // - value: byte array of Value (refer to feast.types.Value) // @@ -135,8 +137,15 @@ message Store { string bootstrap_hosts = 1; int32 port = 2; string keyspace = 3; + + // Please note that table name must be "feature_store" as is specified in the @Table annotation of the + // datastax object mapper string table_name = 4; + + // This specifies the replication strategy to use. Please refer to docs for more details: + // https://docs.datastax.com/en/dse/6.7/cql/cql/cql_reference/cql_commands/cqlCreateKeyspace.html#cqlCreateKeyspace__cqlCreateKeyspacereplicationmap-Pr3yUQ7t map replication_options = 5; + // Default expiration in seconds to use when FeatureSetSpec does not have max_age defined. // Specify 0 for no default expiration google.protobuf.Duration default_ttl = 6;