Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11460] Support reading Parquet files with unknown schema #13616

Merged
merged 9 commits into from
Dec 25, 2020
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

## New Features / Improvements

* ParquetIO add methods _readGenericRecords_ and _readFilesGenericRecords_ can read files with an unknown schema. See [PR-13554](https://github.com/apache/beam/pull/13554) and ([BEAM-11460](https://issues.apache.org/jira/browse/BEAM-11460))
* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).

## Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.parquet;

import static java.lang.String.format;
import static org.apache.parquet.Preconditions.checkArgument;
import static org.apache.parquet.Preconditions.checkNotNull;
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;

Expand All @@ -38,21 +39,30 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.ReadFn;
import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.SplitReadFn;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -152,6 +162,44 @@
* *
* }</pre>
*
* <h3>Reading records of an unknown schema</h3>
*
* <p>To read records from files whose schema is unknown at pipeline construction time or differs
* between files, use {@link #parseGenericRecords(SerializableFunction)} - in this case, you will
* need to specify a parsing function for converting each {@link GenericRecord} into a value of your
* custom type.
*
* <p>For example:
*
* <pre>{@code
* Pipeline p = ...;
*
* PCollection<Foo> records =
* p.apply(
* ParquetIO.parseGenericRecords(
* new SerializableFunction<GenericRecord, Foo>() {
* public Foo apply(GenericRecord record) {
* // If needed, access the schema of the record using record.getSchema()
* return ...;
* }
* })
* .setFilePattern(...));
*
* // For reading from files
* PCollection<FileIO.ReadableFile> files = p.apply(...);
*
* PCollection<Foo> records =
* files
* .apply(
* ParquetIO.parseFilesGenericRecords(
* new SerializableFunction<GenericRecord, Foo>() {
* public Foo apply(GenericRecord record) {
* // If needed, access the schema of the record using record.getSchema()
* return ...;
* }
* }));
* }</pre>
*
* <h3>Writing Parquet files</h3>
*
* <p>{@link ParquetIO.Sink} allows you to write a {@link PCollection} of {@link GenericRecord} into
Expand Down Expand Up @@ -202,7 +250,30 @@ public static Read read(Schema schema) {
*/
public static ReadFiles readFiles(Schema schema) {
return new AutoValue_ParquetIO_ReadFiles.Builder()
.setSplittable(false)
.setSchema(schema)
.build();
}

/**
* Reads {@link GenericRecord} from a Parquet file (or multiple Parquet files matching the
* pattern) and converts to user defined type using provided parseFn.
*/
public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
return new AutoValue_ParquetIO_Parse.Builder<T>()
.setParseFn(parseFn)
.setSplittable(false)
.build();
}

/**
* Reads {@link GenericRecord} from Parquet files and converts to user defined type using provided
* {@code parseFn}.
*/
public static <T> ParseFiles<T> parseFilesGenericRecords(
SerializableFunction<GenericRecord, T> parseFn) {
return new AutoValue_ParquetIO_ParseFiles.Builder<T>()
.setParseFn(parseFn)
.setSplittable(false)
.build();
}
Expand Down Expand Up @@ -300,6 +371,121 @@ public void populateDisplayData(DisplayData.Builder builder) {
}
}

/** Implementation of {@link #parseGenericRecords(SerializableFunction)}. */
@AutoValue
public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
abstract @Nullable ValueProvider<String> getFilepattern();

abstract SerializableFunction<GenericRecord, T> getParseFn();

abstract boolean isSplittable();

abstract Builder<T> toBuilder();

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setFilepattern(ValueProvider<String> inputFiles);

abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);

abstract Builder<T> setSplittable(boolean splittable);

abstract Parse<T> build();
}

public Parse<T> from(ValueProvider<String> inputFiles) {
return toBuilder().setFilepattern(inputFiles).build();
}

public Parse<T> from(String inputFiles) {
return from(ValueProvider.StaticValueProvider.of(inputFiles));
}

public Parse<T> withSplit() {
return toBuilder().setSplittable(true).build();
}

@Override
public PCollection<T> expand(PBegin input) {
checkNotNull(getFilepattern(), "Filepattern cannot be null.");
return input
.apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(
parseFilesGenericRecords(getParseFn())
.toBuilder()
.setSplittable(isSplittable())
.build());
}
}

/** Implementation of {@link #parseFilesGenericRecords(SerializableFunction)}. */
@AutoValue
public abstract static class ParseFiles<T>
extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {

abstract SerializableFunction<GenericRecord, T> getParseFn();

abstract boolean isSplittable();

abstract Builder<T> toBuilder();

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);

abstract Builder<T> setSplittable(boolean split);

abstract ParseFiles<T> build();
}

public ParseFiles<T> withSplit() {
return toBuilder().setSplittable(true).build();
}

@Override
public PCollection<T> expand(PCollection<ReadableFile> input) {
checkArgument(!isGenericRecordOutput(), "Parse can't be used for reading as GenericRecord.");

PCollection<T> parsedRecords =
isSplittable()
? input.apply(ParDo.of(new SplitReadFn<>(null, null, getParseFn())))
: input.apply(ParDo.of(new ReadFn<>(null, getParseFn())));

return parsedRecords.setCoder(inferCoder(input.getPipeline().getCoderRegistry()));
}

