Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@
import static java.lang.String.format;
import static org.apache.parquet.Log.DEBUG;
import static org.apache.parquet.Preconditions.checkNotNull;
import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED_DEFAULT;
import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;

class InternalParquetRecordReader<T> {
private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);

private ColumnIOFactory columnIOFactory = null;
private final Filter filter;
private boolean filterRecords = true;

private MessageType requestedSchema;
private MessageType fileSchema;
Expand Down Expand Up @@ -130,7 +133,8 @@ private void checkRead() throws IOException {
if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
recordReader = columnIO.getRecordReader(pages, recordConverter,
filterRecords ? filter : FilterCompat.NOOP);
startedAssemblingCurrentBlockAt = System.currentTimeMillis();
totalCountLoadedSoFar += pages.getRowCount();
++ currentBlock;
Expand Down Expand Up @@ -173,6 +177,8 @@ public void initialize(ParquetFileReader reader, Configuration configuration)
this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
this.total = reader.getRecordCount();
this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
this.filterRecords = configuration.getBoolean(
RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT);
LOG.info("RecordReader initialized will read a total of " + total + " records.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED_DEFAULT;
import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED_DEFAULT;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -621,11 +625,13 @@ void filterRowGroups(FilterCompat.Filter filter) throws IOException {
// set up data filters based on configured levels
List<RowGroupFilter.FilterLevel> levels = new ArrayList<RowGroupFilter.FilterLevel>();

if (conf.getBoolean("parquet.filter.statistics.enabled", true)) {
if (conf.getBoolean(
STATS_FILTERING_ENABLED, STATS_FILTERING_ENABLED_DEFAULT)) {
levels.add(STATISTICS);
}

if (conf.getBoolean("parquet.filter.dictionary.enabled", false)) {
if (conf.getBoolean(
DICTIONARY_FILTERING_ENABLED, DICTIONARY_FILTERING_ENABLED_DEFAULT)) {
levels.add(DICTIONARY);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,24 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
*/
public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";

/**
* key to configure whether record-level filtering is enabled
*/
public static final String RECORD_FILTERING_ENABLED = "parquet.filter.record-level.enabled";
static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;

/**
* key to configure whether row group stats filtering is enabled
*/
public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled";
static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;

/**
* key to configure whether row group dictionary filtering is enabled
*/
public static final String DICTIONARY_FILTERING_ENABLED = "parquet.filter.dictionary.enabled";
static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = false;

/**
* key to turn on or off task side metadata loading (default true)
* if true then metadata is read on the task side and some tasks may finish immediately.
Expand Down