Skip to content

Commit

Permalink
Merge #12: prefetch specs and validate on job expansion
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilingc committed Dec 31, 2018
2 parents 73a0290 + b604eb0 commit 7f0328c
Show file tree
Hide file tree
Showing 15 changed files with 471 additions and 408 deletions.
29 changes: 17 additions & 12 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,21 @@
import feast.ingestion.config.ImportSpecSupplier;
import feast.ingestion.model.Specs;
import feast.ingestion.options.ImportJobOptions;
import feast.ingestion.transform.*;
import feast.ingestion.transform.ErrorsStoreTransform;
import feast.ingestion.transform.ReadFeaturesTransform;
import feast.ingestion.transform.ServingStoreTransform;
import feast.ingestion.transform.ToFeatureRowExtended;
import feast.ingestion.transform.ValidateTransform;
import feast.ingestion.transform.WarehouseStoreTransform;
import feast.ingestion.transform.fn.ConvertTypesDoFn;
import feast.ingestion.transform.fn.LoggerDoFn;
import feast.ingestion.transform.fn.RoundEventTimestampsDoFn;
import feast.ingestion.values.PFeatureRows;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Arrays;
import java.util.Random;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
Expand All @@ -61,10 +68,6 @@
import org.joda.time.Duration;
import org.slf4j.event.Level;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;

