Skip to content

Commit

Permalink
apidocs & tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pyalex committed Jun 24, 2020
1 parent 8ce55d0 commit daec134
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@
@Data
@AllArgsConstructor
public class FeatureSetReference implements Serializable {
private String reference;
private String projectName;
private String featureSetName;
private Integer version;

// Empty constructor required for Avro decoding.
@SuppressWarnings("unused")
public FeatureSetReference() {}

public String getReference() {
return String.format("%s/%s", projectName, featureSetName);
}
}
17 changes: 8 additions & 9 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti

PCollectionView<Map<String, Iterable<FeatureSetSpec>>> globalSpecView =
featureSetSpecs
.apply(
MapElements.via(
new SimpleFunction<
KV<FeatureSetReference, FeatureSetSpec>, KV<String, FeatureSetSpec>>() {
public KV<String, FeatureSetSpec> apply(
KV<FeatureSetReference, FeatureSetSpec> input) {
return KV.of(input.getKey().getReference(), input.getValue());
}
}))
.apply(MapElements.via(new ReferenceToString()))
.apply("GlobalSpecView", View.asMultimap());

// Step 2. Read messages from Feast Source as FeatureRow.
Expand Down Expand Up @@ -214,4 +206,11 @@ public KV<String, FeatureSetSpec> apply(

return pipeline.run();
}

private static class ReferenceToString
extends SimpleFunction<KV<FeatureSetReference, FeatureSetSpec>, KV<String, FeatureSetSpec>> {
public KV<String, FeatureSetSpec> apply(KV<FeatureSetReference, FeatureSetSpec> input) {
return KV.of(input.getKey().getReference(), input.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package feast.ingestion.transform.specs;

import static feast.ingestion.utils.SpecUtil.parseFeatureSetReference;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -40,6 +42,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.joda.time.Duration;

Expand Down Expand Up @@ -121,9 +124,11 @@ public static class CreateFeatureSetReference
@ProcessElement
public void process(
ProcessContext c, @Element KV<String, FeatureSetProto.FeatureSetSpec> input) {
Pair<String, String> reference = parseFeatureSetReference(input.getKey());
c.output(
KV.of(
new FeatureSetReference(input.getKey(), input.getValue().getVersion()),
new FeatureSetReference(
reference.getLeft(), reference.getRight(), input.getValue().getVersion()),
input.getValue()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@
import org.apache.beam.sdk.values.PDone;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;

/**
* Converts input {@link feast.proto.core.FeatureSetProto.FeatureSetSpec} into {@link
* Collects output from sinks prepareWrite (several streams flatten into one). As soon as count of
* each FeatureSetReference reach getSinksCount() - it means that enough amount of sinks updated its
* state - ack is pushed.
*
* <p>Converts input {@link FeatureSetReference} into {@link
* feast.proto.core.IngestionJobProto.FeatureSetSpecAck} message and writes it to kafka (ack-topic).
*/
@AutoValue
Expand All @@ -61,18 +66,7 @@ public abstract Builder setSpecsStreamingUpdateConfig(
@Override
public PDone expand(PCollection<FeatureSetReference> input) {
return input
.apply(
"OnEveryElementTrigger",
Window.<FeatureSetReference>into(new GlobalWindows())
.accumulatingFiredPanes()
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
.apply("CountingReadySinks", Count.perElement())
.apply(
"WhenAllReady",
Filter.by(
(SerializableFunction<KV<FeatureSetReference, Long>, Boolean>)
count -> count.getValue() >= getSinksCount()))
.apply(Keys.create())
.apply("Prepare", new PrepareWrite(getSinksCount()))
.apply("FeatureSetSpecToAckMessage", ParDo.of(new BuildAckMessage()))
.apply(
"ToKafka",
Expand All @@ -99,4 +93,35 @@ public void process(ProcessContext c) throws IOException {
c.output(KV.of(c.element().getReference(), encodedAck.toByteArray()));
}
}

/**
* Groups FeatureSetReference to generate ack only when amount of repeating elements reach
* sinksCount
*/
static class PrepareWrite
extends PTransform<PCollection<FeatureSetReference>, PCollection<FeatureSetReference>> {
private final Integer sinksCount;

PrepareWrite(Integer sinksCount) {
this.sinksCount = sinksCount;
}

@Override
public PCollection<FeatureSetReference> expand(PCollection<FeatureSetReference> input) {
return input
.apply(
"OnEveryElementTrigger",
Window.<FeatureSetReference>into(new GlobalWindows())
.accumulatingFiredPanes()
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.ZERO))
.apply("CountingReadySinks", Count.perElement())
.apply(
"WhenAllReady",
Filter.by(
(SerializableFunction<KV<FeatureSetReference, Long>, Boolean>)
count -> count.getValue() >= sinksCount))
.apply(Keys.create());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.transform.specs;

import com.google.common.collect.ImmutableList;
import feast.common.models.FeatureSetReference;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;

public class WriteFeatureSetSpecAckTest {
@Rule public transient TestPipeline p = TestPipeline.create();

@Test
public void shouldSendAckWhenAllSinksReady() {
TestStream<FeatureSetReference> sink1 =
TestStream.create(AvroCoder.of(FeatureSetReference.class))
.addElements(new FeatureSetReference("project", "fs", 1))
.addElements(new FeatureSetReference("project", "fs", 2))
.addElements(new FeatureSetReference("project", "fs", 3))
.advanceWatermarkToInfinity();

TestStream<FeatureSetReference> sink2 =
TestStream.create(AvroCoder.of(FeatureSetReference.class))
.addElements(new FeatureSetReference("project", "fs_2", 1))
.addElements(new FeatureSetReference("project", "fs", 3))
.advanceWatermarkToInfinity();

TestStream<FeatureSetReference> sink3 =
TestStream.create(AvroCoder.of(FeatureSetReference.class))
.advanceProcessingTime(Duration.standardSeconds(10))
.addElements(new FeatureSetReference("project", "fs", 3))
.advanceWatermarkToInfinity();

PCollectionList<FeatureSetReference> sinks =
PCollectionList.of(
ImmutableList.of(
p.apply("sink1", sink1), p.apply("sink2", sink2), p.apply("sink3", sink3)));

PCollection<FeatureSetReference> grouped =
sinks.apply(Flatten.pCollections()).apply(new WriteFeatureSetSpecAck.PrepareWrite(3));

PAssert.that(grouped)
.inOnTimePane(GlobalWindow.INSTANCE)
.containsInAnyOrder(new FeatureSetReference("project", "fs", 3));

p.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,12 @@ public interface FeatureSink extends Serializable {
* Set up storage backend for write. This method will be called once during pipeline
* initialisation.
*
* <p>Examples when schemas need to be updated:
* <p>Should create transformation that would update sink's state based on given FeatureSetSpec
* stream. Returning stream should notify subscribers about successful installation of new
* FeatureSetSpec referenced by {@link FeatureSetReference}.
*
* <ul>
* <li>when a new entity is registered, a table usually needs to be created
* <li>when a new feature is registered, a column with appropriate data type usually needs to be
* created
* </ul>
*
* <p>If the storage backend is a key-value or a schema-less database, however, there may not be a
* need to manage any schemas.
*
* @param featureSetSpecs Feature set to be written
* @param featureSetSpecs specs stream
* @return stream of state updated events
*/
PCollection<FeatureSetReference> prepareWrite(
PCollection<KV<FeatureSetReference, FeatureSetProto.FeatureSetSpec>> featureSetSpecs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,7 @@ public void simpleInsert() {
p.apply(
Create.of(
ImmutableMap.of(
new FeatureSetReference(
String.format("%s/%s", spec.getProject(), spec.getName()), 1),
spec))));
new FeatureSetReference(spec.getProject(), spec.getName(), 1), spec))));
PCollection<FeatureRow> successfulInserts =
p.apply(featureRowTestStream).apply(sink.writer()).getSuccessfulInserts();
PAssert.that(successfulInserts).containsInAnyOrder(row1, row2);
Expand Down Expand Up @@ -251,9 +249,7 @@ public void uniqueJobIdPerWindow() {
"StaticSpecs",
Create.of(
ImmutableMap.of(
new FeatureSetReference(
String.format("%s/%s", spec.getProject(), spec.getName()), 1),
spec))));
new FeatureSetReference(spec.getProject(), spec.getName(), 1), spec))));

p.apply(featureRowTestStream).apply(sink.writer());
p.run();
Expand Down Expand Up @@ -285,9 +281,7 @@ public void expectingJobResult() {
"StaticSpecs",
Create.of(
ImmutableMap.of(
new FeatureSetReference(
String.format("%s/%s", spec.getProject(), spec.getName()), 1),
spec))));
new FeatureSetReference(spec.getProject(), spec.getName(), 1), spec))));

