Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.collection.JavaConversions._

import java.util.Properties


private[spark] object SQLConf {
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
Expand All @@ -32,9 +31,12 @@ private[spark] object SQLConf {
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"

val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"

val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"

// This is only used for the thriftserver
Expand Down Expand Up @@ -90,6 +92,10 @@ private[sql] trait SQLConf {
/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt

/** When true predicates will be passed to the parquet record reader when possible. */
private[spark] def parquetFilterPushDown =
getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean

/**
* When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode
* that evaluates expressions found in queries. In general this custom code runs much faster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val prunePushedDownFilters =
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
if (sqlContext.parquetFilterPushDown) {
(filters: Seq[Expression]) => {
filters.filter { filter =>
// Note: filters cannot be pushed down to Parquet if they contain more complex
Expand All @@ -234,7 +234,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
projectList,
filters,
prunePushedDownFilters,
ParquetTableScan(_, relation, filters)) :: Nil
ParquetTableScan(
_,
relation,
if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil

case _ => Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ import org.apache.spark.sql.parquet.ParquetColumns._

private[sql] object ParquetFilters {
val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
// set this to false if pushdown should be disabled
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown"

def createRecordFilter(filterExpressions: Seq[Expression]): Filter = {
val filters: Seq[CatalystFilter] = filterExpressions.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.text.SimpleDateFormat
import java.util.concurrent.{Callable, TimeUnit}
import java.util.{ArrayList, Collections, Date, List => JList}

import org.apache.spark.annotation.DeveloperApi

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try
Expand Down Expand Up @@ -52,6 +54,7 @@ import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}

/**
* :: DeveloperApi ::
* Parquet table scan operator. Imports the file that backs the given
* [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
*/
Expand Down Expand Up @@ -108,15 +111,11 @@ case class ParquetTableScan(
// Note 1: the input format ignores all predicates that cannot be expressed
// as simple column predicate filters in Parquet. Here we just record
// the whole pruning predicate.
// Note 2: you can disable filter predicate pushdown by setting
// "spark.sql.hints.parquetFilterPushdown" to false inside SparkConf.
if (columnPruningPred.length > 0 &&
sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {

if (columnPruningPred.length > 0) {
// Set this in configuration of ParquetInputFormat, needed for RowGroupFiltering
val filter: Filter = ParquetFilters.createRecordFilter(columnPruningPred)
if (filter != null){
val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate()
val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate
ParquetInputFormat.setFilterPredicate(conf, filterPredicate)
}
}
Expand Down Expand Up @@ -193,6 +192,7 @@ case class ParquetTableScan(
}

/**
* :: DeveloperApi ::
* Operator that acts as a sink for queries on RDDs and can be used to
* store the output inside a directory of Parquet files. This operator
* is similar to Hive's INSERT INTO TABLE operation in the sense that
Expand All @@ -208,6 +208,7 @@ case class ParquetTableScan(
* cause unpredicted behaviour and therefore results in a RuntimeException
* (only detected via filename pattern so will not catch all cases).
*/
@DeveloperApi
case class InsertIntoParquetTable(
relation: ParquetRelation,
child: SparkPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
testRDD.registerTempTable("testsource")
parquetFile(ParquetTestData.testFilterDir.toString)
.registerTempTable("testfiltersource")

setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true")
}

override def afterAll() {
Expand Down