Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
// TODO: This file has become too long!
// TODO: Lets split it up: https://issues.apache.org/jira/browse/PARQUET-310
public class ParquetMetadataConverter {
private ParquetMetadataConverter() { }

public static final MetadataFilter NO_FILTER = new NoFilter();
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
Expand All @@ -87,7 +86,7 @@ private ParquetMetadataConverter() { }
private static final ConcurrentHashMap<Set<org.apache.parquet.column.Encoding>, Set<org.apache.parquet.column.Encoding>>
cachedEncodingSets = new ConcurrentHashMap<Set<org.apache.parquet.column.Encoding>, Set<org.apache.parquet.column.Encoding>>();

public static FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata) {
public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata) {
List<BlockMetaData> blocks = parquetMetadata.getBlocks();
List<RowGroup> rowGroups = new ArrayList<RowGroup>();
int numRows = 0;
Expand All @@ -111,13 +110,13 @@ public static FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata
}

// Visible for testing
static List<SchemaElement> toParquetSchema(MessageType schema) {
List<SchemaElement> toParquetSchema(MessageType schema) {
List<SchemaElement> result = new ArrayList<SchemaElement>();
addToList(result, schema);
return result;
}

private static void addToList(final List<SchemaElement> result, org.apache.parquet.schema.Type field) {
private void addToList(final List<SchemaElement> result, org.apache.parquet.schema.Type field) {
field.accept(new TypeVisitor() {
@Override
public void visit(PrimitiveType primitiveType) {
Expand Down Expand Up @@ -164,7 +163,7 @@ private void visitChildren(final List<SchemaElement> result,
});
}

private static void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block) {
private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block) {
//rowGroup.total_byte_size = ;
List<ColumnChunkMetaData> columns = block.getColumns();
List<ColumnChunk> parquetColumns = new ArrayList<ColumnChunk>();
Expand Down Expand Up @@ -193,7 +192,7 @@ private static void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup>
rowGroups.add(rowGroup);
}

private static List<Encoding> toFormatEncodings(Set<org.apache.parquet.column.Encoding> encodings) {
private List<Encoding> toFormatEncodings(Set<org.apache.parquet.column.Encoding> encodings) {
List<Encoding> converted = new ArrayList<Encoding>(encodings.size());
for (org.apache.parquet.column.Encoding encoding : encodings) {
converted.add(getEncoding(encoding));
Expand All @@ -202,7 +201,7 @@ private static List<Encoding> toFormatEncodings(Set<org.apache.parquet.column.En
}

// Visible for testing
static Set<org.apache.parquet.column.Encoding> fromFormatEncodings(List<Encoding> encodings) {
Set<org.apache.parquet.column.Encoding> fromFormatEncodings(List<Encoding> encodings) {
Set<org.apache.parquet.column.Encoding> converted = new HashSet<org.apache.parquet.column.Encoding>();

for (Encoding encoding : encodings) {
Expand All @@ -225,11 +224,11 @@ static Set<org.apache.parquet.column.Encoding> fromFormatEncodings(List<Encoding
return cached;
}

public static org.apache.parquet.column.Encoding getEncoding(Encoding encoding) {
public org.apache.parquet.column.Encoding getEncoding(Encoding encoding) {
return org.apache.parquet.column.Encoding.valueOf(encoding.name());
}

public static Encoding getEncoding(org.apache.parquet.column.Encoding encoding) {
public Encoding getEncoding(org.apache.parquet.column.Encoding encoding) {
return Encoding.valueOf(encoding.name());
}

Expand Down Expand Up @@ -270,7 +269,7 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist
return stats;
}

public static PrimitiveTypeName getPrimitive(Type type) {
public PrimitiveTypeName getPrimitive(Type type) {
switch (type) {
case BYTE_ARRAY: // TODO: rename BINARY and remove this switch
return PrimitiveTypeName.BINARY;
Expand All @@ -294,7 +293,7 @@ public static PrimitiveTypeName getPrimitive(Type type) {
}

// Visible for testing
static Type getType(PrimitiveTypeName type) {
Type getType(PrimitiveTypeName type) {
switch (type) {
case INT64:
return Type.INT64;
Expand All @@ -318,7 +317,7 @@ static Type getType(PrimitiveTypeName type) {
}

// Visible for testing
static OriginalType getOriginalType(ConvertedType type) {
OriginalType getOriginalType(ConvertedType type) {
switch (type) {
case UTF8:
return OriginalType.UTF8;
Expand Down Expand Up @@ -366,7 +365,7 @@ static OriginalType getOriginalType(ConvertedType type) {
}

// Visible for testing
static ConvertedType getConvertedType(OriginalType type) {
ConvertedType getConvertedType(OriginalType type) {
switch (type) {
case UTF8:
return ConvertedType.UTF8;
Expand Down Expand Up @@ -487,7 +486,7 @@ public String toString() {
}

@Deprecated
public static ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
public ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
return readParquetMetadata(from, NO_FILTER);
}

Expand Down Expand Up @@ -524,7 +523,7 @@ static long getOffset(ColumnChunk columnChunk) {
return offset;
}

public static ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
@Override
public FileMetaData visit(NoFilter filter) throws IOException {
Expand All @@ -547,7 +546,7 @@ public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
return parquetMetadata;
}

public static ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException {
public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException {
MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
List<RowGroup> row_groups = parquetMetadata.getRow_groups();
Expand Down Expand Up @@ -606,15 +605,15 @@ private static ColumnPath getPath(ColumnMetaData metaData) {
}

// Visible for testing
static MessageType fromParquetSchema(List<SchemaElement> schema) {
MessageType fromParquetSchema(List<SchemaElement> schema) {
Iterator<SchemaElement> iterator = schema.iterator();
SchemaElement root = iterator.next();
Types.MessageTypeBuilder builder = Types.buildMessage();
buildChildren(builder, iterator, root.getNum_children());
return builder.named(root.name);
}

private static void buildChildren(Types.GroupBuilder builder,
private void buildChildren(Types.GroupBuilder builder,
Iterator<SchemaElement> schema,
int childrenCount) {
for (int i = 0; i < childrenCount; i++) {
Expand Down Expand Up @@ -654,17 +653,17 @@ private static void buildChildren(Types.GroupBuilder builder,
}

// Visible for testing
static FieldRepetitionType toParquetRepetition(Repetition repetition) {
FieldRepetitionType toParquetRepetition(Repetition repetition) {
return FieldRepetitionType.valueOf(repetition.name());
}

// Visible for testing
static Repetition fromParquetRepetition(FieldRepetitionType repetition) {
Repetition fromParquetRepetition(FieldRepetitionType repetition) {
return Repetition.valueOf(repetition.name());
}

@Deprecated
public static void writeDataPageHeader(
public void writeDataPageHeader(
int uncompressedSize,
int compressedSize,
int valueCount,
Expand All @@ -681,7 +680,7 @@ public static void writeDataPageHeader(
valuesEncoding), to);
}

public static void writeDataPageHeader(
public void writeDataPageHeader(
int uncompressedSize,
int compressedSize,
int valueCount,
Expand All @@ -696,7 +695,7 @@ public static void writeDataPageHeader(
to);
}

private static PageHeader newDataPageHeader(
private PageHeader newDataPageHeader(
int uncompressedSize, int compressedSize,
int valueCount,
org.apache.parquet.column.statistics.Statistics statistics,
Expand All @@ -717,7 +716,7 @@ private static PageHeader newDataPageHeader(
return pageHeader;
}

public static void writeDataPageV2Header(
public void writeDataPageV2Header(
int uncompressedSize, int compressedSize,
int valueCount, int nullCount, int rowCount,
org.apache.parquet.column.statistics.Statistics statistics,
Expand All @@ -733,7 +732,7 @@ public static void writeDataPageV2Header(
rlByteLength, dlByteLength), to);
}

private static PageHeader newDataPageV2Header(
private PageHeader newDataPageV2Header(
int uncompressedSize, int compressedSize,
int valueCount, int nullCount, int rowCount,
org.apache.parquet.column.statistics.Statistics<?> statistics,
Expand All @@ -753,7 +752,7 @@ private static PageHeader newDataPageV2Header(
return pageHeader;
}

public static void writeDictionaryPageHeader(
public void writeDictionaryPageHeader(
int uncompressedSize, int compressedSize, int valueCount,
org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
class ColumnChunkPageWriteStore implements PageWriteStore {
private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);

private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();

private static final class ColumnChunkPageWriter implements PageWriter {

private final ColumnDescriptor path;
Expand Down Expand Up @@ -92,7 +94,7 @@ public void writePage(BytesInput bytes,
+ compressedSize);
}
tempOutputStream.reset();
ParquetMetadataConverter.writeDataPageHeader(
parquetMetadataConverter.writeDataPageHeader(
(int)uncompressedSize,
(int)compressedSize,
valueCount,
Expand Down Expand Up @@ -131,7 +133,7 @@ public void writePageV2(
compressedData.size() + repetitionLevels.size() + definitionLevels.size()
);
tempOutputStream.reset();
ParquetMetadataConverter.writeDataPageV2Header(
parquetMetadataConverter.writeDataPageV2Header(
uncompressedSize, compressedSize,
valueCount, nullCount, rowCount,
statistics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public class ParquetFileReader implements Closeable {

private static final Log LOG = Log.getLog(ParquetFileReader.class);

public static final String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";
public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";

private static ParquetMetadataConverter converter = new ParquetMetadataConverter();

/**
* for files provided, check if there's a summary file.
Expand Down Expand Up @@ -426,7 +428,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
throw new RuntimeException("corrupted file: the footer index is not within the file");
}
f.seek(footerIndex);
return ParquetMetadataConverter.readParquetMetadata(f, filter);
return converter.readParquetMetadata(f, filter);
} finally {
f.close();
}
Expand Down Expand Up @@ -573,7 +575,7 @@ public ColumnChunkPageReader readAllPages() throws IOException {
this.readAsBytesInput(compressedPageSize),
uncompressedPageSize,
dicHeader.getNum_values(),
ParquetMetadataConverter.getEncoding(dicHeader.getEncoding())
converter.getEncoding(dicHeader.getEncoding())
);
break;
case DATA_PAGE:
Expand All @@ -587,9 +589,9 @@ public ColumnChunkPageReader readAllPages() throws IOException {
createdBy,
dataHeaderV1.getStatistics(),
descriptor.col.getType()),
ParquetMetadataConverter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
ParquetMetadataConverter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
ParquetMetadataConverter.getEncoding(dataHeaderV1.getEncoding())
converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
converter.getEncoding(dataHeaderV1.getEncoding())
));
valuesCountReadSoFar += dataHeaderV1.getNum_values();
break;
Expand All @@ -603,7 +605,7 @@ public ColumnChunkPageReader readAllPages() throws IOException {
dataHeaderV2.getNum_values(),
this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
ParquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
converter.getEncoding(dataHeaderV2.getEncoding()),
this.readAsBytesInput(dataSize),
uncompressedPageSize,
fromParquetStatistics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
public class ParquetFileWriter {
private static final Log LOG = Log.getLog(ParquetFileWriter.class);

private static ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();

public static final String PARQUET_METADATA_FILE = "_metadata";
public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
Expand Down Expand Up @@ -302,7 +304,7 @@ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOExceptio
currentChunkDictionaryPageOffset = out.getPos();
int uncompressedSize = dictionaryPage.getUncompressedSize();
int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
ParquetMetadataConverter.writeDictionaryPageHeader(
metadataConverter.writeDictionaryPageHeader(
uncompressedSize,
compressedPageSize,
dictionaryPage.getDictionarySize(),
Expand Down Expand Up @@ -337,7 +339,7 @@ public void writeDataPage(
long beforeHeader = out.getPos();
if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
int compressedPageSize = (int)bytes.size();
ParquetMetadataConverter.writeDataPageHeader(
metadataConverter.writeDataPageHeader(
uncompressedPageSize, compressedPageSize,
valueCount,
rlEncoding,
Expand Down Expand Up @@ -374,7 +376,7 @@ public void writeDataPage(
long beforeHeader = out.getPos();
if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
int compressedPageSize = (int)bytes.size();
ParquetMetadataConverter.writeDataPageHeader(
metadataConverter.writeDataPageHeader(
uncompressedPageSize, compressedPageSize,
valueCount,
statistics,
Expand Down Expand Up @@ -467,7 +469,7 @@ public void end(Map<String, String> extraMetaData) throws IOException {

private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
long footerIndex = out.getPos();
org.apache.parquet.format.FileMetaData parquetMetadata = ParquetMetadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
writeFileMetaData(parquetMetadata, out);
if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
Expand Down
Loading