Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Versionless serving option #5

Merged
merged 3 commits into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#
REGISTRY := gcr.io/pm-registry/feast
VERSION := v0.4-cassandra-ordered-notrace
VERSION := v0.4-cassandra-versionless-10
PROJECT_ROOT := $(shell git rev-parse --show-toplevel)

test:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ require (
github.com/ghodss/yaml v1.0.0
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/mock v1.2.0
github.com/golang/protobuf v1.3.2
github.com/google/go-cmp v0.3.0
github.com/golang/protobuf v1.4.1
github.com/google/go-cmp v0.4.0
github.com/huandu/xstrings v1.2.0 // indirect
github.com/lyft/protoc-gen-validate v0.1.0 // indirect
github.com/mitchellh/copystructure v1.0.0 // indirect
Expand Down
19 changes: 19 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,22 @@ github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -389,6 +399,7 @@ golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/netlib v0.0.0-20190331212654-76723241ea4e/go.mod h1:kS+toOQn6AQKjmKJ7gzohV1XkqsFehRA2FbsbkopSuQ=
Expand All @@ -408,6 +419,14 @@ google.golang.org/grpc v1.19.1 h1:TrBcJ1yqAl1G++wO39nD/qtgpsW9/1+QGrluyMGEYgM=
google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zimw=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2MY=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
public class CassandraMutationMapper implements Mapper<CassandraMutation>, Serializable {

private com.datastax.driver.mapping.Mapper<CassandraMutation> mapper;
private Boolean tracing;

CassandraMutationMapper(com.datastax.driver.mapping.Mapper<CassandraMutation> mapper) {
CassandraMutationMapper(
com.datastax.driver.mapping.Mapper<CassandraMutation> mapper, Boolean tracing) {
this.mapper = mapper;
this.tracing = tracing;
}

@Override
Expand All @@ -57,7 +60,7 @@ public Future<Void> saveAsync(CassandraMutation entityClass) {
entityClass,
Option.timestamp(entityClass.getWriteTime()),
Option.ttl(entityClass.getTtl()),
Option.consistencyLevel(ConsistencyLevel.ALL),
Option.tracing(false));
Option.consistencyLevel(ConsistencyLevel.ONE),
Option.tracing(tracing));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ public class CassandraMutationMapperFactory implements SerializableFunction<Sess

private transient MappingManager mappingManager;
private Class<CassandraMutation> entityClass;
private boolean tracing;

public CassandraMutationMapperFactory(Class<CassandraMutation> entityClass) {
public CassandraMutationMapperFactory(Class<CassandraMutation> entityClass, Boolean tracing) {
this.entityClass = entityClass;
this.tracing = tracing;
}

@Override
Expand All @@ -37,6 +39,6 @@ public Mapper apply(Session session) {
this.mappingManager = new MappingManager(session);
}

return new CassandraMutationMapper(mappingManager.mapper(entityClass));
return new CassandraMutationMapper(mappingManager.mapper(entityClass), this.tracing);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,18 @@ public void processElement(ProcessContext context) {
break;
case CASSANDRA:
CassandraConfig cassandraConfig = getStore().getCassandraConfig();
log.info("Tracing Enabled: {}", cassandraConfig.getTracing());
SerializableFunction<Session, Mapper> mapperFactory =
new CassandraMutationMapperFactory(CassandraMutation.class);
new CassandraMutationMapperFactory(
CassandraMutation.class, cassandraConfig.getTracing());
input
.apply(
"Create CassandraMutation from FeatureRow",
ParDo.of(
new FeatureRowToCassandraMutationDoFn(
getFeatureSets(), cassandraConfig.getDefaultTtl())))
getFeatureSets(),
cassandraConfig.getDefaultTtl(),
cassandraConfig.getVersionless())))
.apply(
CassandraIO.<CassandraMutation>write()
.withHosts(Arrays.asList(cassandraConfig.getBootstrapHosts().split(",")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public int getTtl() {
return ttl;
}

static String keyFromFeatureRow(FeatureSetSpec featureSetSpec, FeatureRow featureRow) {
static String keyFromFeatureRow(
FeatureSetSpec featureSetSpec, FeatureRow featureRow, Boolean versionless) {
List<String> entityNames =
featureSetSpec.getEntitiesList().stream()
.map(EntitySpec::getName)
Expand All @@ -95,7 +96,13 @@ static String keyFromFeatureRow(FeatureSetSpec featureSetSpec, FeatureRow featur
entities.put(entityNames.get(entityNames.indexOf(field.getName())), field);
}
}
return featureRow.getFeatureSet()
String fsName;
if (versionless) {
fsName = featureRow.getFeatureSet().split(":")[0];
} else {
fsName = featureRow.getFeatureSet();
}
return fsName
+ ":"
+ entityNames.stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@ public class FeatureRowToCassandraMutationDoFn extends DoFn<FeatureRow, Cassandr
org.slf4j.LoggerFactory.getLogger(FeatureRowToCassandraMutationDoFn.class);
private Map<String, FeatureSet> featureSets;
private Map<String, Integer> maxAges;
private Boolean versionless;

public FeatureRowToCassandraMutationDoFn(
Map<String, FeatureSet> featureSets, Duration defaultTtl) {
Map<String, FeatureSet> featureSets, Duration defaultTtl, Boolean versionless) {
this.featureSets = featureSets;
this.maxAges = new HashMap<>();
this.versionless = versionless;
for (FeatureSet set : featureSets.values()) {
FeatureSetSpec spec = set.getSpec();
String featureSetName = spec.getProject() + "/" + spec.getName() + ":" + spec.getVersion();
String featureSetName;
featureSetName = spec.getProject() + "/" + spec.getName() + ":" + spec.getVersion();
if (spec.getMaxAge() != null && spec.getMaxAge().getSeconds() > 0) {
maxAges.put(featureSetName, Math.toIntExact(spec.getMaxAge().getSeconds()));
} else {
Expand All @@ -65,16 +68,20 @@ public void processElement(ProcessContext context) {
featureSetSpec.getFeaturesList().stream()
.map(FeatureSpec::getName)
.collect(Collectors.toSet());
String key = CassandraMutation.keyFromFeatureRow(featureSetSpec, featureRow);
String key = CassandraMutation.keyFromFeatureRow(featureSetSpec, featureRow, versionless);

Collection<CassandraMutation> mutations = new ArrayList<>();
for (Field field : featureRow.getFieldsList()) {
if (featureNames.contains(field.getName())) {
ByteBuffer value = ByteBuffer.wrap(field.getValue().toByteArray());
if (!value.hasRemaining()) {
continue;
}
mutations.add(
new CassandraMutation(
key,
field.getName(),
ByteBuffer.wrap(field.getValue().toByteArray()),
value,
Timestamps.toMicros(featureRow.getEventTimestamp()),
maxAges.get(featureRow.getFeatureSet())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void processElement_shouldCreateCassandraMutation_givenFeatureRow() {
FeatureSet.newBuilder().setSpec(featureSetSpec).build());
}
},
Duration.newBuilder().setSeconds(0).build())));
Duration.newBuilder().setSeconds(0).build(),
false)));

CassandraMutation[] expected =
new CassandraMutation[] {
Expand Down Expand Up @@ -152,7 +153,8 @@ public void processElement_shouldCreateCassandraMutation_givenFeatureRow() {
FeatureSet.newBuilder().setSpec(featureSetSpec).build());
}
},
Duration.newBuilder().setSeconds(0).build())));
Duration.newBuilder().setSeconds(0).build(),
false)));

