Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dead letter queue core #29164

Merged
merged 82 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
c22b8c1
Update 2.50 release notes to include new Kafka topicPattern feature
johnjcasey Jul 5, 2023
f7cf5de
Merge remote-tracking branch 'origin/master'
johnjcasey Jul 24, 2023
ab68ecb
Merge remote-tracking branch 'origin/master'
johnjcasey Aug 9, 2023
40ad1d5
Merge remote-tracking branch 'origin/master'
johnjcasey Aug 31, 2023
6c9c28d
Create groovy class for io performance tests
johnjcasey Aug 31, 2023
520c9d1
delete unnecessary class
johnjcasey Aug 31, 2023
062de23
fix env call
johnjcasey Aug 31, 2023
01fc25b
Merge pull request #181 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
9c9f86b
fix call to gradle
johnjcasey Aug 31, 2023
92306fa
Merge pull request #182 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
925ce55
run on hosted runner for testing
johnjcasey Aug 31, 2023
2dcfb70
Merge pull request #183 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
117ef8b
add additional checkout
johnjcasey Aug 31, 2023
1f73cda
Merge pull request #184 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
cb6e01b
add destination for triggered tests
johnjcasey Aug 31, 2023
a9e86aa
Merge pull request #185 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
8ea6c51
move env variables to correct location
johnjcasey Sep 1, 2023
d8822d7
Merge pull request #186 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 1, 2023
320a4cc
try uploading against separate dataset
johnjcasey Sep 1, 2023
e89b59e
Merge pull request #187 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 1, 2023
1cd4e55
try without a user
johnjcasey Sep 1, 2023
4473f17
Merge pull request #188 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 1, 2023
4fc5b8e
update branch checkout, try to view the failure log
johnjcasey Sep 5, 2023
706650d
Merge pull request #189 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
59069f2
run on failure
johnjcasey Sep 5, 2023
7f79b62
Merge pull request #190 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
6f51976
update to use correct BigQuery instance
johnjcasey Sep 5, 2023
e95d920
Merge pull request #191 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
df716cb
convert to matrix
johnjcasey Sep 5, 2023
4d8eded
Merge pull request #192 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
4bf0826
add result reporting
johnjcasey Sep 5, 2023
403f054
Merge pull request #193 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
d40d04b
add failure clause
johnjcasey Sep 5, 2023
aca4b2e
Merge pull request #194 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
2739e92
remove failure clause, update to run on self-hosted
johnjcasey Sep 5, 2023
bd6efeb
address comments, clean up build
johnjcasey Sep 6, 2023
226a655
clarify branching
johnjcasey Sep 6, 2023
9c7286b
Merge pull request #195 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 6, 2023
5b1b2c2
Merge branch 'apache:master' into master
johnjcasey Sep 15, 2023
f8c2b06
Merge remote-tracking branch 'origin/master'
johnjcasey Oct 24, 2023
d058ac9
Add error handling base implementation & test DLQ enabled class
johnjcasey Oct 27, 2023
8c9dd94
Add test cases
johnjcasey Oct 27, 2023
4d23fe8
apply spotless
johnjcasey Oct 27, 2023
31432b7
Fix Checkstyles
johnjcasey Oct 30, 2023
a9dae91
Fix Checkstyles
johnjcasey Oct 30, 2023
64dde49
make DLH serializable
johnjcasey Oct 30, 2023
c82185d
rename dead letter to bad record
johnjcasey Oct 30, 2023
78d45a8
make DLH serializable
johnjcasey Oct 31, 2023
6c36549
Change bad record router name, and use multioutputreceiver instead of…
johnjcasey Oct 31, 2023
44036be
Refactor BadRecord to be nested
johnjcasey Oct 31, 2023
06ca166
clean up checkstyle
johnjcasey Oct 31, 2023
a26d605
Update error handler test
johnjcasey Oct 31, 2023
5a6e8d0
Add metrics for counting error records, and for measuring feature usage
johnjcasey Nov 1, 2023
70c8991
apply spotless
johnjcasey Nov 1, 2023
36baf98
fix checkstyle
johnjcasey Nov 1, 2023
2119c76
make metric reporting static
johnjcasey Nov 1, 2023
798cfc3
spotless
johnjcasey Nov 1, 2023
c03bb2b
Rework annotations to be an explicit label on a PTransform, instead o…
johnjcasey Nov 2, 2023
bf99363
fix checkstyle
johnjcasey Nov 2, 2023
881f9d8
Address comments
johnjcasey Nov 8, 2023
f8c6d8c
Address comments
johnjcasey Nov 14, 2023
a1b112c
Fix test cases, spotless
johnjcasey Nov 15, 2023
ad1684a
remove flatting without error collections
johnjcasey Nov 15, 2023
074faf2
fix nullness
johnjcasey Nov 16, 2023
17bf295
spotless + encoding issues
johnjcasey Nov 16, 2023
e2ec57f
spotless
johnjcasey Nov 16, 2023
8b3f052
throw error when error handler isn't used
johnjcasey Nov 21, 2023
525d912
add concrete bad record error handler class
johnjcasey Nov 21, 2023
9b4a348
spotless, fix test category
johnjcasey Nov 22, 2023
d6f4097
fix checkstyle
johnjcasey Nov 22, 2023
a067238
clean up comments
johnjcasey Nov 27, 2023
408bc26
fix test case
johnjcasey Nov 27, 2023
0246a80
Merge remote-tracking branch 'upstream/master' into feature/dead-lett…
johnjcasey Nov 27, 2023
ec50bff
remove "failing transform" field on bad record, add note to CHANGES.md
johnjcasey Nov 27, 2023
b092709
fix failing test cases
johnjcasey Nov 27, 2023
3f1e97c
fix failing test cases
johnjcasey Nov 27, 2023
4356f27
apply spotless
johnjcasey Nov 28, 2023
5067bd1
Merge remote-tracking branch 'upstream/master' into feature/dead-lett…
johnjcasey Dec 4, 2023
864c429
apply final comments
johnjcasey Dec 5, 2023
8307b63
apply final comments
johnjcasey Dec 5, 2023
fad9d56
apply final comments
johnjcasey Dec 5, 2023
c20c2d7
Merge remote-tracking branch 'upstream/master' into feature/dead-lett…
johnjcasey Dec 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
* Running multi-language pipelines locally no longer requires Docker.
Instead, the same (generally auto-started) subprocess used to perform the
expansion can also be used as the cross-language worker.
* Framework for adding Error Handlers to composite transforms added in Java ([#29164](https://github.com/apache/beam/pull/29164))

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ServiceLoader;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
Expand Down Expand Up @@ -502,6 +503,12 @@ public RunnerApi.PTransform translate(
SchemaTranslation.schemaToProto(configRow.getSchema(), true).toByteArray()));
}

