Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
.getOrElse(sqlContext.conf.columnNameOfCorruptRecord)

val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()

file => {
val lines = new HadoopFileLinesReader(file, broadcastedConf.value.value).map(_.toString)
Expand All @@ -148,10 +147,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
columnNameOfCorruptRecord,
parsedOptions)

val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
rows.map { row =>
appendPartitionColumns(joinedRow(row, file.partitionValues))
}
FileFormat.appendPartitionValues(rows, fullSchema, file.partitionValues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will send the reader function to executors, and run it many times for all the files in that RDD partition. However, the projection we used to append partition values can be only initialized once.

Should we extend the reader function to make it have an initialize method? e.g. we can create a trait ReaderFunction and define the initialize and execute API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. We may also add a appendPartitionValues method in buildReader, so that partition value appending can be abstracted away using a default implementation. Only special data sources like Parquet need to override and take care of it.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,13 @@ private[sql] class DefaultSource
iter.asInstanceOf[Iterator[InternalRow]]
} else {
val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

// This is a horrible erasure hack... if we type the iterator above, then it actually check
// the type in next() and we get a class cast exception. If we make that function return
// Object, then we can defer the cast until later!
iter.asInstanceOf[Iterator[InternalRow]]
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
FileFormat.appendPartitionValues(
// This is a horrible erasure hack... if we type the iterator above, then it actually
// check the type in next() and we get a class cast exception. If we make that function
// return Object, then we can defer the cast until later!
iter.asInstanceOf[Iterator[InternalRow]],
fullSchema,
file.partitionValues)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{Sink, Source}
Expand Down Expand Up @@ -484,6 +485,17 @@ trait FileFormat {
}
}

private[sql] object FileFormat {
def appendPartitionValues(
rows: Iterator[InternalRow],
output: Seq[Attribute],
partitionValues: InternalRow): Iterator[InternalRow] = {
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(output, output)
rows.map { row => appendPartitionColumns(joinedRow(row, partitionValues)) }
}
}

/**
* A collection of data files from a partitioned relation, along with the partition values in the
* form of an [[InternalRow]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,10 @@ private[sql] class DefaultSource
file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
)

// Appends partition values
val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)

unsafeRowIterator.map { dataRow =>
appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
}
FileFormat.appendPartitionValues(
unsafeRowIterator,
dataSchema.toAttributes ++ partitionSchema.toAttributes,
file.partitionValues)
}
}
}
Expand Down