Skip to content

Conversation

@andrewor14
Copy link
Contributor

Symptom. If an executor in an application times out, HeartbeatReceiver attempts to kill it. After this happens, however, the application never gets an executor back even when there are cluster resources available.

Cause. The issue is that sc.killExecutor automatically assumes that the application wishes to adjust its resource requirements permanently downwards. This is not the intention in HeartbeatReceiver, however, which simply wants a replacement for the expired executor.

Fix. Differentiate between the intention to kill and the intention to replace an executor with a fresh one. More details can be found in the commit message.

@andrewor14
Copy link
Contributor Author

This is an alternative to #6662. @vanzin @SaintBacchus

@andrewor14 andrewor14 force-pushed the heartbeat-no-kill branch 2 times, most recently from d9c9336 to 18b110e Compare June 30, 2015 02:10
@SparkQA
Copy link

SparkQA commented Jun 30, 2015

Test build #36067 has finished for PR 7107 at commit 18b110e.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 30, 2015

Test build #36070 has finished for PR 7107 at commit 18b110e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14 andrewor14 changed the title [SPARK-8119] HeartbeatReceiver should replace executors, not kill [SPARK-8119] [WIP] HeartbeatReceiver should replace executors, not kill Jun 30, 2015
@andrewor14
Copy link
Contributor Author

retest this please

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps instead of adding this method, add a boolean flag to killExecutors above?

