Skip to content

Commit

Permalink
Remove CsvIOParseResult (#31819)
Browse files Browse the repository at this point in the history
  • Loading branch information
damondouglas authored Jul 10, 2024
1 parent b129433 commit f72f6ce
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 99 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@
*/
package org.apache.beam.sdk.io.csv;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;

/**
* Skeleton for error handling in CsvIO that transforms a {@link FileIO.ReadableFile} into the
* result of parsing.
*/
// TODO(https://github.com/apache/beam/issues/31736): Plan completion in future PR after
// dependencies are completed.
class CsvIOReadFiles<T> extends PTransform<PCollection<FileIO.ReadableFile>, CsvIOParseResult<T>> {
// dependencies are completed.
class CsvIOReadFiles<T> extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
/** Stores required parameters for parsing. */
private final CsvIOParseConfiguration.Builder configBuilder;

Expand All @@ -39,16 +38,11 @@ class CsvIOReadFiles<T> extends PTransform<PCollection<FileIO.ReadableFile>, Csv
}

/** {@link PTransform} that parses and relays the filename associated with each error. */
// TODO: complete expand method to unsure parsing from FileIO.ReadableFile to CsvIOParseResult.
@Override
public CsvIOParseResult<T> expand(PCollection<FileIO.ReadableFile> input) {
public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) {
// TODO(https://github.com/apache/beam/issues/31736): Needed to prevent check errors, will
// remove with future PR.
// remove with future PR.
configBuilder.build();
TupleTag<T> outputTag = new TupleTag<>();
TupleTag<CsvIOParseError> errorTag = new TupleTag<>();
Pipeline p = input.getPipeline();
PCollectionTuple tuple = PCollectionTuple.empty(p);
return CsvIOParseResult.of(outputTag, errorTag, tuple);
return input.apply(ParDo.of(new DoFn<FileIO.ReadableFile, T>() {}));
}
}

0 comments on commit f72f6ce

Please sign in to comment.