-
Notifications
You must be signed in to change notification settings - Fork 323
/
task.rs
199 lines (169 loc) · 5.99 KB
/
task.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
use core::fmt::Display;
use core::mem;
use core::time::Duration;
use crossbeam_channel::{bounded, Sender};
use std::sync::{Arc, RwLock};
use std::thread;
use tracing::{error, info, span, warn};
use crate::util::lock::LockExt;
/**
A task handle holds the endpoints for stopping or waiting for a
background task to terminate.
A holder of `TaskHandle` can explicitly stop the background task by
calling [`shutdown`](TaskHandle::shutdown) or
[`shutdown_and_wait`](TaskHandle::shutdown_and_wait).
Otherwise, when the `TaskHandle` is dropped, it will stop the background
task and wait for the background task to terminate before returning.
*/
pub struct TaskHandle {
shutdown_sender: Sender<()>,
stopped: Arc<RwLock<bool>>,
join_handle: DropJoinHandle,
}
/**
A wrapper to [`std::thread::JoinHandle`] so that the handle is joined
when it is dropped.
*/
struct DropJoinHandle(Option<thread::JoinHandle<()>>);
/**
A wrapper around the error type returned by a background task step
function to indicate whether the background task should be terminated
because of the error.
*/
pub enum TaskError<E> {
/**
Inform the background task runner that an ignorable error has occured,
and the background task runner should log the error and then continue
execution.
*/
Ignore(E),
/**
Inform the background task runner that a fatal error has occured,
and the background task runner should log the error and then abort
execution.
*/
Fatal(E),
}
pub enum Next {
Continue,
Abort,
}
/**
Spawn a long-running background task with the given step runner.
The step runner is a `FnMut` closure that is called repeatedly and
returns a `Result<(), TaskError<E>>`. If the step is executed successfuly,
the step runner should return `Ok(())` so that it will be called again.
Otherwise if errors occurred or of the task needs to be aborted,
the step runner should return a [`TaskError<E>`] that instructs the
task runner of whether the background task should be aborted.
The function is also given a task name string, which is used for logging
information about the execution of the task. An optional [`Duration`]
argument is also given for the task runner to sleep for the given
duration before calling the step runner again.
The function returns a [`TaskHandle`] that can be used to shutdown the
background task. If the [`TaskHandle`] is dropped or if explicit shutdown
instruction is sent, the task runner will stop calling the step runner
and abort the background task.
If the step runner is receiving commands from other
[channels](crossbeam_channel::Receiver), it should use the
[`try_recv`](crossbeam_channel::Receiver::try_recv) function
so that the step runner do not get stuck indefinitely even
when shutdown instruction has been sent through the
[`TaskHandle`].
*/
pub fn spawn_background_task<E: Display>(
task_name: String,
interval_pause: Option<Duration>,
mut step_runner: impl FnMut() -> Result<Next, TaskError<E>> + Send + Sync + 'static,
) -> TaskHandle {
let span = span!(tracing::Level::ERROR, "task", name = %task_name);
let _entered = span.enter();
info!("spawning");
let stopped = Arc::new(RwLock::new(false));
let write_stopped = stopped.clone();
let (shutdown_sender, receiver) = bounded(1);
let thread_span = span.clone();
let join_handle = thread::spawn(move || {
let _entered = thread_span.enter();
loop {
match receiver.try_recv() {
Ok(()) => {
break;
}
_ => match step_runner() {
Ok(Next::Continue) => {}
Ok(Next::Abort) => {
info!("aborting");
break;
}
Err(TaskError::Ignore(e)) => {
warn!("encountered ignorable error: {}", e);
}
Err(TaskError::Fatal(e)) => {
error!("aborting after encountering fatal error: {}", e);
break;
}
},
}
if let Some(interval) = interval_pause {
thread::sleep(interval);
}
}
*write_stopped.acquire_write() = true;
info!("terminated");
});
TaskHandle {
shutdown_sender,
stopped,
join_handle: DropJoinHandle(Some(join_handle)),
}
}
impl TaskHandle {
/**
Wait for the background task to terminate.
Note that because the background tasks are meant to run forever,
this would likely never return unless errors occurred or if
the step runner returns [`TaskError::Abort`] to abort prematurely.
*/
pub fn join(mut self) {
if let Some(handle) = mem::take(&mut self.join_handle.0) {
let _ = handle.join();
}
}
/**
Send the shutdown signal to the background task without waiting
for it to terminate.
Note that the waiting will still happen when the [`TaskHandle`] is
dropped.
This can be used to shutdown multiple tasks in parallel, and then
wait for them to all terminate concurrently.
*/
pub fn shutdown(&self) {
let _ = self.shutdown_sender.send(());
}
/**
Send the shutdown signal and wait for the task to terminate.
This is done implicitly by the [`TaskHandle`] when it is dropped.
*/
pub fn shutdown_and_wait(self) {
let _ = self.shutdown_sender.send(());
}
/**
Check whether a background task has been stopped prematurely.
*/
pub fn is_stopped(&self) -> bool {
*self.stopped.acquire_read()
}
}
impl Drop for DropJoinHandle {
fn drop(&mut self) {
if let Some(handle) = mem::take(&mut self.0) {
let _ = handle.join();
}
}
}
impl Drop for TaskHandle {
fn drop(&mut self) {
let _ = self.shutdown_sender.send(());
}
}