Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send acknowledgment on Spec Update only after sinks are ready #822

Merged
merged 3 commits into from
Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<!--compileOnly 'org.projectlombok:lombok:1.18.12'-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make a decision on Lombok. Personally I am open to using it, but the team previously decided to phase it out and to use purely AutoValue. Apparently they are not compatible.

Copy link
Collaborator Author

@pyalex pyalex Jun 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so I think AutoValue is more heavily used, since it's everywhere in Beam and I also applied it a lot in ingestion. So I changed to AutoValue.
Unfortunately AutoValue cannot be used in this specific case since Avro needs to have empty constructor on object, which is hard to have on autovalue object if not impossible. In general I like AutoValue more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so lets continue to use Lombok for the time being. I hope this doesnt interfere with the rest of the code base.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there is no need to use the AvroCoder, right (@DefaultSchema(AutoValueSchema.class) can be used for serializable autovalue types)? And even if you do want to use AvroCoder, in this case there are only 3 parameters to the object, the methods can be easily written out in code over using Lombok

Copy link
Collaborator Author

@pyalex pyalex Jun 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhilingc you mean SerializableCoder? it's not deterministic and thus such class can't be used as Key in KV.
Regarding second point, I also need equality and hash to use it as key. And I'm not a fan of generating boilerplate code.
I've already produced several enormously huge PRs for the recent few weeks. Don't think it was easy to review them

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"as key" I meant for grouping operations of course

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I guess @zhilingc is right. I could use SchemaCoder. Of course not through decorator @DefaultSchema() since it's common module and we don't want to have dependency on beam here. But that's still feasible. I guess we can move towards AutoValue.

</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
47 changes: 47 additions & 0 deletions common/src/main/java/feast/common/models/FeatureSetReference.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.common.models;

import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Data;

/**
* FeatureSetReference is key that uniquely defines specific version of FeatureSet or FeatureSetSpec
*/
@Data
@AllArgsConstructor
public class FeatureSetReference implements Serializable {
/* Name of project to which this featureSet is assigned */
private String projectName;
/* Name of FeatureSet */
private String featureSetName;
/* Version of FeatureSet */
private Integer version;
pyalex marked this conversation as resolved.
Show resolved Hide resolved

// Empty constructor required for Avro decoding.
@SuppressWarnings("unused")
public FeatureSetReference() {}

public static FeatureSetReference of(String projectName, String featureSetName, Integer version) {
return new FeatureSetReference(projectName, featureSetName, version);
}

public String getReference() {
return String.format("%s/%s", getProjectName(), getFeatureSetName());
}
}
34 changes: 25 additions & 9 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static feast.ingestion.utils.StoreUtil.getFeatureSink;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.common.models.FeatureSetReference;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.transform.FeatureRowToStoreAllocator;
import feast.ingestion.transform.ProcessAndValidateFeatureRows;
Expand Down Expand Up @@ -96,7 +97,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
SpecUtil.parseSpecsStreamingUpdateConfig(options.getSpecsStreamingUpdateConfigJson());

// Step 1. Read FeatureSetSpecs from Spec source
PCollection<KV<String, FeatureSetSpec>> featureSetSpecs =
PCollection<KV<FeatureSetReference, FeatureSetSpec>> featureSetSpecs =
pipeline.apply(
"ReadFeatureSetSpecs",
ReadFeatureSetSpecs.newBuilder()
Expand All @@ -106,7 +107,9 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.build());

PCollectionView<Map<String, Iterable<FeatureSetSpec>>> globalSpecView =
featureSetSpecs.apply("GlobalSpecView", View.asMultimap());
featureSetSpecs
.apply(MapElements.via(new ReferenceToString()))
.apply("GlobalSpecView", View.asMultimap());

// Step 2. Read messages from Feast Source as FeatureRow.
PCollectionTuple convertedFeatureRows =
Expand Down Expand Up @@ -145,8 +148,12 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.setStoreTags(storeTags)
.build());

PCollectionList<FeatureSetReference> sinkReadiness = PCollectionList.empty(pipeline);

for (Store store : stores) {
FeatureSink featureSink = getFeatureSink(store, featureSetSpecs);
FeatureSink featureSink = getFeatureSink(store);

sinkReadiness = sinkReadiness.and(featureSink.prepareWrite(featureSetSpecs));

// Step 5. Write metrics of successfully validated rows
validatedRows
Expand Down Expand Up @@ -188,13 +195,22 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.apply("WriteFailureMetrics", WriteFailureMetricsTransform.create(store.getName()));
}