(Or, because of ExecutorAllocationClient, create a private method that takes the list of executors and a boolean saying whether to update the target executor count, and call it from both of these methods.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to do that but it technically changes public API. The methods in ExecutorAllocationClient are actually public because SparkContext extends it, so we actually can't change the signature there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but you can follow the second suggestion (private method called by the other two).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sounds good. I didn't do it initially because there isn't really a great name for the private method (we already have doKillExecutors). I'll figure something out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case you haven't done it yet, you might be able to just call this method from killExecutors.

@vanzin
Copy link
Contributor

vanzin commented Jun 30, 2015

Approach looks sane, it just feels like both implementations could share more code.

@SparkQA
Copy link

SparkQA commented Jun 30, 2015

Test build #36183 timed out for PR 7107 at commit 18b110e after a configured wait of 175m.

@andrewor14
Copy link
Contributor Author

This is awaiting the clean up in HeartbeatReceiverSuite in #7173.

@andrewor14
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37276 has finished for PR 7107 at commit 18b110e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 If CoarseGrainedSchedulerBackend.removeExecutor was happened before HeartbeatReceiver.killExecutorThread executorsPendingToRemove will have a zombie executorId
and the newTotal will got wrong number,would you like add some check in there ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we already called removeExecutor then executorDataMap will not contain executorId, so we won't reach this if case. I believe the scenario you described cannot happen.

Andrew Or added 2 commits July 16, 2015 14:15
The issue is that sc.killExecutors automatically assumes that
the application wishes to adjust its resource requirements
permanently downwards. This is not the intention in
HeartbeatReceiver, however, which simply wants a replacement
for the expired executor.
@andrewor14 andrewor14 changed the title [SPARK-8119] [WIP] HeartbeatReceiver should replace executors, not kill [SPARK-8119] HeartbeatReceiver should replace executors, not kill Jul 16, 2015
This issue is actually somewhat difficult to test. There are many
components in the picture and each one has to be mocked out. Many
of the executor allocation requests are also done asynchronously
so we need to find ways to block on these requests.
@andrewor14
Copy link
Contributor Author

@vanzin I updated this with your suggestion.

@vanzin
Copy link
Contributor

vanzin commented Jul 16, 2015

LGTM.

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37542 has finished for PR 7107 at commit 1cd2cd7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute
    • abstract class Star extends LeafExpression with NamedExpression
    • case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression
    • case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression
    • trait AggregateExpression extends Expression
    • trait PartialAggregate extends AggregateExpression
    • case class Min(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Max(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Count(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Average(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Sum(child: Expression) extends UnaryExpression with PartialAggregate
    • case class SumDistinct(child: Expression) extends UnaryExpression with PartialAggregate
    • case class First(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Last(child: Expression) extends UnaryExpression with PartialAggregate
    • trait Generator extends Expression
    • case class Explode(child: Expression) extends UnaryExpression with Generator
    • trait NamedExpression extends Expression
    • abstract class Attribute extends LeafExpression with NamedExpression
    • case class PrettyAttribute(name: String) extends Attribute
    • abstract class LeafNode extends LogicalPlan
    • abstract class UnaryNode extends LogicalPlan

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #1088 has finished for PR 7107 at commit 1cd2cd7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute
    • abstract class Star extends LeafExpression with NamedExpression
    • case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression
    • case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression
    • trait AggregateExpression extends Expression
    • trait PartialAggregate extends AggregateExpression
    • case class Min(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Max(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Count(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Average(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Sum(child: Expression) extends UnaryExpression with PartialAggregate
    • case class SumDistinct(child: Expression) extends UnaryExpression with PartialAggregate
    • case class First(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Last(child: Expression) extends UnaryExpression with PartialAggregate
    • trait Generator extends Expression
    • case class Explode(child: Expression) extends UnaryExpression with Generator
    • trait NamedExpression extends Expression
    • abstract class Attribute extends LeafExpression with NamedExpression
    • case class PrettyAttribute(name: String) extends Attribute
    • abstract class LeafNode extends LogicalPlan
    • abstract class UnaryNode extends LogicalPlan

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #1092 has finished for PR 7107 at commit 1cd2cd7.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
    • class RFormula(override val uid: String)
    • case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute
    • abstract class Star extends LeafExpression with NamedExpression
    • case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression
    • abstract class LeafExpression extends Expression
    • abstract class UnaryExpression extends Expression
    • abstract class BinaryExpression extends Expression
    • case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression
    • trait AggregateExpression extends Expression
    • trait PartialAggregate extends AggregateExpression
    • case class Min(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Max(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Count(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Average(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Sum(child: Expression) extends UnaryExpression with PartialAggregate
    • case class SumDistinct(child: Expression) extends UnaryExpression with PartialAggregate
    • case class First(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Last(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Pmod(left: Expression, right: Expression) extends BinaryArithmetic
    • final class SpecificRow extends $
    • trait Generator extends Expression
    • case class Explode(child: Expression) extends UnaryExpression with Generator
    • trait NamedExpression extends Expression
    • abstract class Attribute extends LeafExpression with NamedExpression
    • case class PrettyAttribute(name: String) extends Attribute
    • case class Length(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class FormatNumber(x: Expression, d: Expression)
    • abstract class LeafNode extends LogicalPlan
    • abstract class UnaryNode extends LogicalPlan
    • abstract class BinaryNode extends LogicalPlan

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #1090 has finished for PR 7107 at commit 1cd2cd7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute
    • abstract class Star extends LeafExpression with NamedExpression
    • case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression
    • case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression
    • trait AggregateExpression extends Expression
    • trait PartialAggregate extends AggregateExpression
    • case class Min(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Max(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Count(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Average(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Sum(child: Expression) extends UnaryExpression with PartialAggregate
    • case class SumDistinct(child: Expression) extends UnaryExpression with PartialAggregate
    • case class First(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Last(child: Expression) extends UnaryExpression with PartialAggregate
    • trait Generator extends Expression
    • case class Explode(child: Expression) extends UnaryExpression with Generator
    • trait NamedExpression extends Expression
    • abstract class Attribute extends LeafExpression with NamedExpression
    • case class PrettyAttribute(name: String) extends Attribute
    • case class Length(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class FormatNumber(x: Expression, d: Expression)
    • abstract class LeafNode extends LogicalPlan
    • abstract class UnaryNode extends LogicalPlan

@xuchenCN
Copy link
Contributor

+1 LGTM

@andrewor14
Copy link
Contributor Author

Thanks @xuchenCN and @vanzin. I'm merging this into master. A backport for 1.4.2 will come later.

@asfgit asfgit closed this in 96aa334 Jul 17, 2015
@andrewor14 andrewor14 deleted the heartbeat-no-kill branch July 17, 2015 03:15
mingyukim pushed a commit to palantir/spark that referenced this pull request Jul 23, 2015
**Symptom.** If an executor in an application times out, `HeartbeatReceiver` attempts to kill it. After this happens, however, the application never gets an executor back even when there are cluster resources available.

**Cause.** The issue is that `sc.killExecutor` automatically assumes that the application wishes to adjust its resource requirements permanently downwards. This is not the intention in `HeartbeatReceiver`, however, which simply wants a replacement for the expired executor.

**Fix.** Differentiate between the intention to kill and the intention to replace an executor with a fresh one. More details can be found in the commit message.

Author: Andrew Or <andrew@databricks.com>

Closes apache#7107 from andrewor14/heartbeat-no-kill and squashes the following commits:

1cd2cd7 [Andrew Or] Add regression test for SPARK-8119
25a347d [Andrew Or] Reuse more code in scheduler backend
31ebd40 [Andrew Or] Differentiate between kill and replace

Conflicts:
	core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala

MKIM: It was not trivial to resolve the conflict
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants