Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,24 @@ object TextInputCSVDataSource extends CSVDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): Option[StructType] = {
val csv: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)
val firstLine: String = CSVUtils.filterCommentAndEmpty(csv, parsedOptions).first()
val firstRow = new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val tokenRDD = csv.rdd.mapPartitions { iter =>
val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
val linesWithoutHeader =
CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions)
val parser = new CsvParser(parsedOptions.asParserSettings)
linesWithoutHeader.map(parser.parseLine)
val csv = createBaseDataset(sparkSession, inputPaths, parsedOptions)
CSVUtils.filterCommentAndEmpty(csv, parsedOptions).take(1).headOption match {
case Some(firstLine) =>
val firstRow = new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val tokenRDD = csv.rdd.mapPartitions { iter =>
val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
val linesWithoutHeader =
CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions)
val parser = new CsvParser(parsedOptions.asParserSettings)
linesWithoutHeader.map(parser.parseLine)
}
Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
case None =>
// If the first line could not be read, just return the empty schema.
Some(StructType(Nil))
}

Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
}

private def createBaseDataset(
Expand Down Expand Up @@ -190,28 +194,28 @@ object WholeFileCSVDataSource extends CSVDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): Option[StructType] = {
val csv: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths, parsedOptions)
val maybeFirstRow: Option[Array[String]] = csv.flatMap { lines =>
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
csv.flatMap { lines =>
UnivocityParser.tokenizeStream(
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, lines.getPath()),
false,
shouldDropHeader = false,
new CsvParser(parsedOptions.asParserSettings))
}.take(1).headOption

if (maybeFirstRow.isDefined) {
val firstRow = maybeFirstRow.get
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val tokenRDD = csv.flatMap { lines =>
UnivocityParser.tokenizeStream(
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, lines.getPath()),
parsedOptions.headerFlag,
new CsvParser(parsedOptions.asParserSettings))
}
Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
} else {
// If the first row could not be read, just return the empty schema.
Some(StructType(Nil))
}.take(1).headOption match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, Option.isDefine with Option.get, Option.map with Option.getOrElse and Option with match case Some... case None all might be fine. But, how about minimising the change by matching the above one to Option.isDefine with Option.get? Then, it would not require the changes here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave it as it is, since pattern matching still looks a bit clearer than conditionals. If minimizing changes is so critical, I can revert the previous version here and replace pattern matching with conditionals in my fix , @cloud-fan please advise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong preference, this looks fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three patterns I mentioned are being used across the code base. There is no style guide for this both in https://github.com/databricks/scala-style-guide and http://spark.apache.org/contributing.html

In this case, matching new one to other similar ones is a better choice to reduce changed lines, rather than doing the opposite. Personal taste might be secondary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#17068 (comment) did not show up when I write my comment. I am fine as is. I am not supposed to decide this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon @cloud-fan many thanks for your effort. I really appreciate it and I will take it into account when working with the codebase.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you both for bearing with me.

case Some(firstRow) =>
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val tokenRDD = csv.flatMap { lines =>
UnivocityParser.tokenizeStream(
CodecStreams.createInputStreamWithCloseResource(
lines.getConfiguration,
lines.getPath()),
parsedOptions.headerFlag,
new CsvParser(parsedOptions.asParserSettings))
}
Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
case None =>
// If the first row could not be read, just return the empty schema.
Some(StructType(Nil))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,14 +1072,12 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}

test("Empty file produces empty dataframe with empty schema - wholeFile option") {
withTempPath { path =>
path.createNewFile()

test("Empty file produces empty dataframe with empty schema") {
Seq(false, true).foreach { wholeFile =>
val df = spark.read.format("csv")
.option("header", true)
.option("wholeFile", true)
.load(path.getAbsolutePath)
.option("wholeFile", wholeFile)
.load(testFile(emptyFile))

assert(df.schema === spark.emptyDataFrame.schema)
checkAnswer(df, spark.emptyDataFrame)
Expand Down