// Step 9. Send ack that FeatureSetSpec state is updated
featureSetSpecs.apply(
"WriteAck",
WriteFeatureSetSpecAck.newBuilder()
.setSpecsStreamingUpdateConfig(specsStreamingUpdateConfig)
.build());
sinkReadiness
.apply(Flatten.pCollections())
.apply(
"WriteAck",
WriteFeatureSetSpecAck.newBuilder()
.setSinksCount(stores.size())
.setSpecsStreamingUpdateConfig(specsStreamingUpdateConfig)
.build());

return pipeline.run();
}

private static class ReferenceToString
extends SimpleFunction<KV<FeatureSetReference, FeatureSetSpec>, KV<String, FeatureSetSpec>> {
public KV<String, FeatureSetSpec> apply(KV<FeatureSetReference, FeatureSetSpec> input) {
return KV.of(input.getKey().getReference(), input.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,23 @@
*/
package feast.ingestion.transform.specs;

import static feast.ingestion.utils.SpecUtil.parseFeatureSetReference;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import feast.common.models.FeatureSetReference;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.IngestionJobProto;
import feast.proto.core.SourceProto;
import feast.proto.core.StoreProto;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
Expand All @@ -35,6 +42,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.joda.time.Duration;

Expand All @@ -50,7 +58,7 @@
*/
@AutoValue
public abstract class ReadFeatureSetSpecs
extends PTransform<PBegin, PCollection<KV<String, FeatureSetSpec>>> {
extends PTransform<PBegin, PCollection<KV<FeatureSetReference, FeatureSetSpec>>> {
public abstract IngestionJobProto.SpecsStreamingUpdateConfig getSpecsStreamingUpdateConfig();

public abstract SourceProto.Source getSource();
Expand All @@ -74,7 +82,7 @@ public abstract Builder setSpecsStreamingUpdateConfig(
}

@Override
public PCollection<KV<String, FeatureSetSpec>> expand(PBegin input) {
public PCollection<KV<FeatureSetReference, FeatureSetSpec>> expand(PBegin input) {
return input
.apply(
KafkaIO.readBytes()
Expand Down Expand Up @@ -102,6 +110,26 @@ public PCollection<KV<String, FeatureSetSpec>> expand(PBegin input) {
featureSetSpecs.sort(
Comparator.comparing(FeatureSetSpec::getVersion).reversed());
return featureSetSpecs.get(0);
}));
}))
.apply("CreateFeatureSetReferenceKey", ParDo.of(new CreateFeatureSetReference()))
.setCoder(
KvCoder.of(
AvroCoder.of(FeatureSetReference.class), ProtoCoder.of(FeatureSetSpec.class)));
}

public static class CreateFeatureSetReference
extends DoFn<
KV<String, FeatureSetProto.FeatureSetSpec>,
KV<FeatureSetReference, FeatureSetProto.FeatureSetSpec>> {
@ProcessElement
public void process(
ProcessContext c, @Element KV<String, FeatureSetProto.FeatureSetSpec> input) {
Pair<String, String> reference = parseFeatureSetReference(input.getKey());
c.output(
KV.of(
FeatureSetReference.of(
reference.getLeft(), reference.getRight(), input.getValue().getVersion()),
input.getValue()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,38 @@
package feast.ingestion.transform.specs;

import com.google.auto.value.AutoValue;
import feast.proto.core.FeatureSetProto;
import feast.common.models.FeatureSetReference;
import feast.proto.core.IngestionJobProto;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;

/**
* Converts input {@link feast.proto.core.FeatureSetProto.FeatureSetSpec} into {@link
* Collects output from sinks prepareWrite (several streams flatten into one). As soon as count of
* each FeatureSetReference reach getSinksCount() - it means that enough amount of sinks updated its
* state - ack is pushed.
woop marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>Converts input {@link FeatureSetReference} into {@link
* feast.proto.core.IngestionJobProto.FeatureSetSpecAck} message and writes it to kafka (ack-topic).
*/
@AutoValue
public abstract class WriteFeatureSetSpecAck
extends PTransform<PCollection<KV<String, FeatureSetProto.FeatureSetSpec>>, PDone> {
extends PTransform<PCollection<FeatureSetReference>, PDone> {
public abstract IngestionJobProto.SpecsStreamingUpdateConfig getSpecsStreamingUpdateConfig();

public abstract Integer getSinksCount();

public static Builder newBuilder() {
return new AutoValue_WriteFeatureSetSpecAck.Builder();
}
Expand All @@ -49,12 +58,15 @@ public abstract static class Builder {
public abstract Builder setSpecsStreamingUpdateConfig(
IngestionJobProto.SpecsStreamingUpdateConfig config);

public abstract Builder setSinksCount(Integer count);

public abstract WriteFeatureSetSpecAck build();
}

@Override
public PDone expand(PCollection<KV<String, FeatureSetProto.FeatureSetSpec>> input) {
public PDone expand(PCollection<FeatureSetReference> input) {
return input
.apply("Prepare", new PrepareWrite(getSinksCount()))
.apply("FeatureSetSpecToAckMessage", ParDo.of(new BuildAckMessage()))
.apply(
"ToKafka",
Expand All @@ -66,20 +78,50 @@ public PDone expand(PCollection<KV<String, FeatureSetProto.FeatureSetSpec>> inpu
.withValueSerializer(ByteArraySerializer.class));
}

private static class BuildAckMessage
extends DoFn<KV<String, FeatureSetProto.FeatureSetSpec>, KV<String, byte[]>> {
private static class BuildAckMessage extends DoFn<FeatureSetReference, KV<String, byte[]>> {
@ProcessElement
public void process(ProcessContext c) throws IOException {
ByteArrayOutputStream encodedAck = new ByteArrayOutputStream();

IngestionJobProto.FeatureSetSpecAck.newBuilder()
.setFeatureSetReference(c.element().getKey())
.setFeatureSetReference(c.element().getReference())
.setJobName(c.getPipelineOptions().getJobName())
.setFeatureSetVersion(c.element().getValue().getVersion())
.setFeatureSetVersion(c.element().getVersion())
.build()
.writeTo(encodedAck);

c.output(KV.of(c.element().getKey(), encodedAck.toByteArray()));
c.output(KV.of(c.element().getReference(), encodedAck.toByteArray()));
}
}

/**
* Groups FeatureSetReference to generate ack only when amount of repeating elements reach
* sinksCount
*/
static class PrepareWrite
extends PTransform<PCollection<FeatureSetReference>, PCollection<FeatureSetReference>> {
private final Integer sinksCount;

PrepareWrite(Integer sinksCount) {
this.sinksCount = sinksCount;
}

@Override
public PCollection<FeatureSetReference> expand(PCollection<FeatureSetReference> input) {
return input
.apply(
"OnEveryElementTrigger",
Window.<FeatureSetReference>into(new GlobalWindows())
.accumulatingFiredPanes()
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.ZERO))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im wondering, what's the effect of AllowedLateness on global windows?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure, but Beam demands specified lateness

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oke

.apply("CountingReadySinks", Count.perElement())
.apply(
"WhenAllReady",
Filter.by(
(SerializableFunction<KV<FeatureSetReference, Long>, Boolean>)
count -> count.getValue() >= sinksCount))
.apply(Keys.create());
}
}
}
12 changes: 4 additions & 8 deletions ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static feast.proto.types.ValueProto.ValueType;

import com.google.cloud.bigquery.StandardSQLTypeName;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.StoreProto.Store;
import feast.proto.core.StoreProto.Store.StoreType;
import feast.proto.types.ValueProto.ValueType.Enum;
Expand All @@ -28,8 +27,6 @@
import feast.storage.connectors.redis.writer.RedisFeatureSink;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;

// TODO: Create partitioned table by default
Expand Down Expand Up @@ -80,16 +77,15 @@ public class StoreUtil {
VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(Enum.BOOL_LIST, StandardSQLTypeName.BOOL);
}

public static FeatureSink getFeatureSink(
Store store, PCollection<KV<String, FeatureSetSpec>> featureSetSpecs) {
public static FeatureSink getFeatureSink(Store store) {
StoreType storeType = store.getType();
switch (storeType) {
case REDIS_CLUSTER:
return RedisFeatureSink.fromConfig(store.getRedisClusterConfig(), featureSetSpecs);
return RedisFeatureSink.fromConfig(store.getRedisClusterConfig());
case REDIS:
return RedisFeatureSink.fromConfig(store.getRedisConfig(), featureSetSpecs);
return RedisFeatureSink.fromConfig(store.getRedisConfig());
case BIGQUERY:
return BigQueryFeatureSink.fromConfig(store.getBigqueryConfig(), featureSetSpecs);
return BigQueryFeatureSink.fromConfig(store.getBigqueryConfig());
default:
throw new RuntimeException(String.format("Store type '%s' is unsupported", storeType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -139,8 +140,10 @@ public void pipelineShouldReadSpecsAndAcknowledge() {
.setStores(ImmutableList.of(store))
.setSpecsStreamingUpdateConfig(specsStreamingUpdateConfig)
.build())
.apply(Keys.create())
.apply(
WriteFeatureSetSpecAck.newBuilder()
.setSinksCount(1)
.setSpecsStreamingUpdateConfig(specsStreamingUpdateConfig)
.build());

Expand Down
Loading