Skip to content

Conversation

@jinxing64
Copy link

@jinxing64 jinxing64 commented Feb 9, 2017

What changes were proposed in this pull request?

  1. Use a MedianHeap to record durations of successful tasks. When check speculatable tasks, we can get the median duration with O(1) time complexity.

  2. checkSpeculatableTasks will synchronize TaskSchedulerImpl. If checkSpeculatableTasks doesn't finish with 100ms, then the possibility exists for that thread to release and then immediately re-acquire the lock. Change scheduleAtFixedRate to be scheduleWithFixedDelay when call method of checkSpeculatableTasks.

How was this patch tested?

Added MedianHeapSuite.

@jinxing64
Copy link
Author

@kayousterhout @squito
Would you mind to take a look at this when have time ?

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to go from an O(n log n) operation at each call to checkSpeculatableTasks to an O(lg n) operation at the completion of each task? Just in terms of complexity, there isn't a clear benefit. And I pointed out that the implementation would need to be a little more complicated (duplicate times) -- enough that I'm not sure the actual performance will be better. I feel like this needs a benchmark.

If nothing else, your change to only look at running tasks is a good one which seems like a simple, clear improvement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can't be a set, since multiple tasks might be running for the same amount of time. you could change it to a set of (time, count) pairs, which would make the update logic a bit more complicated, or just keep a list of all runtimes.

also the name should indicate that its the durations, eg. successfulTaskDurations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are some other things you could do to make the original code more efficient:
a) instead of .values.filter.map, use a foreach to directly add into an ArrayBuffer. That will avoid all the intermediate collections that scala would create otherwise.
b) store an approximate distribution of the runtimes, eg. using a tdigest.

aside: what pain that there is no quick way to get the middle element of a TreeSet -- I couldn't find anything efficient in either the java or scala libs :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, good find, this alone looks like a worthwhile fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Echoing what Imran said -- I'm definitely +1 on merging this simple change. The other changes in this PR add a bunch of complexity, so I'd need to see measurements demonstrating a significant improvement in performance to be convinced that we should merge them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout
Thanks a lot for your comments :)
I will keep this simple change in this pr. For time complexity improvement, I will make another pr and try add some measurements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito @kayousterhout @jinxing64 Just to add, the other change does look interesting - and I can definitely see potential value in it. Would be good to see actual impact of it.
For example, when I was running jobs with 200k - 400k tasks, this never came up (though probably my config's were different from yours) - would be good to see actual impact of the other changes.

@squito
Copy link
Contributor

squito commented Feb 21, 2017

Jenkins, ok to test

@SparkQA
Copy link

SparkQA commented Feb 21, 2017

Test build #73230 has finished for PR 16867 at commit 1169d11.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64 jinxing64 changed the title [SPARK-16929] Improve performance when check speculatable tasks. [WIP][SPARK-16929] Improve performance when check speculatable tasks. Feb 24, 2017
@jinxing64
Copy link
Author

jinxing64 commented Feb 27, 2017

@squito
Thanks a lot for your comments : )

When check speculatable tasks in TaskSetManager, current code scan all task infos and sort durations of successful tasks in O(NlogN) time complexity.

checkSpeculatableTasks is scheduled every 100ms by scheduleAtFixedRate(not scheduleWithFixedDelay ), thus checkSpeculatableTasks can be called more often than every 100ms. In my cluster(yarn-cluster mode), if size of the task set is over 300000 and the driver is running on some machine with poor cpu performance, the Arrays.sort can take over than 100ms easily. Since checkSpeculatableTasks will synchronize TaskSchedulerImpl, I suspect that's why my driver hang.

I get the median duration by TreeSet.slice, which comes from IterableLike and cannot jump to the mid position unluckily. The time complexity is O(n/2) in this pr.
I can hack into TreeSet by reflection and get the median, but I don't want to do that, I think that is harmful for code clarity.

successfulTaskIdsSet stores the task's ids. I override ordering in TreeSet. Two successful tasks with same duration will both be stored in successfulTaskIdsSet.

