Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unserializable FluentBackoff cause null pointer exception in Dataflow Runner #417

Merged
merged 1 commit into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/feast/core/service/SpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request) {
* possible if a project name is not set explicitly
*
* <p>The version field can be one of - '*' - This will match all versions - 'latest' - This will
* match the latest feature set version - '&lt;number&gt;' - This will match a specific feature set
* version. This property can only be set if both the feature set name and project name are
* match the latest feature set version - '&lt;number&gt;' - This will match a specific feature
* set version. This property can only be set if both the feature set name and project name are
* explicitly set.
*
* @param filter filter containing the desired featureSet name and version filter
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/feast/core/util/PackageUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class PackageUtil {
* points to the resource location. Note that the extraction process can take several minutes to
* complete.
*
* <p>One use case of this function is to detect the class path of resources to stage when
* using Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be
* handled by default in Apache Beam.
* <p>One use case of this function is to detect the class path of resources to stage when using
* Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be handled by
* default in Apache Beam.
*
* <pre>
* <code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,14 @@ public PDone expand(PCollection<FeatureRow> input) {
switch (storeType) {
case REDIS:
RedisConfig redisConfig = getStore().getRedisConfig();
PCollection<FailedElement> redisWriteResult = input
.apply(
"FeatureRowToRedisMutation",
ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
.apply(
"WriteRedisMutationToRedis",
RedisCustomIO.write(redisConfig));
PCollection<FailedElement> redisWriteResult =
input
.apply(
"FeatureRowToRedisMutation",
ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
.apply("WriteRedisMutationToRedis", RedisCustomIO.write(redisConfig));
if (options.getDeadLetterTableSpec() != null) {
redisWriteResult.apply(
redisWriteResult.apply(
WriteFailedElementToBigQuery.newBuilder()
.setTableSpec(options.getDeadLetterTableSpec())
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import feast.types.FieldProto;
import feast.types.ValueProto.Value.ValCase;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.TupleTag;

Expand Down Expand Up @@ -111,10 +109,7 @@ public void processElement(ProcessContext context) {
}
context.output(getFailureTag(), failedElement.build());
} else {
featureRow = featureRow.toBuilder()
.clearFields()
.addAllFields(fields)
.build();
featureRow = featureRow.toBuilder().clearFields().addAllFields(fields).build();
context.output(getSuccessTag(), featureRow);
}
}
Expand Down
66 changes: 43 additions & 23 deletions ingestion/src/main/java/feast/retry/BackOffExecutor.java
Original file line number Diff line number Diff line change
@@ -1,38 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.retry;

import java.io.Serializable;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.joda.time.Duration;

import java.io.IOException;
import java.io.Serializable;

public class BackOffExecutor implements Serializable {

private static FluentBackoff backoff;
private final Integer maxRetries;
private final Duration initialBackOff;

public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
backoff = FluentBackoff.DEFAULT
.withMaxRetries(maxRetries)
.withInitialBackoff(initialBackOff);
}
public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
this.maxRetries = maxRetries;
this.initialBackOff = initialBackOff;
}

public void execute(Retriable retriable) throws Exception {
FluentBackoff backoff =
FluentBackoff.DEFAULT.withMaxRetries(maxRetries).withInitialBackoff(initialBackOff);
execute(retriable, backoff);
}

public void execute(Retriable retriable) throws Exception {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backOff = backoff.backoff();
while(true) {
try {
retriable.execute();
break;
} catch (Exception e) {
if(retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
retriable.cleanUpAfterFailure();
} else {
throw e;
}
}
private void execute(Retriable retriable, FluentBackoff backoff) throws Exception {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backOff = backoff.backoff();
while (true) {
try {
retriable.execute();
break;
} catch (Exception e) {
if (retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
retriable.cleanUpAfterFailure();
} else {
throw e;
}
}
}
}
}
24 changes: 21 additions & 3 deletions ingestion/src/main/java/feast/retry/Retriable.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.retry;

public interface Retriable {
void execute();
Boolean isExceptionRetriable(Exception e);
void cleanUpAfterFailure();
void execute();

Boolean isExceptionRetriable(Exception e);

void cleanUpAfterFailure();
}
117 changes: 62 additions & 55 deletions ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import feast.ingestion.values.FailedElement;
import feast.retry.BackOffExecutor;
import feast.retry.Retriable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
Expand All @@ -38,10 +41,6 @@
import redis.clients.jedis.Response;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class RedisCustomIO {

private static final int DEFAULT_BATCH_SIZE = 1000;
Expand Down Expand Up @@ -164,7 +163,8 @@ public void setScore(@Nullable Long score) {
}

/** ServingStoreWrite data to a Redis server. */
public static class Write extends PTransform<PCollection<RedisMutation>, PCollection<FailedElement>> {
public static class Write
extends PTransform<PCollection<RedisMutation>, PCollection<FailedElement>> {

private WriteDoFn dofn;

Expand Down Expand Up @@ -202,9 +202,10 @@ public static class WriteDoFn extends DoFn<RedisMutation, FailedElement> {
WriteDoFn(StoreProto.Store.RedisConfig redisConfig) {
this.host = redisConfig.getHost();
this.port = redisConfig.getPort();
long backoffMs = redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
this.backOffExecutor = new BackOffExecutor(redisConfig.getMaxRetries(),
Duration.millis(backoffMs));
long backoffMs =
redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
this.backOffExecutor =
new BackOffExecutor(redisConfig.getMaxRetries(), Duration.millis(backoffMs));
}

public WriteDoFn withBatchSize(int batchSize) {
Expand Down Expand Up @@ -233,47 +234,50 @@ public void startBundle() {
}

private void executeBatch() throws Exception {
backOffExecutor.execute(new Retriable() {
@Override
public void execute() {
pipeline.multi();
mutations.forEach(mutation -> {
writeRecord(mutation);
if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
backOffExecutor.execute(
new Retriable() {
@Override
public void execute() {
pipeline.multi();
mutations.forEach(
mutation -> {
writeRecord(mutation);
if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
}
});
pipeline.exec();
pipeline.sync();
mutations.clear();
}
});
pipeline.exec();
pipeline.sync();
mutations.clear();
}

@Override
public Boolean isExceptionRetriable(Exception e) {
return e instanceof JedisConnectionException;
}
@Override
public Boolean isExceptionRetriable(Exception e) {
return e instanceof JedisConnectionException;
}

@Override
public void cleanUpAfterFailure() {
try {
pipeline.close();
} catch (IOException e) {
log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
}
jedis = new Jedis(host, port, timeout);
pipeline = jedis.pipelined();
}
});
@Override
public void cleanUpAfterFailure() {
try {
pipeline.close();
} catch (IOException e) {
log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
}
jedis = new Jedis(host, port, timeout);
pipeline = jedis.pipelined();
}
});
}

private FailedElement toFailedElement(RedisMutation mutation, Exception exception, String jobName) {
private FailedElement toFailedElement(
RedisMutation mutation, Exception exception, String jobName) {
return FailedElement.newBuilder()
.setJobName(jobName)
.setTransformName("RedisCustomIO")
.setPayload(mutation.getValue().toString())
.setErrorMessage(exception.getMessage())
.setStackTrace(ExceptionUtils.getStackTrace(exception))
.build();
.setJobName(jobName)
.setTransformName("RedisCustomIO")
.setPayload(mutation.getValue().toString())
.setErrorMessage(exception.getMessage())
.setStackTrace(ExceptionUtils.getStackTrace(exception))
.build();
}

@ProcessElement
Expand All @@ -284,11 +288,12 @@ public void processElement(ProcessContext context) {
try {
executeBatch();
} catch (Exception e) {
mutations.forEach(failedMutation -> {
FailedElement failedElement = toFailedElement(
failedMutation, e, context.getPipelineOptions().getJobName());
context.output(failedElement);
});
mutations.forEach(
failedMutation -> {
FailedElement failedElement =
toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName());
context.output(failedElement);
});
mutations.clear();
}
}
Expand All @@ -315,16 +320,18 @@ private Response<?> writeRecord(RedisMutation mutation) {
}

@FinishBundle
public void finishBundle(FinishBundleContext context) throws IOException, InterruptedException {
if(mutations.size() > 0) {
public void finishBundle(FinishBundleContext context)
throws IOException, InterruptedException {
if (mutations.size() > 0) {
try {
executeBatch();
} catch (Exception e) {
mutations.forEach(failedMutation -> {
FailedElement failedElement = toFailedElement(
failedMutation, e, context.getPipelineOptions().getJobName());
context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
});
mutations.forEach(
failedMutation -> {
FailedElement failedElement =
toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName());
context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
});
mutations.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,14 @@ public void shouldExcludeUnregisteredFields() {

FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1);
expected.add(randomRow);
input.add(randomRow.toBuilder()
.addFields(Field.newBuilder()
.setName("extra")
.setValue(Value.newBuilder().setStringVal("hello")))
.build()
);
input.add(
randomRow
.toBuilder()
.addFields(
Field.newBuilder()
.setName("extra")
.setValue(Value.newBuilder().setStringVal("hello")))
.build());

PCollectionTuple output =
p.apply(Create.of(input))
Expand Down
Loading