Skip to content

Commit

Permalink
[CsvIO] Changed CsvIOParseConfiguration to include DLQ (#31852)
Browse files Browse the repository at this point in the history
* Create CsvIOParseConfiguration class

* Altered CsvIOParseConfiguration class to include Dead Letter Queue messages

* Altered CsvIOParseConfiguration class to allow for Dead Letter Queue processing

* removed System.out.println(input)

---------

Co-authored-by: Lahari Guduru <lahariguduru@google.com>
  • Loading branch information
lahariguduru and lahariguduru authored Jul 12, 2024
1 parent 441840a commit eb59788
Showing 1 changed file with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@
import java.util.Map;
import java.util.Optional;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.csv.CSVFormat;

/** Stores parameters needed for CSV record parsing. */
@AutoValue
abstract class CsvIOParseConfiguration {

/** A Dead Letter Queue that returns potential errors with {@link BadRecord}. */
final PTransform<PCollection<BadRecord>, PCollection<BadRecord>> errorHandlerTransform =
new BadRecordOutput();

static Builder builder() {
return new AutoValue_CsvIOParseConfiguration.Builder();
}

/**
* The expected <a
* href="https://javadoc.io/doc/org.apache.commons/commons-csv/1.8/org/apache/commons/csv/CSVFormat.html">CSVFormat</a>
* of the parsed CSV record.
*/
/** The expected {@link CSVFormat} of the parsed CSV record. */
abstract CSVFormat getCsvFormat();

/** The expected {@link Schema} of the target type. */
Expand Down Expand Up @@ -66,4 +71,20 @@ final CsvIOParseConfiguration build() {
return autoBuild();
}
}

private static class BadRecordOutput
extends PTransform<PCollection<BadRecord>, PCollection<BadRecord>> {

@Override
public PCollection<BadRecord> expand(PCollection<BadRecord> input) {
return input.apply(ParDo.of(new BadRecordTransformFn()));
}

private static class BadRecordTransformFn extends DoFn<BadRecord, BadRecord> {
@ProcessElement
public void process(@Element BadRecord input, OutputReceiver<BadRecord> receiver) {
receiver.output(input);
}
}
}
}

0 comments on commit eb59788

Please sign in to comment.