Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33088][CORE] Enhance ExecutorPlugin API to include callbacks on task start and end events #29977

Closed
wants to merge 3 commits into from

Conversation

fsamuel-bs
Copy link
Contributor

What changes were proposed in this pull request?

Proposing a new set of APIs for ExecutorPlugins, to provide callbacks invoked at the start and end of each task of a job. Not very opinionated on the shape of the API, tried to be as minimal as possible for now.

Why are the changes needed?

Changes described in detail on SPARK-33088, but mostly this boils down to:

  1. This feature was considered when the ExecutorPlugin API was initially introduced in [SPARK-24918][Core] Executor Plugin api #21923, but never implemented.
  2. The use-case which requires this feature is to propagate tracing information from the driver to the executor, such that calls from the same job can all be traced.
    a. Tracing frameworks usually are setup in thread locals, therefore it's important for the setup to happen in the same thread which runs the tasks.
    b. Executors can be for multiple jobs, therefore it's not sufficient to set tracing information at executor startup time -- it needs to happen every time a task starts or ends.

Does this PR introduce any user-facing change?

No. This PR introduces new features for future developers to use.

How was this patch tested?

Unit tests on PluginContainerSuite.

@fsamuel-bs
Copy link
Contributor Author

@vanzin, @tgravescs, @LucaCanali tagging you all as you seem to have worked on the latest refactor on this area of the code on #26170.

@tgravescs
Copy link
Contributor

thanks for adding this. I can definitely see use cases for this and have thought it would be nice to have something like this.

I want to look at the api details more. You also have other situations where tasks are killed, which luckily I guess the TaskContext does have a isInterrupted field for that. The executor ends up making some pass/fail decisions that go to the scheduler

@@ -332,7 +332,8 @@ private[spark] class Executor(

class TaskRunner(
execBackend: ExecutorBackend,
private val taskDescription: TaskDescription)
private val taskDescription: TaskDescription,
private val plugins: Option[PluginContainer])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can make the plugins parameter optional or default to some EmptyPluginContainer?:

Suggested change
private val plugins: Option[PluginContainer])
private val plugins: Option[PluginContainer] = None)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for Task#run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rshkv, what is the reason to make this default to None? This is an internal api and only called from here. It's an option already so people can check it easily. In some ways its nice to force it so you make sure all uses of it have been updated.
Are there cases you know this is used outside Spark?

@fsamuel-bs
Copy link
Contributor Author

You also have other situations where tasks are killed, which luckily I guess the TaskContext does have a isInterrupted field for that. The executor ends up making some pass/fail decisions that go to the scheduler

I see. For my use-case, I don't really care about the distinction of success vs fail, only that a method will be called at least once when the task finishes in the same thread the task is executed on (for me to dismantle the tracing stuff I've built up onTaskStart).

Could expose instead an onTaskCompleted method, which does not leak the fact that a task succeeded or failed. Or could move triggering onTaskSucceeded/onTaskFailed from the Executor. Think the latter is better, because it seems useful to have the distinction if a task succeeded or failed, but happy to implement whatever maintainers prefer.

@@ -123,8 +125,12 @@ private[spark] abstract class Task[T](
Option(taskAttemptId),
Option(attemptNumber)).setCurrentContext()

plugins.foreach(_.onTaskStart())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expectation in case onTaskStart fails - do we want to invoke succeeded/failed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I documented on https://github.com/apache/spark/pull/29977/files#diff-6a99ec9983962323b4e0c1899134b5d6R76-R78 -- argument that came to mind is that it's easy for a plugin dev to track some state in a thread-local and clean decide if it wants to perform the succeeded/failed action or not.

Happy to change it if we prefer not to put this burden on the plugin owner though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I'm misunderstanding but the documentation states "Exceptions thrown from this method do not propagate", there is nothing here preventing that. I think perhaps you meant to say the user needs to make sure they don't propagate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We catch Throwable on ExecutorPluginContainer#onTaskStart and siblings (see https://github.com/apache/spark/pull/29977/files#diff-5e4d939e9bb53b4be2c48d4eb53b885c162c729b9adc874f918f7701a352cdbbR157), so that's what I meant by "not propagate". I.e. if a plugin's onTaskStart throws, Spark will log, but won't fail the associated spark task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps reword it to say exceptions are ignored ?

@mridulm
Copy link
Contributor

