Skip to content

Commit b5f207f

Browse files
committed
PARQUET-212: Add Configuration to the ThriftRecordConverter ctor.
Previously, this added Configurable to ThriftRecordConverter, but that caused problems with semver and test failures because a separate initialize method had to be called. This is brittle because callers might not know to call initialize and the class is part of the API because subclassing is allowed. To avoid the issue, this replaces the Configurable interface and initialize method with a new constructor that takes a Configuration.
1 parent b87eb65 commit b5f207f

File tree

5 files changed

+84
-40
lines changed

5 files changed

+84
-40
lines changed

parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeRecordConverter.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.parquet.scrooge;
2020

21+
import org.apache.hadoop.conf.Configuration;
2122
import org.apache.thrift.TException;
2223
import org.apache.thrift.protocol.TProtocol;
2324

@@ -31,16 +32,24 @@
3132

3233
public class ScroogeRecordConverter<T extends ThriftStruct> extends ThriftRecordConverter<T> {
3334

34-
35+
/**
36+
* This is for compatibility only.
37+
* @deprecated will be removed in 2.x
38+
*/
39+
@Deprecated
3540
public ScroogeRecordConverter(final Class<T> thriftClass, MessageType parquetSchema, StructType thriftType) {
41+
this(thriftClass, parquetSchema, thriftType, null);
42+
}
43+
44+
public ScroogeRecordConverter(final Class<T> thriftClass, MessageType parquetSchema, StructType thriftType, Configuration conf) {
3645
super(new ThriftReader<T>() {
3746
@SuppressWarnings("unchecked")
3847
ThriftStructCodec<T> codec = (ThriftStructCodec<T>) getCodec(thriftClass);
3948
@Override
4049
public T readOneRecord(TProtocol protocol) throws TException {
4150
return codec.decode(protocol);
4251
}
43-
}, thriftClass.getSimpleName(), parquetSchema, thriftType);
52+
}, thriftClass.getSimpleName(), parquetSchema, thriftType, conf);
4453
}
4554

