-
-
Notifications
You must be signed in to change notification settings - Fork 636
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add session end tasks support and use for remote cache writes #16952
Conversation
cc @somdoron |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! It's neat how little code this took
src/rust/engine/src/scheduler.rs
Outdated
@@ -331,6 +336,23 @@ impl Scheduler { | |||
}) | |||
} | |||
|
|||
async fn wait_for_tail_tasks(tasks: Vec<BoxFuture<'static, ()>>) { | |||
if !tasks.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could invert this and early return if is_empty()
. Will make the function easier to understand w/ less nesting
src/rust/engine/src/scheduler.rs
Outdated
let timeout_fut = time::timeout(Duration::from_secs(5), joined_tail_tasks_fut); | ||
match timeout_fut.await { | ||
Ok(_) => { | ||
log::trace!("tail tasks completed successfully"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
src/rust/engine/src/scheduler.rs
Outdated
@@ -331,6 +336,23 @@ impl Scheduler { | |||
}) | |||
} | |||
|
|||
async fn wait_for_tail_tasks(tasks: Vec<BoxFuture<'static, ()>>) { | |||
if !tasks.is_empty() { | |||
log::trace!("waiting for {} tail tasks to complete", tasks.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, definitely.
Maybe even INFO, honestly? Consider only rendering this if there are tasks which haven't already finished, by polling each of them individually to see whether they have already finished: likely using https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html#method.is_finished
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be useful to print out "names" of the tasks that were delayed? (And then I'd add a newtype to carry the task future and its name.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very, yea.
src/rust/engine/src/scheduler.rs
Outdated
log::trace!("tail tasks completed successfully"); | ||
} | ||
Err(_) => { | ||
log::trace!("tail tasks failed to complete within timeout"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe warn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly. I have another comment suggesting adding names for the tasks. A warning here would be better for the user if it identified what Pants was doing that timed out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot!
let mut tail_tasks = context.tail_tasks.lock(); | ||
tail_tasks.push(write_fut.boxed()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit shorter, and a bit less error prone (because it reduces the chances of lines inserted at the end of this function accidentally being under the lock):
let mut tail_tasks = context.tail_tasks.lock(); | |
tail_tasks.push(write_fut.boxed()); | |
context.tail_tasks.lock().push(write_fut.boxed()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new Rust improvement? I thought before you had to create a distinct variable for the Mutex Guard
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly? I'm not sure if this was a NLL improvement, or whether it already worked.
workunit_store: WorkunitStore, | ||
build_id: String, | ||
run_id: RunId, | ||
tail_tasks: Arc<Mutex<Vec<BoxFuture<'static, ()>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably worth a newtype.
src/rust/engine/src/scheduler.rs
Outdated
@@ -331,6 +336,23 @@ impl Scheduler { | |||
}) | |||
} | |||
|
|||
async fn wait_for_tail_tasks(tasks: Vec<BoxFuture<'static, ()>>) { | |||
if !tasks.is_empty() { | |||
log::trace!("waiting for {} tail tasks to complete", tasks.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, definitely.
Maybe even INFO, honestly? Consider only rendering this if there are tasks which haven't already finished, by polling each of them individually to see whether they have already finished: likely using https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html#method.is_finished
src/rust/engine/src/scheduler.rs
Outdated
// Wait for tail tasks to complete. | ||
let tail_tasks = session.tail_tasks().lock().drain(..).collect(); | ||
Self::wait_for_tail_tasks(tail_tasks).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm! So, this scoping works, and is probably the simplest possible implementation.
It might be slightly more efficient to make it an explicit method called in a superset of the locations where Scheduler.shutdown
is called... something like Session.await_tail_tasks
. It would be called at the end of a run, even if the whole Scheduler
was not being torn down.
But given how simple this is, I'm fine either way.
@@ -471,7 +471,7 @@ impl crate::CommandRunner for CommandRunner { | |||
{ | |||
let command_runner = self.clone(); | |||
let result = result.clone(); | |||
let _write_join = self.executor.spawn(in_workunit!( | |||
let write_fut = self.executor.spawn(in_workunit!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that a tail task should likely be added by an explicit method of Executor
(e.g. executor.spawn_tail(&mut tail_tasks, fut)
), for two reasons:
- If the task is not
spawn
d, it would likely be an error, because the work would not even start until shutdown. That would potentially be a neat edgecase to hack, because you could queue up lazy work until shutdown, but IMO you would want it to be explicit instead. - Being able to poll
is_finished
on aJoinHandle
returned directly by theExecutor
, or a wrapper type if need be would be very helpful in terms of the debug output at the end of the run (see other comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the executor associated 1:1 with a session?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it isn't: an executor may be used across multiple sessions. Hence passing the tail_tasks
as an argument in the snippet: executor.spawn_tail(&mut tail_tasks, fut)
.
But if the return type was specific, then you could do it as:
let tail_task = executor.spawn_tail(fut);
// Only compiles because there is a wrapper type which matches.
session.tail_tasks.add(tail_task);
Just discovered |
18d490b
to
a2ea913
Compare
There has been some significant refactoring of this PR. It is worth a re-review. |
The latest commits introduce a |
let mut inner = match self.inner.lock().take() { | ||
Some(inner) => inner, | ||
None => { | ||
log::debug!("Session end tasks awaited multiple times!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently this line is triggered even when running ./pants --version
. Maybe there is a bug in the session shutdown code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @stuhood
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait
is being called inside of Scheduler.execute
, which is used multiple times within a single Session
. For example: if you run multiple goals sequentially, it is executed as a loop over calls to Scheduler.execute
.
If you wanted to only wait
at the "end" of a Session
(rather than once per execute call), you'd need to do something more like #16952 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm, but probably worth Stu's approval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
let mut inner = match self.inner.lock().take() { | ||
Some(inner) => inner, | ||
None => { | ||
log::debug!("Session end tasks awaited multiple times!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait
is being called inside of Scheduler.execute
, which is used multiple times within a single Session
. For example: if you run multiple goals sequentially, it is executed as a loop over calls to Scheduler.execute
.
If you wanted to only wait
at the "end" of a Session
(rather than once per execute call), you'd need to do something more like #16952 (comment)
1f596ce
to
605c3b2
Compare
Upgrade Tokio to v1.21.1 plus upgrade related Tokio ecosystem crates. Besides bug fixes, the main motivation to upgrade is to gain access to `tokio::task::JoinSet` for use in #16952.
605c3b2
to
cc58f7c
Compare
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
[ci skip-build-wheels]
cc58f7c
to
3208118
Compare
[ci skip-build-wheels]
[ci skip-build-wheels]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
@@ -417,6 +417,9 @@ thread_local! { | |||
static THREAD_DESTINATION: RefCell<Arc<Destination>> = RefCell::new(Arc::new(Destination(Mutex::new(InnerDestination::Logging)))) | |||
} | |||
|
|||
// Note: The behavior of this task_local! invocation is affected by the `tokio_no_const_thread_local` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stale.
It appears that log_cache_error is not working with tail tasks. https://github.com/pantsbuild/pants/actions/runs/3144144927/jobs/5110386492#step:13:598 I can add regular logging to show that the tail tasks are completing, but errors from |
Mmm... that would be because the task local / thread local information is not being propagated to the new task. See the use of pants/src/rust/engine/task_executor/src/lib.rs Lines 121 to 137 in cdf21fc
|
That was it. Thanks! It apparently worked in an earlier iteration of the PR only due to the double-spawn, one of which was via |
[ci skip-build-wheels]
Add the concept of "session end tasks" to Pants sessions so that Rust code can schedule async tasks for execution where the tasks should complete before a particular Pants run is considered complete. (It is called "tail tasks" internally.)
First use case: Schedule remote cache writes as session end tasks.