diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 67e5cb7600..e4c1f7fb81 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -30,7 +30,12 @@ import feast.ingestion.config.ImportSpecSupplier; import feast.ingestion.model.Specs; import feast.ingestion.options.ImportJobOptions; -import feast.ingestion.transform.*; +import feast.ingestion.transform.ErrorsStoreTransform; +import feast.ingestion.transform.ReadFeaturesTransform; +import feast.ingestion.transform.ServingStoreTransform; +import feast.ingestion.transform.ToFeatureRowExtended; +import feast.ingestion.transform.ValidateTransform; +import feast.ingestion.transform.WarehouseStoreTransform; import feast.ingestion.transform.fn.ConvertTypesDoFn; import feast.ingestion.transform.fn.LoggerDoFn; import feast.ingestion.transform.fn.RoundEventTimestampsDoFn; @@ -38,6 +43,8 @@ import feast.specs.ImportSpecProto.ImportSpec; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import feast.types.FeatureRowProto.FeatureRow; +import java.util.Arrays; +import java.util.Random; import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -61,10 +68,6 @@ import org.joda.time.Duration; import org.slf4j.event.Level; -import java.io.IOException; -import java.util.Arrays; -import java.util.Random; - @Slf4j public class ImportJob { private static Random random = new Random(System.currentTimeMillis()); @@ -125,6 +128,13 @@ public static PipelineResult mainWithResult(String[] args) { return job.run(); } + private static String generateName() { + byte[] bytes = new byte[7]; + random.nextBytes(bytes); + String randomHex = DigestUtils.sha1Hex(bytes).substring(0, 7); + return String.format("feast-importjob-%s-%s", DateTime.now().getMillis(), randomHex); + } + public void expand() { CoderRegistry coderRegistry = pipeline.getCoderRegistry(); coderRegistry.registerCoderForType( @@ -139,6 +149,8 @@ public void expand() { // pass } + specs.validate(); + PCollection features = pipeline.apply("Read", readFeaturesTransform); if (options.getLimit() != null && options.getLimit() > 0) { features = features.apply(Sample.any(options.getLimit())); @@ -200,13 +212,6 @@ public void logNRows(PFeatureRows pFeatureRows, String name, int limit) { .apply("Log errors sample", ParDo.of(new LoggerDoFn(Level.ERROR, name + " ERRORS "))); } - private static String generateName() { - byte[] bytes = new byte[7]; - random.nextBytes(bytes); - String randomHex = DigestUtils.sha1Hex(bytes).substring(0, 7); - return String.format("feast-importjob-%s-%s", DateTime.now().getMillis(), randomHex); - } - private String retrieveId(PipelineResult result) { Class> runner = options.getRunner(); if (runner.isAssignableFrom(DataflowRunner.class)) { diff --git a/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java b/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java index 1d953555eb..2f2f5d9feb 100644 --- a/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java +++ b/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java @@ -20,14 +20,11 @@ import com.google.inject.AbstractModule; import com.google.inject.Provides; import com.google.inject.Singleton; -import java.util.List; -import org.apache.beam.sdk.options.PipelineOptions; import feast.ingestion.model.Specs; -import feast.ingestion.model.SpecsImpl; import feast.ingestion.options.ImportJobOptions; -import feast.ingestion.service.CachedSpecService; import feast.ingestion.service.CoreSpecService; import feast.ingestion.service.FileSpecService; +import feast.ingestion.service.SpecService; import feast.ingestion.service.SpecService.Builder; import feast.ingestion.service.SpecService.UnsupportedBuilder; import feast.specs.ImportSpecProto.ImportSpec; @@ -37,6 +34,8 @@ import feast.storage.service.ErrorsStoreService; import feast.storage.service.ServingStoreService; import feast.storage.service.WarehouseStoreService; +import java.util.List; +import org.apache.beam.sdk.options.PipelineOptions; /** An ImportJobModule is a Guice module for creating dependency injection bindings. */ public class ImportJobModule extends AbstractModule { @@ -54,23 +53,27 @@ protected void configure() { bind(ImportJobOptions.class).toInstance(options); bind(PipelineOptions.class).toInstance(options); bind(ImportSpec.class).toInstance(importSpec); - bind(Specs.class).to(SpecsImpl.class); } @Provides @Singleton Builder provideSpecService(ImportJobOptions options) { if (options.getCoreApiUri() != null) { - return new CachedSpecService.Builder(new CoreSpecService.Builder(options.getCoreApiUri())); + return new CoreSpecService.Builder(options.getCoreApiUri()); } else if (options.getCoreApiSpecPath() != null) { - return new CachedSpecService.Builder( - new FileSpecService.Builder(options.getCoreApiSpecPath())); + return new FileSpecService.Builder(options.getCoreApiSpecPath()); } else { return new UnsupportedBuilder( "Cannot initialise spec service as coreApiHost or specPath was not set."); } } + @Provides + @Singleton + Specs provideSpecs(SpecService.Builder specService) { + return Specs.of(options.getJobName(), importSpec, specService.build()); + } + @Provides @Singleton List provideWarehouseStores() { diff --git a/ingestion/src/main/java/feast/ingestion/model/Specs.java b/ingestion/src/main/java/feast/ingestion/model/Specs.java index 8ffa094aa3..a616f95141 100644 --- a/ingestion/src/main/java/feast/ingestion/model/Specs.java +++ b/ingestion/src/main/java/feast/ingestion/model/Specs.java @@ -17,27 +17,127 @@ package feast.ingestion.model; -import java.io.Serializable; -import java.util.List; -import java.util.Map; -import feast.ingestion.service.SpecRetrievalException; +import com.google.common.base.Preconditions; +import feast.ingestion.service.SpecService; import feast.specs.EntitySpecProto.EntitySpec; import feast.specs.FeatureSpecProto.FeatureSpec; +import feast.specs.ImportSpecProto.Field; import feast.specs.ImportSpecProto.ImportSpec; import feast.specs.StorageSpecProto.StorageSpec; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import lombok.Builder; +import lombok.Getter; + +@Builder +@Getter +public class Specs implements Serializable { + private String jobName; + private ImportSpec importSpec; + private Map entitySpecs; + private Map featureSpecs; + private Map storageSpecs; + private transient SpecService specService; + private RuntimeException error; + + public static Specs of(String jobName, ImportSpec importSpec, SpecService specService) { + try { + Specs.SpecsBuilder specsBuilder = Specs.builder().jobName(jobName).importSpec(importSpec); + + List fields = importSpec.getSchema().getFieldsList(); + List featureIds = new ArrayList<>(); + for (Field field : fields) { + if (!field.getFeatureId().isEmpty()) { + featureIds.add(field.getFeatureId()); + } + } + specsBuilder.featureSpecs(specService.getFeatureSpecs(featureIds)); + + List entityNames = importSpec.getEntitiesList(); + for (FeatureSpec featureSpec : specsBuilder.featureSpecs.values()) { + Preconditions.checkArgument( + entityNames.contains(featureSpec.getEntity()), + "Feature has entity not listed in import spec featureSpec=" + featureSpec.toString()); + } + specsBuilder.entitySpecs(specService.getEntitySpecs(entityNames)); + + specsBuilder.storageSpecs(specService.getAllStorageSpecs()); + + return specsBuilder.build(); + } catch (RuntimeException e) { + return Specs.builder().error(e).build(); + } + } -public interface Specs extends Serializable { - FeatureSpec getFeatureSpec(String featureId); + public void validate() { + if (error != null) { + throw error; + } - List getFeatureSpecByServingStoreId(String storeId) throws SpecRetrievalException; + // Sanity checks that our maps are built correctly + for (Entry entry : featureSpecs.entrySet()) { + Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId())); + } + for (Entry entry : entitySpecs.entrySet()) { + Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getName())); + } + for (Entry entry : storageSpecs.entrySet()) { + Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId())); + } - EntitySpec getEntitySpec(String entityName) throws SpecRetrievalException; + for (FeatureSpec featureSpec : featureSpecs.values()) { + // Check that feature has a matching entity + Preconditions.checkArgument( + entitySpecs.containsKey(featureSpec.getEntity()), + String.format( + "Feature %s references unknown entity %s", + featureSpec.getId(), featureSpec.getEntity())); + // Check that feature has a matching serving store + Preconditions.checkArgument( + storageSpecs.containsKey(featureSpec.getDataStores().getServing().getId()), + String.format( + "Feature %s references unknown serving store %s", + featureSpec.getId(), featureSpec.getDataStores().getServing().getId())); + // Check that feature has a matching warehouse store + Preconditions.checkArgument( + storageSpecs.containsKey(featureSpec.getDataStores().getWarehouse().getId()), + String.format( + "Feature %s references unknown warehouse store %s", + featureSpec.getId(), featureSpec.getDataStores().getWarehouse().getId())); + } + } - ImportSpec getImportSpec() throws SpecRetrievalException; + public EntitySpec getEntitySpec(String entityName) { + Preconditions.checkArgument( + entitySpecs.containsKey(entityName), + String.format("Unknown entity %s, spec was not initialized", entityName)); + return entitySpecs.get(entityName); + } - Map getStorageSpecs() throws SpecRetrievalException; + public FeatureSpec getFeatureSpec(String featureId) { + Preconditions.checkArgument( + featureSpecs.containsKey(featureId), + String.format("Unknown feature %s, spec was not initialized", featureId)); + return featureSpecs.get(featureId); + } - StorageSpec getStorageSpec(String storeId); + public List getFeatureSpecByServingStoreId(String storeId) { + List out = new ArrayList<>(); + for (FeatureSpec featureSpec : featureSpecs.values()) { + if (featureSpec.getDataStores().getServing().getId().equals(storeId)) { + out.add(featureSpec); + } + } + return out; + } - String getJobName(); + public StorageSpec getStorageSpec(String storeId) { + Preconditions.checkArgument( + storageSpecs.containsKey(storeId), + String.format("Unknown store %s, spec was not initialized", storeId)); + return storageSpecs.get(storeId); + } } diff --git a/ingestion/src/main/java/feast/ingestion/model/SpecsImpl.java b/ingestion/src/main/java/feast/ingestion/model/SpecsImpl.java deleted file mode 100644 index ce04730023..0000000000 --- a/ingestion/src/main/java/feast/ingestion/model/SpecsImpl.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.ingestion.model; - -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.options.PipelineOptions; -import feast.ingestion.service.SpecService; -import feast.ingestion.service.SpecService.Builder; -import feast.specs.EntitySpecProto.EntitySpec; -import feast.specs.FeatureSpecProto.FeatureSpec; -import feast.specs.ImportSpecProto.ImportSpec; -import feast.specs.StorageSpecProto.StorageSpec; - -public class SpecsImpl implements Specs { - private String jobName; - private ImportSpec importSpec; - private Builder specServiceBuilder; - private transient SpecService specService; - - @Inject - public SpecsImpl( - PipelineOptions options, ImportSpec importSpec, SpecService.Builder specServiceBuilder) { - if (options != null) { - this.jobName = options.getJobName(); - } - this.importSpec = importSpec; - this.specServiceBuilder = specServiceBuilder; - } - - private SpecService getSpecService() { - if (specService == null) { - specService = specServiceBuilder.build(); - } - return specService; - } - - public FeatureSpec getFeatureSpec(String featureId) { - FeatureSpec spec = - getSpecService().getFeatureSpecs(Collections.singleton(featureId)).get(featureId); - Preconditions.checkNotNull(spec, "Spec not found for feature " + featureId); - return spec; - } - - public EntitySpec getEntitySpec(String entityName) { - EntitySpec spec = - getSpecService().getEntitySpecs(Collections.singleton(entityName)).get(entityName); - Preconditions.checkNotNull(spec, "Spec not found for entity " + entityName); - return spec; - } - - public ImportSpec getImportSpec() { - Preconditions.checkNotNull(importSpec, "Import spec not found"); - return importSpec; - } - - public Map getStorageSpecs() { - return getSpecService().getAllStorageSpecs(); - } - - @Override - public StorageSpec getStorageSpec(String storeId) { - StorageSpec spec = - getSpecService().getStorageSpecs(Collections.singleton(storeId)).get(storeId); - Preconditions.checkNotNull(spec, "Spec not found for storeId " + storeId); - return spec; - } - - @Override - public String getJobName() { - return jobName; - } - - @Override - public List getFeatureSpecByServingStoreId(String storeId) { - List featureSpecs = new ArrayList<>(); - for (FeatureSpec featureSpec : getSpecService().getAllFeatureSpecs().values()) { - if (featureSpec.getDataStores().getServing().getId().equals(storeId)) { - featureSpecs.add(featureSpec); - } - } - return featureSpecs; - } -} diff --git a/ingestion/src/main/java/feast/ingestion/service/CachedSpecService.java b/ingestion/src/main/java/feast/ingestion/service/CachedSpecService.java deleted file mode 100644 index ebb4f9ec5b..0000000000 --- a/ingestion/src/main/java/feast/ingestion/service/CachedSpecService.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.ingestion.service; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import java.time.Duration; -import java.util.Collections; -import java.util.Map; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import feast.specs.EntitySpecProto.EntitySpec; -import feast.specs.FeatureSpecProto.FeatureSpec; -import feast.specs.StorageSpecProto.StorageSpec; - -@Slf4j -public class CachedSpecService implements SpecService { - private static final Duration CACHE_DURATION; - private static final int MAX_SPEC_COUNT = 1000; - - static { - CACHE_DURATION = Duration.ofMinutes(30); - } - - private final SpecService coreService; - private final LoadingCache entitySpecCache; - private final CacheLoader entitySpecLoader; - private final LoadingCache featureSpecCache; - private final CacheLoader featureSpecLoader; - private final LoadingCache storageSpecCache; - private final CacheLoader storageSpecLoader; - - public CachedSpecService(SpecService coreService) { - this.coreService = coreService; - entitySpecLoader = - new CacheLoader() { - @Override - public EntitySpec load(String key) throws Exception { - return coreService.getEntitySpecs(Collections.singletonList(key)).get(key); - } - - @Override - public Map loadAll(Iterable keys) throws Exception { - return coreService.getEntitySpecs((Iterable) keys); - } - }; - entitySpecCache = - CacheBuilder.newBuilder() - .maximumSize(MAX_SPEC_COUNT) - .expireAfterAccess(CACHE_DURATION) - .build(entitySpecLoader); - - featureSpecLoader = - new CacheLoader() { - @Override - public FeatureSpec load(String key) throws Exception { - return coreService.getFeatureSpecs(Collections.singletonList(key)).get(key); - } - - @Override - public Map loadAll(Iterable keys) - throws Exception { - return coreService.getFeatureSpecs((Iterable) keys); - } - }; - featureSpecCache = - CacheBuilder.newBuilder() - .maximumSize(MAX_SPEC_COUNT) - .expireAfterAccess(CACHE_DURATION) - .build(featureSpecLoader); - - storageSpecLoader = - new CacheLoader() { - @Override - public Map loadAll(Iterable keys) - throws Exception { - return coreService.getStorageSpecs((Iterable) keys); - } - - @Override - public StorageSpec load(String key) throws Exception { - return coreService.getStorageSpecs(Collections.singleton(key)).get(key); - } - }; - storageSpecCache = - CacheBuilder.newBuilder() - .maximumSize(MAX_SPEC_COUNT) - .expireAfterAccess(CACHE_DURATION) - .build(storageSpecLoader); - } - - @Override - public Map getEntitySpecs(Iterable entityIds) { - try { - return entitySpecCache.getAll(entityIds); - } catch (Exception e) { - log.error("Error while retrieving entity spec: {}", e); - throw new SpecRetrievalException("Error while retrieving entity spec", e); - } - } - - @Override - public Map getAllEntitySpecs() { - try { - Map result = coreService.getAllEntitySpecs(); - entitySpecCache.putAll(result); - return result; - } catch (Exception e) { - log.error("Error while retrieving entity spec: {}", e); - throw new SpecRetrievalException("Error while retrieving entity spec", e); - } - } - - @Override - public Map getFeatureSpecs(Iterable featureIds) { - try { - return featureSpecCache.getAll(featureIds); - } catch (Exception e) { - log.error("Error while retrieving feature spec: {}", e); - throw new SpecRetrievalException("Error while retrieving feature spec", e); - } - } - - @Override - public Map getAllFeatureSpecs() { - try { - Map result = coreService.getAllFeatureSpecs(); - featureSpecCache.putAll(result); - return result; - } catch (Exception e) { - log.error("Error while retrieving feature spec: {}", e); - throw new SpecRetrievalException("Error while retrieving feature spec", e); - } - } - - @Override - public Map getStorageSpecs(Iterable storageIds) { - try { - return storageSpecCache.getAll(storageIds); - } catch (Exception e) { - log.error("Error while retrieving storage spec: {}", e); - throw new SpecRetrievalException("Error while retrieving storage spec", e); - } - } - - @Override - public Map getAllStorageSpecs() { - try { - Map result = coreService.getAllStorageSpecs(); - storageSpecCache.putAll(result); - return result; - } catch (Exception e) { - log.error("Error while retrieving storage spec: {}", e); - throw new SpecRetrievalException("Error while retrieving storage spec", e); - } - } - - @AllArgsConstructor - public static class Builder implements SpecService.Builder { - - private SpecService.Builder specServiceBuilder; - - @Override - public SpecService build() { - return new CachedSpecService(specServiceBuilder.build()); - } - } -} diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowsDoFn.java index 9cf2c82a03..1255328847 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowsDoFn.java @@ -22,10 +22,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import feast.ingestion.exceptions.ValidationException; import feast.ingestion.metrics.FeastMetrics; import feast.ingestion.model.Specs; @@ -43,6 +39,10 @@ import feast.types.FeatureRowProto.FeatureRow; import feast.types.GranularityProto.Granularity.Enum; import feast.types.ValueProto.ValueType; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; public class ValidateFeatureRowsDoFn extends BaseFeatureDoFn { @@ -85,7 +85,7 @@ public void processElementImpl(ProcessContext context) { checkArgument(!row.getEntityName().isEmpty(), "Entity name must not be empty"); checkArgument( - importSpec.getEntitiesList().contains(row.getEntityName()), + specs.getEntitySpecs().keySet().contains(row.getEntityName()), String.format( "Row entity not found in import spec entities. entity=%s", row.getEntityName())); diff --git a/ingestion/src/main/java/feast/storage/bigquery/FeatureRowBigQueryIO.java b/ingestion/src/main/java/feast/storage/bigquery/FeatureRowBigQueryIO.java index b33b213e58..5384a0744b 100644 --- a/ingestion/src/main/java/feast/storage/bigquery/FeatureRowBigQueryIO.java +++ b/ingestion/src/main/java/feast/storage/bigquery/FeatureRowBigQueryIO.java @@ -20,9 +20,17 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.Lists; import com.google.inject.Inject; -import java.util.List; +import feast.ingestion.model.Specs; +import feast.ingestion.transform.FeatureIO; +import feast.ingestion.transform.SplitFeatures.SingleOutputSplit; +import feast.options.OptionsParser; +import feast.specs.EntitySpecProto.EntitySpec; +import feast.specs.FeatureSpecProto.FeatureSpec; +import feast.specs.ImportSpecProto.ImportSpec; +import feast.types.FeatureRowExtendedProto.FeatureRowExtended; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.GranularityProto.Granularity; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; @@ -38,22 +46,13 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.joda.time.Duration; -import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO; -import feast.ingestion.transform.SplitFeatures.SingleOutputSplit; -import feast.options.OptionsParser; -import feast.specs.EntitySpecProto.EntitySpec; -import feast.specs.FeatureSpecProto.FeatureSpec; -import feast.specs.ImportSpecProto.ImportSpec; -import feast.types.FeatureRowExtendedProto.FeatureRowExtended; -import feast.types.FeatureRowProto.FeatureRow; -import feast.types.GranularityProto.Granularity; @Slf4j public class FeatureRowBigQueryIO { public static Read read(ImportSpec importSpec) { - // TODO: Allow and convert from /projects/{project}/datasets/{dataset}/tables/{table} + // TODO: Allow and convert from /projects/{project}/datasets/{dataset}/tables/{table}, to + // {project}:{dataset}.{table} return new Read(importSpec); } @@ -64,11 +63,10 @@ public static Read read(ImportSpec importSpec) { * the same entity. The columns names in the import spec will be used for selecting columns from * the BigQuery table, but it still scans the whole row. * - *

The output is a PCollection of {@link feast.types.FeatureRowProto.FeatureRow - * FeatureRows}, where each feature and the entity key {@link - * feast.types.ValueProto.Value Value} in the FeatureRow is taken from a column in - * the BigQuery table and set with the closest type to the BigQuery column schema that is - * available in {@link feast.types.ValueProto.ValueType ValueType}. + *

The output is a PCollection of {@link feast.types.FeatureRowProto.FeatureRow FeatureRows}, + * where each feature and the entity key {@link feast.types.ValueProto.Value Value} in the + * FeatureRow is taken from a column in the BigQuery table and set with the closest type to the + * BigQuery column schema that is available in {@link feast.types.ValueProto.ValueType ValueType}. * *

Note a small gotcha is that because Integers and Numerics in BigQuery are 64 bits, these are * always cast to INT64 and DOUBLE respectively. @@ -90,7 +88,7 @@ public PCollection expand(PInput input) { String url = String.format("%s:%s.%s", options.project, options.dataset, options.table); Preconditions.checkArgument( - importSpec.getEntitiesList().size() == 1, "BigQuery read must have only one entity"); + importSpec.getEntitiesCount() == 1, "BigQuery read must have only one entity"); return input .getPipeline() .apply( @@ -113,11 +111,6 @@ public Write(BigQueryStoreOptions bigQueryOptions, Specs specs) { @Override public PDone expand(PCollection input) { - List entityInfoList = Lists.newArrayList(); - for (String entityName : specs.getImportSpec().getEntitiesList()) { - entityInfoList.add(specs.getEntitySpec(entityName)); - } - SingleOutputSplit granularitySplitter = new SingleOutputSplit<>(FeatureSpec::getGranularity, specs); PCollection features = diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java index 67481d0a41..2ddc57059e 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java @@ -17,9 +17,10 @@ package feast.ingestion; -import static org.junit.Assert.assertEquals; import static feast.FeastMatchers.hasCount; import static feast.ToOrderedFeatureRows.orderedFeatureRow; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import com.google.common.base.Charsets; import com.google.common.collect.Lists; @@ -28,28 +29,12 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.protobuf.Timestamp; -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import lombok.extern.slf4j.Slf4j; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; import feast.ToOrderedFeatureRows; import feast.ingestion.boot.ImportJobModule; import feast.ingestion.boot.TestPipelineModule; import feast.ingestion.model.Features; import feast.ingestion.model.Values; import feast.ingestion.options.ImportJobOptions; -import feast.ingestion.util.DateUtil; import feast.ingestion.util.ProtoUtil; import feast.specs.ImportSpecProto.ImportSpec; import feast.storage.MockErrorsStore; @@ -61,6 +46,22 @@ import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import feast.types.FeatureRowProto.FeatureRow; import feast.types.GranularityProto.Granularity; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; @Slf4j public class ImportJobCSVTest { @@ -170,6 +171,52 @@ public void testImportCSV() throws IOException { testPipeline.run(); } + @Test + public void testImportCSVUnknownServingStoreError() throws IOException { + ImportSpec importSpec = + ProtoUtil.decodeProtoYaml( + "---\n" + + "type: file\n" + + "options:\n" + + " format: csv\n" + + " # path: to be overwritten in tests\n" + + "entities:\n" + + " - testEntity\n" + + "schema:\n" + + " entityIdColumn: id\n" + + " timestampValue: 2018-09-25T00:00:00.000Z\n" + + " fields:\n" + + " - name: id\n" + + " - featureId: testEntity.none.redisInt32\n" // Redis is not available by + // default from the json specs + + " - featureId: testEntity.none.testString\n" + + "\n", + ImportSpec.getDefaultInstance()); + + File csvFile = folder.newFile("data.csv"); + Files.asCharSink(csvFile, Charsets.UTF_8).write("1,101,a\n2,202,b\n3,303,c\n"); + importSpec = initImportSpec(importSpec, csvFile.toString()); + + ImportJobOptions options = initOptions(); + + Injector injector = + Guice.createInjector( + new ImportJobModule(options, importSpec), new TestPipelineModule(testPipeline)); + + ImportJob job = injector.getInstance(ImportJob.class); + injector.getInstance(ImportJob.class); + + // Job should fail during expand(), so we don't even need to start the pipeline. + try { + job.expand(); + fail("Should not reach here, we should have thrown an exception"); + } catch (IllegalArgumentException e) { + assertEquals( + "Feature testEntity.none.redisInt32 references unknown serving store REDIS1", + e.getMessage()); + } + } + @Test public void testImportWithErrors() throws IOException { ImportSpec importSpec = diff --git a/ingestion/src/test/java/feast/ingestion/model/SpecsTest.java b/ingestion/src/test/java/feast/ingestion/model/SpecsTest.java new file mode 100644 index 0000000000..17aceba18e --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/model/SpecsTest.java @@ -0,0 +1,192 @@ +package feast.ingestion.model; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.Lists; +import com.google.common.io.Resources; +import feast.ingestion.service.FileSpecService; +import feast.specs.FeatureSpecProto.DataStore; +import feast.specs.FeatureSpecProto.DataStores; +import feast.specs.FeatureSpecProto.FeatureSpec; +import feast.specs.ImportSpecProto.Field; +import feast.specs.ImportSpecProto.ImportSpec; +import feast.specs.ImportSpecProto.Schema; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +public class SpecsTest { + + FileSpecService specService; + + private Field.Builder newField(String featureId) { + return Field.newBuilder().setFeatureId(featureId); + } + + @Before + public void before() { + Path path = Paths.get(Resources.getResource("core_specs/").getPath()); + specService = new FileSpecService(path.toString()); + } + + @Test + public void testSingleFeatureAndEntity() { + ImportSpec importSpec = + ImportSpec.newBuilder() + .addEntities("testEntity") + .setSchema(Schema.newBuilder().addFields(newField("testEntity.none.testInt32"))) + .build(); + + Specs specs = Specs.of("testjob", importSpec, specService); + specs.validate(); + + assertEquals("testjob", specs.getJobName()); + assertEquals(importSpec, specs.getImportSpec()); + + assertEquals(1, specs.getEntitySpecs().size()); + assertTrue(specs.getEntitySpecs().containsKey("testEntity")); + + assertEquals(1, specs.getFeatureSpecs().size()); + assertTrue(specs.getFeatureSpecs().containsKey("testEntity.none.testInt32")); + + assertTrue(specs.getStorageSpecs().containsKey("TEST_SERVING")); + } + + @Test(expected = IllegalArgumentException.class) + public void testErrorOnUnknownEntity() { + ImportSpec importSpec = + ImportSpec.newBuilder() + .addEntities("testEntity") + .setSchema(Schema.newBuilder().addFields(newField("testEntity.none.testInt32"))) + .build(); + + Specs specs = Specs.of("testjob", importSpec, specService); + specs.validate(); + + specs.getEntitySpec("unknown"); + } + + @Test(expected = IllegalArgumentException.class) + public void testErrorOnUnknownFeature() { + ImportSpec importSpec = + ImportSpec.newBuilder() + .addEntities("testEntity") + .setSchema(Schema.newBuilder().addFields(newField("testEntity.none.testInt32"))) + .build(); + + Specs specs = Specs.of("testjob", importSpec, specService); + specs.validate(); + + specs.getFeatureSpec("unknown"); + } + + @Test + public void testGetFeatureSpec() { + ImportSpec importSpec = + ImportSpec.newBuilder() + .addEntities("testEntity") + .setSchema(Schema.newBuilder().addFields(newField("testEntity.none.testInt32"))) + .build(); + + Specs specs = Specs.of("testjob", importSpec, specService); + specs.validate(); + + assertEquals( + "testEntity.none.testInt32", specs.getFeatureSpec("testEntity.none.testInt32").getId()); + } + + @Test + public void testGetEntitySpec() { + ImportSpec importSpec = + ImportSpec.newBuilder() + .addEntities("testEntity") + .setSchema(Schema.newBuilder().addFields(newField("testEntity.none.testInt32"))) + .build(); + + Specs specs = Specs.of("testjob", importSpec, specService); + specs.validate(); + + assertEquals("testEntity", specs.getEntitySpec("testEntity").getName()); + } + + @Test + public void testGetStorageSpec() { + ImportSpec importSpec = + ImportSpec.newBuilder() + .addEntities("testEntity") + .setSchema(Schema.newBuilder().addFields(newField("testEntity.none.testInt32"))) + .build(); + + Specs specs = Specs.of("testjob", importSpec, specService); + specs.validate(); + + assertEquals("TEST_SERVING", specs.getStorageSpec("TEST_SERVING").getId()); + } + + @Test(expected = IllegalArgumentException.class) + public void testErrorOnUnknownStore() { + ImportSpec importSpec = + ImportSpec.newBuilder() + .addEntities("testEntity") + .setSchema(Schema.newBuilder().addFields(newField("testEntity.none.testInt32"))) + .build(); + + Specs specs = Specs.of("testjob", importSpec, specService); + specs.validate(); + + specs.getStorageSpec("Unknown feature unknown, spec was not initialized"); + } + + @Test(expected = IllegalArgumentException.class) + public void testFeatureSpecReferencesUnknownEntity() { + ImportSpec importSpec = + ImportSpec.newBuilder() + .addEntities("totally_different_entity") + .setSchema(Schema.newBuilder().addFields(newField("testEntity.none.testInt32"))) + .build(); + + Specs specs = Specs.of("testjob", importSpec, specService); + specs.validate(); + } + + @Test + public void testGetFeatureSpecByStoreId() { + ImportSpec importSpec = + ImportSpec.newBuilder() + .addEntities("testEntity") + .setSchema( + Schema.newBuilder() + .addAllFields( + Lists.newArrayList( + newField("testEntity.none.testInt32").build(), + newField("testEntity.none.testString").build()))) + .build(); + + Specs specs = Specs.of("testjob", importSpec, specService); + specs.validate(); + + FeatureSpec testStringSpec = specs.getFeatureSpec("testEntity.none.testString"); + DataStores dataStores = + testStringSpec + .getDataStores() + .toBuilder() + .setServing(DataStore.newBuilder().setId("differentStoreId")) + .build(); + testStringSpec = testStringSpec.toBuilder().setDataStores(dataStores).build(); + + // we change one of the specs to point at a different store id. + specs.getFeatureSpecs().put("testEntity.none.testString", testStringSpec); + + List featureSpecs1 = specs.getFeatureSpecByServingStoreId("TEST_SERVING"); + assertEquals(1, featureSpecs1.size()); + assertEquals("testEntity.none.testInt32", featureSpecs1.get(0).getId()); + + + List featureSpecs2 = specs.getFeatureSpecByServingStoreId("differentStoreId"); + assertEquals(1, featureSpecs2.size()); + assertEquals(testStringSpec, featureSpecs2.get(0)); + } +} diff --git a/ingestion/src/test/java/feast/storage/redis/FeatureRowRedisIOWriteTest.java b/ingestion/src/test/java/feast/storage/redis/FeatureRowRedisIOWriteTest.java index 982d8680c3..a5f245ced8 100644 --- a/ingestion/src/test/java/feast/storage/redis/FeatureRowRedisIOWriteTest.java +++ b/ingestion/src/test/java/feast/storage/redis/FeatureRowRedisIOWriteTest.java @@ -27,12 +27,15 @@ import com.google.protobuf.Timestamp; import feast.ingestion.model.Features; import feast.ingestion.model.Specs; -import feast.ingestion.model.SpecsImpl; import feast.ingestion.model.Values; import feast.ingestion.service.FileSpecService; +import feast.ingestion.service.SpecService; import feast.ingestion.transform.FeatureIO; import feast.ingestion.util.DateUtil; import feast.options.OptionsParser; +import feast.specs.ImportSpecProto.Field; +import feast.specs.ImportSpecProto.ImportSpec; +import feast.specs.ImportSpecProto.Schema; import feast.specs.StorageSpecProto.StorageSpec; import feast.storage.RedisProto.RedisBucketKey; import feast.storage.RedisProto.RedisBucketValue; @@ -60,11 +63,15 @@ public class FeatureRowRedisIOWriteTest { + private static final String featureNoneInt32 = "testEntity.none.redisInt32"; + private static final String featureNoneString = "testEntity.none.redisString"; + private static final String featureHourInt32 = "testEntity.hour.redisInt32"; + private static final String featureHourString = "testEntity.hour.redisString"; + private static int REDIS_PORT = 51234; private static Redis redis; private static Specs specs; private static Jedis jedis; - @Rule public TestPipeline testPipeline = TestPipeline.create(); @BeforeClass @@ -72,7 +79,20 @@ public static void setup() throws IOException { redis = new RedisServer(REDIS_PORT); redis.start(); Path path = Paths.get(Resources.getResource("core_specs/").getPath()); - specs = new SpecsImpl(null, null, new FileSpecService.Builder(path.toString())); + SpecService specService = new FileSpecService.Builder(path.toString()).build(); + specs = + Specs.of( + "test job", + ImportSpec.newBuilder() + .addEntities("testEntity") + .setSchema( + Schema.newBuilder() + .addFields(Field.newBuilder().setFeatureId(featureHourInt32)) + .addFields(Field.newBuilder().setFeatureId(featureHourString)) + .addFields(Field.newBuilder().setFeatureId(featureNoneInt32)) + .addFields(Field.newBuilder().setFeatureId(featureNoneString))) + .build(), + specService); jedis = new Jedis("localhost", REDIS_PORT); } @@ -83,9 +103,6 @@ public static void teardown() throws IOException { @Test public void testWriteNoneGranularity() throws IOException { - String featureInt32 = "testEntity.none.redisInt32"; - String featureString = "testEntity.none.redisString"; - StorageSpec storageSpec = StorageSpec.newBuilder() .setId("redis1") @@ -108,8 +125,8 @@ public void testWriteNoneGranularity() throws IOException { .setEntityKey("1") .setGranularity(Granularity.Enum.NONE) .setEventTimestamp(DateUtil.toTimestamp(DateTime.now())) - .addFeatures(Features.of(featureInt32, Values.ofInt32(1))) - .addFeatures(Features.of(featureString, Values.ofString("a")))) + .addFeatures(Features.of(featureNoneInt32, Values.ofInt32(1))) + .addFeatures(Features.of(featureNoneString, Values.ofString("a")))) .build(); PCollection input = testPipeline.apply(Create.of(rowExtended)); @@ -119,9 +136,9 @@ public void testWriteNoneGranularity() throws IOException { testPipeline.run(); RedisBucketKey featureInt32Key = - getRedisBucketKey("1", getFeatureIdSha1Prefix(featureInt32), 0L); + getRedisBucketKey("1", getFeatureIdSha1Prefix(featureNoneInt32), 0L); RedisBucketKey featureStringKey = - getRedisBucketKey("1", getFeatureIdSha1Prefix(featureString), 0L); + getRedisBucketKey("1", getFeatureIdSha1Prefix(featureNoneString), 0L); RedisBucketValue featureInt32Value = RedisBucketValue.parseFrom(jedis.get(featureInt32Key.toByteArray())); @@ -138,9 +155,6 @@ public void testWriteNoneGranularity() throws IOException { @Test public void testWriteNoneGranularityFromOptions() throws IOException { - String featureInt32 = "testEntity.none.redisInt32"; - String featureString = "testEntity.none.redisString"; - StorageSpec storageSpec = StorageSpec.newBuilder() .setId("redis1") @@ -160,8 +174,8 @@ public void testWriteNoneGranularityFromOptions() throws IOException { .setEntityKey("1") .setGranularity(Granularity.Enum.NONE) .setEventTimestamp(DateUtil.toTimestamp(DateTime.now())) - .addFeatures(Features.of(featureInt32, Values.ofInt32(1))) - .addFeatures(Features.of(featureString, Values.ofString("a")))) + .addFeatures(Features.of(featureNoneInt32, Values.ofInt32(1))) + .addFeatures(Features.of(featureNoneString, Values.ofString("a")))) .build(); PCollection input = testPipeline.apply(Create.of(rowExtended)); @@ -171,9 +185,9 @@ public void testWriteNoneGranularityFromOptions() throws IOException { testPipeline.run(); RedisBucketKey featureInt32Key = - getRedisBucketKey("1", getFeatureIdSha1Prefix(featureInt32), 0L); + getRedisBucketKey("1", getFeatureIdSha1Prefix(featureNoneInt32), 0L); RedisBucketKey featureStringKey = - getRedisBucketKey("1", getFeatureIdSha1Prefix(featureString), 0L); + getRedisBucketKey("1", getFeatureIdSha1Prefix(featureNoneString), 0L); RedisBucketValue featureInt32Value = RedisBucketValue.parseFrom(jedis.get(featureInt32Key.toByteArray())); @@ -190,9 +204,6 @@ public void testWriteNoneGranularityFromOptions() throws IOException { @Test public void testWriteHourGranularity() throws IOException { - String featureInt32 = "testEntity.hour.redisInt32"; - String featureString = "testEntity.hour.redisString"; - FeatureRowRedisIO.Write write = new FeatureRowRedisIO.Write( RedisStoreOptions.builder().host("localhost").port(REDIS_PORT).build(), specs); @@ -205,8 +216,8 @@ public void testWriteHourGranularity() throws IOException { .setEntityKey("1") .setGranularity(Granularity.Enum.HOUR) .setEventTimestamp(DateUtil.toTimestamp(DateTime.now())) - .addFeatures(Features.of(featureInt32, Values.ofInt32(1))) - .addFeatures(Features.of(featureString, Values.ofString("a")))) + .addFeatures(Features.of(featureHourInt32, Values.ofInt32(1))) + .addFeatures(Features.of(featureHourString, Values.ofString("a")))) .build(); PCollection input = testPipeline.apply(Create.of(rowExtended)); @@ -219,32 +230,32 @@ public void testWriteHourGranularity() throws IOException { Timestamp roundedTimestamp = DateUtil.roundToGranularity(rowTimestamp, Granularity.Enum.HOUR); RedisBucketKey featureInt32LatestKey = - getRedisBucketKey("1", getFeatureIdSha1Prefix(featureInt32), 0L); + getRedisBucketKey("1", getFeatureIdSha1Prefix(featureHourInt32), 0L); RedisBucketKey featureStringLatestKey = - getRedisBucketKey("1", getFeatureIdSha1Prefix(featureString), 0L); + getRedisBucketKey("1", getFeatureIdSha1Prefix(featureHourString), 0L); RedisBucketKey featureInt32ValueKey = - getRedisBucketKey("1", getFeatureIdSha1Prefix(featureInt32), roundedTimestamp.getSeconds()); + getRedisBucketKey("1", getFeatureIdSha1Prefix(featureHourInt32), roundedTimestamp.getSeconds()); RedisBucketKey featureStringValueKey = getRedisBucketKey( - "1", getFeatureIdSha1Prefix(featureString), roundedTimestamp.getSeconds()); + "1", getFeatureIdSha1Prefix(featureHourString), roundedTimestamp.getSeconds()); // TODO have a helper func for loading feature store options Duration featureInt32BucketSize = OptionsParser.parse( - specs.getFeatureSpec(featureInt32).getDataStores().getServing().getOptionsMap(), + specs.getFeatureSpec(featureHourInt32).getDataStores().getServing().getOptionsMap(), RedisFeatureOptions.class) .getBucketSizeDuration(); RedisBucketKey featureInt32BucketKey = getRedisBucketKey( "1", - getFeatureIdSha1Prefix(featureInt32), + getFeatureIdSha1Prefix(featureHourInt32), getBucketId(roundedTimestamp, featureInt32BucketSize)); RedisBucketKey featureStringBucketKey = getRedisBucketKey( "1", - getFeatureIdSha1Prefix(featureString), + getFeatureIdSha1Prefix(featureHourString), // No bucketsize specified so uses the default. getBucketId( roundedTimestamp, diff --git a/ingestion/src/test/resources/core_specs/feature/testEntity.hour.redisInt32.json b/ingestion/src/test/resources/core_specs/feature/testEntity.hour.redisInt32.json index 3d1b654379..67e3663018 100644 --- a/ingestion/src/test/resources/core_specs/feature/testEntity.hour.redisInt32.json +++ b/ingestion/src/test/resources/core_specs/feature/testEntity.hour.redisInt32.json @@ -1,10 +1,10 @@ { - "id": "testEntity.none.testInt32", + "id": "testEntity.none.redisInt32", "entity": "testEntity", "granularity": "HOUR", - "name": "testInt32", + "name": "redisInt32", "owner": "feast@example.com", - "description": "This is test feature of type integer", + "description": "This is test feature of type integer that goes to redis", "uri": "https://example.com/", "valueType": "INT32", "tags": [], diff --git a/ingestion/src/test/resources/core_specs/feature/testEntity.hour.redisString.json b/ingestion/src/test/resources/core_specs/feature/testEntity.hour.redisString.json index 525e912fe6..98726d26bc 100644 --- a/ingestion/src/test/resources/core_specs/feature/testEntity.hour.redisString.json +++ b/ingestion/src/test/resources/core_specs/feature/testEntity.hour.redisString.json @@ -1,10 +1,10 @@ { - "id": "testEntity.none.testString", + "id": "testEntity.hour.redisString", "entity": "testEntity", "granularity": "HOUR", - "name": "testString", + "name": "redisString", "owner": "feast@example.com", - "description": "This is test feature of type integer", + "description": "This is test feature of type integer that goes to redis", "uri": "https://example.com/", "valueType": "STRING", "tags": [], diff --git a/ingestion/src/test/resources/core_specs/feature/testEntity.none.redisInt32.json b/ingestion/src/test/resources/core_specs/feature/testEntity.none.redisInt32.json index aed8ca205f..0d62bbdaf7 100644 --- a/ingestion/src/test/resources/core_specs/feature/testEntity.none.redisInt32.json +++ b/ingestion/src/test/resources/core_specs/feature/testEntity.none.redisInt32.json @@ -1,10 +1,10 @@ { - "id": "testEntity.none.testInt32", + "id": "testEntity.none.redisInt32", "entity": "testEntity", "granularity": "NONE", - "name": "testInt32", + "name": "redisInt32", "owner": "feast@example.com", - "description": "This is test feature of type integer", + "description": "This is test feature of type integer the goes to redis", "uri": "https://example.com/", "valueType": "INT32", "tags": [], diff --git a/ingestion/src/test/resources/core_specs/feature/testEntity.none.redisString.json b/ingestion/src/test/resources/core_specs/feature/testEntity.none.redisString.json index 00fd038a8e..ed7467001d 100644 --- a/ingestion/src/test/resources/core_specs/feature/testEntity.none.redisString.json +++ b/ingestion/src/test/resources/core_specs/feature/testEntity.none.redisString.json @@ -1,10 +1,10 @@ { - "id": "testEntity.none.testString", + "id": "testEntity.none.redisString", "entity": "testEntity", "granularity": "NONE", - "name": "testString", + "name": "redisString", "owner": "feast@example.com", - "description": "This is test feature of type integer", + "description": "This is test feature of type integer that goes to redis", "uri": "https://example.com/", "valueType": "STRING", "tags": [], diff --git a/protos/feast/specs/ImportSpec.proto b/protos/feast/specs/ImportSpec.proto index 329137f7e4..01fb8ec609 100644 --- a/protos/feast/specs/ImportSpec.proto +++ b/protos/feast/specs/ImportSpec.proto @@ -35,10 +35,9 @@ message ImportSpec { map options = 2; - // You should probably only set one of these, but if you set both, they concatenate repeated string entities = 3; - Schema schema = 4; // this is optional depending on the source + Schema schema = 4; } message Schema {