Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.sql.parquet

import java.io.Serializable
import java.nio.ByteBuffer

import com.google.common.io.BaseEncoding
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.compat.FilterCompat._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Statistics}
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary

import org.apache.spark.SparkEnv
Expand All @@ -41,6 +43,18 @@ private[sql] object ParquetFilters {
}.reduceOption(FilterApi.and).map(FilterCompat.get)
}

case class SetInFilter[T <: Comparable[T]](
valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {

override def keep(value: T): Boolean = {
value != null && valueSet.contains(value)
}

override def canDrop(statistics: Statistics[T]): Boolean = false

override def inverseCanDrop(statistics: Statistics[T]): Boolean = false
}

private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
Expand Down Expand Up @@ -153,6 +167,29 @@ private[sql] object ParquetFilters {
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}

private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
case LongType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(longColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Long]]))
case FloatType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(floatColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Float]]))
case DoubleType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
case StringType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[UTF8String].getBytes))))
case BinaryType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
}

/**
* Converts data sources filters to Parquet filter predicates.
*/
Expand Down Expand Up @@ -284,6 +321,9 @@ private[sql] object ParquetFilters {
case Not(pred) =>
createFilter(pred).map(FilterApi.not)

case InSet(NamedExpression(name, dataType), valueSet) =>
makeInSet.lift(dataType).map(_(name, valueSet))

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ private[sql] case class ParquetTableScan(
SQLConf.PARQUET_CACHE_METADATA,
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))

// Use task side metadata in parquet
conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);

val baseRDD =
new org.apache.spark.rdd.NewHadoopRDD(
sc,
Expand Down Expand Up @@ -453,190 +456,6 @@ private[parquet] class FilteringParquetRowInputFormat
}
}

// This is only a temporary solution sicne we need to use fileStatuses in
// both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of these
// two methods.
override def getSplits(jobContext: JobContext): JList[InputSplit] = {
// First set fileStatuses.
val statuses = listStatus(jobContext)
fileStatuses = statuses.map(file => file.getPath -> file).toMap

super.getSplits(jobContext)
}

// TODO Remove this method and related code once PARQUET-16 is fixed
// This method together with the `getFooters` method and the `fileStatuses` field are just used
// to mimic this PR: https://github.com/apache/incubator-parquet-mr/pull/17
override def getSplits(
configuration: Configuration,
footers: JList[Footer]): JList[ParquetInputSplit] = {

// Use task side strategy by default
val taskSideMetaData = configuration.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
val minSplitSize: JLong =
Math.max(getFormatMinSplitSize, configuration.getLong("mapred.min.split.size", 0L))
if (maxSplitSize < 0 || minSplitSize < 0) {
throw new ParquetDecodingException(
s"maxSplitSize or minSplitSie should not be negative: maxSplitSize = $maxSplitSize;" +
s" minSplitSize = $minSplitSize")
}

// Uses strict type checking by default
val getGlobalMetaData =
classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]])
getGlobalMetaData.setAccessible(true)
var globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]

if (globalMetaData == null) {
val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
return splits
}

val metadata = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
val mergedMetadata = globalMetaData
.getKeyValueMetaData
.updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(metadata)))

globalMetaData = new GlobalMetaData(globalMetaData.getSchema,
mergedMetadata, globalMetaData.getCreatedBy)

val readContext = ParquetInputFormat.getReadSupportInstance(configuration).init(
new InitContext(configuration,
globalMetaData.getKeyValueMetaData,
globalMetaData.getSchema))

if (taskSideMetaData){
logInfo("Using Task Side Metadata Split Strategy")
getTaskSideSplits(configuration,
footers,
maxSplitSize,
minSplitSize,
readContext)
} else {
logInfo("Using Client Side Metadata Split Strategy")
getClientSideSplits(configuration,
footers,
maxSplitSize,
minSplitSize,
readContext)
}

}

def getClientSideSplits(
configuration: Configuration,
footers: JList[Footer],
maxSplitSize: JLong,
minSplitSize: JLong,
readContext: ReadContext): JList[ParquetInputSplit] = {

import org.apache.parquet.filter2.compat.FilterCompat.Filter
import org.apache.parquet.filter2.compat.RowGroupFilter

import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.blockLocationCache

val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)

val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
val filter: Filter = ParquetInputFormat.getFilter(configuration)
var rowGroupsDropped: Long = 0
var totalRowGroups: Long = 0

// Ugly hack, stuck with it until PR:
// https://github.com/apache/incubator-parquet-mr/pull/17
// is resolved
val generateSplits =
Class.forName("org.apache.parquet.hadoop.ClientSideMetadataSplitStrategy")
.getDeclaredMethods.find(_.getName == "generateSplits").getOrElse(
sys.error(s"Failed to reflectively invoke ClientSideMetadataSplitStrategy.generateSplits"))
generateSplits.setAccessible(true)

for (footer <- footers) {
val fs = footer.getFile.getFileSystem(configuration)
val file = footer.getFile
val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
val parquetMetaData = footer.getParquetMetadata
val blocks = parquetMetaData.getBlocks
totalRowGroups = totalRowGroups + blocks.size
val filteredBlocks = RowGroupFilter.filterRowGroups(
filter,
blocks,
parquetMetaData.getFileMetaData.getSchema)
rowGroupsDropped = rowGroupsDropped + (blocks.size - filteredBlocks.size)

if (!filteredBlocks.isEmpty){
var blockLocations: Array[BlockLocation] = null
if (!cacheMetadata) {
blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
} else {
blockLocations = blockLocationCache.get(status, new Callable[Array[BlockLocation]] {
def call(): Array[BlockLocation] = fs.getFileBlockLocations(status, 0, status.getLen)
})
}
splits.addAll(
generateSplits.invoke(
null,
filteredBlocks,
blockLocations,
status,
readContext.getRequestedSchema.toString,
readContext.getReadSupportMetadata,
minSplitSize,
maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
}
}

if (rowGroupsDropped > 0 && totalRowGroups > 0){
val percentDropped = ((rowGroupsDropped/totalRowGroups.toDouble) * 100).toInt
logInfo(s"Dropping $rowGroupsDropped row groups that do not pass filter predicate "
+ s"($percentDropped %) !")
}
else {
logInfo("There were no row groups that could be dropped due to filter predicates")
}
splits

}

def getTaskSideSplits(
configuration: Configuration,
footers: JList[Footer],
maxSplitSize: JLong,
minSplitSize: JLong,
readContext: ReadContext): JList[ParquetInputSplit] = {

val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]

// Ugly hack, stuck with it until PR:
// https://github.com/apache/incubator-parquet-mr/pull/17
// is resolved
val generateSplits =
Class.forName("org.apache.parquet.hadoop.TaskSideMetadataSplitStrategy")
.getDeclaredMethods.find(_.getName == "generateTaskSideMDSplits").getOrElse(
sys.error(
s"Failed to reflectively invoke TaskSideMetadataSplitStrategy.generateTaskSideMDSplits"))
generateSplits.setAccessible(true)

for (footer <- footers) {
val file = footer.getFile
val fs = file.getFileSystem(configuration)
val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
val blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
splits.addAll(
generateSplits.invoke(
null,
blockLocations,
status,
readContext.getRequestedSchema.toString,
readContext.getReadSupportMetadata,
minSplitSize,
maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
}

splits
}

}

private[parquet] object FilteringParquetRowInputFormat {
Expand Down