@jinxing64 jinxing64 changed the title [WIP][SPARK-16929] Improve performance when check speculatable tasks. [SPARK-16929] Improve performance when check speculatable tasks. Feb 27, 2017
Copy link
Contributor

@mridulm mridulm Feb 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskInfos(<slice_output_above>).duration

@SparkQA
Copy link

SparkQA commented Feb 27, 2017

Test build #73514 has finished for PR 16867 at commit 69c2db2.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kayousterhout
Copy link
Contributor

This looks like a real test failure resulting from this change

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73562 has finished for PR 16867 at commit f28d900.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73563 has finished for PR 16867 at commit cd16008.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73575 has started for PR 16867 at commit 9778b67.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73580 has finished for PR 16867 at commit 0abcceb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Feb 28, 2017

@jinxing64 thanks for updating this to be just the simpler fix. Since the original jira has a bit of a longer discussion on it, do you mind opening a new jira for this change, and linking it to the other one?

Then we can continue discussion of your other change when you have a new pr, still on SPARK-16929

@squito
Copy link
Contributor

squito commented Feb 28, 2017

other than a bit of jira re-organization, lgtm

@kayousterhout
Copy link
Contributor

LGTM and @squito's JIRA re-reorging sounds perfect

@jinxing64
Copy link
Author

jinxing64 commented Mar 1, 2017

@kayousterhout @squito
It's great to open a new jira for the simple change. Please take a look at #17111.

@jinxing64 jinxing64 force-pushed the SPARK-16929 branch 2 times, most recently from 6825bd7 to 4f2fc48 Compare March 1, 2017 04:12
@jinxing64
Copy link
Author

jinxing64 commented Mar 1, 2017

@kayousterhout @squito @mridulm

I added a measurement for this pr in #17112 . Results are as below, newAlgorithm indicates whether we use TreeSet to get the median duration or not. And time cost is the time used when get the median duration of the successful tasks. I did the measurement several times. tasksNum indicates how many tasks in the TaskSetManager.
If tasksNum=1000:

newAlgorithm time cost
false 5ms, 3ms, 4ms, 3ms, 3ms
true 2ms, 4ms, 2ms, 2ms, 3ms

if tasksNum=100000:

newAlgorithm time cost
false 107ms, 109ms, 103ms, 100ms, 107ms
true 17ms, 14ms, 14ms, 13ms, 14ms

if tasksNum=150000:

newAlgorithm time cost
false 133ms, 146ms, 127ms, 163ms, 114ms
true 14ms, 13ms, 15ms, 16ms, 14ms

As we can see, new algorithm(TreeSet) has better performance than old algorithm(Arrays.sort). When tasksNum=100000, Arrays.sort costs over 100ms every time, while in new algorithm all below 20ms.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73658 has finished for PR 16867 at commit 6825bd7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73660 has finished for PR 16867 at commit 4f2fc48.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kayousterhout
Copy link
Contributor

I'm a little on the fence about this because of the added complexity, but it does seem to be a significant time improvement. Did you consider implementing this as a median heap (see the last post here: http://stackoverflow.com/questions/15319561/how-to-implement-a-median-heap/15319593). As long as this is solely to improve performance, it seems like we should do the most efficient implementation, and the heap implementation is O(1) for all operations (whereas this is logN to insert and I think similar to do the slice).

@kayousterhout
Copy link
Contributor

Also, thanks for doing the timing measurements!

@SparkQA
Copy link

SparkQA commented Mar 16, 2017

Test build #74640 has finished for PR 16867 at commit 104e867.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 16, 2017

Test build #74649 has finished for PR 16867 at commit 2518a95.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

} catch {
case e: NoSuchElementException =>
valid = true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalatest has a simpler pattern for this:

intercept[NoSuchElementException] {
  medianHeap.median
}

http://www.scalatest.org/user_guide/using_assertions

(I guess you could use assertThrows in this case, but I tend to always use intercept since it also lets you inspect the thrown exception.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the recommendation :)

assert(medianHeap.median === (array(4)))
}

test("Size of Median should be correct though there are duplicated numbers inside.") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove "Size of" from the name, this is testing more than the size.