4655
private static ThriftStructCodec<?> getCodec(Class<?> klass) {

parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.parquet.hadoop.thrift;
2020

2121
import java.lang.reflect.Constructor;
22+
import java.lang.reflect.InvocationTargetException;
2223
import java.util.Map;
2324
import java.util.Set;
2425

@@ -225,23 +226,55 @@ public RecordMaterializer<T> prepareForRead(Configuration configuration,
225226
ThriftMetaData thriftMetaData = ThriftMetaData.fromExtraMetaData(keyValueMetaData);
226227
try {
227228
initThriftClass(thriftMetaData, configuration);
229+
} catch (ClassNotFoundException e) {
230+
throw new RuntimeException("Cannot find Thrift object class for metadata: " + thriftMetaData, e);
231+
}
232+
233+
// if there was not metadata in the file, get it from requested class
234+
if (thriftMetaData == null) {
235+
thriftMetaData = ThriftMetaData.fromThriftClass(thriftClass);
236+
}
228237

229-
// if there was not metadata in the file, get it from requested class
230-
if (thriftMetaData == null) {
231-
thriftMetaData = ThriftMetaData.fromThriftClass(thriftClass);
238+
String converterClassName = configuration.get(RECORD_CONVERTER_CLASS_KEY, RECORD_CONVERTER_DEFAULT);
239+
return getRecordConverterInstance(converterClassName, thriftClass,
240+
readContext.getRequestedSchema(), thriftMetaData.getDescriptor(),
241+
configuration);
242+
}
243+
244+
@SuppressWarnings("unchecked")
245+
private static <T> ThriftRecordConverter<T> getRecordConverterInstance(
246+
String converterClassName, Class<T> thriftClass,
247+
MessageType requestedSchema, StructType descriptor, Configuration conf) {
248+
Class<ThriftRecordConverter<T>> converterClass;
249+
try {
250+
converterClass = (Class<ThriftRecordConverter<T>>) Class.forName(converterClassName);
251+
} catch (ClassNotFoundException e) {
252+
throw new RuntimeException("Cannot find Thrift converter class: " + converterClassName, e);
253+
}
254+
255+
try {
256+
// first try the new version that accepts a Configuration
257+
try {
258+
Constructor<ThriftRecordConverter<T>> constructor =
259+
converterClass.getConstructor(Class.class, MessageType.class, StructType.class, Configuration.class);
260+
return constructor.newInstance(thriftClass, requestedSchema, descriptor, conf);
261+
} catch (IllegalAccessException e) {
262+
// try the other constructor pattern
263+
} catch (NoSuchMethodException e) {
264+
// try to find the other constructor pattern
232265
}
233266

234-
String converterClassName = configuration.get(RECORD_CONVERTER_CLASS_KEY, RECORD_CONVERTER_DEFAULT);
235-
@SuppressWarnings("unchecked")
236-
Class<ThriftRecordConverter<T>> converterClass = (Class<ThriftRecordConverter<T>>) Class.forName(converterClassName);
237267
Constructor<ThriftRecordConverter<T>> constructor =
238268
converterClass.getConstructor(Class.class, MessageType.class, StructType.class);
239-
ThriftRecordConverter<T> converter = constructor.newInstance(thriftClass, readContext.getRequestedSchema(), thriftMetaData.getDescriptor());
240-
converter.setConf(configuration);
241-
converter.initialize();
242-
return converter;
243-
} catch (Exception t) {
244-
throw new RuntimeException("Unable to create Thrift Converter for Thrift metadata " + thriftMetaData, t);
269+
return constructor.newInstance(thriftClass, requestedSchema, descriptor);
270+
} catch (InstantiationException e) {
271+
throw new RuntimeException("Failed to construct Thrift converter class: " + converterClassName, e);
272+
} catch (InvocationTargetException e) {
273+
throw new RuntimeException("Failed to construct Thrift converter class: " + converterClassName, e);
274+
} catch (IllegalAccessException e) {
275+
throw new RuntimeException("Cannot access constructor for Thrift converter class: " + converterClassName, e);
276+
} catch (NoSuchMethodException e) {
277+
throw new RuntimeException("Cannot find constructor for Thrift converter class: " + converterClassName, e);
245278
}
246279
}
247280
}

parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.parquet.thrift;
2020

21+
import org.apache.hadoop.conf.Configuration;
2122
import org.apache.thrift.TBase;
2223
import org.apache.thrift.TException;
2324
import org.apache.thrift.protocol.TProtocol;
@@ -28,7 +29,16 @@
2829

2930
public class TBaseRecordConverter<T extends TBase<?,?>> extends ThriftRecordConverter<T> {
3031

32+
/**
33+
* This is for compatibility only.
34+
* @deprecated will be removed in 2.x
35+
*/
36+
@Deprecated
3137
public TBaseRecordConverter(final Class<T> thriftClass, MessageType requestedParquetSchema, StructType thriftType) {
38+
this(thriftClass, requestedParquetSchema, thriftType, null);
39+
}
40+
41+
public TBaseRecordConverter(final Class<T> thriftClass, MessageType requestedParquetSchema, StructType thriftType, Configuration conf) {
3242
super(new ThriftReader<T>() {
3343
@Override
3444
public T readOneRecord(TProtocol protocol) throws TException {
@@ -42,7 +52,7 @@ public T readOneRecord(TProtocol protocol) throws TException {
4252
throw new ParquetDecodingException("Thrift class or constructor not public " + thriftClass, e);
4353
}
4454
}
45-
}, thriftClass.getSimpleName(), requestedParquetSchema, thriftType);
55+
}, thriftClass.getSimpleName(), requestedParquetSchema, thriftType, conf);
4656
}
4757

4858
}

parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
*
6666
* @param <T>
6767
*/
68-
public class ThriftRecordConverter<T> extends RecordMaterializer<T> implements Configurable {
68+
public class ThriftRecordConverter<T> extends RecordMaterializer<T> {
6969

7070
private static final Log LOG = Log.getLog(ThriftRecordConverter.class);
7171

@@ -861,49 +861,41 @@ public void end() {
861861
}
862862
private final ThriftReader<T> thriftReader;
863863
private final ParquetReadProtocol protocol;
864-
private final MessageType requestedParquetSchema;
865-
private final String name;
866864
private GroupConverter structConverter;
867865
private List<TProtocol> rootEvents = new ArrayList<TProtocol>();
868866
private boolean missingRequiredFieldsInProjection = false;
869-
private Configuration conf = null;
870867
private boolean ignoreNullElements = IGNORE_NULL_LIST_ELEMENTS_DEFAULT;
871868

869+
/**
870+
* This is for compatibility only.
871+
* @deprecated will be removed in 2.x
872+
*/
873+
@Deprecated
874+
public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType) {
875+
this(thriftReader, name, requestedParquetSchema, thriftType, null);
876+
}
877+
872878
/**
873879
*
874880
* @param thriftReader the class responsible for instantiating the final object and read from the protocol
875881
* @param name the name of that type ( the thrift class simple name)
876882
* @param requestedParquetSchema the schema for the incoming columnar events
877883
* @param thriftType the thrift type descriptor
884+
* @param conf a Configuration
878885
*/
879-
public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType) {
886+
public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType, Configuration conf) {
880887
super();
881888
this.thriftReader = thriftReader;
882-
this.name = name;
883-
this.requestedParquetSchema = requestedParquetSchema;
884889
this.protocol = new ParquetReadProtocol();
885890
this.thriftType = thriftType;
886-
}
887-
888-
public void initialize() {
889-
MessageType fullSchema = ThriftSchemaConverter.convertWithoutProjection(thriftType);
890-
missingRequiredFieldsInProjection = hasMissingRequiredFieldInGroupType(requestedParquetSchema, fullSchema);
891-
this.structConverter = new StructConverter(rootEvents, requestedParquetSchema, new ThriftField(name, (short)0, Requirement.REQUIRED, thriftType));
892-
}
893-
894-
@Override
895-
public void setConf(Configuration configuration) {
896-
this.conf = configuration;
897891
if (conf != null) {
898892
this.ignoreNullElements = conf.getBoolean(
899893
IGNORE_NULL_LIST_ELEMENTS,
900894
IGNORE_NULL_LIST_ELEMENTS_DEFAULT);
901895
}
902-
}
903-
904-
@Override
905-
public Configuration getConf() {
906-
return conf;
896+
MessageType fullSchema = ThriftSchemaConverter.convertWithoutProjection(thriftType);
897+
missingRequiredFieldsInProjection = hasMissingRequiredFieldInGroupType(requestedParquetSchema, fullSchema);
898+
this.structConverter = new StructConverter(rootEvents, requestedParquetSchema, new ThriftField(name, (short)0, Requirement.REQUIRED, thriftType));
907899
}
908900

909901
private boolean hasMissingRequiredFieldInGroupType(GroupType requested, GroupType fullSchema) {

parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestArrayCompatibility.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ public void write(RecordConsumer rc) {
331331
fail("Should fail: locations are optional and not ignored");
332332
} catch (RuntimeException e) {
333333
// e is a RuntimeException wrapping the decoding exception
334-
assertTrue(e.getCause().getMessage().contains("locations"));
334+
assertTrue(e.getCause().getCause().getMessage().contains("locations"));
335335
}
336336

337337
assertReaderContains(readerIgnoreNulls(test, ListOfLocations.class), expected);
@@ -615,7 +615,7 @@ public void write(RecordConsumer rc) {
615615
fail("Should fail: locations are optional and not ignored");
616616
} catch (RuntimeException e) {
617617
// e is a RuntimeException wrapping the decoding exception
618-
assertTrue(e.getCause().getMessage().contains("locations"));
618+
assertTrue(e.getCause().getCause().getMessage().contains("locations"));
619619
}
620620

621621
assertReaderContains(readerIgnoreNulls(test, ListOfLocations.class), expected);

0 commit comments

Comments
 (0)