Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,25 +204,10 @@ class DefaultSource extends FileFormat with DataSourceRegister {

val converter = RowEncoder(dataSchema)

val unsafeRowIterator = points.map { pt =>
points.map { pt =>
val features = if (sparse) pt.features.toSparse else pt.features.toDense
converter.toRow(Row(pt.label, features))
}

def toAttribute(f: StructField): AttributeReference =
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()

// Appends partition values
val fullOutput = (dataSchema ++ partitionSchema).map(toAttribute)
val requiredOutput = fullOutput.filter { a =>
requiredSchema.fieldNames.contains(a.name) || partitionSchema.fieldNames.contains(a.name)
}
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(requiredOutput, fullOutput)

unsafeRowIterator.map { dataRow =>
appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")

val readFile = files.fileFormat.buildReader(
val readFile = files.fileFormat.buildReaderWithPartitionValues(
sparkSession = files.sparkSession,
dataSchema = files.dataSchema,
partitionSchema = files.partitionSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,9 @@ class DefaultSource extends FileFormat with DataSourceRegister {

CSVRelation.dropHeaderLine(file, lineIterator, csvOptions)

val unsafeRowIterator = {
val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers)
val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions)
tokenizedIterator.flatMap(parser(_).toSeq)
}

// Appends partition values
val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)

unsafeRowIterator.map { dataRow =>
appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
}
val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers)
val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions)
tokenizedIterator.flatMap(parser(_).toSeq)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{StringType, StructType}
Expand Down Expand Up @@ -238,6 +239,45 @@ trait FileFormat {
throw new UnsupportedOperationException(s"buildReader is not supported for $this")
}

/**
* Exactly the same as [[buildReader]] except that the reader function returned by this method
* appends partition values to [[InternalRow]]s produced by the reader function [[buildReader]]
* returns.
*/
private[sql] def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val dataReader = buildReader(
sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)

new (PartitionedFile => Iterator[InternalRow]) with Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Instead of adding a new ReaderFunction trait with an initialize() method as you suggested, I used an anonymous Function1 class here. Not quite sure how useful the initialize() method can be in more general cases...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. the text datasource, which need to initialize a UnsafeRowWriter for one reader function(not every file).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a reasonable use case. But we can also use an anonymous Function1 class there.

private val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes

private val joinedRow = new JoinedRow()

// Using lazy val to avoid serialization
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to use @transient lazy val to avoid serialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually in this case it's not necessary, because this UnsafeProjection is only referenced on executor side.

private lazy val appendPartitionColumns =
GenerateUnsafeProjection.generate(fullSchema, fullSchema)

override def apply(file: PartitionedFile): Iterator[InternalRow] = {
// Using local val to avoid per-row lazy val check (pre-mature optimization?...)
val converter = appendPartitionColumns

// Note that we have to apply the converter even though `file.partitionValues` is empty.
// This is because the converter is also responsible for converting safe `InternalRow`s into
// `UnsafeRow`s.
dataReader(file).map { dataRow =>
converter(joinedRow(dataRow, file.partitionValues))
}
}
}
}

/**
* Returns a [[OutputWriterFactory]] for generating output writers that can write data.
* This method is current used only by FileStreamSinkWriter to generate output writers that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,14 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)

val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()

(file: PartitionedFile) => {
val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString)

val rows = JacksonParser.parseJson(
JacksonParser.parseJson(
lines,
requiredSchema,
columnNameOfCorruptRecord,
parsedOptions)

val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
rows.map { row =>
appendPartitionColumns(joinedRow(row, file.partitionValues))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,20 @@ private[sql] class DefaultSource
schema.forall(_.dataType.isInstanceOf[AtomicType])
}

override private[sql] def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
// For Parquet data source, `buildReader` already handles partition values appending. Here we
// simply delegate to `buildReader`.
buildReader(
sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
}

override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
}

override private[sql] def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
// Text data source doesn't support partitioning. Here we simply delegate to `buildReader`.
buildReader(
sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
}

override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import java.nio.charset.UnsupportedCharsetException
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,11 @@ private[sql] class DefaultSource
}

// Unwraps `OrcStruct`s to `UnsafeRow`s
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
OrcRelation.unwrapOrcStructs(
conf,
requiredSchema,
Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
new RecordReaderIterator[OrcStruct](orcRecordReader))

// Appends partition values
val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)

unsafeRowIterator.map { dataRow =>
appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
}
}
}
}
Expand Down