@Slf4j
public class ImportJob {
private static Random random = new Random(System.currentTimeMillis());
Expand Down Expand Up @@ -125,6 +128,13 @@ public static PipelineResult mainWithResult(String[] args) {
return job.run();
}

private static String generateName() {
byte[] bytes = new byte[7];
random.nextBytes(bytes);
String randomHex = DigestUtils.sha1Hex(bytes).substring(0, 7);
return String.format("feast-importjob-%s-%s", DateTime.now().getMillis(), randomHex);
}

public void expand() {
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(
Expand All @@ -139,6 +149,8 @@ public void expand() {
// pass
}

specs.validate();

PCollection<FeatureRow> features = pipeline.apply("Read", readFeaturesTransform);
if (options.getLimit() != null && options.getLimit() > 0) {
features = features.apply(Sample.any(options.getLimit()));
Expand Down Expand Up @@ -200,13 +212,6 @@ public void logNRows(PFeatureRows pFeatureRows, String name, int limit) {
.apply("Log errors sample", ParDo.of(new LoggerDoFn(Level.ERROR, name + " ERRORS ")));
}

private static String generateName() {
byte[] bytes = new byte[7];
random.nextBytes(bytes);
String randomHex = DigestUtils.sha1Hex(bytes).substring(0, 7);
return String.format("feast-importjob-%s-%s", DateTime.now().getMillis(), randomHex);
}

private String retrieveId(PipelineResult result) {
Class<? extends PipelineRunner<?>> runner = options.getRunner();
if (runner.isAssignableFrom(DataflowRunner.class)) {
Expand Down
19 changes: 11 additions & 8 deletions ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;
import feast.ingestion.model.Specs;
import feast.ingestion.model.SpecsImpl;
import feast.ingestion.options.ImportJobOptions;
import feast.ingestion.service.CachedSpecService;
import feast.ingestion.service.CoreSpecService;
import feast.ingestion.service.FileSpecService;
import feast.ingestion.service.SpecService;
import feast.ingestion.service.SpecService.Builder;
import feast.ingestion.service.SpecService.UnsupportedBuilder;
import feast.specs.ImportSpecProto.ImportSpec;
Expand All @@ -37,6 +34,8 @@
import feast.storage.service.ErrorsStoreService;
import feast.storage.service.ServingStoreService;
import feast.storage.service.WarehouseStoreService;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;

/** An ImportJobModule is a Guice module for creating dependency injection bindings. */
public class ImportJobModule extends AbstractModule {
Expand All @@ -54,23 +53,27 @@ protected void configure() {
bind(ImportJobOptions.class).toInstance(options);
bind(PipelineOptions.class).toInstance(options);
bind(ImportSpec.class).toInstance(importSpec);
bind(Specs.class).to(SpecsImpl.class);
}

@Provides
@Singleton
Builder provideSpecService(ImportJobOptions options) {
if (options.getCoreApiUri() != null) {
return new CachedSpecService.Builder(new CoreSpecService.Builder(options.getCoreApiUri()));
return new CoreSpecService.Builder(options.getCoreApiUri());
} else if (options.getCoreApiSpecPath() != null) {
return new CachedSpecService.Builder(
new FileSpecService.Builder(options.getCoreApiSpecPath()));
return new FileSpecService.Builder(options.getCoreApiSpecPath());
} else {
return new UnsupportedBuilder(
"Cannot initialise spec service as coreApiHost or specPath was not set.");
}
}

@Provides
@Singleton
Specs provideSpecs(SpecService.Builder specService) {
return Specs.of(options.getJobName(), importSpec, specService.build());
}

@Provides
@Singleton
List<WarehouseStore> provideWarehouseStores() {
Expand Down
124 changes: 112 additions & 12 deletions ingestion/src/main/java/feast/ingestion/model/Specs.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,127 @@

package feast.ingestion.model;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import feast.ingestion.service.SpecRetrievalException;
import com.google.common.base.Preconditions;
import feast.ingestion.service.SpecService;
import feast.specs.EntitySpecProto.EntitySpec;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.ImportSpecProto.Field;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import lombok.Builder;
import lombok.Getter;

@Builder
@Getter
public class Specs implements Serializable {
private String jobName;
private ImportSpec importSpec;
private Map<String, EntitySpec> entitySpecs;
private Map<String, FeatureSpec> featureSpecs;
private Map<String, StorageSpec> storageSpecs;
private transient SpecService specService;
private RuntimeException error;

public static Specs of(String jobName, ImportSpec importSpec, SpecService specService) {
try {
Specs.SpecsBuilder specsBuilder = Specs.builder().jobName(jobName).importSpec(importSpec);

List<Field> fields = importSpec.getSchema().getFieldsList();
List<String> featureIds = new ArrayList<>();
for (Field field : fields) {
if (!field.getFeatureId().isEmpty()) {
featureIds.add(field.getFeatureId());
}
}
specsBuilder.featureSpecs(specService.getFeatureSpecs(featureIds));

List<String> entityNames = importSpec.getEntitiesList();
for (FeatureSpec featureSpec : specsBuilder.featureSpecs.values()) {
Preconditions.checkArgument(
entityNames.contains(featureSpec.getEntity()),
"Feature has entity not listed in import spec featureSpec=" + featureSpec.toString());
}
specsBuilder.entitySpecs(specService.getEntitySpecs(entityNames));

specsBuilder.storageSpecs(specService.getAllStorageSpecs());

return specsBuilder.build();
} catch (RuntimeException e) {
return Specs.builder().error(e).build();
}
}

public interface Specs extends Serializable {
FeatureSpec getFeatureSpec(String featureId);
public void validate() {
if (error != null) {
throw error;
}

List<FeatureSpec> getFeatureSpecByServingStoreId(String storeId) throws SpecRetrievalException;
// Sanity checks that our maps are built correctly
for (Entry<String, FeatureSpec> entry : featureSpecs.entrySet()) {
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()));
}
for (Entry<String, EntitySpec> entry : entitySpecs.entrySet()) {
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getName()));
}
for (Entry<String, StorageSpec> entry : storageSpecs.entrySet()) {
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()));
}

EntitySpec getEntitySpec(String entityName) throws SpecRetrievalException;
for (FeatureSpec featureSpec : featureSpecs.values()) {
// Check that feature has a matching entity
Preconditions.checkArgument(
entitySpecs.containsKey(featureSpec.getEntity()),
String.format(
"Feature %s references unknown entity %s",
featureSpec.getId(), featureSpec.getEntity()));
// Check that feature has a matching serving store
Preconditions.checkArgument(
storageSpecs.containsKey(featureSpec.getDataStores().getServing().getId()),
String.format(
"Feature %s references unknown serving store %s",
featureSpec.getId(), featureSpec.getDataStores().getServing().getId()));
// Check that feature has a matching warehouse store
Preconditions.checkArgument(
storageSpecs.containsKey(featureSpec.getDataStores().getWarehouse().getId()),
String.format(
"Feature %s references unknown warehouse store %s",
featureSpec.getId(), featureSpec.getDataStores().getWarehouse().getId()));
}
}

ImportSpec getImportSpec() throws SpecRetrievalException;
public EntitySpec getEntitySpec(String entityName) {
Preconditions.checkArgument(
entitySpecs.containsKey(entityName),
String.format("Unknown entity %s, spec was not initialized", entityName));
return entitySpecs.get(entityName);
}

Map<String, StorageSpec> getStorageSpecs() throws SpecRetrievalException;
public FeatureSpec getFeatureSpec(String featureId) {
Preconditions.checkArgument(
featureSpecs.containsKey(featureId),
String.format("Unknown feature %s, spec was not initialized", featureId));
return featureSpecs.get(featureId);
}

StorageSpec getStorageSpec(String storeId);
public List<FeatureSpec> getFeatureSpecByServingStoreId(String storeId) {
List<FeatureSpec> out = new ArrayList<>();
for (FeatureSpec featureSpec : featureSpecs.values()) {
if (featureSpec.getDataStores().getServing().getId().equals(storeId)) {
out.add(featureSpec);
}
}
return out;
}

String getJobName();
public StorageSpec getStorageSpec(String storeId) {
Preconditions.checkArgument(
storageSpecs.containsKey(storeId),
String.format("Unknown store %s, spec was not initialized", storeId));
return storageSpecs.get(storeId);
}
}
103 changes: 0 additions & 103 deletions ingestion/src/main/java/feast/ingestion/model/SpecsImpl.java

This file was deleted.

Loading

0 comments on commit 7f0328c

Please sign in to comment.