Skip to content

Commit

Permalink
docs update, spotless check, and bug fix on cassandra schema
Browse files Browse the repository at this point in the history
  • Loading branch information
smadarasmi committed Jan 3, 2020
1 parent bc8f3bc commit b8d1025
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 197 deletions.
29 changes: 16 additions & 13 deletions ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public abstract class WriteToStore extends PTransform<PCollection<FeatureRow>, P
public static final String METRIC_NAMESPACE = "WriteToStore";
public static final String ELEMENTS_WRITTEN_METRIC = "elements_written";

private static final Counter elementsWritten = Metrics
.counter(METRIC_NAMESPACE, ELEMENTS_WRITTEN_METRIC);
private static final Counter elementsWritten =
Metrics.counter(METRIC_NAMESPACE, ELEMENTS_WRITTEN_METRIC);

public abstract Store getStore();

Expand Down Expand Up @@ -155,32 +155,35 @@ public void processElement(ProcessContext context) {
break;
case CASSANDRA:
CassandraConfig cassandraConfig = getStore().getCassandraConfig();
SerializableFunction<Session, Mapper> mapperFactory = new CassandraMutationMapperFactory(CassandraMutation.class);
SerializableFunction<Session, Mapper> mapperFactory =
new CassandraMutationMapperFactory(CassandraMutation.class);
input
.apply(
"Create CassandraMutation from FeatureRow",
ParDo.of(new FeatureRowToCassandraMutationDoFn(
getFeatureSetSpecs(), cassandraConfig.getDefaultTtl()))
)
ParDo.of(
new FeatureRowToCassandraMutationDoFn(
getFeatureSetSpecs(), cassandraConfig.getDefaultTtl())))
.apply(
CassandraIO.<CassandraMutation>write()
.withHosts(Arrays.asList(cassandraConfig.getBootstrapHosts().split(",")))
.withPort(cassandraConfig.getPort())
.withKeyspace(cassandraConfig.getKeyspace())
.withEntity(CassandraMutation.class)
.withMapperFactoryFn(mapperFactory)
);
.withMapperFactoryFn(mapperFactory));
break;
default:
log.error("Store type '{}' is not supported. No Feature Row will be written.", storeType);
break;
}

input.apply("IncrementWriteToStoreElementsWrittenCounter",
MapElements.into(TypeDescriptors.booleans()).via((FeatureRow row) -> {
elementsWritten.inc();
return true;
}));
input.apply(
"IncrementWriteToStoreElementsWrittenCounter",
MapElements.into(TypeDescriptors.booleans())
.via(
(FeatureRow row) -> {
elementsWritten.inc();
return true;
}));

return PDone.in(input.getPipeline());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public static void setupCassandra(CassandraConfig cassandraConfig) {
.ifNotExists()
.addPartitionKey(CassandraMutation.ENTITIES, DataType.text())
.addClusteringColumn(CassandraMutation.FEATURE, DataType.text())
.addStaticColumn(CassandraMutation.VALUE, DataType.blob());
.addColumn(CassandraMutation.VALUE, DataType.blob());
log.info("Create Cassandra table if not exists..");
session.execute(createTable);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.store.serving.cassandra;

import com.google.protobuf.Duration;
Expand All @@ -18,8 +34,8 @@

public class FeatureRowToCassandraMutationDoFn extends DoFn<FeatureRow, CassandraMutation> {

private static final Logger log = org.slf4j.LoggerFactory
.getLogger(FeatureRowToCassandraMutationDoFn.class);
private static final Logger log =
org.slf4j.LoggerFactory.getLogger(FeatureRowToCassandraMutationDoFn.class);
private Map<String, FeatureSetSpec> featureSetSpecs;
private Map<String, Integer> maxAges;

Expand All @@ -36,16 +52,16 @@ public FeatureRowToCassandraMutationDoFn(Map<String, FeatureSetSpec> specs, Dura
}
}

/**
* Output a Cassandra mutation object for every feature in the feature row.
*/
/** Output a Cassandra mutation object for every feature in the feature row. */
@ProcessElement
public void processElement(ProcessContext context) {
FeatureRow featureRow = context.element();
try {
FeatureSetSpec featureSetSpec = featureSetSpecs.get(featureRow.getFeatureSet());
Set<String> featureNames = featureSetSpec.getFeaturesList().stream()
.map(FeatureSpec::getName).collect(Collectors.toSet());
Set<String> featureNames =
featureSetSpec.getFeaturesList().stream()
.map(FeatureSpec::getName)
.collect(Collectors.toSet());
String key = CassandraMutation.keyFromFeatureRow(featureSetSpec, featureRow);

Collection<CassandraMutation> mutations = new ArrayList<>();
Expand All @@ -57,17 +73,13 @@ public void processElement(ProcessContext context) {
field.getName(),
ByteBuffer.wrap(field.getValue().toByteArray()),
Timestamps.toMicros(featureRow.getEventTimestamp()),
maxAges.get(featureRow.getFeatureSet())
)
);
maxAges.get(featureRow.getFeatureSet())));
}
}

