Skip to content

Commit 71370ac

Browse files
committed
proxy: improve graceful shutdown process
- The listener is immediately closed on receipt of a shutdown signal. - All in-progress server connections are now counted, and the process will not shutdown until the connection count has dropped to zero. - In the case of HTTP1, idle connections are closed. In the case of HTTP2, the HTTP2 graceful shutdown steps are followed of sending various GOAWAYs. Signed-off-by: Sean McArthur <sean@seanmonstar.com>
1 parent 91c359e commit 71370ac

14 files changed

+704
-67
lines changed

Cargo.lock

+7-7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ members = [
99

1010
[patch.crates-io]
1111
prost-derive = { git = "https://github.com/danburkert/prost" }
12+

proxy/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ bytes = "0.4"
1919
domain = "0.2.3"
2020
env_logger = { version = "0.5", default-features = false }
2121
futures = "0.1"
22-
h2 = "0.1"
22+
h2 = "0.1.5"
2323
http = "0.1"
2424
httparse = "1.2"
2525
hyper = { version = "0.11.22", default-features = false, features = ["compat"] }

proxy/src/drain.rs

+267
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
use std::mem;
2+
3+
use futures::{Async, Future, Poll, Stream};
4+
use futures::future::Shared;
5+
use futures::sync::{mpsc, oneshot};
6+
7+
/// Creates a drain channel.
8+
///
9+
/// The `Signal` is used to start a drain, and the `Watch` will be notified
10+
/// when a drain is signaled.
11+
pub fn channel() -> (Signal, Watch) {
12+
let (tx, rx) = oneshot::channel();
13+
let (drained_tx, drained_rx) = mpsc::channel(0);
14+
(
15+
Signal {
16+
drained_rx,
17+
tx,
18+
},
19+
Watch {
20+
drained_tx,
21+
rx: rx.shared(),
22+
},
23+
)
24+
}
25+
26+
/// Send a drain command to all watchers.
27+
///
28+
/// When a drain is started, this returns a `Drained` future which resolves
29+
/// when all `Watch`ers have been dropped.
30+
#[derive(Debug)]
31+
pub struct Signal {
32+
drained_rx: mpsc::Receiver<Never>,
33+
tx: oneshot::Sender<()>,
34+
}
35+
36+
/// Watch for a drain command.
37+
///
38+
/// This wraps another future and callback to be called when drain is triggered.
39+
#[derive(Clone, Debug)]
40+
pub struct Watch {
41+
drained_tx: mpsc::Sender<Never>,
42+
rx: Shared<oneshot::Receiver<()>>,
43+
}
44+
45+
/// The wrapped watching `Future`.
46+
#[derive(Debug)]
47+
pub struct Watching<A, F> {
48+
future: A,
49+
state: State<F>,
50+
watch: Watch,
51+
}
52+
53+
#[derive(Debug)]
54+
enum State<F> {
55+
Watch(F),
56+
Draining,
57+
}
58+
59+
//TODO: in Rust 1.26, replace this with `!`.
60+
#[derive(Debug)]
61+
enum Never {}
62+
63+
/// A future that resolves when all `Watch`ers have been dropped (drained).
64+
pub struct Drained {
65+
drained_rx: mpsc::Receiver<Never>,
66+
}
67+
68+
// ===== impl Signal =====
69+
70+
impl Signal {
71+
/// Start the draining process.
72+
///
73+
/// A signal is sent to all futures watching for the signal. A new future
74+
/// is returned from this method that resolves when all watchers have
75+
/// completed.
76+
pub fn drain(self) -> Drained {
77+
let _ = self.tx.send(());
78+
Drained {
79+
drained_rx: self.drained_rx,
80+
}
81+
}
82+
}
83+
84+
// ===== impl Watch =====
85+
86+
impl Watch {
87+
/// Wrap a future and a callback that is triggered when drain is received.
88+
///
89+
/// The callback receives a mutable reference to the original future, and
90+
/// should be used to trigger any shutdown process for it.
91+
pub fn watch<A, F>(self, future: A, on_drain: F) -> Watching<A, F>
92+
where
93+
A: Future,
94+
F: FnOnce(&mut A),
95+
{
96+
Watching {
97+
future,
98+
state: State::Watch(on_drain),
99+
watch: self,
100+
}
101+
}
102+
}
103+
104+
// ===== impl Watching =====
105+
106+
impl<A, F> Future for Watching<A, F>
107+
where
108+
A: Future,
109+
F: FnOnce(&mut A),
110+
{
111+
type Item = A::Item;
112+
type Error = A::Error;
113+
114+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
115+
loop {
116+
match mem::replace(&mut self.state, State::Draining) {
117+
State::Watch(on_drain) => {
118+
match self.watch.rx.poll() {
119+
Ok(Async::Ready(_)) | Err(_) => {
120+
// Drain has been triggered!
121+
on_drain(&mut self.future);
122+
},
123+
Ok(Async::NotReady) => {
124+
self.state = State::Watch(on_drain);
125+
return self.future.poll();
126+
}
127+
}
128+
},
129+
State::Draining => {
130+
return self.future.poll();
131+
},
132+
}
133+
}
134+
}
135+
}
136+
137+
// ===== impl Drained =====
138+
139+
impl Future for Drained {
140+
type Item = ();
141+
type Error = ();
142+
143+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
144+
match try_ready!(self.drained_rx.poll()) {
145+
Some(never) => match never {},
146+
None => Ok(Async::Ready(())),
147+
}
148+
}
149+
}
150+
151+
#[cfg(test)]
152+
mod tests {
153+
use futures::{future, Async, Future, Poll};
154+
use super::*;
155+
156+
struct TestMe {
157+
draining: bool,
158+
finished: bool,
159+
poll_cnt: usize,
160+
}
161+
162+
impl Future for TestMe {
163+
type Item = ();
164+
type Error = ();
165+
166+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
167+
self.poll_cnt += 1;
168+
if self.finished {
169+
Ok(Async::Ready(()))
170+
} else {
171+
Ok(Async::NotReady)
172+
}
173+
}
174+
}
175+
176+
#[test]
177+
fn watch() {
178+
future::lazy(|| {
179+
let (tx, rx) = channel();
180+
let fut = TestMe {
181+
draining: false,
182+
finished: false,
183+
poll_cnt: 0,
184+
};
185+
186+
let mut watch = rx.watch(fut, |fut| {
187+
fut.draining = true;
188+
});
189+
190+
assert_eq!(watch.future.poll_cnt, 0);
191+
192+
// First poll should poll the inner future
193+
assert!(watch.poll().unwrap().is_not_ready());
194+
assert_eq!(watch.future.poll_cnt, 1);
195+
196+
// Second poll should poll the inner future again
197+
assert!(watch.poll().unwrap().is_not_ready());
198+
assert_eq!(watch.future.poll_cnt, 2);
199+
200+
let mut draining = tx.drain();
201+
// Drain signaled, but needs another poll to be noticed.
202+
assert!(!watch.future.draining);
203+
assert_eq!(watch.future.poll_cnt, 2);
204+
205+
// Now, poll after drain has been signaled.
206+
assert!(watch.poll().unwrap().is_not_ready());
207+
assert_eq!(watch.future.poll_cnt, 3);
208+
assert!(watch.future.draining);
209+
210+
// Draining is not ready until watcher completes
211+
assert!(draining.poll().unwrap().is_not_ready());
212+
213+
// Finishing up the watch future
214+
watch.future.finished = true;
215+
assert!(watch.poll().unwrap().is_ready());
216+
assert_eq!(watch.future.poll_cnt, 4);
217+
drop(watch);
218+
219+
assert!(draining.poll().unwrap().is_ready());
220+
221+
Ok::<_, ()>(())
222+
}).wait().unwrap();
223+
}
224+
225+
#[test]
226+
fn watch_clones() {
227+
future::lazy(|| {
228+
let (tx, rx) = channel();
229+
230+
let fut1 = TestMe {
231+
draining: false,
232+
finished: false,
233+
poll_cnt: 0,
234+
};
235+
let fut2 = TestMe {
236+
draining: false,
237+
finished: false,
238+
poll_cnt: 0,
239+
};
240+
241+
let watch1 = rx.clone().watch(fut1, |fut| {
242+
fut.draining = true;
243+
});
244+
let watch2 = rx.watch(fut2, |fut| {
245+
fut.draining = true;
246+
});
247+
248+
let mut draining = tx.drain();
249+
250+
// Still 2 outstanding watchers
251+
assert!(draining.poll().unwrap().is_not_ready());
252+
253+
// drop 1 for whatever reason
254+
drop(watch1);
255+
256+
// Still not ready, 1 other watcher still pending
257+
assert!(draining.poll().unwrap().is_not_ready());
258+
259+
drop(watch2);
260+
261+
// Now all watchers are gone, draining is complete
262+
assert!(draining.poll().unwrap().is_ready());
263+
264+
Ok::<_, ()>(())
265+
}).wait().unwrap();
266+
}
267+
}

0 commit comments

Comments
 (0)