Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ class TaskMetrics private[spark] () extends Serializable {

/**
* Storage statuses of any blocks that have been updated as a result of this task.
*
* Tracking the _updatedBlockStatuses can use a lot of memory.
* It is not used anywhere inside of Spark so we would ideally remove it, but its exposed to
* the user in SparkListenerTaskEnd so the api is kept for compatibility.
* Tracking can be turned off to save memory via config
* TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES.
*/
def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = {
// This is called on driver. All accumulator updates have a fixed value. So it's safe to use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,12 @@ package object config {
"above this threshold. This is to avoid a giant request takes too much memory.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("200m")

private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES =
ConfigBuilder("spark.taskMetrics.trackUpdatedBlockStatuses")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we want to document this somewhere? If so what do you suggest there isn't any other config quite like this right now. Could either put in configuration doc or monitoring doc.

Copy link
Contributor

@cloud-fan cloud-fan Jun 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can document it in configuration.md.

Actually can we turn it off by default? I think this is feature is useless for most of the users. cc @JoshRosen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right this is why I originally had this off by default and you requested it on. I will turn it back on and if a user finds they need it because they somehow extended the class they can turn it on. Turning this off will be most beneficial for the majority of users

.doc("Enable tracking of updatedBlockStatuses in the TaskMetrics. Off by default since " +
"tracking the block statuses can use a lot of memory and its not used anywhere within " +
"spark.")
.booleanConf
.createWithDefault(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -1473,8 +1473,10 @@ private[spark] class BlockManager(
}

private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
if (conf.get(config.TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES)) {
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,8 +922,38 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}

test("turn off updated block statuses") {
val conf = new SparkConf()
conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, false)
store = makeBlockManager(12000, testConf = Some(conf))

store.registerTask(0)
val list = List.fill(2)(new Array[Byte](2000))

def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = {
val context = TaskContext.empty()
try {
TaskContext.setTaskContext(context)
task
} finally {
TaskContext.unset()
}
context.taskMetrics.updatedBlockStatuses
}

// 1 updated block (i.e. list1)
val updatedBlocks1 = getUpdatedBlocks {
store.putIterator(
"list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks1.size === 0)
}


test("updated block statuses") {
store = makeBlockManager(12000)
val conf = new SparkConf()
conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, true)
store = makeBlockManager(12000, testConf = Some(conf))
store.registerTask(0)
val list = List.fill(2)(new Array[Byte](2000))
val bigList = List.fill(8)(new Array[Byte](2000))
Expand Down