diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index f74e57caf6..d43fd7d840 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -46,6 +46,8 @@ import static java.lang.String.format; import static org.apache.parquet.Log.DEBUG; import static org.apache.parquet.Preconditions.checkNotNull; +import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED_DEFAULT; import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING; class InternalParquetRecordReader { @@ -53,6 +55,7 @@ class InternalParquetRecordReader { private ColumnIOFactory columnIOFactory = null; private final Filter filter; + private boolean filterRecords = true; private MessageType requestedSchema; private MessageType fileSchema; @@ -130,7 +133,8 @@ private void checkRead() throws IOException { if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema); MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking); - recordReader = columnIO.getRecordReader(pages, recordConverter, filter); + recordReader = columnIO.getRecordReader(pages, recordConverter, + filterRecords ? filter : FilterCompat.NOOP); startedAssemblingCurrentBlockAt = System.currentTimeMillis(); totalCountLoadedSoFar += pages.getRowCount(); ++ currentBlock; @@ -173,6 +177,8 @@ public void initialize(ParquetFileReader reader, Configuration configuration) this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true); this.total = reader.getRecordCount(); this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total); + this.filterRecords = configuration.getBoolean( + RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT); LOG.info("RecordReader initialized will read a total of " + total + " records."); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 7ac1706c7b..a761f2e155 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -28,6 +28,10 @@ import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE; +import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED_DEFAULT; +import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED_DEFAULT; import java.io.Closeable; import java.io.IOException; @@ -621,11 +625,13 @@ void filterRowGroups(FilterCompat.Filter filter) throws IOException { // set up data filters based on configured levels List levels = new ArrayList(); - if (conf.getBoolean("parquet.filter.statistics.enabled", true)) { + if (conf.getBoolean( + STATS_FILTERING_ENABLED, STATS_FILTERING_ENABLED_DEFAULT)) { levels.add(STATISTICS); } - if (conf.getBoolean("parquet.filter.dictionary.enabled", false)) { + if (conf.getBoolean( + DICTIONARY_FILTERING_ENABLED, DICTIONARY_FILTERING_ENABLED_DEFAULT)) { levels.add(DICTIONARY); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index e3536d7c56..1fe57f9fc5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -115,6 +115,24 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate"; + /** + * key to configure whether record-level filtering is enabled + */ + public static final String RECORD_FILTERING_ENABLED = "parquet.filter.record-level.enabled"; + static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true; + + /** + * key to configure whether row group stats filtering is enabled + */ + public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled"; + static final boolean STATS_FILTERING_ENABLED_DEFAULT = true; + + /** + * key to configure whether row group dictionary filtering is enabled + */ + public static final String DICTIONARY_FILTERING_ENABLED = "parquet.filter.dictionary.enabled"; + static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = false; + /** * key to turn on or off task side metadata loading (default true) * if true then metadata is read on the task side and some tasks may finish immediately.