mridulm commented Oct 8, 2020

The changes mostly look good, thanks for working on it @fsamuel-bs !
Given TaskContext.get() exposes a pretty much everything about the task which the plugin could require, we are in a pretty good shape.
Do we want to include additional metadata to be passed in from driver for a subset of stage/task to the plugin ?
Currently global info can be passed in via init and job properties can be used to pass in metadata; but wondering if we need something more fine grained ... thoughts ?

@mridulm
Copy link
Contributor

mridulm commented Oct 8, 2020

You also have other situations where tasks are killed, which luckily I guess the TaskContext does have a isInterrupted field for that. The executor ends up making some pass/fail decisions that go to the scheduler

Currently we document isInterrupted as a task kill by platform (instead of user code related task failure). Essentially we have:

a) Succeeded.
b) Failed.
b.1) Task killed by platform
b.2) Task failed due to user code.

Do we want to distinguish b.1 from b.2 explicitly via api ? (isInterrupted currently does that - imo that should suffice).
Is there any other failure flows ? (ignoring executor crash here).

@tgravescs
Copy link
Contributor

That is exactly what I wanted to look at some more.
There are a few corner cases Executor handles like failing on memory leak. The other thing is that the fetchFailed error is private[spark] in the TaskContext. If something happens such that user/Spark wraps that exception you might not be able to tell that. There is also the task commit denied exception and then failures due to just stopping early.
Most of these are corner cases but really it comes down to do we care this if users using this api infer what the real Spark status was in these few cases. they just mostly just have to reproduce some of the logic that Executor does. like check for commit denied exception for instance. This is a developer api and think it would be ok as long as we document appropriately.

I think we should definitely keep something that allows user to tell if task passed or failed.

@mridulm thoughts on leaving here vs we could just move the task end notification about into the Executor which would be in the same thread still. That feels like it would be more reliable with the plugin knowing same status of task that the scheduler will see.

@@ -123,8 +125,12 @@ private[spark] abstract class Task[T](
Option(taskAttemptId),
Option(attemptNumber)).setCurrentContext()

plugins.foreach(_.onTaskStart())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I'm misunderstanding but the documentation states "Exceptions thrown from this method do not propagate", there is nothing here preventing that. I think perhaps you meant to say the user needs to make sure they don't propagate?

@@ -332,7 +332,8 @@ private[spark] class Executor(

class TaskRunner(
execBackend: ExecutorBackend,
private val taskDescription: TaskDescription)
private val taskDescription: TaskDescription,
private val plugins: Option[PluginContainer])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rshkv, what is the reason to make this default to None? This is an internal api and only called from here. It's an option already so people can check it easily. In some ways its nice to force it so you make sure all uses of it have been updated.
Are there cases you know this is used outside Spark?

Copy link
Contributor Author

@fsamuel-bs fsamuel-bs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to move onTaskSucceeded/onTaskFailed to Executor, shouldn't be that much work, just pending agreement from @mridulm.

@@ -123,8 +125,12 @@ private[spark] abstract class Task[T](
Option(taskAttemptId),
Option(attemptNumber)).setCurrentContext()

plugins.foreach(_.onTaskStart())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We catch Throwable on ExecutorPluginContainer#onTaskStart and siblings (see https://github.com/apache/spark/pull/29977/files#diff-5e4d939e9bb53b4be2c48d4eb53b885c162c729b9adc874f918f7701a352cdbbR157), so that's what I meant by "not propagate". I.e. if a plugin's onTaskStart throws, Spark will log, but won't fail the associated spark task.

@mridulm
Copy link
Contributor

mridulm commented Oct 15, 2020

Thanks @tgravescs, I did miss out on other cases where task can fail due to spark infra causing the task to fail (commit denied is a very good example).

If it helps with catching more corner cases, without spi impl's having to duplicate what spark does - that is a great direction to take.

@fsamuel-bs
Copy link
Contributor Author

@mridulm @tgravescs: I've moved triggering the methods from Task to Executor. Also modified the signature of onTaskFailed to receive a TaskFailedReason instead of throwable to better match what it's sent to the scheduler. Let me know your thoughts.

@tgravescs
Copy link
Contributor

the test that failed was barrier task context, I rekicked the tests, if it fails again we should look at to ensure something here didn't break it.