CassandraMutation[] expected =
new CassandraMutation[] {
Expand Down Expand Up @@ -222,7 +224,8 @@ public void processElement_shouldUseDefaultMaxAge_whenMissingMaxAge() {
FeatureSet.newBuilder().setSpec(featureSetSpec).build());
}
},
defaultTtl)));
defaultTtl,
false)));

CassandraMutation[] expected =
new CassandraMutation[] {
Expand Down
Binary file added out
Binary file not shown.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@
It is assumed that the GPG command used is version 2.x.
-->
<configuration>
<skip>true</skip>
<gpgArguments>
<arg>--pinentry-mode</arg>
<arg>loopback</arg>
Expand Down
6 changes: 3 additions & 3 deletions protos/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ gen-go:
gen-python:
pip install grpcio-tools
pip install mypy-protobuf
@$(foreach dir,$(dirs),python -m grpc_tools.protoc -I. --python_out=../sdk/python/ --mypy_out=../sdk/python/ feast/$(dir)/*.proto;)
@$(foreach dir,$(service_dirs),python -m grpc_tools.protoc -I. --grpc_python_out=../sdk/python/ feast/$(dir)/*.proto;)
@$(foreach dir,$(dirs),python3 -m grpc_tools.protoc -I. --python_out=../sdk/python/ --mypy_out=../sdk/python/ feast/$(dir)/*.proto;)
@$(foreach dir,$(service_dirs),python3 -m grpc_tools.protoc -I. --grpc_python_out=../sdk/python/ feast/$(dir)/*.proto;)

install-dependencies-docs:
mkdir -p $$HOME/bin
Expand All @@ -30,4 +30,4 @@ install-dependencies-docs:

gen-docs:
protoc --docs_out=../dist/grpc feast/*/*.proto || \
$(MAKE) install-dependencies-docs && PATH=$$HOME/bin:$$PATH protoc -I $$HOME/include/ -I . --docs_out=../dist/grpc feast/*/*.proto
$(MAKE) install-dependencies-docs && PATH=$$HOME/bin:$$PATH protoc -I $$HOME/include/ -I . --docs_out=../dist/grpc feast/*/*.proto
51 changes: 27 additions & 24 deletions protos/feast/core/Store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ option go_package = "github.com/gojek/feast/sdk/go/protos/feast/core";
// The way FeatureRow is encoded and decoded when it is written to and read from
// the Store depends on the type of the Store.
//
// For example, a FeatureRow will materialize as a row in a table in
// For example, a FeatureRow will materialize as a row in a table in
// BigQuery but it will materialize as a key, value pair element in Redis.
//
message Store {
Expand All @@ -41,30 +41,30 @@ message Store {
// The Redis data types used (https://redis.io/topics/data-types):
// - key: STRING
// - value: STRING
//
//
// Encodings:
// - key: byte array of RedisKey (refer to feast.storage.RedisKey)
// - value: byte array of FeatureRow (refer to feast.types.FeatureRow)
//
//
REDIS = 1;

// BigQuery stores a FeatureRow element as a row in a BigQuery table.
//
//
// Table name is derived from the feature set name and version as:
// [feature_set_name]_v[feature_set_version]
//
// [feature_set_name]_v[feature_set_version]
//
// For example:
// A feature row for feature set "driver" and version "1" will be written
// to table "driver_v1".
//
// The entities and features in a FeatureSetSpec corresponds to the
// fields in the BigQuery table (these make up the BigQuery schema).
// The name of the entity spec and feature spec corresponds to the column
// names, and the value_type of entity spec and feature spec corresponds
// to BigQuery standard SQL data type of the column.
//
// The following BigQuery fields are reserved for Feast internal use.
// Ingestion of entity or feature spec with names identical
//
// The entities and features in a FeatureSetSpec corresponds to the
// fields in the BigQuery table (these make up the BigQuery schema).
// The name of the entity spec and feature spec corresponds to the column
// names, and the value_type of entity spec and feature spec corresponds
// to BigQuery standard SQL data type of the column.
//
// The following BigQuery fields are reserved for Feast internal use.
// Ingestion of entity or feature spec with names identical
// to the following field names will raise an exception during ingestion.
//
// column_name | column_data_type | description
Expand All @@ -77,21 +77,21 @@ message Store {
// of the FeatureRow (https://cloud.google.com/bigquery/docs/partitioned-tables).
//
// Since newer version of feature set can introduce breaking, non backward-
// compatible BigQuery schema updates, incrementing the version of a
// compatible BigQuery schema updates, incrementing the version of a
// feature set will result in the creation of a new empty BigQuery table
// with the new schema.
//
// The following table shows how ValueType in Feast is mapped to
// BigQuery Standard SQL data types
//
// The following table shows how ValueType in Feast is mapped to
// BigQuery Standard SQL data types
// (https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types):
//
// BYTES : BYTES
// STRING : STRING
// BYTES : BYTES
// STRING : STRING
// INT32 : INT64
// INT64 : IN64
// INT64 : IN64
// DOUBLE : FLOAT64
// FLOAT : FLOAT64
// BOOL : BOOL
// FLOAT : FLOAT64
// BOOL : BOOL
// BYTES_LIST : ARRAY
// STRING_LIST : ARRAY
// INT32_LIST : ARRAY
Expand Down Expand Up @@ -154,6 +154,9 @@ message Store {
// Default expiration in seconds to use when FeatureSetSpec does not have max_age defined.
// Specify 0 for no default expiration
google.protobuf.Duration default_ttl = 6;
bool versionless = 7;
string consistency = 8;
bool tracing = 9;
}

message Subscription {
Expand Down
12 changes: 7 additions & 5 deletions sdk/go/protos/feast/core/CoreService.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading