diff --git a/js/net.ts b/js/net.ts index ccbd331ddc00f2..1258a0ff195985 100644 --- a/js/net.ts +++ b/js/net.ts @@ -55,6 +55,8 @@ class ListenerImpl implements Listener { export interface Conn extends Reader, Writer, Closer { localAddr: string; remoteAddr: string; + closeRead(): void; + closeWrite(): void; } class ConnImpl implements Conn { @@ -80,19 +82,35 @@ class ConnImpl implements Conn { * Most callers should just use close(). */ closeRead(): void { - // TODO(ry) Connect to AsyncWrite::shutdown in resources.rs - return notImplemented(); + shutdown(this.fd, ShutdownMode.Read); } /** closeWrite shuts down (shutdown(2)) the writing side of the TCP * connection. Most callers should just use close(). */ closeWrite(): void { - // TODO(ry) Connect to AsyncWrite::shutdown in resources.rs - return notImplemented(); + shutdown(this.fd, ShutdownMode.Write); } } +enum ShutdownMode { + // See http://man7.org/linux/man-pages/man2/shutdown.2.html + // Corresponding to SHUT_RD, SHUT_WR, SHUT_RDWR + Read = 0, + Write, + ReadWrite // unused +} + +function shutdown(fd: number, how: ShutdownMode) { + const builder = new flatbuffers.Builder(); + msg.Shutdown.startShutdown(builder); + msg.Shutdown.addRid(builder, fd); + msg.Shutdown.addHow(builder, how); + const inner = msg.Shutdown.endShutdown(builder); + const baseRes = dispatch.sendSync(builder, msg.Any.Shutdown, inner); + assert(baseRes == null); +} + /** Listen announces on the local network address. * * The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket". diff --git a/js/net_test.ts b/js/net_test.ts index 0b6db7afa72c48..c7c8dbb59c5ca7 100644 --- a/js/net_test.ts +++ b/js/net_test.ts @@ -2,6 +2,7 @@ import * as deno from "deno"; import { testPerm, assert, assertEqual } from "./test_util.ts"; +import { deferred } from "./util.ts"; testPerm({ net: true }, function netListenClose() { const listener = deno.listen("tcp", "127.0.0.1:4500"); @@ -9,7 +10,7 @@ testPerm({ net: true }, function netListenClose() { }); testPerm({ net: true }, async function netDialListen() { - let addr = "127.0.0.1:4500"; + const addr = "127.0.0.1:4500"; const listener = deno.listen("tcp", addr); listener.accept().then(async conn => { await conn.write(new Uint8Array([1, 2, 3])); @@ -35,3 +36,115 @@ testPerm({ net: true }, async function netDialListen() { listener.close(); conn.close(); }); + +testPerm({ net: true }, async function netCloseReadSuccess() { + const addr = "127.0.0.1:4500"; + const listener = deno.listen("tcp", addr); + const closeDeferred = deferred(); + listener.accept().then(async conn => { + await conn.write(new Uint8Array([1, 2, 3])); + const buf = new Uint8Array(1024); + const readResult = await conn.read(buf); + assertEqual(3, readResult.nread); + assertEqual(4, buf[0]); + assertEqual(5, buf[1]); + assertEqual(6, buf[2]); + conn.close(); + closeDeferred.resolve(); + }); + const conn = await deno.dial("tcp", addr); + conn.closeRead(); // closing read + const buf = new Uint8Array(1024); + const readResult = await conn.read(buf); + assertEqual(0, readResult.nread); // No error, read nothing + assertEqual(true, readResult.eof); // with immediate EOF + // Ensure closeRead does not impact write + await conn.write(new Uint8Array([4, 5, 6])); + await closeDeferred.promise; + listener.close(); + conn.close(); +}); + +testPerm({ net: true }, async function netDoubleCloseRead() { + const addr = "127.0.0.1:4500"; + const listener = deno.listen("tcp", addr); + const closeDeferred = deferred(); + listener.accept().then(async conn => { + await conn.write(new Uint8Array([1, 2, 3])); + await closeDeferred.promise; + conn.close(); + }); + const conn = await deno.dial("tcp", addr); + conn.closeRead(); // closing read + let err; + try { + // Duplicated close should throw error + conn.closeRead(); + } catch (e) { + err = e; + } + assert(!!err); + assertEqual(err.kind, deno.ErrorKind.NotConnected); + assertEqual(err.name, "NotConnected"); + closeDeferred.resolve(); + listener.close(); + conn.close(); +}); + +testPerm({ net: true }, async function netCloseWriteSuccess() { + const addr = "127.0.0.1:4500"; + const listener = deno.listen("tcp", addr); + const closeDeferred = deferred(); + listener.accept().then(async conn => { + await conn.write(new Uint8Array([1, 2, 3])); + await closeDeferred.promise; + conn.close(); + }); + const conn = await deno.dial("tcp", addr); + conn.closeWrite(); // closing write + const buf = new Uint8Array(1024); + // Check read not impacted + const readResult = await conn.read(buf); + assertEqual(3, readResult.nread); + assertEqual(1, buf[0]); + assertEqual(2, buf[1]); + assertEqual(3, buf[2]); + // Check write should be closed + let err; + try { + await conn.write(new Uint8Array([1, 2, 3])); + } catch (e) { + err = e; + } + assert(!!err); + assertEqual(err.kind, deno.ErrorKind.BrokenPipe); + assertEqual(err.name, "BrokenPipe"); + closeDeferred.resolve(); + listener.close(); + conn.close(); +}); + +testPerm({ net: true }, async function netDoubleCloseWrite() { + const addr = "127.0.0.1:4500"; + const listener = deno.listen("tcp", addr); + const closeDeferred = deferred(); + listener.accept().then(async conn => { + await closeDeferred.promise; + conn.close(); + }); + const conn = await deno.dial("tcp", addr); + conn.closeWrite(); // closing write + let err; + try { + // Duplicated close should throw error + conn.closeWrite(); + } catch (e) { + err = e; + } + assert(!!err); + assertEqual(err.kind, deno.ErrorKind.NotConnected); + assertEqual(err.name, "NotConnected"); + closeDeferred.resolve(); + listener.close(); + conn.close(); +}); diff --git a/js/testing/testing.ts b/js/testing/testing.ts index 64ecbb6bc1138b..ee2ab1456aeead 100644 --- a/js/testing/testing.ts +++ b/js/testing/testing.ts @@ -57,12 +57,11 @@ const FG_RED = "\x1b[31m"; const FG_GREEN = "\x1b[32m"; function red_failed() { - return FG_RED + "FAILED" + RESET + return FG_RED + "FAILED" + RESET; } - function green_ok() { - return FG_GREEN + "ok" + RESET + return FG_GREEN + "ok" + RESET; } async function runTests() { @@ -96,8 +95,8 @@ async function runTests() { const result = failed > 0 ? red_failed() : green_ok(); console.log( `\ntest result: ${result}. ${passed} passed; ${failed} failed; ` + - `${ignored} ignored; ${measured} measured; ${filtered} filtered out\n`); - + `${ignored} ignored; ${measured} measured; ${filtered} filtered out\n` + ); if (failed === 0) { // All good. diff --git a/js/util.ts b/js/util.ts index bfde0190849f09..de6a078bbe9ce8 100644 --- a/js/util.ts +++ b/js/util.ts @@ -101,3 +101,29 @@ export function containsOnlyASCII(str: string): boolean { } return /^[\x00-\x7F]*$/.test(str); } + +// @internal +export interface Deferred { + promise: Promise; + resolve: Function; + reject: Function; +} + +/** + * Create a wrapper around a promise that could be + * resolved externally. + * @internal + */ +export function deferred(): Deferred { + let resolve: Function | undefined; + let reject: Function | undefined; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { + promise, + resolve: resolve!, + reject: reject! + }; +} diff --git a/src/msg.fbs b/src/msg.fbs index 9479d789280916..16d10cdff0b638 100644 --- a/src/msg.fbs +++ b/src/msg.fbs @@ -35,6 +35,7 @@ union Any { Write, WriteRes, Close, + Shutdown, Listen, ListenRes, Accept, @@ -290,6 +291,11 @@ table Close { rid: int; } +table Shutdown { + rid: int; + how: uint; +} + table Listen { network: string; address: string; diff --git a/src/ops.rs b/src/ops.rs index 53163dfd491117..fb67d4befb9885 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -9,6 +9,8 @@ use isolate::Isolate; use isolate::IsolateState; use isolate::Op; use msg; +use resources; +use resources::Resource; use tokio_util; use flatbuffers::FlatBufferBuilder; @@ -19,10 +21,9 @@ use hyper; use hyper::rt::{Future, Stream}; use hyper::Client; use remove_dir_all::remove_dir_all; -use resources; use std; use std::fs; -use std::net::SocketAddr; +use std::net::{Shutdown, SocketAddr}; #[cfg(any(unix))] use std::os::unix::fs::PermissionsExt; use std::path::Path; @@ -84,6 +85,7 @@ pub fn dispatch( msg::Any::Read => op_read, msg::Any::Write => op_write, msg::Any::Close => op_close, + msg::Any::Shutdown => op_shutdown, msg::Any::Remove => op_remove, msg::Any::ReadFile => op_read_file, msg::Any::ReadDir => op_read_dir, @@ -614,6 +616,35 @@ fn op_close( } } +fn op_shutdown( + _state: Arc, + base: &msg::Base, + data: &'static mut [u8], +) -> Box { + assert_eq!(data.len(), 0); + let inner = base.inner_as_shutdown().unwrap(); + let rid = inner.rid(); + let how = inner.how(); + match resources::lookup(rid) { + None => odd_future(errors::new( + errors::ErrorKind::BadFileDescriptor, + String::from("Bad File Descriptor"), + )), + Some(mut resource) => { + let shutdown_mode = match how { + 0 => Shutdown::Read, + 1 => Shutdown::Write, + _ => unimplemented!(), + }; + blocking!(base.sync(), || { + // Use UFCS for disambiguation + Resource::shutdown(&mut resource, shutdown_mode)?; + Ok(empty_buf()) + }) + } + } +} + fn op_read( _state: Arc, base: &msg::Base, diff --git a/src/resources.rs b/src/resources.rs index 75bad04b7b3ee2..5a13e6cbfdbdc4 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -8,13 +8,15 @@ // descriptors". This module implements a global resource table. Ops (AKA // handlers) look up resources by their integer id here. +use errors::DenoError; + use futures; use futures::Poll; use std; use std::collections::HashMap; use std::io::Error; use std::io::{Read, Write}; -use std::net::SocketAddr; +use std::net::{Shutdown, SocketAddr}; use std::sync::atomic::AtomicIsize; use std::sync::atomic::Ordering; use std::sync::Mutex; @@ -79,6 +81,20 @@ impl Resource { let r = table.remove(&self.rid); assert!(r.is_some()); } + + pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&self.rid); + match maybe_repr { + None => panic!("bad rid"), + Some(repr) => match repr { + Repr::TcpStream(ref mut f) => { + TcpStream::shutdown(f, how).map_err(|err| DenoError::from(err)) + } + _ => panic!("Cannot shutdown"), + }, + } + } } impl Read for Resource {