PTransform<PCollection<FeatureRow>, WriteResult> writer =
((BigQueryWrite) sink.writer()).withExpectingResultTime(Duration.standardSeconds(5));
Expand Down Expand Up @@ -334,8 +328,7 @@ public void updateSchemaWithExistingTable() {
p.apply(
Create.of(
ImmutableMap.of(
new FeatureSetReference(
String.format("%s/%s", spec_fs_2.getProject(), spec_fs_2.getName()), 1),
new FeatureSetReference(spec_fs_2.getProject(), spec_fs_2.getName(), 1),
spec_fs_2))));

TestStream<FeatureRow> featureRowTestStream =
Expand Down Expand Up @@ -390,10 +383,10 @@ public void updateSpecInFlight() {
KvCoder.of(
AvroCoder.of(FeatureSetReference.class), ProtoCoder.of(FeatureSetSpec.class)))
.advanceWatermarkTo(Instant.now())
.addElements(KV.of(new FeatureSetReference("myproject/fs", 1), spec))
.addElements(KV.of(new FeatureSetReference("myproject", "fs", 1), spec))
.advanceProcessingTime(Duration.standardSeconds(5))
// .advanceWatermarkTo(Instant.now().plus(Duration.standardSeconds(5)))
.addElements(KV.of(new FeatureSetReference("myproject/fs", 1), spec_fs_2))
.addElements(KV.of(new FeatureSetReference("myproject", "fs", 1), spec_fs_2))
.advanceWatermarkToInfinity();

FeatureSink sink =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public void setUp() throws IOException {

Map<FeatureSetReference, FeatureSetSpec> specMap =
ImmutableMap.of(
new FeatureSetReference("myproject/fs", 1), spec1,
new FeatureSetReference("myproject/feature_set", 1), spec2);
new FeatureSetReference("myproject", "fs", 1), spec1,
new FeatureSetReference("myproject", "feature_set", 1), spec2);
RedisClusterConfig redisClusterConfig =
RedisClusterConfig.newBuilder()
.setConnectionString(CONNECTION_STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ public void setUp() throws IOException {

specMap =
ImmutableMap.of(
new FeatureSetReference("myproject/fs", 1), spec1,
new FeatureSetReference("myproject/feature_set", 1), spec2);
new FeatureSetReference("myproject", "fs", 1), spec1,
new FeatureSetReference("myproject", "feature_set", 1), spec2);
StoreProto.Store.RedisConfig redisConfig =
StoreProto.Store.RedisConfig.newBuilder().setHost(REDIS_HOST).setPort(REDIS_PORT).build();

Expand Down

0 comments on commit daec134

Please sign in to comment.