Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement closeRead/closeWrite using TcpStream::shutdown #903

Merged
merged 5 commits into from
Oct 5, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions js/net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ export type Network = "tcp";
// export type Network = "tcp" | "tcp4" | "tcp6" | "unix" | "unixpacket";

// TODO Support finding network from Addr, see https://golang.org/pkg/net/#Addr
export type Addr = string;
export interface Addr {
network: Network;
address: string;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please leave the Addr changes out of this PR? I'd like to discuss them separately from shutdown.


/** A Listener is a generic network listener for stream-oriented protocols. */
export interface Listener {
Expand All @@ -28,7 +31,11 @@ export interface Listener {
}

class ListenerImpl implements Listener {
constructor(readonly fd: number) {}
private addr_: Addr;

constructor(readonly fd: number, network: Network, address: string) {
this.addr_ = Object.freeze({ network, address });
}

async accept(): Promise<Conn> {
const builder = new flatbuffers.Builder();
Expand All @@ -48,13 +55,15 @@ class ListenerImpl implements Listener {
}

addr(): Addr {
return notImplemented();
return this.addr_;
}
}

export interface Conn extends Reader, Writer, Closer {
localAddr: string;
remoteAddr: string;
closeRead(): void;
closeWrite(): void;
}

class ConnImpl implements Conn {
Expand All @@ -81,18 +90,28 @@ class ConnImpl implements Conn {
*/
closeRead(): void {
// TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
return notImplemented();
shutdown(this.fd, false);
}

/** closeWrite shuts down (shutdown(2)) the writing side of the TCP
* connection. Most callers should just use close().
*/
closeWrite(): void {
// TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
kevinkassimo marked this conversation as resolved.
Show resolved Hide resolved
return notImplemented();
shutdown(this.fd, true);
}
}

function shutdown(fd: number, isWrite: boolean) {
const builder = new flatbuffers.Builder();
msg.Shutdown.startShutdown(builder);
msg.Shutdown.addRid(builder, fd);
msg.Shutdown.addIsWrite(builder, isWrite);
const inner = msg.Shutdown.endShutdown(builder);
const baseRes = dispatch.sendSync(builder, msg.Any.Shutdown, inner);
ry marked this conversation as resolved.
Show resolved Hide resolved
assert(baseRes == null);
}

/** Listen announces on the local network address.
*
* The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
Expand Down Expand Up @@ -121,7 +140,7 @@ export function listen(network: Network, address: string): Listener {
assert(msg.Any.ListenRes === baseRes!.innerType());
const res = new msg.ListenRes();
assert(baseRes!.inner(res) != null);
return new ListenerImpl(res.rid());
return new ListenerImpl(res.rid(), network, address);
}

/** Dial connects to the address on the named network.
Expand Down
58 changes: 57 additions & 1 deletion js/net_test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.

import * as deno from "deno";
import { testPerm, assert, assertEqual } from "./test_util.ts";
import { testPerm, assert, assertEqual, deferred } from "./test_util.ts";

testPerm({ net: true }, function netListenClose() {
const listener = deno.listen("tcp", "127.0.0.1:4500");
Expand Down Expand Up @@ -35,3 +35,59 @@ testPerm({ net: true }, async function netDialListen() {
listener.close();
conn.close();
});

testPerm({ net: true }, async function netConnCloseRead() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to netCloseReadSuccess()

const addr = "127.0.0.1:4500";
const listener = deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(async conn => {
await conn.write(new Uint8Array([1, 2, 3]));
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEqual(3, readResult.nread);
assertEqual(4, buf[0]);
assertEqual(5, buf[1]);
assertEqual(6, buf[2]);
conn.close();
closeDeferred.resolve();
});
const conn = await deno.dial("tcp", addr);
conn.closeRead(); // closing read
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEqual(0, readResult.nread); // TODO: no error?
assertEqual(true, readResult.eof);
await conn.write(new Uint8Array([4, 5, 6]));
await closeDeferred.promise;
listener.close();
conn.close();
});

testPerm({ net: true }, async function netConnCloseWrite() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to netCloseWriteSuccess

const addr = "127.0.0.1:4500";
const listener = deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(async conn => {
await conn.write(new Uint8Array([1, 2, 3]));
await closeDeferred.promise;
conn.close();
});
const conn = await deno.dial("tcp", addr);
conn.closeWrite(); // closing read
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEqual(3, readResult.nread);
assertEqual(1, buf[0]);
assertEqual(2, buf[1]);
assertEqual(3, buf[2]);
let err;
try {
await conn.write(new Uint8Array([1, 2, 3]));
} catch (e) {
err = e;
}
assert(!!err); // TODO: Broken Pipe?
closeDeferred.resolve();
listener.close();
conn.close();
});
kevinkassimo marked this conversation as resolved.
Show resolved Hide resolved
20 changes: 20 additions & 0 deletions js/test_util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ export function test(fn: testing.TestFunction) {
testPerm({ write: false, net: false, env: false }, fn);
}

interface Deferred {
promise: Promise<void>;
resolve: Function;
reject: Function;
}

export function deferred(): Deferred {
let resolve: Function | undefined;
let reject: Function | undefined;
const promise = new Promise<void>((res, rej) => {
resolve = res;
reject = rej;
});
return {
promise,
resolve: resolve!,
reject: reject!
};
}
kevinkassimo marked this conversation as resolved.
Show resolved Hide resolved

test(function permSerialization() {
for (const write of [true, false]) {
for (const net of [true, false]) {
Expand Down
9 changes: 4 additions & 5 deletions js/testing/testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ const FG_RED = "\x1b[31m";
const FG_GREEN = "\x1b[32m";

function red_failed() {
return FG_RED + "FAILED" + RESET
return FG_RED + "FAILED" + RESET;
}


function green_ok() {
return FG_GREEN + "ok" + RESET
return FG_GREEN + "ok" + RESET;
}

async function runTests() {
Expand Down Expand Up @@ -96,8 +95,8 @@ async function runTests() {
const result = failed > 0 ? red_failed() : green_ok();
console.log(
`\ntest result: ${result}. ${passed} passed; ${failed} failed; ` +
`${ignored} ignored; ${measured} measured; ${filtered} filtered out\n`);

`${ignored} ignored; ${measured} measured; ${filtered} filtered out\n`
);

if (failed === 0) {
// All good.
Expand Down
6 changes: 6 additions & 0 deletions src/msg.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ union Any {
Write,
WriteRes,
Close,
Shutdown,
Listen,
ListenRes,
Accept,
Expand Down Expand Up @@ -290,6 +291,11 @@ table Close {
rid: int;
}

table Shutdown {
rid: int;
is_write: bool;
}

table Listen {
network: string;
address: string;
Expand Down
29 changes: 28 additions & 1 deletion src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use remove_dir_all::remove_dir_all;
use resources;
use std;
use std::fs;
use std::net::SocketAddr;
use std::net::{Shutdown, SocketAddr};
#[cfg(any(unix))]
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
Expand Down Expand Up @@ -84,6 +84,7 @@ pub fn dispatch(
msg::Any::Read => op_read,
msg::Any::Write => op_write,
msg::Any::Close => op_close,
msg::Any::Shutdown => op_shutdown,
msg::Any::Remove => op_remove,
msg::Any::ReadFile => op_read_file,
msg::Any::ReadDir => op_read_dir,
Expand Down Expand Up @@ -614,6 +615,32 @@ fn op_close(
}
}

fn op_shutdown(
_state: Arc<IsolateState>,
base: &msg::Base,
data: &'static mut [u8],
) -> Box<Op> {
assert_eq!(data.len(), 0);
let inner = base.inner_as_shutdown().unwrap();
let rid = inner.rid();
let is_write = inner.is_write();
match resources::lookup(rid) {
None => odd_future(errors::new(
errors::ErrorKind::BadFileDescriptor,
String::from("Bad File Descriptor"),
)),
Some(mut resource) => {
let shutdown_mode = if is_write {
Shutdown::Write
} else {
Shutdown::Read
};
resource.shutdown_on(shutdown_mode);
ok_future(empty_buf())
}
}
}

fn op_read(
_state: Arc<IsolateState>,
base: &msg::Base,
Expand Down
15 changes: 14 additions & 1 deletion src/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std;
use std::collections::HashMap;
use std::io::Error;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::net::{Shutdown, SocketAddr};
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
Expand Down Expand Up @@ -79,6 +79,19 @@ impl Resource {
let r = table.remove(&self.rid);
assert!(r.is_some());
}

// no collision with unimplemented shutdown
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?

pub fn shutdown_on(&mut self, how: Shutdown) {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&self.rid);
match maybe_repr {
None => panic!("bad rid"),
Some(repr) => match repr {
Repr::TcpStream(ref mut f) => TcpStream::shutdown(f, how).unwrap(),
_ => panic!("Cannot shutdown"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good - nice.

},
}
}
}

impl Read for Resource {
Expand Down