diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index a3ce3d1ccc5e3..f557cce3ed3df 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -112,6 +112,12 @@ class TaskMetrics private[spark] () extends Serializable { /** * Storage statuses of any blocks that have been updated as a result of this task. + * + * Tracking the _updatedBlockStatuses can use a lot of memory. + * It is not used anywhere inside of Spark so we would ideally remove it, but its exposed to + * the user in SparkListenerTaskEnd so the api is kept for compatibility. + * Tracking can be turned off to save memory via config + * TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES. */ def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = { // This is called on driver. All accumulator updates have a fixed value. So it's safe to use diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 615497d36fd14..462c1890fd8df 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -322,4 +322,12 @@ package object config { "above this threshold. This is to avoid a giant request takes too much memory.") .bytesConf(ByteUnit.BYTE) .createWithDefaultString("200m") + + private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES = + ConfigBuilder("spark.taskMetrics.trackUpdatedBlockStatuses") + .doc("Enable tracking of updatedBlockStatuses in the TaskMetrics. Off by default since " + + "tracking the block statuses can use a lot of memory and its not used anywhere within " + + "spark.") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 74be70348305c..adbe3cfd89ea6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1473,8 +1473,10 @@ private[spark] class BlockManager( } private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = { - Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(blockId -> status) + if (conf.get(config.TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES)) { + Option(TaskContext.get()).foreach { c => + c.taskMetrics().incUpdatedBlockStatuses(blockId -> status) + } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 88f18294aa015..086adccea954c 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -922,8 +922,38 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } + test("turn off updated block statuses") { + val conf = new SparkConf() + conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, false) + store = makeBlockManager(12000, testConf = Some(conf)) + + store.registerTask(0) + val list = List.fill(2)(new Array[Byte](2000)) + + def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = { + val context = TaskContext.empty() + try { + TaskContext.setTaskContext(context) + task + } finally { + TaskContext.unset() + } + context.taskMetrics.updatedBlockStatuses + } + + // 1 updated block (i.e. list1) + val updatedBlocks1 = getUpdatedBlocks { + store.putIterator( + "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } + assert(updatedBlocks1.size === 0) + } + + test("updated block statuses") { - store = makeBlockManager(12000) + val conf = new SparkConf() + conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, true) + store = makeBlockManager(12000, testConf = Some(conf)) store.registerTask(0) val list = List.fill(2)(new Array[Byte](2000)) val bigList = List.fill(8)(new Array[Byte](2000))