Arrays.sort(array)
assert(medianHeap.size === 10)
assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know Kay asked for tests with a some hardcoded data, but I think these tests are too simplistic. All of these tests insert data in order, and none have significant skew.

Can you add a test case which does something like:

  1. inserts 10 elements with the same value (eg. 5), check the median
  2. insert 100 elements with a larger value (eg 10) check the median
  3. insert 1000 elements with an even smaller value (eg 0), check the median

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added this change.

@SparkQA
Copy link

SparkQA commented Mar 16, 2017

Test build #74673 has finished for PR 16867 at commit 7740d77.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 16, 2017

Test build #74674 has finished for PR 16867 at commit c13a198.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 16, 2017

Test build #74675 has finished for PR 16867 at commit 617d5aa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Mar 17, 2017

@jinxing64 would you mind repeating your performance experiments with the lastest version? Both for checkSpeculatableTasks and also for inserting the duration on each task completion?

@jinxing64
Copy link
Author

jinxing64 commented Mar 18, 2017

@squito
Sure. I did test 5 times for 100k tasks. The results are as below:

time cost
insert 135ms, 122ms, 119ms, 120ms, 163ms
checkSpeculatableTasks 6ms, 6ms, 6ms, 5ms, 6ms

@mridulm
Copy link
Contributor

mridulm commented Mar 18, 2017

LGTM @kayousterhout , @squito.

@jinxing64
Copy link
Author

@mridulm
Thanks a lot for helping review this : ) really appreciate.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, very minor suggestions (which might just be my personal style preferences ...)


// Stores all the numbers less than the current median in a smallerHalf,
// i.e median is the maximum, at the root
private[this] var smallerHalf = PriorityQueue.empty[Double](ord)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very minor -- could you make this comment a doc with /**? Even though its private, I find that helpful as that is useful in IDEs where they'll show this text w/ a hover on a reference

* return the average of the two top values of heaps. Otherwise we return the top of the
* heap which has one more element.
*/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: delete this blank line

}

// Returns the median of the numbers.
def median: Double = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I find comments which basically just restate the method name to be pretty pointless. I'd only include them if they add something else, eg. preconditions, or complexity, etc. Mostly I'd say they're not necessary for any of the methods here.

@jinxing64
Copy link
Author

@squito
Thanks :) already refined.

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74858 has started for PR 16867 at commit 36c205a.

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74870 has finished for PR 16867 at commit 5192a32.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@kayousterhout more comments?

Copy link
Contributor

@kayousterhout kayousterhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @jinxing64! I left a few last very minor comments for the tests.

}

test("Median should be correct though there are duplicated numbers inside.") {
val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this to something like:

Array(0, 0, 1, 1, 2, 3, 4)?

Otherwise the median heap could be handling the duplicates wrong (e.g., by not actually inserting duplicates), and the assertion at the bottom would still old. Then the check at the end can be medianHeap.median === 1.

val medianHeap = new MedianHeap()
array.foreach(medianHeap.insert(_))
assert(medianHeap.size() === 10)
assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of indexing into the array, I think it would be clearer here to just hard-code 4.5 (it's easier to see that the median is 4.5 than to reason about the indices in the array)


package org.apache.spark.util.collection

import java.util.Arrays
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: can you combine these into one import (import java.util.{Arrays, NoSuchElementException})

val medianHeap = new MedianHeap()
array.foreach(medianHeap.insert(_))
assert(medianHeap.size() === 9)
assert(medianHeap.median === (array(4)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly here -- just medianHeap.median === 4

assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
}

test("Median should be correct when skew situations.") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"when skew situations" --> "when input data is skewed"

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75137 has finished for PR 16867 at commit b9bdf44.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@kayousterhout
Thanks a lot for comments. I refined accordingly :)

@kayousterhout
Copy link
Contributor

Merged this to master -- thanks for all of the quick updates here @jinxing64!

@asfgit asfgit closed this in 19596c2 Mar 24, 2017
@jinxing64
Copy link
Author

@kayousterhout @squito @mridulm
Thanks for reviewing this !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants