Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5687a3b
Try to push down filter at RowGroups level for parquet reader.
viirya May 28, 2016
077f7f8
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Jun 9, 2016
97ccacf
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Jun 14, 2016
5711ae4
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Jun 16, 2016
36fd059
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Jun 24, 2016
687d75b
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Jul 7, 2016
a8bae96
Add regression test.
viirya Aug 3, 2016
6c6fc69
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Aug 3, 2016
50095a5
Don't need two SQLConf settings.
viirya Aug 3, 2016
246129c
Improve test cases and revert the change not needed now.
viirya Aug 3, 2016
58b4689
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Aug 4, 2016
f7baf41
Improve test case.
viirya Aug 5, 2016
a52b354
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Aug 5, 2016
3c7afaa
Add SQL metrics for number of row groups for test purpose.
viirya Aug 5, 2016
ea81fdc
A few file format are not serializable.
viirya Aug 5, 2016
2d34803
Improve the approach to update accumulator.
viirya Aug 6, 2016
462edc7
Add a method in TaskMetrics to look up for accumulator by name.
viirya Aug 8, 2016
0b38ba1
Add comments.
viirya Aug 9, 2016
cee74b7
Remove unneeded changes.
viirya Aug 9, 2016
bbc5f7b
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Aug 10, 2016
a2ba343
Prevent acculumator to be released early.
viirya Aug 10, 2016
ca074f1
Remove previous accumulator.
viirya Aug 10, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ class TaskMetrics private[spark] () extends Serializable {
}

private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums

/**
* Looks for a registered accumulator by accumulator name.
*/
private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = {
accumulators.find { acc =>
acc.name.isDefined && acc.name.get == name
}
}
}


Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext}
import org.apache.spark.scheduler.AccumulableInfo

Expand Down Expand Up @@ -257,6 +259,16 @@ private[spark] object AccumulatorContext {
originals.clear()
}

/**
* Looks for a registered accumulator by accumulator name.
*/
private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = {
originals.values().asScala.find { ref =>
val acc = ref.get
acc != null && acc.name.isDefined && acc.name.get == name
}.map(_.get)
}

// Identifier for distinguishing SQL metrics from other accumulators
private[spark] val SQL_ACCUM_IDENTIFIER = "sql"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Set;

import scala.Option;

import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
Expand Down Expand Up @@ -59,8 +61,12 @@
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.LongAccumulator;

/**
* Base class for custom RecordReaders for Parquet that directly materialize to `T`.
Expand Down Expand Up @@ -145,6 +151,18 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
}

// For test purpose.
// If the predefined accumulator exists, the row group number to read will be updated
// to the accumulator. So we can check if the row groups are filtered or not in test case.
TaskContext taskContext = TaskContext$.MODULE$.get();
if (taskContext != null) {
Option<AccumulatorV2<?, ?>> accu = (Option<AccumulatorV2<?, ?>>) taskContext.taskMetrics()
.lookForAccumulatorByName("numRowGroups");
if (accu.isDefined()) {
((LongAccumulator)accu.get()).add((long)blocks.size());
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -357,6 +358,11 @@ class ParquetFileFormat
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)

// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
vectorizedReader.initialize(split, hadoopAttemptContext)
Expand Down Expand Up @@ -563,87 +569,7 @@ private[parquet] class ParquetOutputWriter(
override def close(): Unit = recordWriter.close(context)
}


object ParquetFileFormat extends Logging {
/**
* If parquet's block size (row group size) setting is larger than the min split size,
* we use parquet's block size setting as the min split size. Otherwise, we will create
* tasks processing nothing (because a split does not cover the starting point of a
* parquet block). See https://issues.apache.org/jira/browse/SPARK-10143 for more information.
*/
private def overrideMinSplitSize(parquetBlockSize: Long, conf: Configuration): Unit = {
val minSplitSize =
math.max(
conf.getLong("mapred.min.split.size", 0L),
conf.getLong("mapreduce.input.fileinputformat.split.minsize", 0L))
if (parquetBlockSize > minSplitSize) {
val message =
s"Parquet's block size (row group size) is larger than " +
s"mapred.min.split.size/mapreduce.input.fileinputformat.split.minsize. Setting " +
s"mapred.min.split.size and mapreduce.input.fileinputformat.split.minsize to " +
s"$parquetBlockSize."
logDebug(message)
conf.set("mapred.min.split.size", parquetBlockSize.toString)
conf.set("mapreduce.input.fileinputformat.split.minsize", parquetBlockSize.toString)
}
}

/** This closure sets various Parquet configurations at both driver side and executor side. */
private[parquet] def initializeLocalJobFunc(
requiredColumns: Array[String],
filters: Array[Filter],
dataSchema: StructType,
parquetBlockSize: Long,
useMetadataCache: Boolean,
parquetFilterPushDown: Boolean,
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
val conf = job.getConfiguration
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)

// Try to push down filters when filter push-down is enabled.
if (parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(dataSchema, _))
.reduceOption(FilterApi.and)
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
}

conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
ParquetSchemaConverter.checkFieldNames(requestedSchema).json
})

conf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(dataSchema).json)

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)

// Sets flags for `CatalystSchemaConverter`
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)

overrideMinSplitSize(parquetBlockSize, conf)
}

/** This closure sets input paths at the driver side. */
private[parquet] def initializeDriverSideJobFunc(
inputFiles: Array[FileStatus],
parquetBlockSize: Long)(job: Job): Unit = {
// We side the input paths at the driver side.
logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
if (inputFiles.nonEmpty) {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}

overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
}

private[parquet] def readSchema(
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.{AccumulatorContext, LongAccumulator}

/**
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
Expand Down Expand Up @@ -368,73 +369,75 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex

test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
import testImplicits._

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
withTempPath { dir =>
val pathOne = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
val pathTwo = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)

// If the "c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits. This is a Parquet issue (PARQUET-389).
val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
checkAnswer(
df,
Row(1, "1", null))

// The fields "a" and "c" only exist in one Parquet file.
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathThree = s"${dir.getCanonicalPath}/table3"
df.write.parquet(pathThree)

// We will remove the temporary metadata when writing Parquet file.
val schema = spark.read.parquet(pathThree).schema
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

val pathFour = s"${dir.getCanonicalPath}/table4"
val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
dfStruct.select(struct("a").as("s")).write.parquet(pathFour)

val pathFive = s"${dir.getCanonicalPath}/table5"
val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)

// If the "s.c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits.
val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
.selectExpr("s")
checkAnswer(dfStruct3, Row(Row(null, 1)))

// The fields "s.a" and "s.c" only exist in one Parquet file.
val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathSix = s"${dir.getCanonicalPath}/table6"
dfStruct3.write.parquet(pathSix)

// We will remove the temporary metadata when writing Parquet file.
val forPathSix = spark.read.parquet(pathSix).schema
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

// sanity test: make sure optional metadata field is not wrongly set.
val pathSeven = s"${dir.getCanonicalPath}/table7"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
val pathEight = s"${dir.getCanonicalPath}/table8"
(4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)

val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
checkAnswer(
df2,
Row(1, "1"))

// The fields "a" and "b" exist in both two Parquet files. No metadata is set.
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
Seq("true", "false").map { vectorized =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A regression test added for the optional column (only existing in part of parquet files). Previously it is only for non vectorized parquet reader. Now adding test for vectorized reader. This PR has related changes before but it is removed as in the discussion folded.

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
withTempPath { dir =>
val pathOne = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
val pathTwo = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)

// If the "c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits. This is a Parquet issue (PARQUET-389).
val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
checkAnswer(
df,
Row(1, "1", null))

// The fields "a" and "c" only exist in one Parquet file.
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathThree = s"${dir.getCanonicalPath}/table3"
df.write.parquet(pathThree)

// We will remove the temporary metadata when writing Parquet file.
val schema = spark.read.parquet(pathThree).schema
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

val pathFour = s"${dir.getCanonicalPath}/table4"
val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
dfStruct.select(struct("a").as("s")).write.parquet(pathFour)

val pathFive = s"${dir.getCanonicalPath}/table5"
val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)

// If the "s.c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits.
val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
.selectExpr("s")
checkAnswer(dfStruct3, Row(Row(null, 1)))

// The fields "s.a" and "s.c" only exist in one Parquet file.
val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathSix = s"${dir.getCanonicalPath}/table6"
dfStruct3.write.parquet(pathSix)

// We will remove the temporary metadata when writing Parquet file.
val forPathSix = spark.read.parquet(pathSix).schema
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

// sanity test: make sure optional metadata field is not wrongly set.
val pathSeven = s"${dir.getCanonicalPath}/table7"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
val pathEight = s"${dir.getCanonicalPath}/table8"
(4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)

val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
checkAnswer(
df2,
Row(1, "1"))

// The fields "a" and "b" exist in both two Parquet files. No metadata is set.
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
}
}
}
}
Expand Down Expand Up @@ -527,4 +530,32 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(df.filter("_1 IS NOT NULL").count() === 4)
}
}

test("Fiters should be pushed down for vectorized Parquet reader at row group level") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about non-vectorized reader ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-vectorized reader, we use parquet's ParquetRecordReader and push the filters into.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya
I mean that we can also add test to check if we correctly push filter into ParquetRecordReader.
You know that you're also resolving SPARK-16321 (#14465) ?

Copy link
Member Author

@viirya viirya Aug 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have test for the pushed down filters for ParquetRecordReader. But it is at individual record level. If you mean row group level, because ParquetRecordReader doesn't expose a count for the row group, I think we can't check if the filter is pushed down at row group level. Besides, it seems to be the functionality of ParquetRecordReader, and I think it should be unit test in parquet project.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, as the ParquetRecordReader also uses the Configuration to get the pushed down filters, I think this also fixes SPARK-16321.

import testImplicits._

withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table"
(1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)

Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) {
val accu = new LongAccumulator
accu.register(sparkContext, Some("numRowGroups"))

val df = spark.read.parquet(path).filter("a < 100")
df.foreachPartition(_.foreach(v => accu.add(0)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this test? shouldn't accu always be zero?

Copy link
Member Author

@viirya viirya Apr 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SpecificParquetRecordReaderBase, it looks for an accumulator numRowGroups if any and update it with the row group number to read. It is for test purpose only.

Here we force this trivial foreach function refer this accumulator, but doesn't change it, so the executor side can see it.

df.collect

val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups")
assert(numRowGroups.isDefined)
assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value))
AccumulatorContext.remove(accu.id)
}
}
}
}
}
}