Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Current scheduler and I/O work #6577

Closed
wants to merge 58 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
34be071
core::rt: Remove Close trait
brson Apr 25, 2013
0b4d4ed
core::rt: Fix a warning about unnecessary mutable variable
brson Apr 25, 2013
b2fbd34
core::rt: Begin implementing TcpStream
brson Apr 25, 2013
93ca5eb
core::rt: Clean up the interface to rtio
brson Apr 27, 2013
23bf892
core::rt: Improve docs
brson Apr 27, 2013
ab284d4
core::rt Restructure some modules
brson Apr 27, 2013
01b7b7d
core::rt: Use unsafe pointers instead of transmuted regions
brson Apr 27, 2013
b771c99
core::rt: Fix the finalizer on UvTcpStream and UvTcpListener
brson Apr 27, 2013
cfd183d
core::rt: Fix some copies in uv
brson Apr 28, 2013
6ab02c0
core::rt: Convert some uv functions to extension methods
brson Apr 28, 2013
91ca3a9
core::rt: Reording code
brson Apr 28, 2013
9138fea
core::rt: Only use one mechanism for attaching custom data to uv handles
brson Apr 28, 2013
dbf8966
core::rt: Move the implementation of IdleWatcher to its own file
brson Apr 28, 2013
a134503
core::rt: Move all the uv callback definitions to one place
brson Apr 28, 2013
ad6719e
core::rt: Just a small fix to TcpStream
brson May 3, 2013
10355d7
core::rt Wire up logging to newsched tasks
brson Apr 28, 2013
272c3c2
Tidy
brson May 3, 2013
936fce5
Warnings
brson May 3, 2013
40a9de5
core::rt: Add a very simple ref counted pointer
brson May 3, 2013
414f3c7
core::rt: Add a simple channel type for passing buffered messages bet…
brson May 5, 2013
d234cf7
core::rt: Make TCP servers work
brson May 6, 2013
101aaa3
core::rt: 0 is a valid TLS key
brson May 7, 2013
4472a50
rtdebug off
brson May 7, 2013
52f015a
core: Cleanup warnings
brson May 7, 2013
329dfca
core: Move unstable::exchange_alloc to rt::global_heap
brson May 7, 2013
f934fa7
core::rt: Docs
brson May 7, 2013
204e3d8
core::rt: Register stacks with valgrind. #6428
brson May 12, 2013
ee0ce64
core::rt: Wait for handles to close
brson May 12, 2013
b04fce6
Merge remote-tracking branch 'brson/io-upstream' into incoming
brson May 14, 2013
6a6076a
core::rt: Ignore tcp test multiple_connect_interleaved_lazy_schedule
brson May 15, 2013
bfd9aa9
core:rt: A few micro-opts
brson May 8, 2013
36ad366
core::rt: Add a test of standalone use of the runtime
brson May 8, 2013
f6401ba
core: Use a global lock instead of runtime lock for os::getenv, etc. …
brson May 8, 2013
cc2897d
core: Replace use of libc::getenv with os::getenv
brson May 8, 2013
0a54bad
core::rt: Initialize logging
brson May 8, 2013
174ec1e
core::rt: Error handling for TcpStream.read
brson May 9, 2013
afcf4f2
core::rt: Don't abort when reporting an unknown uv error
brson May 9, 2013
013b776
core: Turn task::unkillable, etc. into no-ops in newsched. #6377
brson May 10, 2013
b764d4c
core::rt: Begin implementing Reader extension methods
brson May 10, 2013
76e0977
core::rt: `read` raises `read_error`
brson May 13, 2013
4724966
core::rt: Add uv timer bindings
brson May 11, 2013
c42b03d
core::rt: Fix scheduling logic for enqueued tasks
brson May 11, 2013
56c0b18
rt: Rename sched_key to rt_key
brson May 12, 2013
7f5746f
core::rt: Rename Sched.task_queue to work_queue
brson May 12, 2013
390dde5
core::rt: Rename Task to Coroutine
brson May 12, 2013
1c1f11e
core::rt: Warnings
brson May 13, 2013
28a13ec
core::rt: Make push_bytes raise read_error on EOF
brson May 13, 2013
d45dc8d
core::rt: More work on Reader extensions and error handling
brson May 13, 2013
2bc1e6b
core::rt: Copy many of the old io extensions to the new io
brson May 14, 2013
d951da8
core::rt: Fix TCP test on mac
brson May 15, 2013
0d1331f
Merge remote-tracking branch 'brson/io' into incoming
brson May 15, 2013
018dfaf
core::rt: Unignore a fixed TCP test
brson May 15, 2013
03a8e59
Merge remote-tracking branch 'brson/io' into incoming
brson May 18, 2013
f5987b0
core::rt: implement `oneshot` and `stream`.
brson May 16, 2013
26becc3
core: Wire up oneshot pipes to newsched
brson May 17, 2013
df9e412
core: Wire up `stream` to newsched
brson May 18, 2013
633af4c
Whitespace
brson May 18, 2013
8daa5ec
xfail-fast run-pass/core-rt-smoke
brson May 18, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 121 additions & 63 deletions src/libcore/rt/io/extensions.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@
// XXX: Iteration should probably be considered separately

use vec;
use rt::io::{Reader, read_error, standard_error, EndOfFile};
use rt::io::{Reader, read_error, standard_error, EndOfFile, DEFAULT_BUF_SIZE};
use option::{Option, Some, None};
use unstable::finally::Finally;
use util;
@@ -37,18 +37,17 @@ pub trait ReaderUtil {
/// # Failure
///
/// Raises the same conditions as `read`. Additionally raises `read_error`
/// on EOF. If `read_error` is handled then `push_bytes` returns without
/// pushing any bytes onto `buf` - that is, `buf` has the same length
/// upon exit as it did on entry.
/// on EOF. If `read_error` is handled then `push_bytes` may push less
/// than the requested number of bytes.
fn push_bytes(&mut self, buf: &mut ~[u8], len: uint);

/// Reads `len` bytes and gives you back a new vector of length `len`
///
/// # Failure
///
/// Raises the same conditions as `read`. Additionally raises `read_error`
/// on EOF. If `read_error` is handled then the returned vector has
/// length 0.
/// on EOF. If `read_error` is handled then the returned vector may
/// contain less than the requested number of bytes.
fn read_bytes(&mut self, len: uint) -> ~[u8];

/// Reads all remaining bytes from the stream.
@@ -60,60 +59,6 @@ pub trait ReaderUtil {

}

impl<T: Reader> ReaderUtil for T {
fn read_byte(&mut self) -> Option<u8> {
let mut buf = [0];
match self.read(buf) {
Some(0) => {
debug!("read 0 bytes. trying again");
self.read_byte()
}
Some(1) => Some(buf[0]),
Some(_) => util::unreachable(),
None => None
}
}

fn push_bytes(&mut self, buf: &mut ~[u8], len: uint) {
unsafe {
let start_len = buf.len();
let mut total_read = 0;

vec::reserve_at_least(buf, start_len + len);
vec::raw::set_len(buf, start_len + len);

do (|| {
while total_read < len {
let slice = vec::mut_slice(*buf, start_len + total_read, buf.len());
match self.read(slice) {
Some(nread) => {
total_read += nread;
}
None => {
read_error::cond.raise(standard_error(EndOfFile));
// Reset the vector length as though we didn't read anything
total_read = 0;
break;
}
}
}
}).finally {
vec::raw::set_len(buf, start_len + total_read);
}
}
}

fn read_bytes(&mut self, len: uint) -> ~[u8] {
let mut buf = vec::with_capacity(len);
self.push_bytes(&mut buf, len);
return buf;
}

fn read_to_end(&mut self) -> ~[u8] {
fail!()
}
}

pub trait ReaderByteConversions {
/// Reads `n` little-endian unsigned integer bytes.
///
@@ -323,6 +268,71 @@ pub trait WriterByteConversions {
fn write_i8(&mut self, n: i8);
}

impl<T: Reader> ReaderUtil for T {
fn read_byte(&mut self) -> Option<u8> {
let mut buf = [0];
match self.read(buf) {
Some(0) => {
debug!("read 0 bytes. trying again");
self.read_byte()
}
Some(1) => Some(buf[0]),
Some(_) => util::unreachable(),
None => None
}
}

fn push_bytes(&mut self, buf: &mut ~[u8], len: uint) {
unsafe {
let start_len = buf.len();
let mut total_read = 0;

vec::reserve_at_least(buf, start_len + len);
vec::raw::set_len(buf, start_len + len);

do (|| {
while total_read < len {
let slice = vec::mut_slice(*buf, start_len + total_read, buf.len());
match self.read(slice) {
Some(nread) => {
total_read += nread;
}
None => {
read_error::cond.raise(standard_error(EndOfFile));
break;
}
}
}
}).finally {
vec::raw::set_len(buf, start_len + total_read);
}
}
}

fn read_bytes(&mut self, len: uint) -> ~[u8] {
let mut buf = vec::with_capacity(len);
self.push_bytes(&mut buf, len);
return buf;
}

fn read_to_end(&mut self) -> ~[u8] {
let mut buf = vec::with_capacity(DEFAULT_BUF_SIZE);
let mut keep_reading = true;
do read_error::cond.trap(|e| {
if e.kind == EndOfFile {
keep_reading = false;
} else {
read_error::cond.raise(e)
}
}).in {
while keep_reading {
self.push_bytes(&mut buf, DEFAULT_BUF_SIZE)
}
}
return buf;
}
}

#[cfg(test)]
mod test {
use super::*;
@@ -414,7 +424,7 @@ mod test {
let mut reader = MemReader::new(~[10, 11]);
do read_error::cond.trap(|_| {
}).in {
assert!(reader.read_bytes(4) == ~[]);
assert!(reader.read_bytes(4) == ~[10, 11]);
}
}

@@ -456,7 +466,7 @@ mod test {
do read_error::cond.trap(|_| {
}).in {
reader.push_bytes(&mut buf, 4);
assert!(buf == ~[8, 9]);
assert!(buf == ~[8, 9, 10, 11]);
}
}

@@ -480,7 +490,7 @@ mod test {
do read_error::cond.trap(|_| { } ).in {
reader.push_bytes(&mut buf, 4);
}
assert!(buf == ~[8, 9]);
assert!(buf == ~[8, 9, 10]);
}

#[test]
@@ -514,4 +524,52 @@ mod test {
}
}

#[test]
fn read_to_end() {
let mut reader = MockReader::new();
let count = Cell(0);
reader.read = |buf| {
do count.with_mut_ref |count| {
if *count == 0 {
*count = 1;
buf[0] = 10;
buf[1] = 11;
Some(2)
} else if *count == 1 {
*count = 2;
buf[0] = 12;
buf[1] = 13;
Some(2)
} else {
None
}
}
};
let buf = reader.read_to_end();
assert!(buf == ~[10, 11, 12, 13]);
}

#[test]
#[should_fail]
#[ignore(cfg(windows))]
fn read_to_end_error() {
let mut reader = MockReader::new();
let count = Cell(0);
reader.read = |buf| {
do count.with_mut_ref |count| {
if *count == 0 {
*count = 1;
buf[0] = 10;
buf[1] = 11;
Some(2)
} else {
read_error::cond.raise(placeholder_error());
None
}
}
};
let buf = reader.read_to_end();
assert!(buf == ~[10, 11]);
}

}
8 changes: 6 additions & 2 deletions src/libcore/rt/io/mod.rs
Original file line number Diff line number Diff line change
@@ -187,7 +187,7 @@ In particular code written to ignore errors and expect conditions to be unhandle
will start passing around null or zero objects when wrapped in a condition handler.

* XXX: How should we use condition handlers that return values?

* XXX: Should EOF raise default conditions when EOF is not an error?

# Issues withi/o scheduler affinity, work stealing, task pinning

@@ -323,6 +323,10 @@ pub mod native {
/// Mock implementations for testing
mod mock;

/// The default buffer size for various I/O operations
/// XXX: Not pub
pub static DEFAULT_BUF_SIZE: uint = 1024 * 64;

/// The type passed to I/O condition handlers to indicate error
///
/// # XXX
@@ -375,7 +379,7 @@ pub trait Reader {
///
/// # XXX
///
/// * Should raise error on eof
/// * Should raise_default error on eof?
/// * If the condition is handled it should still return the bytes read,
/// in which case there's no need to return Option - but then you *have*
/// to install a handler to detect eof.