Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,15 @@ public Footer call() throws Exception {
}
}

/**
* Read the footers of all the files under that path (recursively)
* not using summary files.
*/
public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus, boolean skipRowGroups) throws IOException {
List<FileStatus> statuses = listFiles(configuration, fileStatus);
return readAllFootersInParallel(configuration, statuses, skipRowGroups);
}

/**
* Read the footers of all the files under that path (recursively)
* not using summary files.
Expand All @@ -259,10 +268,10 @@ public Footer call() throws Exception {
* @throws IOException
*/
public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus) throws IOException {
List<FileStatus> statuses = listFiles(configuration, fileStatus);
return readAllFootersInParallel(configuration, statuses, false);
return readAllFootersInParallel(configuration, fileStatus, false);
}


@Deprecated
public static List<Footer> readFooters(Configuration configuration, Path path) throws IOException {
return readFooters(configuration, status(configuration, path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
import org.apache.hadoop.fs.Path;

import org.apache.parquet.Log;
import org.apache.parquet.Preconditions;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand Down Expand Up @@ -474,30 +476,83 @@ private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream o
org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
writeFileMetaData(parquetMetadata, out);
if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
out.write(MAGIC);
}

/**
* Given a list of metadata files, merge them into a single ParquetMetadata
* Requires that the schemas be compatible, and the extraMetadata be exactly equal.
*/
public static ParquetMetadata mergeMetadataFiles(List<Path> files, Configuration conf) throws IOException {
Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list of metadata");

GlobalMetaData globalMetaData = null;
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();

for (Path p : files) {
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, p, ParquetMetadataConverter.NO_FILTER);
FileMetaData fmd = pmd.getFileMetaData();
globalMetaData = mergeInto(fmd, globalMetaData, true);
blocks.addAll(pmd.getBlocks());
}

// collapse GlobalMetaData into a single FileMetaData, which will throw if they are not compatible
return new ParquetMetadata(globalMetaData.merge(), blocks);
}

/**
* Given a list of metadata files, merge them into a single metadata file.
* Requires that the schemas be compatible, and the extraMetaData be exactly equal.
* This is useful when merging 2 directories of parquet files into a single directory, as long
* as both directories were written with compatible schemas and equal extraMetaData.
*/
public static void writeMergedMetadataFile(List<Path> files, Path outputPath, Configuration conf) throws IOException {
ParquetMetadata merged = mergeMetadataFiles(files, conf);
writeMetadataFile(outputPath, merged, outputPath.getFileSystem(conf));
}

/**
* writes a _metadata and _common_metadata file
* @param configuration the configuration to use to get the FileSystem
* @param outputPath the directory to write the _metadata file to
* @param footers the list of footers to merge
* @deprecated use the variant of writeMetadataFile that takes a {@link JobSummaryLevel} as an argument.
* @throws IOException
*/
@Deprecated
public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
writeMetadataFile(configuration, outputPath, footers, JobSummaryLevel.ALL);
}

/**
* writes _common_metadata file, and optionally a _metadata file depending on the {@link JobSummaryLevel} provided
*/
public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers, JobSummaryLevel level) throws IOException {
Preconditions.checkArgument(level == JobSummaryLevel.ALL || level == JobSummaryLevel.COMMON_ONLY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since JobSummaryLevel is an enum, I guess it's really to guard against your own code than a user mistake.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's to guard against adding new enum values to JobSummaryLevel, but not updating the places where it's used. Similar to having a default clause in all your switch statements. (it's too bad there aren't exhaustive switch statements).

"Unsupported level: " + level);

FileSystem fs = outputPath.getFileSystem(configuration);
outputPath = outputPath.makeQualified(fs);
ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);

if (level == JobSummaryLevel.ALL) {
writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
}

metadataFooter.getBlocks().clear();
writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
}