/** Returns true if expected output is {@code PCollection<GenericRecord>}. */
private boolean isGenericRecordOutput() {
String outputType = TypeDescriptors.outputOf(getParseFn()).getType().getTypeName();
return outputType.equals(GenericRecord.class.getTypeName());
}

/**
* Identifies the {@code Coder} to be used for the output PCollection.
*
* <p>Returns {@link AvroCoder} if expected output is {@link GenericRecord}.
*
* @param coderRegistry the {@link org.apache.beam.sdk.Pipeline}'s CoderRegistry to identify
* Coder for expected output type of {@link #getParseFn()}
*/
private Coder<T> inferCoder(CoderRegistry coderRegistry) {
if (isGenericRecordOutput()) {
throw new IllegalArgumentException("Parse can't be used for reading as GenericRecord.");
}

// If not GenericRecord infer it from ParseFn.
try {
return coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()));
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException(
"Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",
e);
}
}
}

/** Implementation of {@link #readFiles(Schema)}. */
@AutoValue
public abstract static class ReadFiles
Expand Down Expand Up @@ -357,26 +543,35 @@ public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> input)
if (isSplittable()) {
Schema coderSchema = getProjectionSchema() == null ? getSchema() : getEncoderSchema();
return input
.apply(ParDo.of(new SplitReadFn(getAvroDataModel(), getProjectionSchema())))
.apply(
ParDo.of(
new SplitReadFn<>(
getAvroDataModel(),
getProjectionSchema(),
GenericRecordPassthroughFn.create())))
.setCoder(AvroCoder.of(coderSchema));
}
return input
.apply(ParDo.of(new ReadFn(getAvroDataModel())))
.apply(ParDo.of(new ReadFn<>(getAvroDataModel(), GenericRecordPassthroughFn.create())))
.setCoder(AvroCoder.of(getSchema()));
}

@DoFn.BoundedPerElement
static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
static class SplitReadFn<T> extends DoFn<FileIO.ReadableFile, T> {
private Class<? extends GenericData> modelClass;
private static final Logger LOG = LoggerFactory.getLogger(SplitReadFn.class);
private String requestSchemaString;
// Default initial splitting the file into blocks of 64MB. Unit of SPLIT_LIMIT is byte.
private static final long SPLIT_LIMIT = 64000000;

SplitReadFn(GenericData model, Schema requestSchema) {
private final SerializableFunction<GenericRecord, T> parseFn;

SplitReadFn(
GenericData model, Schema requestSchema, SerializableFunction<GenericRecord, T> parseFn) {

this.modelClass = model != null ? model.getClass() : null;
this.requestSchemaString = requestSchema != null ? requestSchema.toString() : null;
this.parseFn = checkNotNull(parseFn, "GenericRecord parse function can't be null");
}

ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws Exception {
Expand All @@ -388,7 +583,7 @@ ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws Exceptio
public void processElement(
@Element FileIO.ReadableFile file,
RestrictionTracker<OffsetRange, Long> tracker,
OutputReceiver<GenericRecord> outputReceiver)
OutputReceiver<T> outputReceiver)
throws Exception {
LOG.debug(
"start "
Expand Down Expand Up @@ -468,7 +663,7 @@ record = recordReader.read();
file.toString());
continue;
}
outputReceiver.output(record);
outputReceiver.output(parseFn.apply(record));
} catch (RuntimeException e) {

throw new ParquetDecodingException(
Expand Down Expand Up @@ -618,12 +813,15 @@ public Progress getProgress() {
}
}

static class ReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
static class ReadFn<T> extends DoFn<FileIO.ReadableFile, T> {

private Class<? extends GenericData> modelClass;

ReadFn(GenericData model) {
private final SerializableFunction<GenericRecord, T> parseFn;

ReadFn(GenericData model, SerializableFunction<GenericRecord, T> parseFn) {
this.modelClass = model != null ? model.getClass() : null;
this.parseFn = checkNotNull(parseFn, "GenericRecord parse function is null");
}

@ProcessElement
Expand All @@ -647,7 +845,7 @@ public void processElement(ProcessContext processContext) throws Exception {
try (ParquetReader<GenericRecord> reader = builder.build()) {
GenericRecord read;
while ((read = reader.read()) != null) {
processContext.output(read);
processContext.output(parseFn.apply(read));
}
}
}
Expand Down Expand Up @@ -838,6 +1036,28 @@ public void close() throws IOException {
}
}

/**
* Passthrough function to provide seamless backward compatibility to ParquetIO's functionality.
*/
@VisibleForTesting
static final class GenericRecordPassthroughFn
implements SerializableFunction<GenericRecord, GenericRecord> {

private static final GenericRecordPassthroughFn singleton = new GenericRecordPassthroughFn();

static GenericRecordPassthroughFn create() {
return singleton;
}

@Override
public GenericRecord apply(GenericRecord input) {
return input;
}

/** Enforce singleton pattern, by disallowing construction with {@code new} operator. */
private GenericRecordPassthroughFn() {}
}

/** Disallow construction of utility class. */
private ParquetIO() {}
}
Loading