Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;

Expand Down Expand Up @@ -89,24 +90,27 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
initialize(inputSplit, taskAttemptContext, Option.empty());
initialize(inputSplit, taskAttemptContext, Option.empty(), Option.empty(), Option.empty());
}

public void initialize(
InputSplit inputSplit,
TaskAttemptContext taskAttemptContext,
Option<HadoopInputFile> inputFile,
Option<SeekableInputStream> inputStream,
Option<ParquetMetadata> fileFooter) throws IOException, InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
FileSplit split = (FileSplit) inputSplit;
this.file = split.getPath();
ParquetReadOptions options = HadoopReadOptions
.builder(configuration, file)
.withRange(split.getStart(), split.getStart() + split.getLength())
.build();
ParquetFileReader fileReader;
if (fileFooter.isDefined()) {
fileReader = new ParquetFileReader(configuration, file, fileFooter.get());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor internally calls HadoopInputFile.fromPath(file, configuration), which produces an unnecessary GetFileInfo RPC

  public static HadoopInputFile fromPath(Path path, Configuration conf) throws IOException {
    FileSystem fs = path.getFileSystem(conf);
    return new HadoopInputFile(fs, fs.getFileStatus(path), conf);
  }

if (inputFile.isDefined() && fileFooter.isDefined() && inputStream.isDefined()) {
fileReader = new ParquetFileReader(
inputFile.get(), fileFooter.get(), options, inputStream.get());
} else {
ParquetReadOptions options = HadoopReadOptions
.builder(configuration, file)
.withRange(split.getStart(), split.getStart() + split.getLength())
.build();
fileReader = new ParquetFileReader(
HadoopInputFile.fromPath(file, configuration), options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.List;
import java.util.Set;

import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
import scala.Option;
import scala.jdk.javaapi.CollectionConverters;

Expand All @@ -35,11 +33,15 @@
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector;
Expand Down Expand Up @@ -190,9 +192,11 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
public void initialize(
InputSplit inputSplit,
TaskAttemptContext taskAttemptContext,
Option<HadoopInputFile> inputFile,
Option<SeekableInputStream> inputStream,
Option<ParquetMetadata> fileFooter)
throws IOException, InterruptedException, UnsupportedOperationException {
super.initialize(inputSplit, taskAttemptContext, fileFooter);
super.initialize(inputSplit, taskAttemptContext, inputFile, inputStream, fileFooter);
initializeInternal();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ trait FileSourceScanLike extends DataSourceScanExec with SessionStateHelper {
override def toPartitionArray: Array[PartitionedFile] = {
partitionDirectories.flatMap { p =>
p.files.map { f =>
PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values, 0, f.getLen)
PartitionedFileUtil.getPartitionedFile(f, p.values, 0, f.getLen)
}
}
}
Expand Down Expand Up @@ -876,7 +876,6 @@ case class FileSourceScanExec(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partitionVals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,35 @@

package org.apache.spark.sql.execution

import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus}

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources._

object PartitionedFileUtil {
def splitFiles(
file: FileStatusWithMetadata,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
if (isSplitable) {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
getPartitionedFile(file, filePath, partitionValues, offset, size)
getPartitionedFile(file, partitionValues, offset, size)
}
} else {
Seq(getPartitionedFile(file, filePath, partitionValues, 0, file.getLen))
Seq(getPartitionedFile(file, partitionValues, 0, file.getLen))
}
}

def getPartitionedFile(
file: FileStatusWithMetadata,
filePath: Path,
partitionValues: InternalRow,
start: Long,
length: Long): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file.fileStatus), start, length)
PartitionedFile(partitionValues, SparkPath.fromPath(filePath), start, length, hosts,
file.getModificationTime, file.getLen, file.metadata)
PartitionedFile(partitionValues, start, length, file.fileStatus, hosts, file.metadata)
}

private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.hadoop.fs._
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -315,9 +314,7 @@ object FileFormat {
def createMetadataInternalRow(
partitionValues: InternalRow,
fieldNames: Seq[String],
filePath: SparkPath,
fileSize: Long,
fileModificationTime: Long): InternalRow = {
fileStatus: FileStatus): InternalRow = {
// When scanning files directly from the filesystem, we only support file-constant metadata
// fields whose values can be derived from a file status. In particular, we don't have accurate
// file split information yet, nor do we have a way to provide custom metadata column values.
Expand All @@ -327,12 +324,10 @@ object FileFormat {
assert(fieldNames.forall(validFieldNames.contains))
val pf = PartitionedFile(
partitionValues = partitionValues,
filePath = filePath,
start = 0L,
length = fileSize,
length = fileStatus.getLen,
fileStatus = fileStatus,
locations = Array.empty,
modificationTime = fileModificationTime,
fileSize = fileSize,
otherConstantMetadataColumnValues = Map.empty)
updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames, pf, extractors)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.collection.mutable

import org.apache.hadoop.fs._

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -84,8 +83,7 @@ class FilePruningRunner(filters: Seq[Expression]) {
// use option.forall, so if there is no filter no metadata struct, return true
boundedFilterMetadataStructOpt.forall { boundedFilter =>
val row =
FileFormat.createMetadataInternalRow(partitionValues, requiredMetadataColumnNames.toSeq,
SparkPath.fromFileStatus(f), f.getLen, f.getModificationTime)
FileFormat.createMetadataInternalRow(partitionValues, requiredMetadataColumnNames.toSeq, f)
boundedFilter.eval(row)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.io.{Closeable, FileNotFoundException}
import java.net.URI

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hdfs.BlockMissingException
import org.apache.hadoop.security.AccessControlException

Expand All @@ -47,26 +47,25 @@ import org.apache.spark.util.NextIterator
* that need to be prepended to each row.
*
* @param partitionValues value of partition columns to be prepended to each row.
* @param filePath URI of the file to read
* @param start the beginning offset (in bytes) of the block.
* @param length number of bytes to read.
* @param modificationTime The modification time of the input file, in milliseconds.
* @param fileSize The length of the input file (not the block), in bytes.
* @param fileStatus The FileStatus instance of the file to read.
* @param otherConstantMetadataColumnValues The values of any additional constant metadata columns.
*/
case class PartitionedFile(
partitionValues: InternalRow,
filePath: SparkPath,
start: Long,
length: Long,
fileStatus: FileStatus,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, due to the addition of fileStatus, the constructor of PartitionedFile can also be further simplified, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, since fileStatus will hold more state and also participate in serialization, will this lead to additional memory overhead and serialization pressure?

Copy link
Member Author

@pan3793 pan3793 Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fileStatus should occupy a little bit more memory, but I haven't received OOM issues during the rollout of this change to the online cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Are there also risks of breaking internal APIs with modifications similar to those made here and in FileFormat.createMetadataInternalRow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the cost of serializing file status?

Copy link
Member Author

@pan3793 pan3793 Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I think path contributes the majority of the size.

public class FileStatus implements Writable, Comparable<Object>,
    Serializable, ObjectInputValidation {
  ...
  private Path path;           // backed by URI
  private long length;
  private Boolean isdir;
  private short block_replication;
  private long blocksize;
  private long modification_time;
  private long access_time;
  private FsPermission permission;
  private String owner;
  private String group;
  private Path symlink;        // likely be NULL
  private Set<AttrFlags> attr; // AttrFlags is enum
  ...
}

public class FsPermission implements Writable, Serializable,
    ObjectInputValidation {
  ...
  private FsAction useraction = null;  // FsAction is enum
  private FsAction groupaction = null;
  private FsAction otheraction = null;
  private Boolean stickyBit = false;
  ...
}

https://github.com/apache/hadoop/blob/branch-3.4.2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java

https://github.com/apache/hadoop/blob/branch-3.4.2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have a custom serde for it and only send the path string? This reminds me of SerializableConfiguration as these Hadoop classes are usually not optimized for serialization and transport.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems not feasible, because FileStatus has many sub-classes, i.e. S3AFileStatus, ViewFsFileStatus

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan the change basically moves the RPC cost from executor => storage service, to driver => executors, in my env (HDFS with RBF), the latter is much cheaper than the former. I don't have cloud env, so I can't give numbers for object storage services like S3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, then this may cause regression for short queries?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm not sure how much difference this will make it terms of driver memory usage. Is it easy to make the FileStatus optional in PartitionedFile and make it controllable via a flag?

It seems in Parquet Java the file status is only used in one case: https://github.com/apache/parquet-java/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java#L109-L132

Mostly we just need file path and length. But yea this one use case seems critical to avoid duplicated NN call to get the file status again.

Copy link
Member Author

@pan3793 pan3793 Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao thanks for your suggestion, after an offline discussion with @cloud-fan, I understand his concerns about the overhead of FileStatus, let me summarize the conclusion and my thoughts:

  1. there may be different Hadoop FileSystem implementations, the getFileStatus might be cheap or have executor-side cache in some implementations, but for our case - HDFS with RBF, it's relatively heavy.
  2. there is an upcoming optimization to replace FileStatusCache with PathCache(only carry necessary metadata) on the driver side to reduce driver memory.
  3. @cloud-fan suggests constructing FileStatus from the executor side directly

so, I'm going to split this PR into two parts

  1. I will experiment (3), but I can only do it on HDFS cases (w/ and w/o RBF, w/ and w/o EC)
  2. span the rest of the executor-side changes into a dedicated PR.

@transient locations: Array[String] = Array.empty,
modificationTime: Long = 0L,
fileSize: Long = 0L,
otherConstantMetadataColumnValues: Map[String, Any] = Map.empty) {

@transient lazy val filePath: SparkPath = SparkPath.fromFileStatus(fileStatus)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If SparkPath.fromFileStatus is cheap enough, we don't need to materialize filePath, so we can save some memory

def pathUri: URI = filePath.toUri
def toPath: Path = filePath.toPath
def urlEncodedPath: String = filePath.urlEncoded
def modificationTime: Long = fileStatus.getModificationTime
def fileSize: Long = fileStatus.getLen

override def toString: String = {
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
Expand Down
Loading