Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11460] Support reading Parquet files with unknown schema #13554

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* ReadFromMongoDB/WriteToMongoDB will mask password in display_data (Python) ([BEAM-11444](https://issues.apache.org/jira/browse/BEAM-11444).)
* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* There is a new transform `ReadAllFromBigQuery` that can receive multiple requests to read data from BigQuery at pipeline runtime. See [PR 13170](https://github.com/apache/beam/pull/13170), and [BEAM-9650](https://issues.apache.org/jira/browse/BEAM-9650).
* ParquetIO can now read files with an unknown schema. See [PR-13554](https://github.com/apache/beam/pull/13554) and ([BEAM-11460](https://issues.apache.org/jira/browse/BEAM-11460))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this up to the 2.28.0 section. The 2.27.0 was already merged (sorry I missed this in previous check.


## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.parquet;

import static java.lang.String.format;
import static org.apache.parquet.Preconditions.checkArgument;
import static org.apache.parquet.Preconditions.checkNotNull;
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;

Expand All @@ -38,21 +39,30 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.ReadFn;
import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.SplitReadFn;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -152,6 +162,38 @@
* *
* }</pre>
*
* <h3>Reading records of an unknown schema</h3>
*
* <p>To read records from files whose schema is unknown at pipeline construction time or differs
* between files, use {@link #parseGenericRecords(SerializableFunction)} - in this case, you will
* need to specify a parsing function for converting each {@link GenericRecord} into a value of your
* custom type.
*
* <p>For example:
*
* <pre>{@code
* Pipeline p = ...;
*
* PCollection<Foo> records =
* p.apply(ParquetIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
* public Foo apply(GenericRecord record) {
* // If needed, access the schema of the record using record.getSchema()
* return ...;
* }}));
*
* // For reading from filepatterns
* PCollection<String> filepatterns = p.apply(...);
anantdamle marked this conversation as resolved.
Show resolved Hide resolved
*
* PCollection<Foo> records =
* filepatterns
anantdamle marked this conversation as resolved.
Show resolved Hide resolved
* .apply(ParquetIO.parseFilesGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
* public Foo apply(GenericRecord record) {
* // If needed, access the schema of the record using record.getSchema()
* return ...;
anantdamle marked this conversation as resolved.
Show resolved Hide resolved
* }
* }));
* }</pre>
*
* <h3>Writing Parquet files</h3>
*
* <p>{@link ParquetIO.Sink} allows you to write a {@link PCollection} of {@link GenericRecord} into
Expand Down Expand Up @@ -202,7 +244,30 @@ public static Read read(Schema schema) {
*/
public static ReadFiles readFiles(Schema schema) {
return new AutoValue_ParquetIO_ReadFiles.Builder()
.setSplittable(false)
.setSchema(schema)
.build();
}

/**
* Reads {@link GenericRecord} from a Parquet file (or multiple Parquet files matching the
* pattern) and converts to user defined type using provided parseFn.
*/
public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
return new AutoValue_ParquetIO_Parse.Builder<T>()
.setParseFn(parseFn)
.setSplittable(false)
.build();
}

/**
* Reads {@link GenericRecord} from Parquet files and converts to user defined type using provided
* {@code parseFn}.
*/
public static <T> ParseFiles<T> parseFilesGenericRecords(
SerializableFunction<GenericRecord, T> parseFn) {
return new AutoValue_ParquetIO_ParseFiles.Builder<T>()
.setParseFn(parseFn)
.setSplittable(false)
.build();
}
Expand Down Expand Up @@ -300,6 +365,121 @@ public void populateDisplayData(DisplayData.Builder builder) {
}
}

/** Implementation of {@link #parseGenericRecords(SerializableFunction)}. */
@AutoValue
public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
abstract @Nullable ValueProvider<String> getFilepattern();

abstract SerializableFunction<GenericRecord, T> getParseFn();

abstract boolean isSplittable();

abstract Builder<T> toBuilder();

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setFilepattern(ValueProvider<String> inputFiles);

abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);

abstract Builder<T> setSplittable(boolean splittable);

abstract Parse<T> build();
}

public Parse<T> from(ValueProvider<String> inputFiles) {
return toBuilder().setFilepattern(inputFiles).build();
}

public Parse<T> from(String inputFiles) {
return from(ValueProvider.StaticValueProvider.of(inputFiles));
}

public Parse<T> withSplit() {
return toBuilder().setSplittable(true).build();
}

@Override
public PCollection<T> expand(PBegin input) {
checkNotNull(getFilepattern(), "Filepattern cannot be null.");
return input
.apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(
parseFilesGenericRecords(getParseFn())
.toBuilder()
.setSplittable(isSplittable())
.build());
}
}

/** Implementation of {@link #parseFilesGenericRecords(SerializableFunction)}. */
@AutoValue
public abstract static class ParseFiles<T>
extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {

abstract SerializableFunction<GenericRecord, T> getParseFn();

abstract boolean isSplittable();

abstract Builder<T> toBuilder();

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);

abstract Builder<T> setSplittable(boolean split);

abstract ParseFiles<T> build();
}

public ParseFiles<T> withSplit() {
return toBuilder().setSplittable(true).build();
}

@Override
public PCollection<T> expand(PCollection<ReadableFile> input) {
checkArgument(!isGenericRecordOutput(), "Parse can't be used for reading as GenericRecord.");

PCollection<T> parsedRecords =
isSplittable()
? input.apply(ParDo.of(new SplitReadFn<>(null, null, getParseFn())))
: input.apply(ParDo.of(new ReadFn<>(null, getParseFn())));

return parsedRecords.setCoder(inferCoder(input.getPipeline().getCoderRegistry()));
}

/** Returns true if expected output is {@code PCollection<GenericRecord>}. */
private boolean isGenericRecordOutput() {
String outputType = TypeDescriptors.outputOf(getParseFn()).getType().getTypeName();
return outputType.equals(GenericRecord.class.getTypeName());
}

/**
* Identifies the {@code Coder} to be used for the output PCollection.
*
* <p>Returns {@link AvroCoder} if expected output is {@link GenericRecord}.
*
* @param coderRegistry the {@link org.apache.beam.sdk.Pipeline}'s CoderRegistry to identify
* Coder for expected output type of {@link #getParseFn()}
*/
private Coder<T> inferCoder(CoderRegistry coderRegistry) {
if (isGenericRecordOutput()) {
throw new IllegalArgumentException("Parse can't be used for reading as GenericRecord.");
}

// If not GenericRecord infer it from ParseFn.
try {
return coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()));
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException(
"Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",
e);
}
}
}

/** Implementation of {@link #readFiles(Schema)}. */
@AutoValue
public abstract static class ReadFiles
Expand Down Expand Up @@ -357,26 +537,35 @@ public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> input)
if (isSplittable()) {
Schema coderSchema = getProjectionSchema() == null ? getSchema() : getEncoderSchema();
return input
.apply(ParDo.of(new SplitReadFn(getAvroDataModel(), getProjectionSchema())))
.apply(
ParDo.of(
new SplitReadFn<>(
getAvroDataModel(),
getProjectionSchema(),
GenericRecordPassthroughFn.create())))
.setCoder(AvroCoder.of(coderSchema));
}
return input
.apply(ParDo.of(new ReadFn(getAvroDataModel())))
.apply(ParDo.of(new ReadFn<>(getAvroDataModel(), GenericRecordPassthroughFn.create())))
.setCoder(AvroCoder.of(getSchema()));
}

@DoFn.BoundedPerElement
static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
static class SplitReadFn<T> extends DoFn<FileIO.ReadableFile, T> {
private Class<? extends GenericData> modelClass;
private static final Logger LOG = LoggerFactory.getLogger(SplitReadFn.class);
private String requestSchemaString;
// Default initial splitting the file into blocks of 64MB. Unit of SPLIT_LIMIT is byte.
private static final long SPLIT_LIMIT = 64000000;

SplitReadFn(GenericData model, Schema requestSchema) {
private final SerializableFunction<GenericRecord, T> parseFn;

SplitReadFn(
GenericData model, Schema requestSchema, SerializableFunction<GenericRecord, T> parseFn) {

this.modelClass = model != null ? model.getClass() : null;
this.requestSchemaString = requestSchema != null ? requestSchema.toString() : null;
this.parseFn = checkNotNull(parseFn, "GenericRecord parse function can't be null");
}

ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws Exception {
Expand All @@ -388,7 +577,7 @@ ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws Exceptio
public void processElement(
@Element FileIO.ReadableFile file,
RestrictionTracker<OffsetRange, Long> tracker,
OutputReceiver<GenericRecord> outputReceiver)
OutputReceiver<T> outputReceiver)
throws Exception {
LOG.debug(
"start "
Expand Down Expand Up @@ -468,7 +657,7 @@ record = recordReader.read();
file.toString());
continue;
}
outputReceiver.output(record);
outputReceiver.output(parseFn.apply(record));
} catch (RuntimeException e) {

throw new ParquetDecodingException(
Expand Down Expand Up @@ -618,12 +807,16 @@ public Progress getProgress() {
}
}

static class ReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
static class ReadFn<T> extends DoFn<FileIO.ReadableFile, T> {

private Class<? extends GenericData> modelClass;

ReadFn(GenericData model) {
private final SerializableFunction<GenericRecord, T> parseFn;

ReadFn(GenericData model, SerializableFunction<GenericRecord, T> parseFn) {

anantdamle marked this conversation as resolved.
Show resolved Hide resolved
this.modelClass = model != null ? model.getClass() : null;
this.parseFn = checkNotNull(parseFn, "GenericRecord parse function is null");
}

@ProcessElement
Expand All @@ -647,7 +840,7 @@ public void processElement(ProcessContext processContext) throws Exception {
try (ParquetReader<GenericRecord> reader = builder.build()) {
GenericRecord read;
while ((read = reader.read()) != null) {
processContext.output(read);
processContext.output(parseFn.apply(read));
}
}
}
Expand Down Expand Up @@ -838,6 +1031,23 @@ public void close() throws IOException {
}
}

/**
* Passthrough function to provide seamless backward compatibility to ParquetIO's functionality.
*/
@VisibleForTesting
static class GenericRecordPassthroughFn
implements SerializableFunction<GenericRecord, GenericRecord> {

static GenericRecordPassthroughFn create() {
return new GenericRecordPassthroughFn();
anantdamle marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public GenericRecord apply(GenericRecord input) {
return input;
}
}

/** Disallow construction of utility class. */
private ParquetIO() {}
}
Loading