private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
private static void writeMetadataFile(Path outputPathRoot, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
throws IOException {
Path metaDataPath = new Path(outputPathRoot, parquetMetadataFile);
writeMetadataFile(metaDataPath, metadataFooter, fs);
}

private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs)
throws IOException {
Path metaDataPath = new Path(outputPath, parquetMetadataFile);
FSDataOutputStream metadata = fs.create(metaDataPath);
FSDataOutputStream metadata = fs.create(outputPath);
metadata.write(MAGIC);
serializeFooter(metadataFooter, metadata);
metadata.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;

import org.apache.parquet.Log;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.apache.parquet.hadoop.util.ContextUtil;

public class ParquetOutputCommitter extends FileOutputCommitter {
Expand All @@ -48,30 +49,63 @@ public void commitJob(JobContext jobContext) throws IOException {
writeMetaDataFile(configuration,outputPath);
}

// TODO: This method should propagate errors, and we should clean up
// TODO: all the catching of Exceptions below -- see PARQUET-383
public static void writeMetaDataFile(Configuration configuration, Path outputPath) {
if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
JobSummaryLevel level = ParquetOutputFormat.getJobSummaryLevel(configuration);
if (level == JobSummaryLevel.NONE) {
return;
}

try {
final FileSystem fileSystem = outputPath.getFileSystem(configuration);
FileStatus outputStatus = fileSystem.getFileStatus(outputPath);
List<Footer> footers;

switch (level) {
case ALL:
footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus, false); // don't skip row groups
break;
case COMMON_ONLY:
footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus, true); // skip row groups
break;
default:
throw new IllegalArgumentException("Unrecognized job summary level: " + level);
}

// If there are no footers, _metadata file cannot be written since there is no way to determine schema!
// Onus of writing any summary files lies with the caller in this case.
if (footers.isEmpty()) {
return;
}

try {
final FileSystem fileSystem = outputPath.getFileSystem(configuration);
FileStatus outputStatus = fileSystem.getFileStatus(outputPath);
List<Footer> footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus);
// If there are no footers, _metadata file cannot be written since there is no way to determine schema!
// Onus of writing any summary files lies with the caller in this case.
if (footers.isEmpty()) {
return;
}
ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers, level);
} catch (Exception e) {
LOG.warn("could not write summary file(s) for " + outputPath, e);

final Path metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE);

