Skip to content

Commit 972171b

Browse files
committed
Merge pull request alteryx#197 from aarondav/patrick-fix
Fix 'timeWriting' stat for shuffle files Due to concurrent git branches, changes from shuffle file consolidation patch caused the shuffle write timing patch to no longer actually measure the time, since it requires time be measured after the stream has been closed.
2 parents 718cc80 + ccea38b commit 972171b

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ private[spark] class ShuffleMapTask(
167167
var totalTime = 0L
168168
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
169169
writer.commit()
170+
writer.close()
170171
val size = writer.fileSegment().length
171172
totalBytes += size
172173
totalTime += writer.timeWriting()
@@ -184,14 +185,16 @@ private[spark] class ShuffleMapTask(
184185
} catch { case e: Exception =>
185186
// If there is an exception from running the task, revert the partial writes
186187
// and throw the exception upstream to Spark.
187-
if (shuffle != null) {
188-
shuffle.writers.foreach(_.revertPartialWrites())
188+
if (shuffle != null && shuffle.writers != null) {
189+
for (writer <- shuffle.writers) {
190+
writer.revertPartialWrites()
191+
writer.close()
192+
}
189193
}
190194
throw e
191195
} finally {
192196
// Release the writers back to the shuffle block manager.
193197
if (shuffle != null && shuffle.writers != null) {
194-
shuffle.writers.foreach(_.close())
195198
shuffle.releaseWriters(success)
196199
}
197200
// Execute the callbacks on task completion.

0 commit comments

Comments
 (0)