Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ public Builder<T> withCompatibility(boolean enableCompatibility) {
@Override
protected ReadSupport<T> getReadSupport() {
if (isReflect) {
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
} else {
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility);
configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility);
}
return new AvroReadSupport<T>(model);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ private Binary() {}

public abstract ByteBuffer toByteBuffer();

public abstract short get2BytesLittleEndian();
public short get2BytesLittleEndian() {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public boolean equals(Object obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ public class CodecFactory implements CompressionCodecFactory {
private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<>();
private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<>();

protected final ParquetConfiguration configuration;
protected final ParquetConfiguration conf;
protected final int pageSize;

// May be null if parquetConfiguration is not an instance of org.apache.parquet.conf.HadoopParquetConfiguration
@Deprecated
protected final Configuration configuration;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added it back to be backward compatible. @amousavigourabi @Fokko


static final BytesDecompressor NO_OP_DECOMPRESSOR = new BytesDecompressor() {
@Override
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) {
Expand Down Expand Up @@ -115,7 +119,12 @@ public CodecFactory(Configuration configuration, int pageSize) {
* decompressors this parameter has no impact on the function of the factory
*/
public CodecFactory(ParquetConfiguration configuration, int pageSize) {
this.configuration = configuration;
if (configuration instanceof HadoopParquetConfiguration) {
this.configuration = ((HadoopParquetConfiguration) configuration).getConfiguration();
} else {
this.configuration = null;
}
this.conf = configuration;
this.pageSize = pageSize;
}

Expand Down Expand Up @@ -293,7 +302,7 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) {
codecClass = new Configuration(false).getClassLoader().loadClass(codecClassName);
}
codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, ConfigurationUtil.createHadoopConfiguration(configuration));
ReflectionUtils.newInstance(codecClass, ConfigurationUtil.createHadoopConfiguration(conf));
CODEC_BY_NAME.put(codecCacheKey, codec);
return codec;
} catch (ClassNotFoundException e) {
Expand All @@ -305,13 +314,13 @@ private String cacheKey(CompressionCodecName codecName) {
String level = null;
switch (codecName) {
case GZIP:
level = configuration.get("zlib.compress.level");
level = conf.get("zlib.compress.level");
break;
case BROTLI:
level = configuration.get("compression.brotli.quality");
level = conf.get("compression.brotli.quality");
break;
case ZSTD:
level = configuration.get("parquet.compression.codec.zstd.level");
level = conf.get("parquet.compression.codec.zstd.level");
break;
default:
// compression level is not supported; ignore it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
Expand Down Expand Up @@ -513,6 +514,20 @@ public void writeBloomFilter(BloomFilter bloomFilter) {
new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
private final MessageType schema;

@Deprecated
public ColumnChunkPageWriteStore(
BytesCompressor compressor,
MessageType schema,
ByteBufferAllocator allocator,
int columnIndexTruncateLength) {
this(
(BytesInputCompressor) compressor,
schema,
allocator,
columnIndexTruncateLength,
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}

public ColumnChunkPageWriteStore(
BytesInputCompressor compressor,
MessageType schema,
Expand All @@ -526,6 +541,16 @@ public ColumnChunkPageWriteStore(
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}

@Deprecated
public ColumnChunkPageWriteStore(
BytesCompressor compressor,
MessageType schema,
ByteBufferAllocator allocator,
int columnIndexTruncateLength,
boolean pageWriteChecksumEnabled) {
this((BytesInputCompressor) compressor, schema, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled);
}

public ColumnChunkPageWriteStore(
BytesInputCompressor compressor,
MessageType schema,
Expand All @@ -550,6 +575,25 @@ public ColumnChunkPageWriteStore(
}
}

@Deprecated
public ColumnChunkPageWriteStore(
BytesCompressor compressor,
MessageType schema,
ByteBufferAllocator allocator,
int columnIndexTruncateLength,
boolean pageWriteChecksumEnabled,
InternalFileEncryptor fileEncryptor,
int rowGroupOrdinal) {
this(
(BytesInputCompressor) compressor,
schema,
allocator,
columnIndexTruncateLength,
pageWriteChecksumEnabled,
fileEncryptor,
rowGroupOrdinal);
}

public ColumnChunkPageWriteStore(
BytesInputCompressor compressor,
MessageType schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ private class ZstdCompressor extends BaseCompressor {

ZstdCompressor() {
context = new ZstdCompressCtx();
context.setLevel(configuration.getInt(
context.setLevel(conf.getInt(
ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL));
context.setWorkers(configuration.getInt(
context.setWorkers(conf.getInt(
ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,27 +196,31 @@ public static class Builder<T> {
private final Path path;
private Filter filter = null;
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
protected ParquetConfiguration conf;
protected ParquetConfiguration configuration;
private ParquetReadOptions.Builder optionsBuilder;

// May be null if parquetConfiguration is not an instance of org.apache.parquet.conf.HadoopParquetConfiguration
@Deprecated
protected Configuration conf;

@Deprecated
private Builder(ReadSupport<T> readSupport, Path path) {
this.readSupport = Objects.requireNonNull(readSupport, "readSupport cannot be null");
this.file = null;
this.path = Objects.requireNonNull(path, "path cannot be null");
Configuration hadoopConf = new Configuration();
this.conf = new HadoopParquetConfiguration(hadoopConf);
this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path);
this.conf = new Configuration();
this.configuration = new HadoopParquetConfiguration(this.conf);
this.optionsBuilder = HadoopReadOptions.builder(this.conf, path);
}

@Deprecated
protected Builder(Path path) {
this.readSupport = null;
this.file = null;
this.path = Objects.requireNonNull(path, "path cannot be null");
Configuration hadoopConf = new Configuration();
this.conf = new HadoopParquetConfiguration(hadoopConf);
this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path);
this.conf = new Configuration();
this.configuration = new HadoopParquetConfiguration(this.conf);
this.optionsBuilder = HadoopReadOptions.builder(this.conf, path);
}

protected Builder(InputFile file) {
Expand All @@ -225,9 +229,9 @@ protected Builder(InputFile file) {
this.path = null;
if (file instanceof HadoopInputFile) {
HadoopInputFile hadoopFile = (HadoopInputFile) file;
Configuration hadoopConf = hadoopFile.getConfiguration();
this.conf = new HadoopParquetConfiguration(hadoopConf);
optionsBuilder = HadoopReadOptions.builder(hadoopConf, hadoopFile.getPath());
this.conf = hadoopFile.getConfiguration();
this.configuration = new HadoopParquetConfiguration(this.conf);
optionsBuilder = HadoopReadOptions.builder(this.conf, hadoopFile.getPath());
} else {
optionsBuilder = ParquetReadOptions.builder(new HadoopParquetConfiguration());
}
Expand All @@ -237,19 +241,20 @@ protected Builder(InputFile file, ParquetConfiguration conf) {
this.readSupport = null;
this.file = Objects.requireNonNull(file, "file cannot be null");
this.path = null;
this.conf = conf;
this.configuration = conf;
if (file instanceof HadoopInputFile) {
this.conf = ConfigurationUtil.createHadoopConfiguration(conf);
HadoopInputFile hadoopFile = (HadoopInputFile) file;
optionsBuilder = HadoopReadOptions.builder(
ConfigurationUtil.createHadoopConfiguration(conf), hadoopFile.getPath());
optionsBuilder = HadoopReadOptions.builder(this.conf, hadoopFile.getPath());
} else {
optionsBuilder = ParquetReadOptions.builder(conf);
}
}

// when called, resets options to the defaults from conf
public Builder<T> withConf(Configuration conf) {
this.conf = new HadoopParquetConfiguration(Objects.requireNonNull(conf, "conf cannot be null"));
this.conf = Objects.requireNonNull(conf, "conf cannot be null");
this.configuration = new HadoopParquetConfiguration(this.conf);

// previous versions didn't use the builder, so may set filter before conf. this maintains
// compatibility for filter. other options are reset by a new conf.
Expand All @@ -262,7 +267,7 @@ public Builder<T> withConf(Configuration conf) {
}

public Builder<T> withConf(ParquetConfiguration conf) {
this.conf = conf;
this.configuration = conf;
this.optionsBuilder = ParquetReadOptions.builder(conf);
if (filter != null) {
optionsBuilder.withRecordFilter(filter);
Expand Down Expand Up @@ -383,7 +388,7 @@ public ParquetReader<T> build() throws IOException {
ParquetReadOptions options = optionsBuilder.withAllocator(allocator).build();

if (path != null) {
Configuration hadoopConf = ConfigurationUtil.createHadoopConfiguration(conf);
Configuration hadoopConf = ConfigurationUtil.createHadoopConfiguration(configuration);
FileSystem fs = path.getFileSystem(hadoopConf);
FileStatus stat = fs.getFileStatus(path);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
Expand All @@ -43,6 +44,33 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
private final MemoryManager memoryManager;
private final CodecFactory codecFactory;

@Deprecated
public ParquetRecordWriter(
ParquetFileWriter w,
WriteSupport<T> writeSupport,
MessageType schema,
Map<String, String> extraMetaData,
int blockSize,
int pageSize,
BytesCompressor compressor,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
WriterVersion writerVersion) {
this(
w,
writeSupport,
schema,
extraMetaData,
blockSize,
pageSize,
(BytesInputCompressor) compressor,
dictionaryPageSize,
enableDictionary,
validating,
writerVersion);
}

/**
* @param w the file to write to
* @param writeSupport the class to convert incoming records
Expand Down Expand Up @@ -81,6 +109,35 @@ public ParquetRecordWriter(
this.codecFactory = null;
}

@Deprecated
public ParquetRecordWriter(
ParquetFileWriter w,
WriteSupport<T> writeSupport,
MessageType schema,
Map<String, String> extraMetaData,
long blockSize,
int pageSize,
BytesCompressor compressor,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
WriterVersion writerVersion,
MemoryManager memoryManager) {
this(
w,
writeSupport,
schema,
extraMetaData,
blockSize,
pageSize,
(BytesInputCompressor) compressor,
dictionaryPageSize,
enableDictionary,
validating,
writerVersion,
memoryManager);
}

/**
* @param w the file to write to
* @param writeSupport the class to convert incoming records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ public interface StateVisitor<R, S> {

R visit(StringType stringType, S state);

R visit(UUIDType uuidType, S state);
default R visit(UUIDType uuidType, S state) {
throw new UnsupportedOperationException("Not implemented");
}
}

/**
Expand Down Expand Up @@ -166,7 +168,9 @@ public interface TypeVisitor {

void visit(StringType stringType);

void visit(UUIDType uuidType);
default void visit(UUIDType uuidType) {
throw new UnsupportedOperationException("Not implemented");
}
}

/**
Expand Down
Loading