for (Entry<String, byte[]> annotation :
appliedPTransform.getTransform().getAnnotations().entrySet()) {
johnjcasey marked this conversation as resolved.
Show resolved Hide resolved
transformBuilder.putAnnotations(
annotation.getKey(), ByteString.copyFrom(annotation.getValue()));
}

return transformBuilder.build();
}
}
Expand Down
24 changes: 24 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables.transform;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand All @@ -43,6 +44,9 @@
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PBegin;
Expand Down Expand Up @@ -318,6 +322,7 @@ public PipelineResult run(PipelineOptions options) {
LOG.debug("Running {} via {}", this, runner);
try {
validate(options);
validateErrorHandlers();
return runner.run(this);
} catch (UserCodeException e) {
// This serves to replace the stack with one that ends here and
Expand All @@ -343,6 +348,13 @@ public SchemaRegistry getSchemaRegistry() {
return schemaRegistry;
}

public <OutputT extends POutput> BadRecordErrorHandler<OutputT> registerBadRecordErrorHandler(
PTransform<PCollection<BadRecord>, OutputT> sinkTransform) {
BadRecordErrorHandler<OutputT> errorHandler = new BadRecordErrorHandler<>(sinkTransform, this);
errorHandlers.add(errorHandler);
return errorHandler;
}

/////////////////////////////////////////////////////////////////////////////
// Below here are operations that aren't normally called by users.

Expand Down Expand Up @@ -511,6 +523,8 @@ public static <InputT extends PInput, OutputT extends POutput> OutputT applyTran
private final Multimap<String, PTransform<?, ?>> instancePerName = ArrayListMultimap.create();
private final PipelineOptions defaultOptions;

private final List<ErrorHandler<?, ?>> errorHandlers = new ArrayList<>();

private Pipeline(TransformHierarchy transforms, PipelineOptions options) {
this.transforms = transforms;
this.defaultOptions = options;
Expand Down Expand Up @@ -715,4 +729,14 @@ public boolean apply(@Nonnull final Map.Entry<K, Collection<V>> input) {
return input != null && input.getValue().size() == 1;
}
}

private void validateErrorHandlers() {
for (ErrorHandler<?, ?> errorHandler : errorHandlers) {
if (!errorHandler.isClosed()) {
throw new IllegalStateException(
"One or more ErrorHandlers aren't closed, and this pipeline"
+ "cannot be run. See the ErrorHandler documentation for expected usage");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
Expand Down Expand Up @@ -216,6 +217,17 @@ public ResourceHints getResourceHints() {
return resourceHints;
}

/** Returns annotations map to provide additional hints to the runner. */
public Map<String, byte[]> getAnnotations() {
return annotations;
}

public PTransform<InputT, OutputT> addAnnotation(
@NonNull String annotationType, byte @NonNull [] annotation) {
annotations.put(annotationType, annotation);
return this;
}

/////////////////////////////////////////////////////////////////////////////

// See the note about about PTransform's fake Serializability, to
Expand All @@ -229,6 +241,8 @@ public ResourceHints getResourceHints() {

protected transient @NonNull ResourceHints resourceHints = ResourceHints.create();

protected transient @NonNull Map<String, byte @NonNull []> annotations = new HashMap<>();

protected PTransform() {
this.name = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms.errorhandling;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.auto.value.AutoValue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Serializable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract class BadRecord implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(BadRecord.class);

/** Information about the record that failed. */
public abstract Record getRecord();

/** Information about why the record failed. */
public abstract Failure getFailure();

public static Builder builder() {
return new AutoValue_BadRecord.Builder();
}

public static Coder<BadRecord> getCoder(Pipeline pipeline) {
try {
SchemaRegistry schemaRegistry = pipeline.getSchemaRegistry();
return SchemaCoder.of(
schemaRegistry.getSchema(BadRecord.class),
TypeDescriptor.of(BadRecord.class),
schemaRegistry.getToRowFunction(BadRecord.class),
schemaRegistry.getFromRowFunction(BadRecord.class));
} catch (NoSuchSchemaException e) {
throw new RuntimeException(e);
}
}

public static <RecordT> BadRecord fromExceptionInformation(
RecordT record,
@Nullable Coder<RecordT> coder,
@Nullable Exception exception,
String description)
throws IOException {
Preconditions.checkArgumentNotNull(record);

// Build up record information
BadRecord.Record.Builder recordBuilder = Record.builder();
recordBuilder.addHumanReadableJson(record).addCoderAndEncodedRecord(coder, record);

// Build up failure information
BadRecord.Failure.Builder failureBuilder = Failure.builder().setDescription(description);

// It's possible for us to want to handle an error scenario where no actual exception object
// exists
if (exception != null) {
failureBuilder.setException(exception.toString()).addExceptionStackTrace(exception);
}

return BadRecord.builder()
.setRecord(recordBuilder.build())
.setFailure(failureBuilder.build())
.build();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setRecord(Record record);

public abstract Builder setFailure(Failure error);

public abstract BadRecord build();
}

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class Record implements Serializable {

/** The failing record, encoded as JSON. Will be null if serialization as JSON fails. */
public abstract @Nullable String getHumanReadableJsonRecord();

/**
* Nullable to account for failing to encode, or if there is no coder for the record at the time
* of failure.
*/
@SuppressWarnings("mutable")
public abstract byte @Nullable [] getEncodedRecord();

/** The coder for the record, or null if there is no coder. */
public abstract @Nullable String getCoder();

public static Builder builder() {
return new AutoValue_BadRecord_Record.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setHumanReadableJsonRecord(@Nullable String jsonRecord);

public Builder addHumanReadableJson(Object record) {
ObjectWriter objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter();
try {
this.setHumanReadableJsonRecord(objectWriter.writeValueAsString(record));
} catch (Exception e) {
LOG.error(
"Unable to serialize record as JSON. Human readable record attempted via .toString",
e);
try {
this.setHumanReadableJsonRecord(record.toString());
} catch (Exception e2) {
LOG.error(
"Unable to serialize record via .toString. Human readable record will be null", e2);
}
}
return this;
}

@SuppressWarnings("mutable")
public abstract Builder setEncodedRecord(byte @Nullable [] encodedRecord);

public abstract Builder setCoder(@Nullable String coder);

public <T> Builder addCoderAndEncodedRecord(@Nullable Coder<T> coder, T record) {
// We will sometimes not have a coder for a failing record, for example if it has already
// been
// modified within the dofn.
if (coder != null) {
this.setCoder(coder.toString());
try {
this.setEncodedRecord(CoderUtils.encodeToByteArray(coder, record));
} catch (IOException e) {
LOG.error(
"Unable to encode failing record using provided coder."
+ " BadRecord will be published without encoded bytes",
e);
}
}
return this;
}

public abstract Record build();
}
}

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class Failure implements Serializable {

/** The exception itself, e.g. IOException. Null if there is a failure without an exception. */
johnjcasey marked this conversation as resolved.
Show resolved Hide resolved
public abstract @Nullable String getException();

/** The full stacktrace. Null if there is a failure without an exception. */
public abstract @Nullable String getExceptionStacktrace();

/** The description of what was being attempted when the failure occurred. */
public abstract String getDescription();

public static Builder builder() {
return new AutoValue_BadRecord_Failure.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setException(@Nullable String exception);

public abstract Builder setExceptionStacktrace(@Nullable String stacktrace);

public Builder addExceptionStackTrace(Exception exception) throws IOException {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
PrintStream printStream = new PrintStream(stream, false, Charsets.UTF_8.name());
exception.printStackTrace(printStream);
printStream.close();

this.setExceptionStacktrace(new String(stream.toByteArray(), Charsets.UTF_8));
return this;
}

public abstract Builder setDescription(String description);

public abstract Failure build();
}
}
}
Loading
Loading