diff --git a/common/src/main/java/feast/common/models/FeatureSetReference.java b/common/src/main/java/feast/common/models/FeatureSetReference.java index 032f50bd85..b2d5eb21b3 100644 --- a/common/src/main/java/feast/common/models/FeatureSetReference.java +++ b/common/src/main/java/feast/common/models/FeatureSetReference.java @@ -23,10 +23,15 @@ @Data @AllArgsConstructor public class FeatureSetReference implements Serializable { - private String reference; + private String projectName; + private String featureSetName; private Integer version; // Empty constructor required for Avro decoding. @SuppressWarnings("unused") public FeatureSetReference() {} + + public String getReference() { + return String.format("%s/%s", projectName, featureSetName); + } } diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 127924c555..b0b9eaefdf 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -108,15 +108,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti PCollectionView>> globalSpecView = featureSetSpecs - .apply( - MapElements.via( - new SimpleFunction< - KV, KV>() { - public KV apply( - KV input) { - return KV.of(input.getKey().getReference(), input.getValue()); - } - })) + .apply(MapElements.via(new ReferenceToString())) .apply("GlobalSpecView", View.asMultimap()); // Step 2. Read messages from Feast Source as FeatureRow. @@ -214,4 +206,11 @@ public KV apply( return pipeline.run(); } + + private static class ReferenceToString + extends SimpleFunction, KV> { + public KV apply(KV input) { + return KV.of(input.getKey().getReference(), input.getValue()); + } + } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/specs/ReadFeatureSetSpecs.java b/ingestion/src/main/java/feast/ingestion/transform/specs/ReadFeatureSetSpecs.java index 4e31209abf..b5bb6a2b5e 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/specs/ReadFeatureSetSpecs.java +++ b/ingestion/src/main/java/feast/ingestion/transform/specs/ReadFeatureSetSpecs.java @@ -16,6 +16,8 @@ */ package feast.ingestion.transform.specs; +import static feast.ingestion.utils.SpecUtil.parseFeatureSetReference; + import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -40,6 +42,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.joda.time.Duration; @@ -121,9 +124,11 @@ public static class CreateFeatureSetReference @ProcessElement public void process( ProcessContext c, @Element KV input) { + Pair reference = parseFeatureSetReference(input.getKey()); c.output( KV.of( - new FeatureSetReference(input.getKey(), input.getValue().getVersion()), + new FeatureSetReference( + reference.getLeft(), reference.getRight(), input.getValue().getVersion()), input.getValue())); } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/specs/WriteFeatureSetSpecAck.java b/ingestion/src/main/java/feast/ingestion/transform/specs/WriteFeatureSetSpecAck.java index 98953745bb..027f25a52a 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/specs/WriteFeatureSetSpecAck.java +++ b/ingestion/src/main/java/feast/ingestion/transform/specs/WriteFeatureSetSpecAck.java @@ -32,9 +32,14 @@ import org.apache.beam.sdk.values.PDone; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.joda.time.Duration; /** - * Converts input {@link feast.proto.core.FeatureSetProto.FeatureSetSpec} into {@link + * Collects output from sinks prepareWrite (several streams flatten into one). As soon as count of + * each FeatureSetReference reach getSinksCount() - it means that enough amount of sinks updated its + * state - ack is pushed. + * + *

Converts input {@link FeatureSetReference} into {@link * feast.proto.core.IngestionJobProto.FeatureSetSpecAck} message and writes it to kafka (ack-topic). */ @AutoValue @@ -61,18 +66,7 @@ public abstract Builder setSpecsStreamingUpdateConfig( @Override public PDone expand(PCollection input) { return input - .apply( - "OnEveryElementTrigger", - Window.into(new GlobalWindows()) - .accumulatingFiredPanes() - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) - .apply("CountingReadySinks", Count.perElement()) - .apply( - "WhenAllReady", - Filter.by( - (SerializableFunction, Boolean>) - count -> count.getValue() >= getSinksCount())) - .apply(Keys.create()) + .apply("Prepare", new PrepareWrite(getSinksCount())) .apply("FeatureSetSpecToAckMessage", ParDo.of(new BuildAckMessage())) .apply( "ToKafka", @@ -99,4 +93,35 @@ public void process(ProcessContext c) throws IOException { c.output(KV.of(c.element().getReference(), encodedAck.toByteArray())); } } + + /** + * Groups FeatureSetReference to generate ack only when amount of repeating elements reach + * sinksCount + */ + static class PrepareWrite + extends PTransform, PCollection> { + private final Integer sinksCount; + + PrepareWrite(Integer sinksCount) { + this.sinksCount = sinksCount; + } + + @Override + public PCollection expand(PCollection input) { + return input + .apply( + "OnEveryElementTrigger", + Window.into(new GlobalWindows()) + .accumulatingFiredPanes() + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .withAllowedLateness(Duration.ZERO)) + .apply("CountingReadySinks", Count.perElement()) + .apply( + "WhenAllReady", + Filter.by( + (SerializableFunction, Boolean>) + count -> count.getValue() >= sinksCount)) + .apply(Keys.create()); + } + } } diff --git a/ingestion/src/test/java/feast/ingestion/transform/specs/WriteFeatureSetSpecAckTest.java b/ingestion/src/test/java/feast/ingestion/transform/specs/WriteFeatureSetSpecAckTest.java new file mode 100644 index 0000000000..38872c9320 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/specs/WriteFeatureSetSpecAckTest.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.specs; + +import com.google.common.collect.ImmutableList; +import feast.common.models.FeatureSetReference; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; + +public class WriteFeatureSetSpecAckTest { + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Test + public void shouldSendAckWhenAllSinksReady() { + TestStream sink1 = + TestStream.create(AvroCoder.of(FeatureSetReference.class)) + .addElements(new FeatureSetReference("project", "fs", 1)) + .addElements(new FeatureSetReference("project", "fs", 2)) + .addElements(new FeatureSetReference("project", "fs", 3)) + .advanceWatermarkToInfinity(); + + TestStream sink2 = + TestStream.create(AvroCoder.of(FeatureSetReference.class)) + .addElements(new FeatureSetReference("project", "fs_2", 1)) + .addElements(new FeatureSetReference("project", "fs", 3)) + .advanceWatermarkToInfinity(); + + TestStream sink3 = + TestStream.create(AvroCoder.of(FeatureSetReference.class)) + .advanceProcessingTime(Duration.standardSeconds(10)) + .addElements(new FeatureSetReference("project", "fs", 3)) + .advanceWatermarkToInfinity(); + + PCollectionList sinks = + PCollectionList.of( + ImmutableList.of( + p.apply("sink1", sink1), p.apply("sink2", sink2), p.apply("sink3", sink3))); + + PCollection grouped = + sinks.apply(Flatten.pCollections()).apply(new WriteFeatureSetSpecAck.PrepareWrite(3)); + + PAssert.that(grouped) + .inOnTimePane(GlobalWindow.INSTANCE) + .containsInAnyOrder(new FeatureSetReference("project", "fs", 3)); + + p.run(); + } +} diff --git a/storage/api/src/main/java/feast/storage/api/writer/FeatureSink.java b/storage/api/src/main/java/feast/storage/api/writer/FeatureSink.java index 92d4659e77..8734398a68 100644 --- a/storage/api/src/main/java/feast/storage/api/writer/FeatureSink.java +++ b/storage/api/src/main/java/feast/storage/api/writer/FeatureSink.java @@ -31,18 +31,12 @@ public interface FeatureSink extends Serializable { * Set up storage backend for write. This method will be called once during pipeline * initialisation. * - *

Examples when schemas need to be updated: + *

Should create transformation that would update sink's state based on given FeatureSetSpec + * stream. Returning stream should notify subscribers about successful installation of new + * FeatureSetSpec referenced by {@link FeatureSetReference}. * - *

    - *
  • when a new entity is registered, a table usually needs to be created - *
  • when a new feature is registered, a column with appropriate data type usually needs to be - * created - *
- * - *

If the storage backend is a key-value or a schema-less database, however, there may not be a - * need to manage any schemas. - * - * @param featureSetSpecs Feature set to be written + * @param featureSetSpecs specs stream + * @return stream of state updated events */ PCollection prepareWrite( PCollection> featureSetSpecs); diff --git a/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java b/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java index f43479c68f..b82e500a8c 100644 --- a/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java +++ b/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java @@ -201,9 +201,7 @@ public void simpleInsert() { p.apply( Create.of( ImmutableMap.of( - new FeatureSetReference( - String.format("%s/%s", spec.getProject(), spec.getName()), 1), - spec)))); + new FeatureSetReference(spec.getProject(), spec.getName(), 1), spec)))); PCollection successfulInserts = p.apply(featureRowTestStream).apply(sink.writer()).getSuccessfulInserts(); PAssert.that(successfulInserts).containsInAnyOrder(row1, row2); @@ -251,9 +249,7 @@ public void uniqueJobIdPerWindow() { "StaticSpecs", Create.of( ImmutableMap.of( - new FeatureSetReference( - String.format("%s/%s", spec.getProject(), spec.getName()), 1), - spec)))); + new FeatureSetReference(spec.getProject(), spec.getName(), 1), spec)))); p.apply(featureRowTestStream).apply(sink.writer()); p.run(); @@ -285,9 +281,7 @@ public void expectingJobResult() { "StaticSpecs", Create.of( ImmutableMap.of( - new FeatureSetReference( - String.format("%s/%s", spec.getProject(), spec.getName()), 1), - spec)))); + new FeatureSetReference(spec.getProject(), spec.getName(), 1), spec)))); PTransform, WriteResult> writer = ((BigQueryWrite) sink.writer()).withExpectingResultTime(Duration.standardSeconds(5)); @@ -334,8 +328,7 @@ public void updateSchemaWithExistingTable() { p.apply( Create.of( ImmutableMap.of( - new FeatureSetReference( - String.format("%s/%s", spec_fs_2.getProject(), spec_fs_2.getName()), 1), + new FeatureSetReference(spec_fs_2.getProject(), spec_fs_2.getName(), 1), spec_fs_2)))); TestStream featureRowTestStream = @@ -390,10 +383,10 @@ public void updateSpecInFlight() { KvCoder.of( AvroCoder.of(FeatureSetReference.class), ProtoCoder.of(FeatureSetSpec.class))) .advanceWatermarkTo(Instant.now()) - .addElements(KV.of(new FeatureSetReference("myproject/fs", 1), spec)) + .addElements(KV.of(new FeatureSetReference("myproject", "fs", 1), spec)) .advanceProcessingTime(Duration.standardSeconds(5)) // .advanceWatermarkTo(Instant.now().plus(Duration.standardSeconds(5))) - .addElements(KV.of(new FeatureSetReference("myproject/fs", 1), spec_fs_2)) + .addElements(KV.of(new FeatureSetReference("myproject", "fs", 1), spec_fs_2)) .advanceWatermarkToInfinity(); FeatureSink sink = diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java index 2a3895d791..6c02364687 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java @@ -119,8 +119,8 @@ public void setUp() throws IOException { Map specMap = ImmutableMap.of( - new FeatureSetReference("myproject/fs", 1), spec1, - new FeatureSetReference("myproject/feature_set", 1), spec2); + new FeatureSetReference("myproject", "fs", 1), spec1, + new FeatureSetReference("myproject", "feature_set", 1), spec2); RedisClusterConfig redisClusterConfig = RedisClusterConfig.newBuilder() .setConnectionString(CONNECTION_STRING) diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java index 3e98447063..123a3d81af 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java @@ -112,8 +112,8 @@ public void setUp() throws IOException { specMap = ImmutableMap.of( - new FeatureSetReference("myproject/fs", 1), spec1, - new FeatureSetReference("myproject/feature_set", 1), spec2); + new FeatureSetReference("myproject", "fs", 1), spec1, + new FeatureSetReference("myproject", "feature_set", 1), spec2); StoreProto.Store.RedisConfig redisConfig = StoreProto.Store.RedisConfig.newBuilder().setHost(REDIS_HOST).setPort(REDIS_PORT).build();