-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-14654][CORE] New accumulator API #12612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #56704 has finished for PR 12612 at commit
|
| } | ||
|
|
||
|
|
||
| class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One difference from the previous API: we can't have a general setValue method, as it needs the intermedia type which is not exposed by the new API. For example, AverageAccumulator doesn't have setValue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think getting rid of setValue is great, in the consistent accumulators based under the old API I had to just throw an exception if people were using setValue
|
The |
|
Test build #56848 has finished for PR 12612 at commit
|
|
Test build #56850 has finished for PR 12612 at commit
|
|
cc @rxin , several questions need to be discussed:
|
|
Test build #56860 has finished for PR 12612 at commit
|
|
Test build #56861 has finished for PR 12612 at commit
|
d4cc938 to
38cb9a1
Compare
|
Test build #56862 has finished for PR 12612 at commit
|
|
Test build #56863 has finished for PR 12612 at commit
|
| metrics.internalAccums.find(_.name == accum.name).foreach(_.setValueAny(accum.update.get)) | ||
| definedAccumUpdates.filter(_.internal).foreach { accInfo => | ||
| metrics.internalAccums.find(_.name == accInfo.name).foreach { acc => | ||
| acc.asInstanceOf[Accumulator[Any, Any]].add(accInfo.update.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an example shows a weakness of the new API: we can't setValue. For this example, we have the final output and we wanna set the value of accumulator so that it can produce the same output. With the new API, we can't guarantee that all accumulators can implement setValue, e.g. the average accumulator. I'm still thinking about how to fix it or work around it, @rxin any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just have a reset and "add"?
I'd argue it doesn't make sense to call setValue, since "set" action is not algebraic (i.e. you cannot compose/merge set operations).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually i don't think we need this if we send accumulators back to the driver.
|
Test build #56900 has finished for PR 12612 at commit
|
| name: Option[String], | ||
| countFailedValues: Boolean) extends Serializable | ||
|
|
||
| trait UpdatedValue extends Serializable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @rxin , I didn't send the accumulator back for a serialization problem.
Basically when we send accumulator from driver to executors, we don't want to send its current value(think about list accumulator, we definitely don't wanna send the current list to executors.).
But when we send accumulator from executors to driver, we do need to send the current value.
One possible solution is to have 2 local variables for each accumulator, one for driver, one for executors. But it's a lot of trouble when accumulators have complex intermedia type, e.g. average accumulator. So I end up with this apporach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another potential problem with sending Accumulators over the wire, with the proposed API from the JIRA, is that the Accumulators register them selves inside of readObject.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Why can't we send the current list? The current list as far as I understand will always be zero sized? We can just create a copy of the accumulator for sending to the executors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the accumulator was used in two separate tasks it could have built up some values from the first task in the driver before the second task. But always sending a zeroed copy to the executor would be an OK solution to that.
|
Test build #56981 has finished for PR 12612 at commit
|
|
Test build #57000 has finished for PR 12612 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really a big problem...
We need some serialization hooks to support sending accumulator back from executors, and I tried 2 approaches but both failed:
- Add a writing hook, which resets the accumulator before send it from driver to executor. The problem is we can't just reset, the accumulator states should be kept at driver side. And the java serializing hook isn't flex enough to allow us do a copy or something. One possible workaround is to create an
AccumulatorWrapperso that we can have full control of accumulator serialization. But this will complicate the hierarchy. - Add a reading hook, which resets the accumlator after deserialization. Unfortunately it doesn't work when
Accumulatoris a base class. By the timereadObjectis called, child's fields are not initialized yet. Callingresethere is no-op, the values of child's fileds will be filled later.
Generally speaking, writeObject and readObject is not a good serialization hook. We'd either figure out some tricky to workaround it, or find out other better serialization hooks. (or do not send accumulators back)
@rxin any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed offline, writeReplace
| assert(acc.value === Seq(9, 10)) | ||
| } | ||
|
|
||
| test("value is reset on the executors") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is covered by the new test accumulator serialization
| assert(newUpdates.size === tm.internalAccums.size + 4) | ||
| } | ||
|
|
||
| test("from accumulator updates") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is not valid anymore. TaskMetrics.fromAccumulatorUpdates will return a task metrics only containing internal accumulators, no need to worry about unregistered external accumulators.
|
Test build #57130 has finished for PR 12612 at commit
|
|
Test build #57132 has finished for PR 12612 at commit
|
|
Test build #57135 has finished for PR 12612 at commit
|
| /** | ||
| * The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of | ||
| * type `OUT`. Implementations must define following methods: | ||
| * - isZero: tell if this accumulator is zero value or not. e.g. for a counter accumulator, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these should be javadoc of the methods, rather than in the classdoc
|
This looks pretty good to me. We should get it to pass tests and then merge it asap. Some of the comments can be addressed later. |
| def localValue: OUT | ||
|
|
||
| // Called by Java when serializing an object | ||
| final protected def writeReplace(): Any = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be private, however, this hook won't be called if it's private, not sure why, so I use final protected to work around it.
|
Test build #57220 has finished for PR 12612 at commit
|
|
Merging in master! |
| if (atDriverSide) { | ||
| if (!isRegistered) { | ||
| throw new UnsupportedOperationException( | ||
| "Accumulator must be registered before send to executor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I'm getting intermittent, but regular, test failures in ALSSuite (not sure if there might be others, this just happens to be something I'm working on now).
e.g.
[info] - exact rank-1 matrix *** FAILED *** (4 seconds, 397 milliseconds)
[info] org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 74, not attempting to retry it. Exception during serialization: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
[info] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1448)
[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1436)
[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1435)
[info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1435)
[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:809)
[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:809)
[info] at scala.Option.foreach(Option.scala:257)
[info] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:809)
[info] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1657)
[info] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1616)
[info] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
[info] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[info] at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
[info] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
[info] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1936)
[info] at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:970)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[info] at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
[info] at org.apache.spark.rdd.RDD.reduce(RDD.scala:952)
[info] at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$stats$1.apply(DoubleRDDFunctions.scala:42)
[info] at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$stats$1.apply(DoubleRDDFunctions.scala:42)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[info] at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
[info] at org.apache.spark.rdd.DoubleRDDFunctions.stats(DoubleRDDFunctions.scala:41)
[info] at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$mean$1.apply$mcD$sp(DoubleRDDFunctions.scala:47)
[info] at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$mean$1.apply(DoubleRDDFunctions.scala:47)
[info] at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$mean$1.apply(DoubleRDDFunctions.scala:47)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[info] at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
[info] at org.apache.spark.rdd.DoubleRDDFunctions.mean(DoubleRDDFunctions.scala:46)
[info] at org.apache.spark.ml.recommendation.ALSSuite.testALS(ALSSuite.scala:373)
[info] at org.apache.spark.ml.recommendation.ALSSuite$$anonfun$12.apply$mcV$sp(ALSSuite.scala:385)
[info] at org.apache.spark.ml.recommendation.ALSSuite$$anonfun$12.apply(ALSSuite.scala:383)
[info] at org.apache.spark.ml.recommendation.ALSSuite$$anonfun$12.apply(ALSSuite.scala:383)
[info] at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
[info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:56)
[info] at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
[info] at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
[info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
[info] at scala.collection.immutable.List.foreach(List.scala:381)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
[info] at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
[info] at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
[info] at org.scalatest.Suite$class.run(Suite.scala:1424)
[info] at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
[info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
[info] at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
[info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:28)
[info] at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
[info] at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:28)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:357)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:502)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info] at java.lang.Thread.run(Thread.java:745)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also found some tests failed because of this indeterminately, looking into it.
|
Shall we revert this commit? Got some similar errors: Code: sc.parallelize(1 until 100, 1).map { i => Array.fill(1e7.toInt)(1.0) }.count()The job succeeded but error messages got emitted to Spark shell: |
|
Created https://issues.apache.org/jira/browse/SPARK-15010 for the reported issue. |
| taskContext.registerAccumulator(this) | ||
| } | ||
| } else { | ||
| atDriverSide = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this assignment needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the accumulator is sent back from executor to driver, we should set the atDriverSide flag.
What changes were proposed in this pull request?
This PR introduces a new accumulator API which is much simpler than before:
AccumulatorclassinitialValueandzeroValueconcepts into just one concept:zeroValueregistermethod, the accumulator registration and cleanup registration are combined.id,nameandcountFailedValuesare combined into anAccumulatorMetadata, and is provided during registration.SQLMetricis a good example to show the simplicity of this new API.What we break:
setValueanymore. In the new API, the intermedia type can be different from the result type, it's very hard to implement a generalsetValueProblems need to be addressed in follow-ups:
AccumulatorInfodoesn't make a lot of sense, the partial output is not partial updates, we need to expose the intermediate value.ExceptionFailureshould not carry the accumulator updates. Why do users care about accumulator updates for failed cases? It looks like we only use this feature to update the internal metrics, how about we sending a heartbeat to update internal metrics after the failure event?SparkListenerTaskEndcarries aTaskMetrics. Ideally thisTaskMetricsdon't need to carry external accumulators, as the only method ofTaskMetricsthat can access external accumulators isprivate[spark]. However,SQLListeneruse it to retrieve sql metrics.How was this patch tested?
existing tests