From 5d55995cc5603631bfb03c216ad3a689f3cc582a Mon Sep 17 00:00:00 2001 From: "Kevin (Kun) \"Kassimo\" Qian" Date: Sun, 13 Jan 2019 19:20:50 -0800 Subject: [PATCH 1/3] Kill all pending accepts when TCP listener is closed --- js/net_test.ts | 16 +++++++++++++++- src/resources.rs | 49 ++++++++++++++++++++++++++++++++++++++++------- src/tokio_util.rs | 25 +++++++++++++++++++++++- 3 files changed, 81 insertions(+), 9 deletions(-) diff --git a/js/net_test.ts b/js/net_test.ts index 6ba36547f2fb76..3e6d2be5f9e3b4 100644 --- a/js/net_test.ts +++ b/js/net_test.ts @@ -4,10 +4,24 @@ import { testPerm, assert, assertEqual } from "./test_util.ts"; import { deferred } from "deno"; testPerm({ net: true }, function netListenClose() { - const listener = deno.listen("tcp", "127.0.0.1:4500"); + const listener = deno.listen("tcp", "127.0.0.1:4502"); listener.close(); }); +testPerm({ net: true }, async function netCloseWhileAccept() { + const listener = deno.listen("tcp", "127.0.0.1:4501"); + const p = listener.accept(); + listener.close(); + let err: Error; + try { + await p; + } catch (e) { + err = e; + } + assert(!!err); + assertEqual(err.message, "Listener has been closed"); +}); + testPerm({ net: true }, async function netDialListen() { const listener = deno.listen("tcp", ":4500"); listener.accept().then(async conn => { diff --git a/src/resources.rs b/src/resources.rs index 55e1a9f641bb13..5fe48ad1dbeee6 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -87,7 +87,11 @@ enum Repr { Stdout(tokio::fs::File), Stderr(tokio::io::Stderr), FsFile(tokio::fs::File), - TcpListener(tokio::net::TcpListener), + // Since TcpListener might be closed while there are pending accept tasks, + // we need to track these task so that when the listener is closed, + // pending tasks could be notified and die. + // The tasks are indexed, so they could be removed when accept completed. + TcpListener(tokio::net::TcpListener, HashMap), TcpStream(tokio::net::TcpStream), HttpBody(HttpBody), Repl(Repl), @@ -132,7 +136,7 @@ fn inspect_repr(repr: &Repr) -> String { Repr::Stdout(_) => "stdout", Repr::Stderr(_) => "stderr", Repr::FsFile(_) => "fsFile", - Repr::TcpListener(_) => "tcpListener", + Repr::TcpListener(_, _) => "tcpListener", Repr::TcpStream(_) => "tcpStream", Repr::HttpBody(_) => "httpBody", Repr::Repl(_) => "repl", @@ -148,7 +152,7 @@ fn inspect_repr(repr: &Repr) -> String { // Abstract async file interface. // Ideally in unix, if Resource represents an OS rid, it will be the same. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct Resource { pub rid: ResourceId, } @@ -159,20 +163,51 @@ impl Resource { let mut table = RESOURCE_TABLE.lock().unwrap(); let maybe_repr = table.get_mut(&self.rid); match maybe_repr { - None => panic!("bad rid"), + None => Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Listener has been closed", + )), Some(repr) => match repr { - Repr::TcpListener(ref mut s) => s.poll_accept(), + Repr::TcpListener(ref mut s, _) => s.poll_accept(), _ => panic!("Cannot accept"), }, } } + /// Track pending tasks with task_id as identifier (for TcpListener resource) + /// Insert the task into map for tracking + pub fn track_task(&mut self, task_id: usize) { + let mut table = RESOURCE_TABLE.lock().unwrap(); + // Only register if is TcpListener + if let Some(Repr::TcpListener(_, m)) = table.get_mut(&self.rid) { + m.insert(task_id, futures::task::current()); + } + } + + /// Remove a task from tracking (for TcpListener resource) + /// Happens when the task is done and thus no further tracking is needed + pub fn untrack_task(&mut self, task_id: usize) { + let mut table = RESOURCE_TABLE.lock().unwrap(); + if let Some(Repr::TcpListener(_, m)) = table.get_mut(&self.rid) { + m.remove(&task_id); + } + } + // close(2) is done by dropping the value. Therefore we just need to remove // the resource from the RESOURCE_TABLE. pub fn close(&self) { let mut table = RESOURCE_TABLE.lock().unwrap(); let r = table.remove(&self.rid); assert!(r.is_some()); + // If TcpListener, we must kill all pending accepts! + if let Repr::TcpListener(l, m) = r.unwrap() { + // Drop first + std::mem::drop(l); + // Call notify on each task, so that they would error out + for (_, t) in m { + t.notify(); + } + } } pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> { @@ -264,7 +299,7 @@ pub fn add_fs_file(fs_file: tokio::fs::File) -> Resource { pub fn add_tcp_listener(listener: tokio::net::TcpListener) -> Resource { let rid = new_rid(); let mut tg = RESOURCE_TABLE.lock().unwrap(); - let r = tg.insert(rid, Repr::TcpListener(listener)); + let r = tg.insert(rid, Repr::TcpListener(listener, HashMap::new())); assert!(r.is_none()); Resource { rid } } @@ -515,7 +550,7 @@ pub fn eager_accept(resource: Resource) -> EagerAccept { match maybe_repr { None => panic!("bad rid"), Some(repr) => match repr { - Repr::TcpListener(ref mut tcp_listener) => { + Repr::TcpListener(ref mut tcp_listener, _) => { eager::tcp_accept(tcp_listener, resource) } _ => Either::A(tokio_util::accept(resource)), diff --git a/src/tokio_util.rs b/src/tokio_util.rs index 32542aa43a7a13..6ee5b2173d6f78 100644 --- a/src/tokio_util.rs +++ b/src/tokio_util.rs @@ -11,6 +11,13 @@ use tokio; use tokio::net::TcpStream; use tokio_executor; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +lazy_static! { + // Keep unique such that no collisions in TcpListener accept task map + static ref NEXT_ACCEPT_ID: AtomicUsize = AtomicUsize::new(0); +} + pub fn block_on(future: F) -> Result where F: Send + 'static + Future, @@ -45,6 +52,7 @@ enum AcceptState { pub fn accept(r: Resource) -> Accept { Accept { state: AcceptState::Pending(r), + task_id: NEXT_ACCEPT_ID.fetch_add(1, Ordering::SeqCst), } } @@ -55,6 +63,7 @@ pub fn accept(r: Resource) -> Accept { #[derive(Debug)] pub struct Accept { state: AcceptState, + task_id: usize, } impl Future for Accept { @@ -63,7 +72,21 @@ impl Future for Accept { fn poll(&mut self) -> Poll { let (stream, addr) = match self.state { - AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()), + // Similar to try_ready!, but also track/untrack accept task + // in TcpListener resource + // In this way, when the listener is closed, the task could be + // notified to error out (instead of stuck forever) + AcceptState::Pending(ref mut r) => match r.poll_accept() { + Ok(futures::prelude::Async::Ready(t)) => { + r.untrack_task(self.task_id); + t + } + Ok(futures::prelude::Async::NotReady) => { + r.track_task(self.task_id); + return Ok(futures::prelude::Async::NotReady); + } + Err(e) => return Err(From::from(e)), + }, AcceptState::Empty => panic!("poll Accept after it's done"), }; From aa3b4ee4a24c7957c39c2e28a3a3c2e0d5114bed Mon Sep 17 00:00:00 2001 From: "Kevin (Kun) \"Kassimo\" Qian" Date: Sun, 13 Jan 2019 22:22:14 -0800 Subject: [PATCH 2/3] Nit 1 --- src/resources.rs | 4 +--- src/tokio_util.rs | 5 ++++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/resources.rs b/src/resources.rs index 5fe48ad1dbeee6..2ea351f2ab14b9 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -200,9 +200,7 @@ impl Resource { let r = table.remove(&self.rid); assert!(r.is_some()); // If TcpListener, we must kill all pending accepts! - if let Repr::TcpListener(l, m) = r.unwrap() { - // Drop first - std::mem::drop(l); + if let Repr::TcpListener(_, m) = r.unwrap() { // Call notify on each task, so that they would error out for (_, t) in m { t.notify(); diff --git a/src/tokio_util.rs b/src/tokio_util.rs index 6ee5b2173d6f78..c8bf2a0a6d3a14 100644 --- a/src/tokio_util.rs +++ b/src/tokio_util.rs @@ -85,7 +85,10 @@ impl Future for Accept { r.track_task(self.task_id); return Ok(futures::prelude::Async::NotReady); } - Err(e) => return Err(From::from(e)), + Err(e) => { + r.untrack_task(self.task_id); + return Err(From::from(e)); + } }, AcceptState::Empty => panic!("poll Accept after it's done"), }; From 9f405cc20712689ccba27e1476dad648b09d2919 Mon Sep 17 00:00:00 2001 From: "Kevin (Kun) \"Kassimo\" Qian" Date: Tue, 15 Jan 2019 10:17:30 -0800 Subject: [PATCH 3/3] Switch to Option --- js/net_test.ts | 28 ++++++++++++++++++++--- src/resources.rs | 58 ++++++++++++++++++++++++++++------------------- src/tokio_util.rs | 22 ++++++------------ 3 files changed, 67 insertions(+), 41 deletions(-) diff --git a/js/net_test.ts b/js/net_test.ts index 3e6d2be5f9e3b4..be415f19c75a16 100644 --- a/js/net_test.ts +++ b/js/net_test.ts @@ -4,24 +4,46 @@ import { testPerm, assert, assertEqual } from "./test_util.ts"; import { deferred } from "deno"; testPerm({ net: true }, function netListenClose() { - const listener = deno.listen("tcp", "127.0.0.1:4502"); + const listener = deno.listen("tcp", "127.0.0.1:4500"); listener.close(); }); testPerm({ net: true }, async function netCloseWhileAccept() { - const listener = deno.listen("tcp", "127.0.0.1:4501"); + const listener = deno.listen("tcp", ":4501"); const p = listener.accept(); listener.close(); - let err: Error; + let err; try { await p; } catch (e) { err = e; } assert(!!err); + assertEqual(err.kind, deno.ErrorKind.Other); assertEqual(err.message, "Listener has been closed"); }); +testPerm({ net: true }, async function netConcurrentAccept() { + const listener = deno.listen("tcp", ":4502"); + let err; + // Consume this accept error + // (since it would still be waiting when listener.close is called) + listener.accept().catch(e => { + assertEqual(e.kind, deno.ErrorKind.Other); + assertEqual(e.message, "Listener has been closed"); + }); + const p1 = listener.accept(); + try { + await p1; + } catch (e) { + err = e; + } + assert(!!err); + assertEqual(err.kind, deno.ErrorKind.Other); + assertEqual(err.message, "Another accept task is ongoing"); + listener.close(); +}); + testPerm({ net: true }, async function netDialListen() { const listener = deno.listen("tcp", ":4500"); listener.accept().then(async conn => { diff --git a/src/resources.rs b/src/resources.rs index 2ea351f2ab14b9..1f5a121deaa5a1 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -87,11 +87,12 @@ enum Repr { Stdout(tokio::fs::File), Stderr(tokio::io::Stderr), FsFile(tokio::fs::File), - // Since TcpListener might be closed while there are pending accept tasks, - // we need to track these task so that when the listener is closed, - // pending tasks could be notified and die. - // The tasks are indexed, so they could be removed when accept completed. - TcpListener(tokio::net::TcpListener, HashMap), + // Since TcpListener might be closed while there is a pending accept task, + // we need to track the task so that when the listener is closed, + // this pending task could be notified and die. + // Currently TcpListener itself does not take care of this issue. + // See: https://github.com/tokio-rs/tokio/issues/846 + TcpListener(tokio::net::TcpListener, Option), TcpStream(tokio::net::TcpStream), HttpBody(HttpBody), Repl(Repl), @@ -152,7 +153,7 @@ fn inspect_repr(repr: &Repr) -> String { // Abstract async file interface. // Ideally in unix, if Resource represents an OS rid, it will be the same. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct Resource { pub rid: ResourceId, } @@ -174,22 +175,35 @@ impl Resource { } } - /// Track pending tasks with task_id as identifier (for TcpListener resource) - /// Insert the task into map for tracking - pub fn track_task(&mut self, task_id: usize) { + /// Track the current task (for TcpListener resource). + /// Throws an error if another task is already tracked. + pub fn track_task(&mut self) -> Result<(), std::io::Error> { let mut table = RESOURCE_TABLE.lock().unwrap(); - // Only register if is TcpListener - if let Some(Repr::TcpListener(_, m)) = table.get_mut(&self.rid) { - m.insert(task_id, futures::task::current()); + // Only track if is TcpListener. + if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) { + // Currently, we only allow tracking a single accept task for a listener. + // This might be changed in the future with multiple workers. + // Caveat: TcpListener by itself also only tracks an accept task at a time. + // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 + if t.is_some() { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Another accept task is ongoing", + )); + } + t.replace(futures::task::current()); } + Ok(()) } - /// Remove a task from tracking (for TcpListener resource) - /// Happens when the task is done and thus no further tracking is needed - pub fn untrack_task(&mut self, task_id: usize) { + /// Stop tracking a task (for TcpListener resource). + /// Happens when the task is done and thus no further tracking is needed. + pub fn untrack_task(&mut self) { let mut table = RESOURCE_TABLE.lock().unwrap(); - if let Some(Repr::TcpListener(_, m)) = table.get_mut(&self.rid) { - m.remove(&task_id); + // Only untrack if is TcpListener. + if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) { + assert!(t.is_some()); + t.take(); } } @@ -200,11 +214,9 @@ impl Resource { let r = table.remove(&self.rid); assert!(r.is_some()); // If TcpListener, we must kill all pending accepts! - if let Repr::TcpListener(_, m) = r.unwrap() { - // Call notify on each task, so that they would error out - for (_, t) in m { - t.notify(); - } + if let Repr::TcpListener(_, Some(t)) = r.unwrap() { + // Call notify on the tracked task, so that they would error out. + t.notify(); } } @@ -297,7 +309,7 @@ pub fn add_fs_file(fs_file: tokio::fs::File) -> Resource { pub fn add_tcp_listener(listener: tokio::net::TcpListener) -> Resource { let rid = new_rid(); let mut tg = RESOURCE_TABLE.lock().unwrap(); - let r = tg.insert(rid, Repr::TcpListener(listener, HashMap::new())); + let r = tg.insert(rid, Repr::TcpListener(listener, None)); assert!(r.is_none()); Resource { rid } } diff --git a/src/tokio_util.rs b/src/tokio_util.rs index c8bf2a0a6d3a14..2eb0211db94685 100644 --- a/src/tokio_util.rs +++ b/src/tokio_util.rs @@ -11,13 +11,6 @@ use tokio; use tokio::net::TcpStream; use tokio_executor; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -lazy_static! { - // Keep unique such that no collisions in TcpListener accept task map - static ref NEXT_ACCEPT_ID: AtomicUsize = AtomicUsize::new(0); -} - pub fn block_on(future: F) -> Result where F: Send + 'static + Future, @@ -52,7 +45,6 @@ enum AcceptState { pub fn accept(r: Resource) -> Accept { Accept { state: AcceptState::Pending(r), - task_id: NEXT_ACCEPT_ID.fetch_add(1, Ordering::SeqCst), } } @@ -63,7 +55,6 @@ pub fn accept(r: Resource) -> Accept { #[derive(Debug)] pub struct Accept { state: AcceptState, - task_id: usize, } impl Future for Accept { @@ -73,20 +64,21 @@ impl Future for Accept { fn poll(&mut self) -> Poll { let (stream, addr) = match self.state { // Similar to try_ready!, but also track/untrack accept task - // in TcpListener resource - // In this way, when the listener is closed, the task could be - // notified to error out (instead of stuck forever) + // in TcpListener resource. + // In this way, when the listener is closed, the task can be + // notified to error out (instead of stuck forever). AcceptState::Pending(ref mut r) => match r.poll_accept() { Ok(futures::prelude::Async::Ready(t)) => { - r.untrack_task(self.task_id); + r.untrack_task(); t } Ok(futures::prelude::Async::NotReady) => { - r.track_task(self.task_id); + // Would error out if another accept task is being tracked. + r.track_task()?; return Ok(futures::prelude::Async::NotReady); } Err(e) => { - r.untrack_task(self.task_id); + r.untrack_task(); return Err(From::from(e)); } },