Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.parquet;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.parquet.SemanticVersion.SemanticVersionParseException;
import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.VersionParser.VersionParseException;
Expand All @@ -31,6 +33,8 @@
* and thus it's statistics should be ignored / not trusted.
*/
public class CorruptStatistics {
private static final AtomicBoolean alreadyLogged = new AtomicBoolean(false);

private static final Log LOG = Log.getLog(CorruptStatistics.class);

// the version in which the bug described by jira: PARQUET-251 was fixed
Expand All @@ -52,7 +56,7 @@ public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName
if (Strings.isNullOrEmpty(createdBy)) {
// created_by is not populated, which could have been caused by
// parquet-mr during the same time as PARQUET-251, see PARQUET-297
LOG.info("Ignoring statistics because created_by is null or empty! See PARQUET-251 and PARQUET-297");
warnOnce("Ignoring statistics because created_by is null or empty! See PARQUET-251 and PARQUET-297");
return true;
}

Expand All @@ -65,16 +69,16 @@ public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName
}

if (Strings.isNullOrEmpty(version.version)) {
LOG.warn("Ignoring statistics because created_by did not contain a semver (see PARQUET-251): " + createdBy);
warnOnce("Ignoring statistics because created_by did not contain a semver (see PARQUET-251): " + createdBy);
return true;
}

SemanticVersion semver = SemanticVersion.parse(version.version);

if (semver.compareTo(PARQUET_251_FIXED_VERSION) < 0) {
LOG.info("Ignoring statistics because this file was created prior to "
warnOnce("Ignoring statistics because this file was created prior to "
+ PARQUET_251_FIXED_VERSION
+ ", see PARQUET-251" );
+ ", see PARQUET-251");
return true;
}

Expand All @@ -83,22 +87,30 @@ public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName
} catch (RuntimeException e) {
// couldn't parse the created_by field, log what went wrong, don't trust the stats,
// but don't make this fatal.
warnParseError(createdBy, e);
warnParseErrorOnce(createdBy, e);
return true;
} catch (SemanticVersionParseException e) {
// couldn't parse the created_by field, log what went wrong, don't trust the stats,
// but don't make this fatal.
warnParseError(createdBy, e);
warnParseErrorOnce(createdBy, e);
return true;
} catch (VersionParseException e) {
// couldn't parse the created_by field, log what went wrong, don't trust the stats,
// but don't make this fatal.
warnParseError(createdBy, e);
warnParseErrorOnce(createdBy, e);
return true;
}
}

private static void warnParseError(String createdBy, Throwable e) {
LOG.warn("Ignoring statistics because created_by could not be parsed (see PARQUET-251): " + createdBy, e);
private static void warnParseErrorOnce(String createdBy, Throwable e) {
if(!alreadyLogged.getAndSet(true)) {
LOG.warn("Ignoring statistics because created_by could not be parsed (see PARQUET-251): " + createdBy, e);
}
}

private static void warnOnce(String message) {
if(!alreadyLogged.getAndSet(true)) {
LOG.warn(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@ public class ScroogeStructConverterTest {
*/
private void shouldConvertConsistentlyWithThriftStructConverter(Class scroogeClass) throws ClassNotFoundException {
Class<? extends TBase<?, ?>> thriftClass = (Class<? extends TBase<?, ?>>)Class.forName(scroogeClass.getName().replaceFirst("org.apache.parquet.scrooge.test", "org.apache.parquet.thrift.test"));
ThriftType.StructType structFromThriftSchemaConverter = new ThriftSchemaConverter().toStructType(thriftClass);
ThriftType.StructType structFromThriftSchemaConverter = ThriftSchemaConverter.toStructType(thriftClass);
ThriftType.StructType structFromScroogeSchemaConverter = new ScroogeStructConverter().convert(scroogeClass);

assertEquals(toParquetSchema(structFromThriftSchemaConverter), toParquetSchema(structFromScroogeSchemaConverter));
}

private MessageType toParquetSchema(ThriftType.StructType struct) {
ThriftSchemaConverter sc = new ThriftSchemaConverter();
return sc.convert(struct);
return ThriftSchemaConverter.convertWithoutProjection(struct);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ protected void init(Class<T> thriftClass) {
this.thriftClass = thriftClass;
this.thriftStruct = getThriftStruct();

ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
this.schema = thriftSchemaConverter.convert(thriftStruct);
this.schema = ThriftSchemaConverter.convertWithoutProjection(thriftStruct);

final Map<String, String> extraMetaData = new ThriftMetaData(thriftClass.getName(), thriftStruct).toExtraMetaData();
// adding the Pig schema as it would have been mapped from thrift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public TBaseWriteSupport(Class<T> thriftClass) {

@Override
protected StructType getThriftStruct() {
ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
return thriftSchemaConverter.toStructType((Class<TBase<?, ?>>)thriftClass);
return ThriftSchemaConverter.toStructType(thriftClass);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ public WriteContext init(Configuration configuration) {
} else {
thriftClass = TBaseWriteSupport.getThriftClass(configuration);
}
ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
this.thriftStruct = thriftSchemaConverter.toStructType(thriftClass);
this.schema = thriftSchemaConverter.convert(thriftStruct);
this.thriftStruct = ThriftSchemaConverter.toStructType(thriftClass);
this.schema = ThriftSchemaConverter.convertWithoutProjection(thriftStruct);
if (buffered) {
readToWrite = new BufferedProtocolReadToWrite(thriftStruct, errorHandler);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public static Set<String> getThriftClassNames(Map<String, Set<String>> fileMetad

@Override
public String toString() {
return "ThriftMetaData" + toExtraMetaData();
return String.format("ThriftMetaData(thriftClassName: %s, descriptor: %s)", thriftClassName, descriptor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;

import org.apache.parquet.io.ParquetDecodingException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TField;
import org.apache.thrift.protocol.TList;
Expand Down Expand Up @@ -62,7 +63,7 @@
*/
public class ThriftRecordConverter<T> extends RecordMaterializer<T> {

final ParquetProtocol readFieldEnd = new ParquetProtocol("readFieldEnd()") {
final static ParquetProtocol readFieldEnd = new ParquetProtocol("readFieldEnd()") {
@Override
public void readFieldEnd() throws TException {
}
Expand All @@ -75,7 +76,7 @@ public void readFieldEnd() throws TException {
* @author Julien Le Dem
*
*/
class PrimitiveFieldHandler extends PrimitiveConverter {
static class PrimitiveFieldHandler extends PrimitiveConverter {

private final PrimitiveConverter delegate;
private final List<TProtocol> events;
Expand Down Expand Up @@ -154,7 +155,7 @@ public void addLong(long value) {
* @author Julien Le Dem
*
*/
class GroupFieldhandler extends GroupConverter {
static class GroupFieldhandler extends GroupConverter {

private final GroupConverter delegate;
private final List<TProtocol> events;
Expand Down Expand Up @@ -203,7 +204,7 @@ interface Counter {
* @author Julien Le Dem
*
*/
class GroupCounter extends GroupConverter implements Counter {
static class GroupCounter extends GroupConverter implements Counter {

private final GroupConverter delegate;
private int count;
Expand Down Expand Up @@ -246,7 +247,7 @@ public int getCount() {
* @author Julien Le Dem
*
*/
class PrimitiveCounter extends PrimitiveConverter implements Counter {
static class PrimitiveCounter extends PrimitiveConverter implements Counter {

private final PrimitiveConverter delegate;
private int count;
Expand Down Expand Up @@ -309,7 +310,7 @@ public int getCount() {
* @author Julien Le Dem
*
*/
class FieldPrimitiveConverter extends PrimitiveConverter {
static class FieldPrimitiveConverter extends PrimitiveConverter {

private final List<TProtocol> events;
private ThriftTypeID type;
Expand Down Expand Up @@ -400,7 +401,7 @@ public long readI64() throws TException {
* @author Julien Le Dem
*
*/
class FieldStringConverter extends PrimitiveConverter {
static class FieldStringConverter extends PrimitiveConverter {

private final List<TProtocol> events;

Expand Down Expand Up @@ -429,14 +430,15 @@ public ByteBuffer readBinary() throws TException {
* @author Julien Le Dem
*
*/
class FieldEnumConverter extends PrimitiveConverter {
static class FieldEnumConverter extends PrimitiveConverter {

private final List<TProtocol> events;

private Map<Binary, Integer> enumLookup = new HashMap<Binary, Integer>();
private final Map<Binary, Integer> enumLookup = new HashMap<Binary, Integer>();
private final ThriftField field;

public FieldEnumConverter(List<TProtocol> events, ThriftField field) {
this.events = events;
this.field = field;
final Iterable<EnumValue> values = ((EnumType)field.getType()).getValues();
for (EnumValue enumValue : values) {
enumLookup.put(Binary.fromString(enumValue.getName()), enumValue.getId());
Expand All @@ -445,7 +447,16 @@ public FieldEnumConverter(List<TProtocol> events, ThriftField field) {

@Override
public void addBinary(final Binary value) {
final int id = enumLookup.get(value);
final Integer id = enumLookup.get(value);

if (id == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, as far as I can tell, this implementation is faster than:

try {
  final int id = enumLookup.get(value);
} catch (NullPointerException e) {
  throw new ParquetDecodingException(...)
}

I thought maybe the above would be better because there's sort of a hint to the branch predictor that the 'normal' case is for this not to throw so predict that... but i did a small benchmark and the if statement seems to be faster

throw new ParquetDecodingException("Unrecognized enum value: "
+ value.toStringUsingUTF8()
+ " known values: "
+ enumLookup
+ " in " + this.field);
}

events.add(new ParquetProtocol("readI32() enum") {
@Override
public int readI32() throws TException {
Expand All @@ -461,7 +472,7 @@ public int readI32() throws TException {
* @author Julien Le Dem
*
*/
class MapConverter extends GroupConverter {
static class MapConverter extends GroupConverter {

private final GroupCounter child;
private final List<TProtocol> mapEvents = new ArrayList<TProtocol>();
Expand Down Expand Up @@ -523,7 +534,7 @@ public TMap readMapBegin() throws TException {
* @author Julien Le Dem
*
*/
class MapKeyValueConverter extends GroupConverter {
static class MapKeyValueConverter extends GroupConverter {

private Converter keyConverter;
private Converter valueConverter;
Expand Down Expand Up @@ -561,7 +572,7 @@ public void end() {
* @author Julien Le Dem
*
*/
class SetConverter extends CollectionConverter {
static class SetConverter extends CollectionConverter {

final ParquetProtocol readSetEnd = new ParquetProtocol("readSetEnd()") {
@Override
Expand Down Expand Up @@ -598,7 +609,7 @@ void collectionEnd() {
* @author Julien Le Dem
*
*/
class ListConverter extends CollectionConverter {
static class ListConverter extends CollectionConverter {

final ParquetProtocol readListEnd = new ParquetProtocol("readListEnd()") {
@Override
Expand Down Expand Up @@ -635,7 +646,7 @@ void collectionEnd() {
* @author Julien Le Dem
*
*/
abstract class CollectionConverter extends GroupConverter {
static abstract class CollectionConverter extends GroupConverter {

private final Converter child;
private final Counter childCounter;
Expand Down Expand Up @@ -696,7 +707,7 @@ public void end() {
* @author Julien Le Dem
*
*/
class StructConverter extends GroupConverter {
static class StructConverter extends GroupConverter {

private final int schemaSize;

Expand Down Expand Up @@ -794,7 +805,7 @@ public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, MessageT
this.thriftReader = thriftReader;
this.protocol = new ParquetReadProtocol();
this.thriftType = thriftType;
MessageType fullSchema = new ThriftSchemaConverter().convert(thriftType);
MessageType fullSchema = ThriftSchemaConverter.convertWithoutProjection(thriftType);
missingRequiredFieldsInProjection = hasMissingRequiredFieldInGroupType(requestedParquetSchema, fullSchema);
this.structConverter = new StructConverter(rootEvents, requestedParquetSchema, new ThriftField(name, (short)0, Requirement.REQUIRED, thriftType));
}
Expand Down Expand Up @@ -863,7 +874,7 @@ public GroupConverter getRootConverter() {
return structConverter;
}

private Converter newConverter(List<TProtocol> events, Type type, ThriftField field) {
private static Converter newConverter(List<TProtocol> events, Type type, ThriftField field) {
switch (field.getType().getType()) {
case LIST:
return new ListConverter(events, type.asGroupType(), field);
Expand Down
Loading