try {
ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers);
} catch (Exception e) {
LOG.warn("could not write summary file for " + outputPath, e);
final Path metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE);
if (fileSystem.exists(metadataPath)) {
fileSystem.delete(metadataPath, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the committer were to fail, I guess cleaning after would not be necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, do files written by the committer need to be deleted, or are they being written to a temp dir anyway?

}
} catch (Exception e2) {
LOG.warn("could not delete metadata file" + outputPath, e2);
}
} catch (Exception e) {
LOG.warn("could not write summary file for " + outputPath, e);

try {
final Path commonMetadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE);
if (fileSystem.exists(commonMetadataPath)) {
fileSystem.delete(commonMetadataPath, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if previous fs operations failed, this would never be called either

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks also it fixes PARQUET-359 (see #258)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha nice.

Yeah so we need to wrap the above deletion in a try / catch (though some of those method return boolean instead of throwing, let me check)

}
} catch (Exception e2) {
LOG.warn("could not delete metadata file" + outputPath, e2);
}

}
} catch (Exception e) {
LOG.warn("could not write summary file for " + outputPath, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,33 @@
public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
private static final Log LOG = Log.getLog(ParquetOutputFormat.class);

public static enum JobSummaryLevel {
/**
* Write no summary files
*/
NONE,
/**
* Write both summary file with row group info and summary file without
* (both _metadata and _common_metadata)
*/
ALL,
/**
* Write only the summary file without the row group info
* (_common_metadata only)
*/
COMMON_ONLY
}

/**
* An alias for JOB_SUMMARY_LEVEL, where true means ALL and false means NONE
*/
@Deprecated
public static final String ENABLE_JOB_SUMMARY = "parquet.enable.summary-metadata";

/**
* Must be one of the values in {@link JobSummaryLevel} (case insensitive)
*/
public static final String JOB_SUMMARY_LEVEL = "parquet.summary.metadata.level";
public static final String BLOCK_SIZE = "parquet.block.size";
public static final String PAGE_SIZE = "parquet.page.size";
public static final String COMPRESSION = "parquet.compression";
Expand All @@ -111,14 +138,36 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
public static final String ENABLE_DICTIONARY = "parquet.enable.dictionary";
public static final String VALIDATION = "parquet.validation";
public static final String WRITER_VERSION = "parquet.writer.version";
public static final String ENABLE_JOB_SUMMARY = "parquet.enable.summary-metadata";
public static final String MEMORY_POOL_RATIO = "parquet.memory.pool.ratio";
public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size";
public static final String MAX_PADDING_BYTES = "parquet.writer.max-padding";

// default to no padding for now
private static final int DEFAULT_MAX_PADDING_SIZE = 0;

public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
String level = conf.get(JOB_SUMMARY_LEVEL);
String deprecatedFlag = conf.get(ENABLE_JOB_SUMMARY);

if (deprecatedFlag != null) {
LOG.warn("Setting " + ENABLE_JOB_SUMMARY + " is deprecated, please use " + JOB_SUMMARY_LEVEL);
}

if (level != null && deprecatedFlag != null) {
LOG.warn("Both " + JOB_SUMMARY_LEVEL + " and " + ENABLE_JOB_SUMMARY + " are set! " + ENABLE_JOB_SUMMARY + " will be ignored.");
}

if (level != null) {
return JobSummaryLevel.valueOf(level.toUpperCase());
}

if (deprecatedFlag != null) {
return Boolean.valueOf(deprecatedFlag) ? JobSummaryLevel.ALL : JobSummaryLevel.NONE;
}

return JobSummaryLevel.ALL;
}

public static void setWriteSupportClass(Job job, Class<?> writeSupportClass) {
getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* An example file writer class.
Expand Down Expand Up @@ -70,6 +72,7 @@ public static Builder builder(Path file) {

public static class Builder extends ParquetWriter.Builder<Group, Builder> {
private MessageType type = null;
private Map<String, String> extraMetaData = new HashMap<String, String>();

private Builder(Path file) {
super(file);
Expand All @@ -80,14 +83,20 @@ public Builder withType(MessageType type) {
return this;
}

public Builder withExtraMetaData(Map<String, String> extraMetaData) {
this.extraMetaData = extraMetaData;
return this;
}

@Override
protected Builder self() {
return this;
}

@Override
protected WriteSupport<Group> getWriteSupport(Configuration conf) {
return new GroupWriteSupport(type);
return new GroupWriteSupport(type, extraMetaData);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;

Expand All @@ -45,14 +46,21 @@ public static MessageType getSchema(Configuration configuration) {
return parseMessageType(checkNotNull(configuration.get(PARQUET_EXAMPLE_SCHEMA), PARQUET_EXAMPLE_SCHEMA));
}

private MessageType schema = null;
private MessageType schema;
private GroupWriter groupWriter;
private Map<String, String> extraMetaData;

public GroupWriteSupport() {
this(null, new HashMap<String, String>());
}

GroupWriteSupport(MessageType schema) {
this(schema, new HashMap<String, String>());
}

GroupWriteSupport(MessageType schema, Map<String, String> extraMetaData) {
this.schema = schema;
this.extraMetaData = extraMetaData;
}

@Override
Expand All @@ -61,7 +69,7 @@ public org.apache.parquet.hadoop.api.WriteSupport.WriteContext init(Configuratio
if (schema == null) {
schema = getSchema(configuration);
}
return new WriteContext(schema, new HashMap<String, String>());
return new WriteContext(schema, this.extraMetaData);
}

@Override
Expand Down
Loading