Skip to content

Commit 1563e03

Browse files
committed
Address comment.
1 parent 5fd215f commit 1563e03

File tree

3 files changed

+20
-35
lines changed

3 files changed

+20
-35
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,14 @@ class HadoopRDD[K, V](
217217
private val inputMetrics = context.taskMetrics().inputMetrics
218218
private val existingBytesRead = inputMetrics.bytesRead
219219

220+
// Sets InputFileBlockHolder for the file block's information
221+
split.inputSplit.value match {
222+
case fs: FileSplit =>
223+
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
224+
case _ =>
225+
InputFileBlockHolder.unset()
226+
}
227+
220228
// Find a function that will return the FileSystem bytes read by this thread. Do this before
221229
// creating RecordReader, because RecordReader's constructor might read some bytes
222230
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
@@ -255,23 +263,7 @@ class HadoopRDD[K, V](
255263
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
256264
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
257265

258-
private var setInputFileBlockHolder: Boolean = false
259-
260266
override def getNext(): (K, V) = {
261-
if (!setInputFileBlockHolder) {
262-
// Sets InputFileBlockHolder for the file block's information
263-
// We can't set it before consuming this iterator, otherwise some expressions which
264-
// use thread local variables will fail when working with Python UDF. That is because
265-
// the batch of Python UDF is running in individual thread.
266-
split.inputSplit.value match {
267-
case fs: FileSplit =>
268-
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
269-
case _ =>
270-
InputFileBlockHolder.unset()
271-
}
272-
setInputFileBlockHolder = true
273-
}
274-
275267
try {
276268
finished = !reader.next(key, value)
277269
} catch {

core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ private[spark] object InputFileBlockHolder {
4141
* The thread variable for the name of the current file being read. This is used by
4242
* the InputFileName function in Spark SQL.
4343
*/
44-
private[this] val inputBlock: ThreadLocal[FileBlock] = new ThreadLocal[FileBlock] {
45-
override protected def initialValue(): FileBlock = new FileBlock
46-
}
44+
private[this] val inputBlock: InheritableThreadLocal[FileBlock] =
45+
new InheritableThreadLocal[FileBlock] {
46+
override protected def initialValue(): FileBlock = new FileBlock
47+
}
4748

4849
/**
4950
* Returns the holding file name or empty string if it is unknown.

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ class NewHadoopRDD[K, V](
139139
private val inputMetrics = context.taskMetrics().inputMetrics
140140
private val existingBytesRead = inputMetrics.bytesRead
141141

142+
// Sets InputFileBlockHolder for the file block's information
143+
split.serializableHadoopSplit.value match {
144+
case fs: FileSplit =>
145+
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
146+
case _ =>
147+
InputFileBlockHolder.unset()
148+
}
149+
142150
// Find a function that will return the FileSystem bytes read by this thread. Do this before
143151
// creating RecordReader, because RecordReader's constructor might read some bytes
144152
private val getBytesReadCallback: Option[() => Long] =
@@ -209,23 +217,7 @@ class NewHadoopRDD[K, V](
209217
!finished
210218
}
211219

212-
private var setInputFileBlockHolder: Boolean = false
213-
214220
override def next(): (K, V) = {
215-
if (!setInputFileBlockHolder) {
216-
// Sets InputFileBlockHolder for the file block's information.
217-
// We can't set it before consuming this iterator, otherwise some expressions which
218-
// use thread local variables will fail when working with Python UDF. That is because
219-
// the batch of Python UDF is running in individual thread.
220-
split.serializableHadoopSplit.value match {
221-
case fs: FileSplit =>
222-
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
223-
case _ =>
224-
InputFileBlockHolder.unset()
225-
}
226-
setInputFileBlockHolder = true
227-
}
228-
229221
if (!hasNext) {
230222
throw new java.util.NoSuchElementException("End of stream")
231223
}

0 commit comments

Comments
 (0)