mutations.forEach(context::output);
} catch (Exception e) {
log.error(e.getMessage(), e);
}

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,7 @@ public Store getStore() {
return Store.newBuilder()
.setType(StoreType.CASSANDRA)
.setName("SERVING")
.setCassandraConfig(
CassandraConfig.newBuilder()
.setBootstrapHosts(LocalCassandra.getHost())
.setPort(LocalCassandra.getPort())
.setTableName("feature_store")
.setKeyspace("test")
.build())
.setCassandraConfig(getCassandraConfig())
.build();
}

Expand All @@ -86,10 +80,26 @@ public Map<String, FeatureSetSpec> getFeatureSetSpecs() {
}
}

private static CassandraConfig getCassandraConfig() {
return CassandraConfig.newBuilder()
.setBootstrapHosts(LocalCassandra.getHost())
.setPort(LocalCassandra.getPort())
.setTableName("feature_store")
.setKeyspace("test")
.putAllReplicationOptions(
new HashMap<String, String>() {
{
put("class", "SimpleStrategy");
put("replication_factor", "1");
}
})
.build();
}

@BeforeClass
public static void startServer() throws InterruptedException, IOException, TTransportException {
LocalCassandra.start();
LocalCassandra.createKeyspaceAndTable();
LocalCassandra.createKeyspaceAndTable(getCassandraConfig());
}

@Before
Expand Down Expand Up @@ -120,7 +130,7 @@ public void setUp() {
put("entity1", TestUtil.intValue(1));
put("entity2", TestUtil.strValue("a"));
put("feature1", TestUtil.intValue(1));
put("feature2", TestUtil.intValue(1));
put("feature2", TestUtil.intValue(2));
}
});
}
Expand All @@ -146,7 +156,7 @@ public void testWriteCassandra_happyPath() throws InvalidProtocolBufferException
List<Field> expectedFields =
Arrays.asList(
Field.newBuilder().setName("feature1").setValue(TestUtil.intValue(1)).build(),
Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(1)).build());
Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(2)).build());

assertTrue(actualResults.containsAll(expectedFields));
assertEquals(expectedFields.size(), actualResults.size());
Expand Down Expand Up @@ -201,7 +211,7 @@ public void testWriteCassandra_shouldNotOverrideNewerValues()
put("entity1", TestUtil.intValue(1));
put("entity2", TestUtil.strValue("a"));
put("feature1", TestUtil.intValue(3));
put("feature2", TestUtil.intValue(3));
put("feature2", TestUtil.intValue(4));
}
});

Expand All @@ -217,7 +227,7 @@ public void testWriteCassandra_shouldNotOverrideNewerValues()
List<Field> expectedFields =
Arrays.asList(
Field.newBuilder().setName("feature1").setValue(TestUtil.intValue(1)).build(),
Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(1)).build());
Field.newBuilder().setName("feature2").setValue(TestUtil.intValue(2)).build());

assertTrue(actualResults.containsAll(expectedFields));
assertEquals(expectedFields.size(), actualResults.size());
Expand Down
Loading

0 comments on commit b8d1025

Please sign in to comment.