Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill all pending accepts when TCP listener is closed #1517

Merged
merged 3 commits into from
Jan 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions js/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,42 @@ testPerm({ net: true }, function netListenClose() {
listener.close();
});

testPerm({ net: true }, async function netCloseWhileAccept() {
const listener = deno.listen("tcp", ":4501");
const p = listener.accept();
listener.close();
let err;
try {
await p;
} catch (e) {
err = e;
}
assert(!!err);
assertEqual(err.kind, deno.ErrorKind.Other);
assertEqual(err.message, "Listener has been closed");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool - good test case

});

testPerm({ net: true }, async function netConcurrentAccept() {
const listener = deno.listen("tcp", ":4502");
let err;
// Consume this accept error
// (since it would still be waiting when listener.close is called)
listener.accept().catch(e => {
assertEqual(e.kind, deno.ErrorKind.Other);
assertEqual(e.message, "Listener has been closed");
});
const p1 = listener.accept();
try {
await p1;
} catch (e) {
err = e;
}
assert(!!err);
assertEqual(err.kind, deno.ErrorKind.Other);
assertEqual(err.message, "Another accept task is ongoing");
kevinkassimo marked this conversation as resolved.
Show resolved Hide resolved
listener.close();
});

testPerm({ net: true }, async function netDialListen() {
const listener = deno.listen("tcp", ":4500");
listener.accept().then(async conn => {
Expand Down
57 changes: 51 additions & 6 deletions src/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ enum Repr {
Stdout(tokio::fs::File),
Stderr(tokio::io::Stderr),
FsFile(tokio::fs::File),
TcpListener(tokio::net::TcpListener),
// Since TcpListener might be closed while there is a pending accept task,
// we need to track the task so that when the listener is closed,
// this pending task could be notified and die.
// Currently TcpListener itself does not take care of this issue.
// See: https://github.com/tokio-rs/tokio/issues/846
TcpListener(tokio::net::TcpListener, Option<futures::task::Task>),
TcpStream(tokio::net::TcpStream),
HttpBody(HttpBody),
Repl(Repl),
Expand Down Expand Up @@ -132,7 +137,7 @@ fn inspect_repr(repr: &Repr) -> String {
Repr::Stdout(_) => "stdout",
Repr::Stderr(_) => "stderr",
Repr::FsFile(_) => "fsFile",
Repr::TcpListener(_) => "tcpListener",
Repr::TcpListener(_, _) => "tcpListener",
Repr::TcpStream(_) => "tcpStream",
Repr::HttpBody(_) => "httpBody",
Repr::Repl(_) => "repl",
Expand All @@ -159,20 +164,60 @@ impl Resource {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&self.rid);
match maybe_repr {
None => panic!("bad rid"),
None => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Listener has been closed",
)),
Some(repr) => match repr {
Repr::TcpListener(ref mut s) => s.poll_accept(),
Repr::TcpListener(ref mut s, _) => s.poll_accept(),
_ => panic!("Cannot accept"),
},
}
}

/// Track the current task (for TcpListener resource).
/// Throws an error if another task is already tracked.
pub fn track_task(&mut self) -> Result<(), std::io::Error> {
let mut table = RESOURCE_TABLE.lock().unwrap();
// Only track if is TcpListener.
if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
// Currently, we only allow tracking a single accept task for a listener.
// This might be changed in the future with multiple workers.
// Caveat: TcpListener by itself also only tracks an accept task at a time.
// See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
if t.is_some() {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Another accept task is ongoing",
));
}
t.replace(futures::task::current());
}
Ok(())
}

/// Stop tracking a task (for TcpListener resource).
/// Happens when the task is done and thus no further tracking is needed.
pub fn untrack_task(&mut self) {
let mut table = RESOURCE_TABLE.lock().unwrap();
// Only untrack if is TcpListener.
if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
assert!(t.is_some());
t.take();
kevinkassimo marked this conversation as resolved.
Show resolved Hide resolved
}
}

// close(2) is done by dropping the value. Therefore we just need to remove
// the resource from the RESOURCE_TABLE.
pub fn close(&self) {
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&self.rid);
assert!(r.is_some());
// If TcpListener, we must kill all pending accepts!
if let Repr::TcpListener(_, Some(t)) = r.unwrap() {
// Call notify on the tracked task, so that they would error out.
t.notify();
}
}

pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {
Expand Down Expand Up @@ -264,7 +309,7 @@ pub fn add_fs_file(fs_file: tokio::fs::File) -> Resource {
pub fn add_tcp_listener(listener: tokio::net::TcpListener) -> Resource {
let rid = new_rid();
let mut tg = RESOURCE_TABLE.lock().unwrap();
let r = tg.insert(rid, Repr::TcpListener(listener));
let r = tg.insert(rid, Repr::TcpListener(listener, None));
assert!(r.is_none());
Resource { rid }
}
Expand Down Expand Up @@ -515,7 +560,7 @@ pub fn eager_accept(resource: Resource) -> EagerAccept {
match maybe_repr {
None => panic!("bad rid"),
Some(repr) => match repr {
Repr::TcpListener(ref mut tcp_listener) => {
Repr::TcpListener(ref mut tcp_listener, _) => {
eager::tcp_accept(tcp_listener, resource)
}
_ => Either::A(tokio_util::accept(resource)),
Expand Down
20 changes: 19 additions & 1 deletion src/tokio_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,25 @@ impl Future for Accept {

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (stream, addr) = match self.state {
AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()),
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
AcceptState::Pending(ref mut r) => match r.poll_accept() {
Ok(futures::prelude::Async::Ready(t)) => {
r.untrack_task();
t
}
Ok(futures::prelude::Async::NotReady) => {
// Would error out if another accept task is being tracked.
r.track_task()?;
return Ok(futures::prelude::Async::NotReady);
}
Err(e) => {
r.untrack_task();
return Err(From::from(e));
}
},
AcceptState::Empty => panic!("poll Accept after it's done"),
};

Expand Down