Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -436,23 +436,37 @@ case class FileSourceScanExec(
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val bucketed =
selectedPartitions.flatMap { p =>
p.files.map { f =>
val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
val session = fsRelation.sparkSession
val hadoopConf = session.sessionState.newHadoopConf()
val partitionFiles = selectedPartitions.flatMap { partition =>
partition.files.map((_, partition.values))
}
val format = fsRelation.fileFormat
val splitter =
format.buildSplitter(session, fsRelation.location, dataFilters, schema, hadoopConf)
val bucketed = partitionFiles.flatMap { case (file, values) =>
val blockLocations = getBlockLocations(file)
val filePath = file.getPath.toUri.toString
if (format.isSplitable(session, fsRelation.options, file.getPath)) {
val validSplits = splitter(file)
validSplits.map { split =>
val hosts = getBlockHosts(blockLocations, split.getStart, split.getLength)
PartitionedFile(values, filePath, split.getStart, split.getLength, hosts)
}
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
} else {
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
Seq(PartitionedFile(values, filePath, 0, file.getLen, hosts))
}

}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
}
val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
}

new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
new FileScanRDD(session, readFile, filePartitions)
}

/**
Expand All @@ -467,34 +481,44 @@ case class FileSourceScanExec(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val session = fsRelation.sparkSession
val hadoopConf = session.sessionState.newHadoopConf()
val defaultMaxSplitBytes = session.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = session.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = session.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism

val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val partitionFiles = selectedPartitions.flatMap { partition =>
partition.files.map((_, partition.values))
}
val format = fsRelation.fileFormat
val splitter =
format.buildSplitter(session, fsRelation.location, dataFilters, schema, hadoopConf)
val splitFiles = partitionFiles.flatMap { case (file, values) =>
val blockLocations = getBlockLocations(file)
val filePath = file.getPath.toUri.toString
// If the format is splittable, attempt to split and filter the file.
if (format.isSplitable(session, fsRelation.options, file.getPath)) {
val validSplits = splitter(file)
validSplits.flatMap { split =>
val splitOffset = split.getStart
val end = splitOffset + split.getLength
(splitOffset until end by maxSplitBytes).map { offset =>
val remaining = end - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(
partition.values, file.getPath.toUri.toString, offset, size, hosts)
PartitionedFile(values, filePath, offset, size, hosts)
}
} else {
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
Seq(PartitionedFile(
partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
}
} else {
// Take the entire file as one partition.
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
Seq(PartitionedFile(values, filePath, 0, file.getLen, hosts))
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

Expand Down Expand Up @@ -527,7 +551,7 @@ case class FileSourceScanExec(
}
closePartition()

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
new FileScanRDD(session, readFile, partitions)
}

private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileSplit

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -74,6 +75,18 @@ trait FileFormat {
false
}

/**
* Allow a splittable FileFormat to produce a function to split individual files.
*/
def buildSplitter(
sparkSession: SparkSession,
fileIndex: FileIndex,
filters: Seq[Filter],
schema: StructType,
hadoopConf: Configuration): (FileStatus => Seq[FileSplit]) = {
stat => Seq(new FileSplit(stat.getPath, 0, stat.getLen, Array.empty))
}

/**
* Returns a function that can be used to read a single file in as an Iterator of InternalRow.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,26 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.io.FileNotFoundException
import java.net.URI
import java.util.concurrent.{Callable, TimeUnit}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Try}

import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.metadata.ParquetMetadata
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.schema.MessageType

Expand Down Expand Up @@ -278,6 +283,68 @@ class ParquetFileFormat
true
}

override def buildSplitter(
sparkSession: SparkSession,
fileIndex: FileIndex,
filters: Seq[Filter],
schema: StructType,
hadoopConf: Configuration): (FileStatus => Seq[FileSplit]) = {
val pruningEnabled = sparkSession.sessionState.conf.parquetPartitionPruningEnabled
val defaultSplitter = super.buildSplitter(sparkSession, fileIndex, filters, schema, hadoopConf)
if (!pruningEnabled || filters.isEmpty) {
// Return immediately to save FileSystem overhead
defaultSplitter
} else {
val splitters = fileIndex.rootPaths.map { root =>
val splits = ParquetFileFormat.fileSplits.get(root,
new Callable[ParquetFileSplitter] {
override def call(): ParquetFileSplitter =
createParquetFileSplits(root, hadoopConf, schema)
})
root -> splits.buildSplitter(filters)
}.toMap
val compositeSplitter: (FileStatus => Seq[FileSplit]) = { stat =>
val filePath = stat.getPath
val rootOption: Option[Path] = fileIndex.rootPaths
.find(root => filePath.toString.startsWith(root.toString))
val splitterForPath = rootOption.flatMap { root =>
splitters.get(root)
}.getOrElse(defaultSplitter)
splitterForPath(stat)
}
compositeSplitter
}
}

private def createParquetFileSplits(
root: Path,
hadoopConf: Configuration,
schema: StructType): ParquetFileSplitter = {
getMetadataForPath(root, hadoopConf)
.map(meta => new ParquetMetadataFileSplitter(root, meta.getBlocks.asScala, schema))
.getOrElse(ParquetDefaultFileSplitter)
}

private def getMetadataForPath(
rootPath: Path,
conf: Configuration): Option[ParquetMetadata] = {
val fs = rootPath.getFileSystem(conf)
try {
val stat = fs.getFileStatus(rootPath)
// Mimic Parquet behavior. If given a directory, find the underlying _metadata file
// If given a single file, check the parent directory for a _metadata file
val directory = if (stat.isDirectory) stat.getPath else stat.getPath.getParent
val metadataFile = new Path(directory, ParquetFileWriter.PARQUET_METADATA_FILE)
val metadata =
ParquetFileReader.readFooter(conf, metadataFile, ParquetMetadataConverter.NO_FILTER)
Option(metadata)
} catch {
case notFound: FileNotFoundException =>
log.debug(s"No _metadata file found in root $rootPath")
None
}
}

override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
Expand Down Expand Up @@ -424,6 +491,13 @@ class ParquetFileFormat
}

object ParquetFileFormat extends Logging {

@transient private val fileSplits: Cache[Path, ParquetFileSplitter] =
CacheBuilder.newBuilder()
.expireAfterAccess(4, TimeUnit.HOURS)
.concurrencyLevel(1)
.build()

private[parquet] def readSchema(
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {

Expand Down
Loading