Skip to content

Conversation

@XuTingjun
Copy link
Contributor

No description provided.

@XuTingjun XuTingjun changed the title When tasks failed and append new ones, post SparkListenerTaskResubmit event When tasks failed and append new ones, post SparkListenerTaskResubmit event to ExecutorAllocationManager Jun 15, 2015
@XuTingjun XuTingjun changed the title When tasks failed and append new ones, post SparkListenerTaskResubmit event to ExecutorAllocationManager [SPARK-8366] When tasks failed and append new ones, post SparkListenerTaskResubmit event to ExecutorAllocationManager Jun 15, 2015
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. when the task fails, a new one will append, so I post a resubmit event here to let the ExecutorAllocationManager know.
  2. if the numFailures is bigger than maxTaskFailures, not need to append new a task and submit it

@XuTingjun
Copy link
Contributor Author

@sryza @andrewor14 Can you have a look?

@squito
Copy link
Contributor

squito commented Jun 15, 2015

err, sorry @XuTingjun I was looking at an old version of the page, I deleted my comment about the jira

@andrewor14
Copy link
Contributor

Hi @XuTingjun I'll look at this today if not tomorrow thanks for the ping.

@andrewor14
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jun 18, 2015

Test build #35170 has finished for PR 6817 at commit 51e158c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SparkListenerTaskResubmit(stageId: Int) extends SparkListenerEvent

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35207 has finished for PR 6817 at commit c6609a2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SparkListenerTaskResubmit(stageId: Int) extends SparkListenerEvent

@XuTingjun
Copy link
Contributor Author

I think this patch has no association with the failed unit tests, please retest.

@XuTingjun
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 23, 2015

Test build #35506 has finished for PR 6817 at commit c6609a2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SparkListenerTaskResubmit(stageId: Int) extends SparkListenerEvent

@XuTingjun
Copy link
Contributor Author

@andrewor14

@andrewor14
Copy link
Contributor

@XuTingjun From the description, I'm still having a hard time trying to understand what the symptom is. From the JIRA:

I use the dynamic executor allocation function. Then one executor is killed, all the tasks on it are failed. When the new tasks are appended, the new executor won't added.

What do you mean won't add? Are the resubmitted tasks not being run on the new executor? Or are we not requesting new executors? Could you give a detailed example of a case when this happens?

@andrewor14
Copy link
Contributor

Also, echoing @squito's comments, I also don't really see why a new TaskResubmit event is necessary. If a task is resubmitted, a new SparkListenerTaskStart event will be posted with a new stage attempt ID, so we already get notified of that when it happens. Right?

@XuTingjun
Copy link
Contributor Author

@andrewor14, sorry for my pool English. The problem is :
when a executor losts, the running tasks on it will be failed, and post a SparkListenerTaskEnd. Until reach maxTaskFailures, the failed tasks will re-run with a new task id. Yeah, the new task will post a SparkListenerTaskStart, and the stageIdToTaskIndices will add. But the total task num only set when StageSubmitted. so the numTasksScheduled == numTasksTotal won't be accessed, and pending tasks calculation will be wrong.

@XuTingjun
Copy link
Contributor Author

@andrewor14, Have you understood the problem?

  def totalPendingTasks(): Int = {
      stageIdToNumTasks.map { case (stageId, numTasks) =>
        numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
      }.sum
    }

The new attempt tasks will count into stageIdToTaskIndices, but not stageIdToNumTasks. So the new attempt tasks will not count into PendingTasks and maxNumExecutorsNeeded.

@XuTingjun
Copy link
Contributor Author

@andrewor14 , Sorry to bother you again. I think it's really a bug, wish you have a look again, thanks!

@andrewor14
Copy link
Contributor

Hi @XuTingjun sorry for slipping. I'll have another look at this tomorrow.

@andrewor14
Copy link
Contributor

@XuTingjun I dug into the scheduler code a little. When a task is resubmitted, it uses a new task ID, but not a new task index. To calculate the number of pending tasks, we use the task index, not the task ID. Therefore, it should handle resubmit correctly since task indices are the same across multiple attempts of the same task.

Could you clarify what the resulting behavior of this bug is? It will be useful to describe the symptoms without referring to the low-level implementation. I just want to know what the consequences the issue has for the Spark user who knows nothing about ExecutorAllocationManager.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If anything, I would think that we should remove this line. If this task fails, then the next attempt would go to the else case of stageIdNumTasks.getOrElse(stageId, -1), which not technically correct. It's safe to remove it because we remove it in stageCompleted anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we delete line 557-585 ? I think stageCompleted also have done this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following... L557 - 585 refer to adding executors. stageCompleted does not even deal with executors.

@XuTingjun
Copy link
Contributor Author

Let me answer the question - “Could you clarify what the resulting behavior of this bug is? ”.
-- An application only has one executor, if the executor is lost and the tasks fails. There will no new executor allocated to the new attempt tasks, so the application will hung.

@andrewor14 I don't agree with your opinion. Let me give an example, a stage only has one task with task index is 1, so one executor is allocate to this task. But when the task fails of executor lost , a new attempt task will start, no executor will allocate, because numTasks =stageIdToTaskIndices.get(stageId), the pending task is 0.

def totalPendingTasks(): Int = {
      stageIdToNumTasks.map { case (stageId, numTasks) =>
        numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
      }.sum
    }

@andrewor14
Copy link
Contributor

Yes, so in that case we should just remove the task from stageIdToTaskIndices if the task did not end successfully right? The new resubmitted task still has the same task index so we can just treat it as a normal task. I believe the fix here can be much simpler.

Do you understand my proposal?

@XuTingjun
Copy link
Contributor Author

Yeah, I got it. I think we can add below code into onTaskEnd method, right?

stageIdToTaskIndices.get(taskEnd.stageId).get.remove(taskIndex)

@SparkQA
Copy link

SparkQA commented Aug 5, 2015

Test build #233 has finished for PR 6817 at commit 4b2dd75.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 5, 2015

Test build #39862 has finished for PR 6817 at commit 4b2dd75.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@XuTingjun XuTingjun changed the title [SPARK-8366] When tasks failed and append new ones, post SparkListenerTaskResubmit event to ExecutorAllocationManager [SPARK-8366] maxNumExecutorsNeeded should properly handle failed tasks Aug 6, 2015
@andrewor14
Copy link
Contributor

retest this please, this is failing a test that was already fixed in upstream

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #40049 has finished for PR 6817 at commit 4b2dd75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2015

Test build #40160 has finished for PR 6817 at commit 25734c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update or remove this comment

@andrewor14
Copy link
Contributor

@XuTingjun The fix here is not correct because after all tasks have been scheduled we keep asking for executors even though we don't need them. I would just add back the entire code block you deleted in L602-610 and call onSchedulerBacklogged on task end if the task is not successful. This ensures

(1) We stop requesting executors once we have started all tasks (not finished)
(2) We request executors again if a task fails.

Does that make sense?

@markhamstra
Copy link
Contributor

@andrewor14 Makes better sense to me. Thanks for the explanation, Andrew.

@XuTingjun
Copy link
Contributor Author

@andrewor14, I understand what you mean.
what I consider is that, if many stages run in parallel, just delete L606 may be not correct.

@XuTingjun
Copy link
Contributor Author

Maybe we can change below code, right?

        val numTasksScheduled = stageIdToTaskIndices(stageId).size
        val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
        if (numTasksScheduled == numTasksTotal) {
          // No more pending tasks for this stage
          stageIdToNumTasks -= stageId
          if (stageIdToNumTasks.isEmpty) {
            allocationManager.onSchedulerQueueEmpty()
          }
        }

to

        if (totalPendingTasks() == 0) {
          allocationManager.onSchedulerQueueEmpty()
        }

@andrewor14
Copy link
Contributor

@XuTingjun yes I think that looks fine. Would you mind testing this change on a real cluster? This scenario is somewhat tricky and it would be good to verify that it works outside of unit tests.

@XuTingjun
Copy link
Contributor Author

@andrewor14, I have tested it in the real cluster, it's ok.

@andrewor14
Copy link
Contributor

retest this please

@andrewor14
Copy link
Contributor

Latest changes LGTM, will merge once tests pass. Thanks for your persistence @XuTingjun

@SparkQA
Copy link

SparkQA commented Aug 11, 2015

Test build #40476 timed out for PR 6817 at commit 9f12fa6 after a configured wait of 175m.

@andrewor14
Copy link
Contributor

retest this please

1 similar comment
@andrewor14
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Aug 11, 2015

Test build #40509 timed out for PR 6817 at commit 9f12fa6 after a configured wait of 175m.

@SparkQA
Copy link

SparkQA commented Aug 12, 2015

Test build #1461 has finished for PR 6817 at commit 9f12fa6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 12, 2015

Test build #40526 has finished for PR 6817 at commit 9f12fa6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

Alright, merging into master 1.5.

asfgit pushed a commit that referenced this pull request Aug 12, 2015
Author: xutingjun <xutingjun@huawei.com>
Author: meiyoula <1039320815@qq.com>

Closes #6817 from XuTingjun/SPARK-8366.

(cherry picked from commit b85f9a2)
Signed-off-by: Andrew Or <andrew@databricks.com>
@asfgit asfgit closed this in b85f9a2 Aug 12, 2015
CodingCat pushed a commit to CodingCat/spark that referenced this pull request Aug 17, 2015
Author: xutingjun <xutingjun@huawei.com>
Author: meiyoula <1039320815@qq.com>

Closes apache#6817 from XuTingjun/SPARK-8366.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants