Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for feature set updates and remove versions #676

Merged
merged 19 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Job startJob(Job job) {
featureSetProtos.add(featureSet.toProto());
}
ImportOptions pipelineOptions =
getPipelineOptions(featureSetProtos, job.getStore().toProto());
getPipelineOptions(job.getId(), featureSetProtos, job.getStore().toProto());
PipelineResult pipelineResult = runPipeline(pipelineOptions);
DirectJob directJob = new DirectJob(job.getId(), pipelineResult);
jobs.add(directJob);
Expand All @@ -93,14 +93,16 @@ public Job startJob(Job job) {
}

private ImportOptions getPipelineOptions(
List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink) throws IOException {
String jobName, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink)
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());

pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets));
pipelineOptions.setJobName(jobName);
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setRunner(DirectRunner.class);
pipelineOptions.setProject(""); // set to default value to satisfy validation
Expand Down
44 changes: 18 additions & 26 deletions core/src/main/java/feast/core/model/FeatureSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,24 @@ public FeatureSetProto.FeatureSet toProto() throws InvalidProtocolBufferExceptio
return FeatureSetProto.FeatureSet.newBuilder().setMeta(meta).setSpec(spec).build();
}

/**
* Checks if the given featureSet's schema and source has is different from this one.
*
* @param other FeatureSet to compare to
* @return boolean denoting if the source or schema have changed.
*/
public boolean equalTo(FeatureSet other) {
@Override
public int hashCode() {
HashCodeBuilder hcb = new HashCodeBuilder();
hcb.append(project.getName());
hcb.append(getName());
return hcb.toHashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof FeatureSet)) {
return false;
}

FeatureSet other = (FeatureSet) obj;
if (!getName().equals(other.getName())) {
return false;
}
Expand Down Expand Up @@ -331,23 +342,4 @@ public boolean equalTo(FeatureSet other) {

return true;
}

@Override
public int hashCode() {
HashCodeBuilder hcb = new HashCodeBuilder();
hcb.append(project.getName());
hcb.append(getName());
return hcb.toHashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof FeatureSet)) {
return false;
}
return this.equalTo(((FeatureSet) obj));
}
}
15 changes: 12 additions & 3 deletions core/src/main/java/feast/core/service/SpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.FeatureSetProto;
import feast.core.FeatureSetProto.FeatureSetStatus;
import feast.core.SourceProto;
import feast.core.StoreProto;
import feast.core.StoreProto.Store.Subscription;
Expand Down Expand Up @@ -248,6 +249,16 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetProto.FeatureSet newFea
throw new IllegalArgumentException(String.format("Project is archived: %s", project_name));
}

// Set source to default if not set in proto
if (newFeatureSet.getSpec().getSource() == SourceProto.Source.getDefaultInstance()) {
newFeatureSet =
newFeatureSet
.toBuilder()
.setSpec(
newFeatureSet.getSpec().toBuilder().setSource(defaultSource.toProto()).build())
.build();
}

// Retrieve existing FeatureSet
FeatureSet featureSet =
featureSetRepository.findFeatureSetByNameAndProject_Name(
Expand All @@ -258,9 +269,6 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetProto.FeatureSet newFea
// Create new feature set since it doesn't exist
newFeatureSet = newFeatureSet.toBuilder().setSpec(newFeatureSet.getSpec()).build();
featureSet = FeatureSet.fromProto(newFeatureSet);
if (newFeatureSet.getSpec().getSource() == SourceProto.Source.getDefaultInstance()) {
featureSet.setSource(defaultSource);
}
status = Status.CREATED;
} else {
// If the featureSet remains unchanged, we do nothing.
Expand All @@ -275,6 +283,7 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetProto.FeatureSet newFea
}

// Persist the FeatureSet object
featureSet.setStatus(FeatureSetStatus.STATUS_PENDING);
project.addFeatureSet(featureSet);
projectRepository.saveAndFlush(project);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {

Printer printer = JsonFormat.printer();

String expectedJobId = "feast-job-0";
ImportOptions expectedPipelineOptions =
PipelineOptionsFactory.fromArgs("").as(ImportOptions.class);
expectedPipelineOptions.setJobName(expectedJobId);
expectedPipelineOptions.setAppName("DirectRunnerJobManager");
expectedPipelineOptions.setRunner(DirectRunner.class);
expectedPipelineOptions.setBlockOnRun(false);
Expand All @@ -130,7 +132,6 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {
expectedPipelineOptions.setFeatureSetJson(
featureSetJsonCompressor.compress(Collections.singletonList(featureSet)));

String expectedJobId = "feast-job-0";
ArgumentCaptor<ImportOptions> pipelineOptionsCaptor =
ArgumentCaptor.forClass(ImportOptions.class);
ArgumentCaptor<DirectJob> directJobCaptor = ArgumentCaptor.forClass(DirectJob.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void processElement(ProcessContext context) {
Field fieldSpec = featureSet.getField(field.getName());
if (fieldSpec == null) {
// skip
break;
continue;
}
// If value is set in the FeatureRow, make sure the value type matches
// that defined in FeatureSetSpec
Expand Down
3 changes: 3 additions & 0 deletions protos/feast/core/FeatureSet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ message FeatureSetSpec {
// Name of the feature set. Must be unique.
string name = 1;

// Feature set version was removed in v0.5.0.
reserved 2;

// List of entities contained within this featureSet.
// This allows the feature to be used during joins between feature sets.
// If the featureSet is ingested into a store that supports keys, this value
Expand Down
2 changes: 2 additions & 0 deletions protos/feast/core/FeatureSetReference.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ message FeatureSetReference {
string project = 1;
// Name of the FeatureSet
string name = 2;
// Feature set version was removed in v0.5.0.
reserved 3;
}
4 changes: 3 additions & 1 deletion protos/feast/core/Store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ message Store {
// pattern matching.
string project = 3;


// Name of the desired feature set. Asterisks can be used as wildcards in the name.
// Matching on names is only permitted if a specific project is defined. It is disallowed
// If the project name is set to "*"
Expand All @@ -148,6 +147,9 @@ message Store {
// - my-feature-set* can be used to match all features prefixed by "my-feature-set"
// - my-feature-set-6 can be used to select a single feature set
string name = 1;

// Feature set version was removed in v0.5.0.
reserved 2;
}

// Name of the store.
Expand Down
4 changes: 2 additions & 2 deletions sdk/go/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func buildFeatures(featureReferences []string, defaultProject string) ([]*servin

if len(projectSplit) == 2 {
project = projectSplit[0]
name = strings.Split(projectSplit[1], ":")[0]
name = projectSplit[1]
} else if len(projectSplit) == 1 {
project = defaultProject
name = strings.Split(projectSplit[0], ":")[0]
name = projectSplit[0]
} else {
return nil, fmt.Errorf(ErrInvalidFeatureName, featureRef)
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/java/src/main/java/com/gojek/feast/RequestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ public static List<FeatureReference> createFeatureRefs(

if (projectSplit.length == 1) {
project = defaultProject;
name = projectSplit[0].split(":")[0];
name = projectSplit[0];
} else if (projectSplit.length == 2) {
project = projectSplit[0];
name = projectSplit[1].split(":")[0];
name = projectSplit[1];
} else {
throw new IllegalArgumentException(
String.format(
"Feature id '%s' has invalid format. Expected format: <project>/<feature-name>.",
featureRefString));
}

if (project.isEmpty() || name.isEmpty()) {
if (project.isEmpty() || name.isEmpty() || name.contains(":")) {
throw new IllegalArgumentException(
String.format(
"Feature id '%s' has invalid format. Expected format: <project>/<feature-name>.",
Expand Down
20 changes: 14 additions & 6 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,10 @@ def _apply_feature_set(self, feature_set: FeatureSet):

# If the feature set has changed, update the local copy
if apply_fs_response.status == ApplyFeatureSetResponse.Status.CREATED:
print(f'Feature set updated/created: "{applied_fs.name}"')
print(f'Feature set created: "{applied_fs.name}"')

if apply_fs_response.status == ApplyFeatureSetResponse.Status.UPDATED:
print(f'Feature set updated: "{applied_fs.name}"')

# If no change has been applied, do nothing
if apply_fs_response.status == ApplyFeatureSetResponse.Status.NO_CHANGE:
Expand Down Expand Up @@ -719,7 +722,7 @@ def ingest(
max_workers: int = max(CPU_COUNT - 1, 1),
disable_progress_bar: bool = False,
timeout: int = KAFKA_CHUNK_PRODUCTION_TIMEOUT,
) -> None:
) -> str:
"""
Loads feature data into Feast for a specific feature set.

Expand Down Expand Up @@ -747,8 +750,8 @@ def ingest(
Timeout in seconds to wait for completion.

Returns:
None:
None
str:
ingestion id for this dataset
"""

if isinstance(feature_set, FeatureSet):
Expand Down Expand Up @@ -827,7 +830,7 @@ def ingest(
print("Removing temporary file(s)...")
shutil.rmtree(dir_path)

return None
return ingestion_id


def _build_feature_references(
Expand Down Expand Up @@ -868,7 +871,12 @@ def _build_feature_references(
f'Could not parse feature ref {feature_ref}, expecting "project/feature"'
)

features.append(FeatureReference(project=project, name=name.split(":")[0]))
if ":" in name:
raise ValueError(
f'Could not parse feature ref {feature_ref}, expecting "project/feature". Versions were deprecated in v0.5.0.'
)

features.append(FeatureReference(project=project, name=name))
return features


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ public List<FeatureSetRequest> getFeatureSets(List<FeatureReference> featureRefe
*/
public void populateCache() {
Map<String, FeatureSetSpec> featureSetMap = getFeatureSetMap();

featureSetCache.invalidateAll();
featureSetCache.putAll(featureSetMap);

featureToFeatureSetMapping.clear();
featureToFeatureSetMapping.putAll(getFeatureToFeatureSetMapping(featureSetMap));

featureSetsCount.set(featureSetCache.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import feast.storage.connectors.bigquery.common.TypeUtil;
import feast.types.FeatureRowProto;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -195,10 +194,10 @@ private TableDefinition createBigQueryTableDefinition(
TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("event_timestamp").build();
log.info("Table partitioning: " + timePartitioning.toString());

FieldList fieldsList = FieldList.of(Collections.emptyList());
List<Field> fieldsList = new ArrayList<>();
if (existingTable != null) {
Schema existingSchema = existingTable.getDefinition().getSchema();
fieldsList = existingSchema.getFields();
fieldsList.addAll(existingSchema.getFields());
}

for (Field field : fields) {
Expand All @@ -209,7 +208,7 @@ private TableDefinition createBigQueryTableDefinition(

return StandardTableDefinition.newBuilder()
.setTimePartitioning(timePartitioning)
.setSchema(Schema.of(fields))
.setSchema(Schema.of(FieldList.of(fieldsList)))
.build();
}
}
Loading