@tgravescs
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34446/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34446/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Test build #129841 has finished for PR 29977 at commit 8a5e436.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 8f4fc22 Oct 16, 2020
@mridulm
Copy link
Contributor

mridulm commented Oct 16, 2020

Thanks for the reviews @rshkv and @tgravescs !
Merging to master.

Thanks for the contribution @fsamuel-bs !
I was not able to assign the jira to you @fsamuel-bs, your id was not showing up - can you reassign it to yourself pls ? Thx

@tgravescs
Copy link
Contributor

tgravescs commented Oct 16, 2020

I added @fsamuel-bs as a contributor in jira and assigned it to him. thanks..


assert(TestSparkPlugin.executorPlugin.numOnTaskStart == 2)
assert(TestSparkPlugin.executorPlugin.numOnTaskSucceeded == 0)
assert(TestSparkPlugin.executorPlugin.numOnTaskFailed == 2)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, folks.
It turns out that this is a flaky test. I filed a JIRA issue and made PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rshkv pushed a commit to palantir/spark that referenced this pull request Nov 26, 2020
…n task start and end events

Proposing a new set of APIs for ExecutorPlugins, to provide callbacks invoked at the start and end of each task of a job. Not very opinionated on the shape of the API, tried to be as minimal as possible for now.

Changes described in detail on [SPARK-33088](https://issues.apache.org/jira/browse/SPARK-33088), but mostly this boils down to:

1. This feature was considered when the ExecutorPlugin API was initially introduced in apache#21923, but never implemented.
2. The use-case which **requires** this feature is to propagate tracing information from the driver to the executor, such that calls from the same job can all be traced.
  a. Tracing frameworks usually are setup in thread locals, therefore it's important for the setup to happen in the same thread which runs the tasks.
  b. Executors can be for multiple jobs, therefore it's not sufficient to set tracing information at executor startup time -- it needs to happen every time a task starts or ends.

No. This PR introduces new features for future developers to use.

Unit tests on `PluginContainerSuite`.

Closes apache#29977 from fsamuel-bs/SPARK-33088.

Authored-by: Samuel Souza <ssouza@palantir.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
jdcasale pushed a commit to palantir/spark that referenced this pull request Jan 15, 2021
…n task start and end events

Proposing a new set of APIs for ExecutorPlugins, to provide callbacks invoked at the start and end of each task of a job. Not very opinionated on the shape of the API, tried to be as minimal as possible for now.

Changes described in detail on [SPARK-33088](https://issues.apache.org/jira/browse/SPARK-33088), but mostly this boils down to:

1. This feature was considered when the ExecutorPlugin API was initially introduced in apache#21923, but never implemented.
2. The use-case which **requires** this feature is to propagate tracing information from the driver to the executor, such that calls from the same job can all be traced.
  a. Tracing frameworks usually are setup in thread locals, therefore it's important for the setup to happen in the same thread which runs the tasks.
  b. Executors can be for multiple jobs, therefore it's not sufficient to set tracing information at executor startup time -- it needs to happen every time a task starts or ends.

No. This PR introduces new features for future developers to use.

Unit tests on `PluginContainerSuite`.

Closes apache#29977 from fsamuel-bs/SPARK-33088.

Authored-by: Samuel Souza <ssouza@palantir.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Feb 25, 2021
…n task start and end events

Proposing a new set of APIs for ExecutorPlugins, to provide callbacks invoked at the start and end of each task of a job. Not very opinionated on the shape of the API, tried to be as minimal as possible for now.

Changes described in detail on [SPARK-33088](https://issues.apache.org/jira/browse/SPARK-33088), but mostly this boils down to:

1. This feature was considered when the ExecutorPlugin API was initially introduced in apache#21923, but never implemented.
2. The use-case which **requires** this feature is to propagate tracing information from the driver to the executor, such that calls from the same job can all be traced.
  a. Tracing frameworks usually are setup in thread locals, therefore it's important for the setup to happen in the same thread which runs the tasks.
  b. Executors can be for multiple jobs, therefore it's not sufficient to set tracing information at executor startup time -- it needs to happen every time a task starts or ends.

No. This PR introduces new features for future developers to use.

Unit tests on `PluginContainerSuite`.

Closes apache#29977 from fsamuel-bs/SPARK-33088.

Authored-by: Samuel Souza <ssouza@palantir.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Feb 26, 2021
…n task start and end events

Proposing a new set of APIs for ExecutorPlugins, to provide callbacks invoked at the start and end of each task of a job. Not very opinionated on the shape of the API, tried to be as minimal as possible for now.

Changes described in detail on [SPARK-33088](https://issues.apache.org/jira/browse/SPARK-33088), but mostly this boils down to:

1. This feature was considered when the ExecutorPlugin API was initially introduced in apache#21923, but never implemented.
2. The use-case which **requires** this feature is to propagate tracing information from the driver to the executor, such that calls from the same job can all be traced.
  a. Tracing frameworks usually are setup in thread locals, therefore it's important for the setup to happen in the same thread which runs the tasks.
  b. Executors can be for multiple jobs, therefore it's not sufficient to set tracing information at executor startup time -- it needs to happen every time a task starts or ends.

No. This PR introduces new features for future developers to use.

Unit tests on `PluginContainerSuite`.

Closes apache#29977 from fsamuel-bs/SPARK-33088.

Authored-by: Samuel Souza <ssouza@palantir.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Feb 26, 2021
…n task start and end events

Proposing a new set of APIs for ExecutorPlugins, to provide callbacks invoked at the start and end of each task of a job. Not very opinionated on the shape of the API, tried to be as minimal as possible for now.

Changes described in detail on [SPARK-33088](https://issues.apache.org/jira/browse/SPARK-33088), but mostly this boils down to:

1. This feature was considered when the ExecutorPlugin API was initially introduced in apache#21923, but never implemented.
2. The use-case which **requires** this feature is to propagate tracing information from the driver to the executor, such that calls from the same job can all be traced.
  a. Tracing frameworks usually are setup in thread locals, therefore it's important for the setup to happen in the same thread which runs the tasks.
  b. Executors can be for multiple jobs, therefore it's not sufficient to set tracing information at executor startup time -- it needs to happen every time a task starts or ends.

No. This PR introduces new features for future developers to use.

Unit tests on `PluginContainerSuite`.

Closes apache#29977 from fsamuel-bs/SPARK-33088.

Authored-by: Samuel Souza <ssouza@palantir.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
jdcasale pushed a commit to palantir/spark that referenced this pull request Mar 3, 2021
…n task start and end events

Proposing a new set of APIs for ExecutorPlugins, to provide callbacks invoked at the start and end of each task of a job. Not very opinionated on the shape of the API, tried to be as minimal as possible for now.

Changes described in detail on [SPARK-33088](https://issues.apache.org/jira/browse/SPARK-33088), but mostly this boils down to:

1. This feature was considered when the ExecutorPlugin API was initially introduced in apache#21923, but never implemented.
2. The use-case which **requires** this feature is to propagate tracing information from the driver to the executor, such that calls from the same job can all be traced.
  a. Tracing frameworks usually are setup in thread locals, therefore it's important for the setup to happen in the same thread which runs the tasks.
  b. Executors can be for multiple jobs, therefore it's not sufficient to set tracing information at executor startup time -- it needs to happen every time a task starts or ends.

No. This PR introduces new features for future developers to use.

Unit tests on `PluginContainerSuite`.

Closes apache#29977 from fsamuel-bs/SPARK-33088.

Authored-by: Samuel Souza <ssouza@palantir.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Mar 4, 2021
…n task start and end events

Proposing a new set of APIs for ExecutorPlugins, to provide callbacks invoked at the start and end of each task of a job. Not very opinionated on the shape of the API, tried to be as minimal as possible for now.

Changes described in detail on [SPARK-33088](https://issues.apache.org/jira/browse/SPARK-33088), but mostly this boils down to:

1. This feature was considered when the ExecutorPlugin API was initially introduced in apache#21923, but never implemented.
2. The use-case which **requires** this feature is to propagate tracing information from the driver to the executor, such that calls from the same job can all be traced.
  a. Tracing frameworks usually are setup in thread locals, therefore it's important for the setup to happen in the same thread which runs the tasks.
  b. Executors can be for multiple jobs, therefore it's not sufficient to set tracing information at executor startup time -- it needs to happen every time a task starts or ends.

No. This PR introduces new features for future developers to use.

Unit tests on `PluginContainerSuite`.

Closes apache#29977 from fsamuel-bs/SPARK-33088.

Authored-by: Samuel Souza <ssouza@palantir.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants