diff --git a/Makefile.in b/Makefile.in index 268a25d72fc19..dd2e6a95861bd 100644 --- a/Makefile.in +++ b/Makefile.in @@ -238,7 +238,7 @@ $(foreach target,$(CFG_TARGET_TRIPLES),\ CORELIB_CRATE := $(S)src/libcore/core.rc CORELIB_INPUTS := $(wildcard $(addprefix $(S)src/libcore/, \ - core.rc *.rs */*.rs)) + core.rc *.rs */*.rs */*/*rs)) ###################################################################### # Standard library variables diff --git a/mk/tests.mk b/mk/tests.mk index 33a828d6e6785..f96b7325f60d4 100644 --- a/mk/tests.mk +++ b/mk/tests.mk @@ -244,21 +244,29 @@ $(foreach host,$(CFG_HOST_TRIPLES), \ define TEST_RUNNER +# If NO_REBUILD is set then break the dependencies on std so we can +# test crates without rebuilding core and std first +ifeq ($(NO_REBUILD),) +STDTESTDEP_$(1)_$(2)_$(3) = $$(TLIB$(1)_T_$(2)_H_$(3))/$$(CFG_STDLIB_$(2)) +else +STDTESTDEP_$(1)_$(2)_$(3) = +endif + $(3)/test/coretest.stage$(1)-$(2)$$(X_$(2)): \ $$(CORELIB_CRATE) $$(CORELIB_INPUTS) \ - $$(TLIB$(1)_T_$(2)_H_$(3))/$$(CFG_STDLIB_$(2)) + $$(STDTESTDEP_$(1)_$(2)_$(3)) @$$(call E, compile_and_link: $$@) $$(STAGE$(1)_T_$(2)_H_$(3)) -o $$@ $$< --test $(3)/test/stdtest.stage$(1)-$(2)$$(X_$(2)): \ $$(STDLIB_CRATE) $$(STDLIB_INPUTS) \ - $$(TLIB$(1)_T_$(2)_H_$(3))/$$(CFG_STDLIB_$(2)) + $$(STDTESTDEP_$(1)_$(2)_$(3)) @$$(call E, compile_and_link: $$@) $$(STAGE$(1)_T_$(2)_H_$(3)) -o $$@ $$< --test $(3)/test/syntaxtest.stage$(1)-$(2)$$(X_$(2)): \ $$(LIBSYNTAX_CRATE) $$(LIBSYNTAX_INPUTS) \ - $$(TLIB$(1)_T_$(2)_H_$(3))/$$(CFG_STDLIB_$(2)) + $$(STDTESTDEP_$(1)_$(2)_$(3)) @$$(call E, compile_and_link: $$@) $$(STAGE$(1)_T_$(2)_H_$(3)) -o $$@ $$< --test diff --git a/src/compiletest/common.rs b/src/compiletest/common.rs index 55d9f467f79e2..f892e3a1e232a 100644 --- a/src/compiletest/common.rs +++ b/src/compiletest/common.rs @@ -63,6 +63,9 @@ pub struct config { // Run tests using the JIT jit: bool, + // Run tests using the new runtime + newrt: bool, + // Explain what's going on verbose: bool diff --git a/src/compiletest/compiletest.rc b/src/compiletest/compiletest.rc index 7d53b29e04029..0c1f328ad09fa 100644 --- a/src/compiletest/compiletest.rc +++ b/src/compiletest/compiletest.rc @@ -61,7 +61,8 @@ pub fn parse_config(args: ~[~str]) -> config { getopts::optopt(~"runtool"), getopts::optopt(~"rustcflags"), getopts::optflag(~"verbose"), getopts::optopt(~"logfile"), - getopts::optflag(~"jit")]; + getopts::optflag(~"jit"), + getopts::optflag(~"newrt")]; fail_unless!(!args.is_empty()); let args_ = vec::tail(args); @@ -95,6 +96,7 @@ pub fn parse_config(args: ~[~str]) -> config { runtool: getopts::opt_maybe_str(matches, ~"runtool"), rustcflags: getopts::opt_maybe_str(matches, ~"rustcflags"), jit: getopts::opt_present(matches, ~"jit"), + newrt: getopts::opt_present(matches, ~"newrt"), verbose: getopts::opt_present(matches, ~"verbose") } } @@ -114,6 +116,7 @@ pub fn log_config(config: config) { logv(c, fmt!("runtool: %s", opt_str(config.runtool))); logv(c, fmt!("rustcflags: %s", opt_str(config.rustcflags))); logv(c, fmt!("jit: %b", config.jit)); + logv(c, fmt!("newrt: %b", config.newrt)); logv(c, fmt!("verbose: %b", config.verbose)); logv(c, fmt!("\n")); } diff --git a/src/compiletest/runtest.rs b/src/compiletest/runtest.rs index 62dee541849f7..f17e9ffe548bd 100644 --- a/src/compiletest/runtest.rs +++ b/src/compiletest/runtest.rs @@ -484,9 +484,17 @@ fn compile_test_(config: config, props: TestProps, fn exec_compiled_test(config: config, props: TestProps, testfile: &Path) -> ProcRes { + + // If testing the new runtime then set the RUST_NEWRT env var + let env = if config.newrt { + props.exec_env + ~[(~"RUST_NEWRT", ~"1")] + } else { + props.exec_env + }; + compose_and_run(config, testfile, make_run_args(config, props, testfile), - props.exec_env, + env, config.run_lib_path, None) } diff --git a/src/libcore/rt/context.rs b/src/libcore/rt/context.rs index 4798399d5e948..527acd4d1b1ba 100644 --- a/src/libcore/rt/context.rs +++ b/src/libcore/rt/context.rs @@ -8,6 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use option::*; use super::stack::StackSegment; use libc::c_void; use cast::{transmute, transmute_mut_unsafe, @@ -16,17 +17,30 @@ use cast::{transmute, transmute_mut_unsafe, // XXX: Registers is boxed so that it is 16-byte aligned, for storing // SSE regs. It would be marginally better not to do this. In C++ we // use an attribute on a struct. -pub struct Context(~Registers); +// XXX: It would be nice to define regs as `~Option` since +// the registers are sometimes empty, but the discriminant would +// then misalign the regs again. +pub struct Context { + /// The context entry point, saved here for later destruction + start: Option<~~fn()>, + /// Hold the registers while the task or scheduler is suspended + regs: ~Registers +} pub impl Context { fn empty() -> Context { - Context(new_regs()) + Context { + start: None, + regs: new_regs() + } } /// Create a new context that will resume execution by running ~fn() - /// # Safety Note - /// The `start` closure must remain valid for the life of the Task - fn new(start: &~fn(), stack: &mut StackSegment) -> Context { + fn new(start: ~fn(), stack: &mut StackSegment) -> Context { + // XXX: Putting main into a ~ so it's a thin pointer and can + // be passed to the spawn function. Another unfortunate + // allocation + let start = ~start; // The C-ABI function that is the task entry point extern fn task_start_wrapper(f: &~fn()) { (*f)() } @@ -40,21 +54,29 @@ pub impl Context { // which we will then modify to call the given function when restored let mut regs = new_regs(); unsafe { - swap_registers(transmute_mut_region(&mut *regs), - transmute_region(&*regs)) + swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)) }; initialize_call_frame(&mut *regs, fp, argp, sp); - return Context(regs); + return Context { + start: Some(start), + regs: regs + } } + /* Switch contexts + + Suspend the current execution context and resume another by + saving the registers values of the executing thread to a Context + then loading the registers from a previously saved Context. + */ fn swap(out_context: &mut Context, in_context: &Context) { let out_regs: &mut Registers = match out_context { - &Context(~ref mut r) => r + &Context { regs: ~ref mut r, _ } => r }; let in_regs: &Registers = match in_context { - &Context(~ref r) => r + &Context { regs: ~ref r, _ } => r }; unsafe { swap_registers(out_regs, in_regs) }; @@ -84,11 +106,10 @@ fn new_regs() -> ~Registers { } #[cfg(target_arch = "x86")] -fn initialize_call_frame(regs: &mut Registers, - fptr: *c_void, arg: *c_void, sp: *mut uint) { +fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp: *mut uint) { let sp = align_down(sp); - let sp = mut_offset(sp, -4); // XXX: -4 words? Needs this be done at all? + let sp = mut_offset(sp, -4); unsafe { *sp = arg as uint; } let sp = mut_offset(sp, -1); @@ -108,8 +129,7 @@ type Registers = [uint * 22]; fn new_regs() -> ~Registers { ~[0, .. 22] } #[cfg(target_arch = "x86_64")] -fn initialize_call_frame(regs: &mut Registers, - fptr: *c_void, arg: *c_void, sp: *mut uint) { +fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp: *mut uint) { // Redefinitions from regs.h static RUSTRT_ARG0: uint = 3; @@ -143,8 +163,7 @@ type Registers = [uint * 32]; fn new_regs() -> ~Registers { ~[0, .. 32] } #[cfg(target_arch = "arm")] -fn initialize_call_frame(regs: &mut Registers, - fptr: *c_void, arg: *c_void, sp: *mut uint) { +fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp: *mut uint) { let sp = mut_offset(sp, -1); // The final return address. 0 indicates the bottom of the stack @@ -162,8 +181,7 @@ type Registers = [uint * 32]; fn new_regs() -> ~Registers { ~[0, .. 32] } #[cfg(target_arch = "mips")] -fn initialize_call_frame(regs: &mut Registers, - fptr: *c_void, arg: *c_void, sp: *mut uint) { +fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp: *mut uint) { let sp = mut_offset(sp, -1); // The final return address. 0 indicates the bottom of the stack diff --git a/src/libcore/rt/io/file.rs b/src/libcore/rt/io/file.rs new file mode 100644 index 0000000000000..9f1f200d8e466 --- /dev/null +++ b/src/libcore/rt/io/file.rs @@ -0,0 +1,45 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use prelude::*; +use super::super::sched::*; +use super::super::rtio::*; +use super::Stream; + +pub struct FileStream; + +pub impl FileStream { + fn new(_path: Path) -> FileStream { + fail!() + } +} + +impl Stream for FileStream { + fn read(&mut self, _buf: &mut [u8]) -> uint { + fail!() + } + + fn eof(&mut self) -> bool { + fail!() + } + + fn write(&mut self, _v: &const [u8]) { + fail!() + } +} + +#[test] +#[ignore] +fn super_simple_smoke_test_lets_go_read_some_files_and_have_a_good_time() { + let message = "it's alright. have a good time"; + let filename = Path("test.txt"); + let mut outstream = FileStream::new(filename); + outstream.write(message.to_bytes()); +} diff --git a/src/libcore/rt/io/mod.rs b/src/libcore/rt/io/mod.rs new file mode 100644 index 0000000000000..f82092b829c9b --- /dev/null +++ b/src/libcore/rt/io/mod.rs @@ -0,0 +1,45 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use option::*; +use comm::{GenericPort, GenericChan}; + +pub mod file; + +// FIXME #5370 Strongly want this to be StreamError(&mut Stream) +pub struct StreamError; + +// XXX: Can't put doc comments on macros +// Raised by `Stream` instances on error. Returning `true` from the handler +// indicates that the `Stream` should continue, `false` that it should fail. +condition! { + stream_error: super::StreamError -> bool; +} + +pub trait Stream { + /// Read bytes, up to the length of `buf` and place them in `buf`, + /// returning the number of bytes read or an `IoError`. Reads + /// 0 bytes on EOF. + /// + /// # Failure + /// + /// Raises the `reader_error` condition on error + fn read(&mut self, buf: &mut [u8]) -> uint; + + /// Return whether the Reader has reached the end of the stream + fn eof(&mut self) -> bool; + + /// Write the given buffer + /// + /// # Failure + /// + /// Raises the `writer_error` condition on error + fn write(&mut self, v: &const [u8]); +} diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index a1a9884aeca2c..04891a1673c0a 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -8,6 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use libc::c_char; // Some basic logging macro_rules! rtdebug_ ( @@ -15,16 +16,10 @@ macro_rules! rtdebug_ ( dumb_println(fmt!( $($arg),+ )); fn dumb_println(s: &str) { - use str::as_c_str; - use libc::c_char; - - extern { - fn printf(s: *c_char); - } - - do as_c_str(s.to_str() + "\n") |s| { - unsafe { printf(s); } - } + use io::WriterUtil; + let dbg = ::libc::STDERR_FILENO as ::io::fd_t; + dbg.write_str(s); + dbg.write_str("\n"); } } ) @@ -36,9 +31,13 @@ macro_rules! rtdebug ( ) mod sched; -mod io; +mod rtio; +pub mod uvll; mod uvio; +#[path = "uv/mod.rs"] mod uv; +#[path = "io/mod.rs"] +mod io; // FIXME #5248: The import in `sched` doesn't resolve unless this is pub! pub mod thread_local_storage; mod work_queue; @@ -46,3 +45,23 @@ mod stack; mod context; mod thread; pub mod env; + +pub fn start(main: *u8, _argc: int, _argv: *c_char, _crate_map: *u8) -> int { + use self::sched::{Scheduler, Task}; + use self::uvio::UvEventLoop; + + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new(loop_); + let main_task = ~do Task::new(&mut sched.stack_pool) { + // XXX: Can't call a C function pointer from Rust yet + unsafe { rust_call_nullary_fn(main) }; + }; + sched.task_queue.push_back(main_task); + sched.run(); + return 0; + + extern { + fn rust_call_nullary_fn(f: *u8); + } +} + diff --git a/src/libcore/rt/io.rs b/src/libcore/rt/rtio.rs similarity index 100% rename from src/libcore/rt/io.rs rename to src/libcore/rt/rtio.rs diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 0beadb30d42c6..5034ffa3c6045 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -16,7 +16,7 @@ use ptr::mut_null; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; -use super::io::{EventLoop, EventLoopObject}; +use super::rtio::{EventLoop, EventLoopObject}; use super::context::Context; use tls = super::thread_local_storage; @@ -70,7 +70,14 @@ enum CleanupJob { pub impl Scheduler { - pub fn new(event_loop: ~EventLoopObject) -> Scheduler { + fn new(event_loop: ~EventLoopObject) -> Scheduler { + + // Lazily initialize the global state, currently the scheduler TLS key + unsafe { rust_initialize_global_state(); } + extern { + fn rust_initialize_global_state(); + } + Scheduler { event_loop: event_loop, task_queue: WorkQueue::new(), @@ -183,8 +190,7 @@ pub impl Scheduler { let blocked_task = self.current_task.swap_unwrap(); let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, ~Task), - &fn(&mut Scheduler, ~Task)>(f) + transmute::<&fn(&mut Scheduler, ~Task), &fn(&mut Scheduler, ~Task)>(f) }; let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region); self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); @@ -233,8 +239,7 @@ pub impl Scheduler { Context::swap(task_context, scheduler_context); } - priv fn swap_in_task_from_running_task(&mut self, - running_task: &mut Task) { + priv fn swap_in_task_from_running_task(&mut self, running_task: &mut Task) { let running_task_context = &mut running_task.saved_context; let next_context = &self.current_task.get_ref().saved_context; Context::swap(running_task_context, next_context); @@ -285,8 +290,6 @@ pub impl Scheduler { static TASK_MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack pub struct Task { - /// The task entry point, saved here for later destruction - priv start: ~~fn(), /// The segment of stack on which the task is currently running or, /// if the task is blocked, on which the task will resume execution priv current_stack_segment: StackSegment, @@ -295,17 +298,13 @@ pub struct Task { priv saved_context: Context, } -impl Task { - pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task { - // XXX: Putting main into a ~ so it's a thin pointer and can - // be passed to the spawn function. Another unfortunate - // allocation - let start = ~Task::build_start_wrapper(start); +pub impl Task { + fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task { + let start = Task::build_start_wrapper(start); let mut stack = stack_pool.take_segment(TASK_MIN_STACK_SIZE); // NB: Context holds a pointer to that ~fn - let initial_context = Context::new(&*start, &mut stack); + let initial_context = Context::new(start, &mut stack); return Task { - start: start, current_stack_segment: stack, saved_context: initial_context, }; @@ -350,8 +349,7 @@ impl ThreadLocalScheduler { fn put_scheduler(&mut self, scheduler: ~Scheduler) { unsafe { let key = match self { &ThreadLocalScheduler(key) => key }; - let value: *mut c_void = - transmute::<~Scheduler, *mut c_void>(scheduler); + let value: *mut c_void = transmute::<~Scheduler, *mut c_void>(scheduler); tls::set(key, value); } } @@ -363,8 +361,9 @@ impl ThreadLocalScheduler { fail_unless!(value.is_not_null()); { let value_ptr = &mut value; - let sched: &mut ~Scheduler = - transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr); + let sched: &mut ~Scheduler = { + transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr) + }; let sched: &mut Scheduler = &mut **sched; return sched; } diff --git a/src/libcore/rt/stack.rs b/src/libcore/rt/stack.rs index 9b164eb08fa7c..9eca3bda0473c 100644 --- a/src/libcore/rt/stack.rs +++ b/src/libcore/rt/stack.rs @@ -27,6 +27,7 @@ pub impl StackSegment { } } + /// Point one word beyond the high end of the allocated stack fn end(&self) -> *uint { unsafe { vec::raw::to_ptr(self.buf).offset(self.buf.len()) as *uint diff --git a/src/libcore/rt/thread.rs b/src/libcore/rt/thread.rs index c45e4295ab144..910e445f47b04 100644 --- a/src/libcore/rt/thread.rs +++ b/src/libcore/rt/thread.rs @@ -14,13 +14,13 @@ use ops::Drop; #[allow(non_camel_case_types)] // runtime type type raw_thread = libc::c_void; -struct Thread { +pub struct Thread { main: ~fn(), raw_thread: *raw_thread } -impl Thread { - pub fn start(main: ~fn()) -> Thread { +pub impl Thread { + fn start(main: ~fn()) -> Thread { fn substart(main: &fn()) -> *raw_thread { unsafe { rust_raw_thread_start(&main) } } diff --git a/src/libcore/rt/uv.rs b/src/libcore/rt/uv.rs deleted file mode 100644 index 19ce04bd66b5c..0000000000000 --- a/src/libcore/rt/uv.rs +++ /dev/null @@ -1,919 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/*! - -Bindings to libuv. - -UV types consist of the event loop (Loop), Watchers, Requests and -Callbacks. - -Watchers and Requests encapsulate pointers to uv *handles*, which have -subtyping relationships with each other. This subtyping is reflected -in the bindings with explicit or implicit coercions. For example, an -upcast from TcpWatcher to StreamWatcher is done with -`tcp_watcher.as_stream()`. In other cases a callback on a specific -type of watcher will be passed a watcher of a supertype. - -Currently all use of Request types (connect/write requests) are -encapsulated in the bindings and don't need to be dealt with by the -caller. - -# Safety note - -Due to the complex lifecycle of uv handles, as well as compiler bugs, -this module is not memory safe and requires explicit memory management, -via `close` and `delete` methods. - -*/ - -use option::*; -use str::raw::from_c_str; -use to_str::ToStr; -use vec; -use ptr; -use libc::{c_void, c_int, size_t, malloc, free, ssize_t}; -use cast::{transmute, transmute_mut_region}; -use ptr::null; -use sys::size_of; -use unstable::uvll; -use super::io::{IpAddr, Ipv4, Ipv6}; - -#[cfg(test)] use unstable::run_in_bare_thread; -#[cfg(test)] use super::thread::Thread; -#[cfg(test)] use cell::Cell; - -fn ip4_to_uv_ip4(addr: IpAddr) -> uvll::sockaddr_in { - match addr { - Ipv4(a, b, c, d, p) => { - unsafe { - uvll::ip4_addr(fmt!("%u.%u.%u.%u", - a as uint, - b as uint, - c as uint, - d as uint), p as int) - } - } - Ipv6 => fail!() - } -} - -/// A trait for callbacks to implement. Provides a little extra type safety -/// for generic, unsafe interop functions like `set_watcher_callback`. -trait Callback { } - -type NullCallback = ~fn(); -impl Callback for NullCallback { } - -/// A type that wraps a native handle -trait NativeHandle { - pub fn from_native_handle(T) -> Self; - pub fn native_handle(&self) -> T; -} - -/// XXX: Loop(*handle) is buggy with destructors. Normal structs -/// with dtors may not be destructured, but tuple structs can, -/// but the results are not correct. -pub struct Loop { - handle: *uvll::uv_loop_t -} - -pub impl Loop { - fn new() -> Loop { - let handle = unsafe { uvll::loop_new() }; - fail_unless!(handle.is_not_null()); - NativeHandle::from_native_handle(handle) - } - - fn run(&mut self) { - unsafe { uvll::run(self.native_handle()) }; - } - - fn close(&mut self) { - unsafe { uvll::loop_delete(self.native_handle()) }; - } -} - -impl NativeHandle<*uvll::uv_loop_t> for Loop { - fn from_native_handle(handle: *uvll::uv_loop_t) -> Loop { - Loop { handle: handle } - } - fn native_handle(&self) -> *uvll::uv_loop_t { - self.handle - } -} - -/// The trait implemented by uv 'watchers' (handles). Watchers are -/// non-owning wrappers around the uv handles and are not completely -/// safe - there may be multiple instances for a single underlying -/// handle. Watchers are generally created, then `start`ed, `stop`ed -/// and `close`ed, but due to their complex life cycle may not be -/// entirely memory safe if used in unanticipated patterns. -trait Watcher { - fn event_loop(&self) -> Loop; -} - -pub struct IdleWatcher(*uvll::uv_idle_t); - -impl Watcher for IdleWatcher { - fn event_loop(&self) -> Loop { - loop_from_watcher(self) - } -} - -type IdleCallback = ~fn(IdleWatcher, Option); -impl Callback for IdleCallback { } - -pub impl IdleWatcher { - fn new(loop_: &mut Loop) -> IdleWatcher { - unsafe { - let handle = uvll::idle_new(); - fail_unless!(handle.is_not_null()); - fail_unless!(0 == uvll::idle_init(loop_.native_handle(), handle)); - uvll::set_data_for_uv_handle(handle, null::<()>()); - NativeHandle::from_native_handle(handle) - } - } - - fn start(&mut self, cb: IdleCallback) { - - set_watcher_callback(self, cb); - unsafe { - fail_unless!(0 == uvll::idle_start(self.native_handle(), idle_cb)) - }; - - extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { - let idle_watcher: IdleWatcher = - NativeHandle::from_native_handle(handle); - let cb: &IdleCallback = - borrow_callback_from_watcher(&idle_watcher); - let status = status_to_maybe_uv_error(handle, status); - (*cb)(idle_watcher, status); - } - } - - fn stop(&mut self) { - unsafe { fail_unless!(0 == uvll::idle_stop(self.native_handle())); } - } - - fn close(self) { - unsafe { uvll::close(self.native_handle(), close_cb) }; - - extern fn close_cb(handle: *uvll::uv_idle_t) { - let mut idle_watcher = NativeHandle::from_native_handle(handle); - drop_watcher_callback::(&mut idle_watcher); - unsafe { uvll::idle_delete(handle) }; - } - } -} - -impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher { - fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher { - IdleWatcher(handle) - } - fn native_handle(&self) -> *uvll::uv_idle_t { - match self { &IdleWatcher(ptr) => ptr } - } -} - -// uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t -// and uv_file_t -pub struct StreamWatcher(*uvll::uv_stream_t); - -impl Watcher for StreamWatcher { - fn event_loop(&self) -> Loop { - loop_from_watcher(self) - } -} - -type ReadCallback = ~fn(StreamWatcher, int, Buf, Option); -impl Callback for ReadCallback { } - -// XXX: The uv alloc callback also has a *uv_handle_t arg -pub type AllocCallback = ~fn(uint) -> Buf; -impl Callback for AllocCallback { } - -pub impl StreamWatcher { - - fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) { - // XXX: Borrowchk problems - let data = get_watcher_data(unsafe { transmute_mut_region(self) }); - data.alloc_cb = Some(alloc); - data.read_cb = Some(cb); - - let handle = self.native_handle(); - unsafe { uvll::read_start(handle, alloc_cb, read_cb); } - - extern fn alloc_cb(stream: *uvll::uv_stream_t, - suggested_size: size_t) -> Buf { - let mut stream_watcher: StreamWatcher = - NativeHandle::from_native_handle(stream); - let data = get_watcher_data(&mut stream_watcher); - let alloc_cb = data.alloc_cb.get_ref(); - return (*alloc_cb)(suggested_size as uint); - } - - extern fn read_cb(stream: *uvll::uv_stream_t, - nread: ssize_t, ++buf: Buf) { - rtdebug!("buf addr: %x", buf.base as uint); - rtdebug!("buf len: %d", buf.len as int); - let mut stream_watcher: StreamWatcher = - NativeHandle::from_native_handle(stream); - let data = get_watcher_data(&mut stream_watcher); - let cb = data.read_cb.get_ref(); - let status = status_to_maybe_uv_error(stream, nread as c_int); - (*cb)(stream_watcher, nread as int, buf, status); - } - } - - fn read_stop(&mut self) { - // It would be nice to drop the alloc and read callbacks here, - // but read_stop may be called from inside one of them and we - // would end up freeing the in-use environment - let handle = self.native_handle(); - unsafe { uvll::read_stop(handle); } - } - - // XXX: Needs to take &[u8], not ~[u8] - fn write(&mut self, msg: ~[u8], cb: ConnectionCallback) { - // XXX: Borrowck - let data = get_watcher_data(unsafe { transmute_mut_region(self) }); - fail_unless!(data.write_cb.is_none()); - data.write_cb = Some(cb); - - let req = WriteRequest::new(); - let buf = vec_to_uv_buf(msg); - // XXX: Allocation - let bufs = ~[buf]; - unsafe { - fail_unless!(0 == uvll::write(req.native_handle(), - self.native_handle(), - &bufs, write_cb)); - } - // XXX: Freeing immediately after write. Is this ok? - let _v = vec_from_uv_buf(buf); - - extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { - let write_request: WriteRequest = - NativeHandle::from_native_handle(req); - let mut stream_watcher = write_request.stream(); - write_request.delete(); - let cb = get_watcher_data(&mut stream_watcher) - .write_cb.swap_unwrap(); - let status = status_to_maybe_uv_error( - stream_watcher.native_handle(), status); - cb(stream_watcher, status); - } - } - - fn accept(&mut self, stream: StreamWatcher) { - let self_handle = self.native_handle() as *c_void; - let stream_handle = stream.native_handle() as *c_void; - unsafe { - fail_unless!(0 == uvll::accept(self_handle, stream_handle)); - } - } - - fn close(self, cb: NullCallback) { - { - let mut self = self; - let data = get_watcher_data(&mut self); - fail_unless!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { uvll::close(self.native_handle(), close_cb); } - - extern fn close_cb(handle: *uvll::uv_stream_t) { - let mut stream_watcher: StreamWatcher = - NativeHandle::from_native_handle(handle); - { - let mut data = get_watcher_data(&mut stream_watcher); - data.close_cb.swap_unwrap()(); - } - drop_watcher_data(&mut stream_watcher); - unsafe { free(handle as *c_void) } - } - } -} - -impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher { - fn from_native_handle( - handle: *uvll::uv_stream_t) -> StreamWatcher { - StreamWatcher(handle) - } - fn native_handle(&self) -> *uvll::uv_stream_t { - match self { &StreamWatcher(ptr) => ptr } - } -} - -pub struct TcpWatcher(*uvll::uv_tcp_t); - -impl Watcher for TcpWatcher { - fn event_loop(&self) -> Loop { - loop_from_watcher(self) - } -} - -type ConnectionCallback = ~fn(StreamWatcher, Option); -impl Callback for ConnectionCallback { } - -pub impl TcpWatcher { - fn new(loop_: &mut Loop) -> TcpWatcher { - unsafe { - let size = size_of::() as size_t; - let handle = malloc(size) as *uvll::uv_tcp_t; - fail_unless!(handle.is_not_null()); - fail_unless!(0 == uvll::tcp_init(loop_.native_handle(), handle)); - let mut watcher = NativeHandle::from_native_handle(handle); - install_watcher_data(&mut watcher); - return watcher; - } - } - - fn bind(&mut self, address: IpAddr) { - match address { - Ipv4(*) => { - let addr = ip4_to_uv_ip4(address); - let result = unsafe { - uvll::tcp_bind(self.native_handle(), &addr) - }; - // XXX: bind is likely to fail. need real error handling - fail_unless!(result == 0); - } - _ => fail!() - } - } - - fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) { - unsafe { - fail_unless!(get_watcher_data(self).connect_cb.is_none()); - get_watcher_data(self).connect_cb = Some(cb); - - let mut connect_watcher = ConnectRequest::new(); - let connect_handle = connect_watcher.native_handle(); - match address { - Ipv4(*) => { - let addr = ip4_to_uv_ip4(address); - rtdebug!("connect_t: %x", connect_handle as uint); - fail_unless!(0 == uvll::tcp_connect(connect_handle, - self.native_handle(), - &addr, connect_cb)); - } - _ => fail!() - } - - extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { - rtdebug!("connect_t: %x", req as uint); - let connect_request: ConnectRequest = - NativeHandle::from_native_handle(req); - let mut stream_watcher = connect_request.stream(); - connect_request.delete(); - let cb: ConnectionCallback = { - let data = get_watcher_data(&mut stream_watcher); - data.connect_cb.swap_unwrap() - }; - let status = status_to_maybe_uv_error( - stream_watcher.native_handle(), status); - cb(stream_watcher, status); - } - } - } - - fn listen(&mut self, cb: ConnectionCallback) { - // XXX: Borrowck - let data = get_watcher_data(unsafe { transmute_mut_region(self) }); - fail_unless!(data.connect_cb.is_none()); - data.connect_cb = Some(cb); - - unsafe { - static BACKLOG: c_int = 128; // XXX should be configurable - // XXX: This can probably fail - fail_unless!(0 == uvll::listen(self.native_handle(), - BACKLOG, connection_cb)); - } - - extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { - rtdebug!("connection_cb"); - let mut stream_watcher: StreamWatcher = - NativeHandle::from_native_handle(handle); - let cb = get_watcher_data(&mut stream_watcher) - .connect_cb.swap_unwrap(); - let status = status_to_maybe_uv_error( - stream_watcher.native_handle(), status); - cb(stream_watcher, status); - } - } - - fn as_stream(&self) -> StreamWatcher { - NativeHandle::from_native_handle( - self.native_handle() as *uvll::uv_stream_t) - } -} - -impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher { - fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher { - TcpWatcher(handle) - } - fn native_handle(&self) -> *uvll::uv_tcp_t { - match self { &TcpWatcher(ptr) => ptr } - } -} - -trait Request { } - -type ConnectCallback = ~fn(ConnectRequest, Option); -impl Callback for ConnectCallback { } - -// uv_connect_t is a subclass of uv_req_t -struct ConnectRequest(*uvll::uv_connect_t); - -impl Request for ConnectRequest { } - -impl ConnectRequest { - - fn new() -> ConnectRequest { - let connect_handle = unsafe { - malloc(size_of::() as size_t) - }; - fail_unless!(connect_handle.is_not_null()); - let connect_handle = connect_handle as *uvll::uv_connect_t; - ConnectRequest(connect_handle) - } - - fn stream(&self) -> StreamWatcher { - unsafe { - let stream_handle = - uvll::get_stream_handle_from_connect_req( - self.native_handle()); - NativeHandle::from_native_handle(stream_handle) - } - } - - fn delete(self) { - unsafe { free(self.native_handle() as *c_void) } - } -} - -impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest { - fn from_native_handle( - handle: *uvll:: uv_connect_t) -> ConnectRequest { - ConnectRequest(handle) - } - fn native_handle(&self) -> *uvll::uv_connect_t { - match self { &ConnectRequest(ptr) => ptr } - } -} - -pub struct WriteRequest(*uvll::uv_write_t); - -impl Request for WriteRequest { } - -impl WriteRequest { - - fn new() -> WriteRequest { - let write_handle = unsafe { - malloc(size_of::() as size_t) - }; - fail_unless!(write_handle.is_not_null()); - let write_handle = write_handle as *uvll::uv_write_t; - WriteRequest(write_handle) - } - - fn stream(&self) -> StreamWatcher { - unsafe { - let stream_handle = - uvll::get_stream_handle_from_write_req(self.native_handle()); - NativeHandle::from_native_handle(stream_handle) - } - } - - fn delete(self) { - unsafe { free(self.native_handle() as *c_void) } - } -} - -impl NativeHandle<*uvll::uv_write_t> for WriteRequest { - fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest { - WriteRequest(handle) - } - fn native_handle(&self) -> *uvll::uv_write_t { - match self { &WriteRequest(ptr) => ptr } - } -} - -// XXX: Need to define the error constants like EOF so they can be -// compared to the UvError type - -struct UvError(uvll::uv_err_t); - -impl UvError { - - fn name(&self) -> ~str { - unsafe { - let inner = match self { &UvError(ref a) => a }; - let name_str = uvll::err_name(inner); - fail_unless!(name_str.is_not_null()); - from_c_str(name_str) - } - } - - fn desc(&self) -> ~str { - unsafe { - let inner = match self { &UvError(ref a) => a }; - let desc_str = uvll::strerror(inner); - fail_unless!(desc_str.is_not_null()); - from_c_str(desc_str) - } - } -} - -impl ToStr for UvError { - fn to_str(&self) -> ~str { - fmt!("%s: %s", self.name(), self.desc()) - } -} - -#[test] -fn error_smoke_test() { - let err = uvll::uv_err_t { code: 1, sys_errno_: 1 }; - let err: UvError = UvError(err); - fail_unless!(err.to_str() == ~"EOF: end of file"); -} - - -/// Given a uv handle, convert a callback status to a UvError -// XXX: Follow the pattern below by parameterizing over T: Watcher, not T -fn status_to_maybe_uv_error(handle: *T, status: c_int) -> Option { - if status != -1 { - None - } else { - unsafe { - rtdebug!("handle: %x", handle as uint); - let loop_ = uvll::get_loop_for_uv_handle(handle); - rtdebug!("loop: %x", loop_ as uint); - let err = uvll::last_error(loop_); - Some(UvError(err)) - } - } -} - -/// Get the uv event loop from a Watcher -pub fn loop_from_watcher>( - watcher: &W) -> Loop { - - let handle = watcher.native_handle(); - let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) }; - NativeHandle::from_native_handle(loop_) -} - -/// Set the custom data on a handle to a callback Note: This is only -/// suitable for watchers that make just one type of callback. For -/// others use WatcherData -fn set_watcher_callback, CB: Callback>( - watcher: &mut W, cb: CB) { - - drop_watcher_callback::(watcher); - // XXX: Boxing the callback so it fits into a - // pointer. Unfortunate extra allocation - let boxed_cb = ~cb; - let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) }; - unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) }; -} - -/// Delete a callback from a handle's custom data -fn drop_watcher_callback, CB: Callback>( - watcher: &mut W) { - - unsafe { - let handle = watcher.native_handle(); - let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); - if handle_data.is_not_null() { - // Take ownership of the callback and drop it - let _cb = transmute::<*c_void, ~CB>(handle_data); - // Make sure the pointer is zeroed - uvll::set_data_for_uv_handle( - watcher.native_handle(), null::<()>()); - } - } -} - -/// Take a pointer to the callback installed as custom data -fn borrow_callback_from_watcher, - CB: Callback>(watcher: &W) -> &CB { - - unsafe { - let handle = watcher.native_handle(); - let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); - fail_unless!(handle_data.is_not_null()); - let cb = transmute::<&*c_void, &~CB>(&handle_data); - return &**cb; - } -} - -/// Take ownership of the callback installed as custom data -fn take_callback_from_watcher, CB: Callback>( - watcher: &mut W) -> CB { - - unsafe { - let handle = watcher.native_handle(); - let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); - fail_unless!(handle_data.is_not_null()); - uvll::set_data_for_uv_handle(handle, null::<()>()); - let cb: ~CB = transmute::<*c_void, ~CB>(handle_data); - let cb = match cb { ~cb => cb }; - return cb; - } -} - -/// Callbacks used by StreamWatchers, set as custom data on the foreign handle -struct WatcherData { - read_cb: Option, - write_cb: Option, - connect_cb: Option, - close_cb: Option, - alloc_cb: Option -} - -fn install_watcher_data>(watcher: &mut W) { - unsafe { - let data = ~WatcherData { - read_cb: None, - write_cb: None, - connect_cb: None, - close_cb: None, - alloc_cb: None - }; - let data = transmute::<~WatcherData, *c_void>(data); - uvll::set_data_for_uv_handle(watcher.native_handle(), data); - } -} - -fn get_watcher_data>( - watcher: &'r mut W) -> &'r mut WatcherData { - - unsafe { - let data = uvll::get_data_for_uv_handle(watcher.native_handle()); - let data = transmute::<&*c_void, &mut ~WatcherData>(&data); - return &mut **data; - } -} - -fn drop_watcher_data>(watcher: &mut W) { - unsafe { - let data = uvll::get_data_for_uv_handle(watcher.native_handle()); - let _data = transmute::<*c_void, ~WatcherData>(data); - uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>()); - } -} - -#[test] -fn test_slice_to_uv_buf() { - let slice = [0, .. 20]; - let buf = slice_to_uv_buf(slice); - - fail_unless!(buf.len == 20); - - unsafe { - let base = transmute::<*u8, *mut u8>(buf.base); - (*base) = 1; - (*ptr::mut_offset(base, 1)) = 2; - } - - fail_unless!(slice[0] == 1); - fail_unless!(slice[1] == 2); -} - -/// The uv buffer type -pub type Buf = uvll::uv_buf_t; - -/// Borrow a slice to a Buf -pub fn slice_to_uv_buf(v: &[u8]) -> Buf { - let data = unsafe { vec::raw::to_ptr(v) }; - unsafe { uvll::buf_init(data, v.len()) } -} - -// XXX: Do these conversions without copying - -/// Transmute an owned vector to a Buf -fn vec_to_uv_buf(v: ~[u8]) -> Buf { - let data = unsafe { malloc(v.len() as size_t) } as *u8; - fail_unless!(data.is_not_null()); - do vec::as_imm_buf(v) |b, l| { - let data = data as *mut u8; - unsafe { ptr::copy_memory(data, b, l) } - } - let buf = unsafe { uvll::buf_init(data, v.len()) }; - return buf; -} - -/// Transmute a Buf that was once a ~[u8] back to ~[u8] -fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> { - if !(buf.len == 0 && buf.base.is_null()) { - let v = unsafe { vec::from_buf(buf.base, buf.len as uint) }; - unsafe { free(buf.base as *c_void) }; - return Some(v); - } else { - // No buffer - return None; - } -} - -#[test] -fn loop_smoke_test() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - loop_.run(); - loop_.close(); - } -} - -#[test] -#[ignore(reason = "valgrind - loop destroyed before watcher?")] -fn idle_new_then_close() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; - idle_watcher.close(); - } -} - -#[test] -fn idle_smoke_test() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; - let mut count = 10; - let count_ptr: *mut int = &mut count; - do idle_watcher.start |idle_watcher, status| { - let mut idle_watcher = idle_watcher; - fail_unless!(status.is_none()); - if unsafe { *count_ptr == 10 } { - idle_watcher.stop(); - idle_watcher.close(); - } else { - unsafe { *count_ptr = *count_ptr + 1; } - } - } - loop_.run(); - loop_.close(); - fail_unless!(count == 10); - } -} - -#[test] -fn idle_start_stop_start() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; - do idle_watcher.start |idle_watcher, status| { - let mut idle_watcher = idle_watcher; - fail_unless!(status.is_none()); - idle_watcher.stop(); - do idle_watcher.start |idle_watcher, status| { - fail_unless!(status.is_none()); - let mut idle_watcher = idle_watcher; - idle_watcher.stop(); - idle_watcher.close(); - } - } - loop_.run(); - loop_.close(); - } -} - -#[test] -#[ignore(reason = "ffi struct issues")] -fn connect_close() { - do run_in_bare_thread() { - let mut loop_ = Loop::new(); - let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; - // Connect to a port where nobody is listening - let addr = Ipv4(127, 0, 0, 1, 2923); - do tcp_watcher.connect(addr) |stream_watcher, status| { - rtdebug!("tcp_watcher.connect!"); - fail_unless!(status.is_some()); - fail_unless!(status.get().name() == ~"ECONNREFUSED"); - stream_watcher.close(||()); - } - loop_.run(); - loop_.close(); - } -} - -#[test] -#[ignore(reason = "need a server to connect to")] -fn connect_read() { - do run_in_bare_thread() { - let mut loop_ = Loop::new(); - let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; - let addr = Ipv4(127, 0, 0, 1, 2924); - do tcp_watcher.connect(addr) |stream_watcher, status| { - let mut stream_watcher = stream_watcher; - rtdebug!("tcp_watcher.connect!"); - fail_unless!(status.is_none()); - let alloc: AllocCallback = |size| { - vec_to_uv_buf(vec::from_elem(size, 0)) - }; - do stream_watcher.read_start(alloc) - |stream_watcher, nread, buf, status| { - - let buf = vec_from_uv_buf(buf); - rtdebug!("read cb!"); - if status.is_none() { - let bytes = buf.unwrap(); - rtdebug!("%s", bytes.slice(0, nread as uint).to_str()); - } else { - rtdebug!("status after read: %s", status.get().to_str()); - rtdebug!("closing"); - stream_watcher.close(||()); - } - } - } - loop_.run(); - loop_.close(); - } -} - -#[test] -#[ignore(reason = "ffi struct issues")] -fn listen() { - do run_in_bare_thread() { - static MAX: int = 10; - let mut loop_ = Loop::new(); - let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) }; - let addr = Ipv4(127, 0, 0, 1, 2925); - server_tcp_watcher.bind(addr); - let loop_ = loop_; - rtdebug!("listening"); - do server_tcp_watcher.listen |server_stream_watcher, status| { - rtdebug!("listened!"); - fail_unless!(status.is_none()); - let mut server_stream_watcher = server_stream_watcher; - let mut loop_ = loop_; - let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); - let mut client_tcp_watcher = client_tcp_watcher.as_stream(); - server_stream_watcher.accept(client_tcp_watcher); - let count_cell = Cell(0); - let server_stream_watcher = server_stream_watcher; - rtdebug!("starting read"); - let alloc: AllocCallback = |size| { - vec_to_uv_buf(vec::from_elem(size, 0)) - }; - do client_tcp_watcher.read_start(alloc) - |stream_watcher, nread, buf, status| { - - rtdebug!("i'm reading!"); - let buf = vec_from_uv_buf(buf); - let mut count = count_cell.take(); - if status.is_none() { - rtdebug!("got %d bytes", nread); - let buf = buf.unwrap(); - for buf.slice(0, nread as uint).each |byte| { - fail_unless!(*byte == count as u8); - rtdebug!("%u", *byte as uint); - count += 1; - } - } else { - fail_unless!(count == MAX); - do stream_watcher.close { - server_stream_watcher.close(||()); - } - } - count_cell.put_back(count); - } - } - - let _client_thread = do Thread::start { - rtdebug!("starting client thread"); - let mut loop_ = Loop::new(); - let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; - do tcp_watcher.connect(addr) |stream_watcher, status| { - rtdebug!("connecting"); - fail_unless!(status.is_none()); - let mut stream_watcher = stream_watcher; - let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9]; - do stream_watcher.write(msg) |stream_watcher, status| { - rtdebug!("writing"); - fail_unless!(status.is_none()); - stream_watcher.close(||()); - } - } - loop_.run(); - loop_.close(); - }; - - let mut loop_ = loop_; - loop_.run(); - loop_.close(); - } -} diff --git a/src/libcore/rt/uv/file.rs b/src/libcore/rt/uv/file.rs new file mode 100644 index 0000000000000..7df06f87dfe41 --- /dev/null +++ b/src/libcore/rt/uv/file.rs @@ -0,0 +1,52 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use prelude::*; +use ptr::null; +use libc::c_void; +use super::{UvError, Callback, Request, NativeHandle, Loop}; +use super::super::uvll; +use super::super::uvll::*; + +pub type FsCallback = ~fn(FsRequest, Option); +impl Callback for FsCallback { } + +pub struct FsRequest(*uvll::uv_fs_t); + +impl Request for FsRequest; + +impl FsRequest { + fn new() -> FsRequest { + let fs_req = unsafe { malloc_req(UV_FS) }; + fail_unless!(fs_req.is_not_null()); + let fs_req = fs_req as *uvll::uv_write_t; + unsafe { uvll::set_data_for_req(fs_req, null::<()>()); } + NativeHandle::from_native_handle(fs_req) + } + + fn delete(self) { + unsafe { free_req(self.native_handle() as *c_void) } + } + + fn open(&mut self, _loop_: &Loop, _cb: FsCallback) { + } + + fn close(&mut self, _loop_: &Loop, _cb: FsCallback) { + } +} + +impl NativeHandle<*uvll::uv_fs_t> for FsRequest { + fn from_native_handle(handle: *uvll:: uv_fs_t) -> FsRequest { + FsRequest(handle) + } + fn native_handle(&self) -> *uvll::uv_fs_t { + match self { &FsRequest(ptr) => ptr } + } +} diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs new file mode 100644 index 0000000000000..28d695273e7c7 --- /dev/null +++ b/src/libcore/rt/uv/mod.rs @@ -0,0 +1,456 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! + +Bindings to libuv. + +UV types consist of the event loop (Loop), Watchers, Requests and +Callbacks. + +Watchers and Requests encapsulate pointers to uv *handles*, which have +subtyping relationships with each other. This subtyping is reflected +in the bindings with explicit or implicit coercions. For example, an +upcast from TcpWatcher to StreamWatcher is done with +`tcp_watcher.as_stream()`. In other cases a callback on a specific +type of watcher will be passed a watcher of a supertype. + +Currently all use of Request types (connect/write requests) are +encapsulated in the bindings and don't need to be dealt with by the +caller. + +# Safety note + +Due to the complex lifecycle of uv handles, as well as compiler bugs, +this module is not memory safe and requires explicit memory management, +via `close` and `delete` methods. + +*/ + +use option::*; +use str::raw::from_c_str; +use to_str::ToStr; +use vec; +use ptr; +use libc::{c_void, c_int, size_t, malloc, free, ssize_t}; +use cast::{transmute, transmute_mut_region}; +use ptr::null; +use sys::size_of; +use super::uvll; +use super::uvll::*; +use unstable::finally::Finally; + +#[cfg(test)] use unstable::run_in_bare_thread; +#[cfg(test)] use super::thread::Thread; +#[cfg(test)] use cell::Cell; + +pub use self::file::{FsRequest, FsCallback}; +pub use self::net::{StreamWatcher, TcpWatcher}; +pub use self::net::{ReadCallback, AllocCallback, ConnectionCallback, ConnectCallback}; + +pub mod file; +pub mod net; + +/// A trait for callbacks to implement. Provides a little extra type safety +/// for generic, unsafe interop functions like `set_watcher_callback`. +pub trait Callback { } + +pub trait Request { } + +/// The trait implemented by uv 'watchers' (handles). Watchers are +/// non-owning wrappers around the uv handles and are not completely +/// safe - there may be multiple instances for a single underlying +/// handle. Watchers are generally created, then `start`ed, `stop`ed +/// and `close`ed, but due to their complex life cycle may not be +/// entirely memory safe if used in unanticipated patterns. +pub trait Watcher { + fn event_loop(&self) -> Loop; +} + +pub type NullCallback = ~fn(); +impl Callback for NullCallback { } + +/// A type that wraps a native handle +pub trait NativeHandle { + pub fn from_native_handle(T) -> Self; + pub fn native_handle(&self) -> T; +} + +/// XXX: Loop(*handle) is buggy with destructors. Normal structs +/// with dtors may not be destructured, but tuple structs can, +/// but the results are not correct. +pub struct Loop { + handle: *uvll::uv_loop_t +} + +pub impl Loop { + fn new() -> Loop { + let handle = unsafe { uvll::loop_new() }; + fail_unless!(handle.is_not_null()); + NativeHandle::from_native_handle(handle) + } + + fn run(&mut self) { + unsafe { uvll::run(self.native_handle()) }; + } + + fn close(&mut self) { + unsafe { uvll::loop_delete(self.native_handle()) }; + } +} + +impl NativeHandle<*uvll::uv_loop_t> for Loop { + fn from_native_handle(handle: *uvll::uv_loop_t) -> Loop { + Loop { handle: handle } + } + fn native_handle(&self) -> *uvll::uv_loop_t { + self.handle + } +} + +pub struct IdleWatcher(*uvll::uv_idle_t); + +impl Watcher for IdleWatcher { + fn event_loop(&self) -> Loop { + loop_from_watcher(self) + } +} + +pub type IdleCallback = ~fn(IdleWatcher, Option); +impl Callback for IdleCallback { } + +pub impl IdleWatcher { + fn new(loop_: &mut Loop) -> IdleWatcher { + unsafe { + let handle = uvll::idle_new(); + fail_unless!(handle.is_not_null()); + fail_unless!(0 == uvll::idle_init(loop_.native_handle(), handle)); + uvll::set_data_for_uv_handle(handle, null::<()>()); + NativeHandle::from_native_handle(handle) + } + } + + fn start(&mut self, cb: IdleCallback) { + + set_watcher_callback(self, cb); + unsafe { + fail_unless!(0 == uvll::idle_start(self.native_handle(), idle_cb)) + }; + + extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { + let idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); + let cb: &IdleCallback = borrow_callback_from_watcher(&idle_watcher); + let status = status_to_maybe_uv_error(handle, status); + (*cb)(idle_watcher, status); + } + } + + fn stop(&mut self) { + unsafe { fail_unless!(0 == uvll::idle_stop(self.native_handle())); } + } + + fn close(self) { + unsafe { uvll::close(self.native_handle(), close_cb) }; + + extern fn close_cb(handle: *uvll::uv_idle_t) { + let mut idle_watcher = NativeHandle::from_native_handle(handle); + drop_watcher_callback::(&mut idle_watcher); + unsafe { uvll::idle_delete(handle) }; + } + } +} + +impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher { + fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher { + IdleWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_idle_t { + match self { &IdleWatcher(ptr) => ptr } + } +} + +// XXX: Need to define the error constants like EOF so they can be +// compared to the UvError type + +pub struct UvError(uvll::uv_err_t); + +pub impl UvError { + + fn name(&self) -> ~str { + unsafe { + let inner = match self { &UvError(ref a) => a }; + let name_str = uvll::err_name(inner); + fail_unless!(name_str.is_not_null()); + from_c_str(name_str) + } + } + + fn desc(&self) -> ~str { + unsafe { + let inner = match self { &UvError(ref a) => a }; + let desc_str = uvll::strerror(inner); + fail_unless!(desc_str.is_not_null()); + from_c_str(desc_str) + } + } +} + +impl ToStr for UvError { + fn to_str(&self) -> ~str { + fmt!("%s: %s", self.name(), self.desc()) + } +} + +#[test] +fn error_smoke_test() { + let err = uvll::uv_err_t { code: 1, sys_errno_: 1 }; + let err: UvError = UvError(err); + fail_unless!(err.to_str() == ~"EOF: end of file"); +} + + +/// Given a uv handle, convert a callback status to a UvError +// XXX: Follow the pattern below by parameterizing over T: Watcher, not T +pub fn status_to_maybe_uv_error(handle: *T, status: c_int) -> Option { + if status != -1 { + None + } else { + unsafe { + rtdebug!("handle: %x", handle as uint); + let loop_ = uvll::get_loop_for_uv_handle(handle); + rtdebug!("loop: %x", loop_ as uint); + let err = uvll::last_error(loop_); + Some(UvError(err)) + } + } +} + +/// Get the uv event loop from a Watcher +pub fn loop_from_watcher>( + watcher: &W) -> Loop { + + let handle = watcher.native_handle(); + let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) }; + NativeHandle::from_native_handle(loop_) +} + +/// Set the custom data on a handle to a callback Note: This is only +/// suitable for watchers that make just one type of callback. For +/// others use WatcherData +pub fn set_watcher_callback, CB: Callback>( + watcher: &mut W, cb: CB) { + + drop_watcher_callback::(watcher); + // XXX: Boxing the callback so it fits into a + // pointer. Unfortunate extra allocation + let boxed_cb = ~cb; + let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) }; + unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) }; +} + +/// Delete a callback from a handle's custom data +pub fn drop_watcher_callback, CB: Callback>( + watcher: &mut W) { + + unsafe { + let handle = watcher.native_handle(); + let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); + if handle_data.is_not_null() { + // Take ownership of the callback and drop it + let _cb = transmute::<*c_void, ~CB>(handle_data); + // Make sure the pointer is zeroed + uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>()); + } + } +} + +/// Take a pointer to the callback installed as custom data +pub fn borrow_callback_from_watcher, + CB: Callback>(watcher: &W) -> &CB { + + unsafe { + let handle = watcher.native_handle(); + let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); + fail_unless!(handle_data.is_not_null()); + let cb = transmute::<&*c_void, &~CB>(&handle_data); + return &**cb; + } +} + +/// Take ownership of the callback installed as custom data +pub fn take_callback_from_watcher, CB: Callback>( + watcher: &mut W) -> CB { + + unsafe { + let handle = watcher.native_handle(); + let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); + fail_unless!(handle_data.is_not_null()); + uvll::set_data_for_uv_handle(handle, null::<()>()); + let cb: ~CB = transmute::<*c_void, ~CB>(handle_data); + let cb = match cb { ~cb => cb }; + return cb; + } +} + +/// Callbacks used by StreamWatchers, set as custom data on the foreign handle +struct WatcherData { + read_cb: Option, + write_cb: Option, + connect_cb: Option, + close_cb: Option, + alloc_cb: Option +} + +pub fn install_watcher_data>(watcher: &mut W) { + unsafe { + let data = ~WatcherData { + read_cb: None, + write_cb: None, + connect_cb: None, + close_cb: None, + alloc_cb: None + }; + let data = transmute::<~WatcherData, *c_void>(data); + uvll::set_data_for_uv_handle(watcher.native_handle(), data); + } +} + +pub fn get_watcher_data>( + watcher: &'r mut W) -> &'r mut WatcherData { + + unsafe { + let data = uvll::get_data_for_uv_handle(watcher.native_handle()); + let data = transmute::<&*c_void, &mut ~WatcherData>(&data); + return &mut **data; + } +} + +pub fn drop_watcher_data>(watcher: &mut W) { + unsafe { + let data = uvll::get_data_for_uv_handle(watcher.native_handle()); + let _data = transmute::<*c_void, ~WatcherData>(data); + uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>()); + } +} + +#[test] +fn test_slice_to_uv_buf() { + let slice = [0, .. 20]; + let buf = slice_to_uv_buf(slice); + + fail_unless!(buf.len == 20); + + unsafe { + let base = transmute::<*u8, *mut u8>(buf.base); + (*base) = 1; + (*ptr::mut_offset(base, 1)) = 2; + } + + fail_unless!(slice[0] == 1); + fail_unless!(slice[1] == 2); +} + +/// The uv buffer type +pub type Buf = uvll::uv_buf_t; + +/// Borrow a slice to a Buf +pub fn slice_to_uv_buf(v: &[u8]) -> Buf { + let data = unsafe { vec::raw::to_ptr(v) }; + unsafe { uvll::buf_init(data, v.len()) } +} + +// XXX: Do these conversions without copying + +/// Transmute an owned vector to a Buf +pub fn vec_to_uv_buf(v: ~[u8]) -> Buf { + let data = unsafe { malloc(v.len() as size_t) } as *u8; + fail_unless!(data.is_not_null()); + do vec::as_imm_buf(v) |b, l| { + let data = data as *mut u8; + unsafe { ptr::copy_memory(data, b, l) } + } + let buf = unsafe { uvll::buf_init(data, v.len()) }; + return buf; +} + +/// Transmute a Buf that was once a ~[u8] back to ~[u8] +pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> { + if !(buf.len == 0 && buf.base.is_null()) { + let v = unsafe { vec::from_buf(buf.base, buf.len as uint) }; + unsafe { free(buf.base as *c_void) }; + return Some(v); + } else { + // No buffer + return None; + } +} + +#[test] +fn loop_smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + loop_.run(); + loop_.close(); + } +} + +#[test] +#[ignore(reason = "valgrind - loop destroyed before watcher?")] +fn idle_new_then_close() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + idle_watcher.close(); + } +} + +#[test] +fn idle_smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + let mut count = 10; + let count_ptr: *mut int = &mut count; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + fail_unless!(status.is_none()); + if unsafe { *count_ptr == 10 } { + idle_watcher.stop(); + idle_watcher.close(); + } else { + unsafe { *count_ptr = *count_ptr + 1; } + } + } + loop_.run(); + loop_.close(); + fail_unless!(count == 10); + } +} + +#[test] +fn idle_start_stop_start() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + fail_unless!(status.is_none()); + idle_watcher.stop(); + do idle_watcher.start |idle_watcher, status| { + fail_unless!(status.is_none()); + let mut idle_watcher = idle_watcher; + idle_watcher.stop(); + idle_watcher.close(); + } + } + loop_.run(); + loop_.close(); + } +} diff --git a/src/libcore/rt/uv/net.rs b/src/libcore/rt/uv/net.rs new file mode 100644 index 0000000000000..8f0e8c3edd50d --- /dev/null +++ b/src/libcore/rt/uv/net.rs @@ -0,0 +1,483 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use prelude::*; +use libc::{size_t, ssize_t, c_int, c_void}; +use cast::{transmute, transmute_mut_region}; +use super::super::uvll; +use super::super::uvll::*; +use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCallback, + loop_from_watcher, status_to_maybe_uv_error, + install_watcher_data, get_watcher_data, drop_watcher_data, + vec_to_uv_buf, vec_from_uv_buf}; +use super::super::rtio::{IpAddr, Ipv4, Ipv6}; + +#[cfg(test)] +use unstable::run_in_bare_thread; +#[cfg(test)] +use super::super::thread::Thread; +#[cfg(test)] +use cell::Cell; + +fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) { + match addr { + Ipv4(a, b, c, d, p) => { + unsafe { + let addr = malloc_ip4_addr(fmt!("%u.%u.%u.%u", + a as uint, + b as uint, + c as uint, + d as uint), p as int); + do (|| { + f(addr); + }).finally { + free_ip4_addr(addr); + } + } + } + Ipv6 => fail!() + } +} + +// uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t +// and uv_file_t +pub struct StreamWatcher(*uvll::uv_stream_t); + +impl Watcher for StreamWatcher { + fn event_loop(&self) -> Loop { + loop_from_watcher(self) + } +} + +pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option); +impl Callback for ReadCallback { } + +// XXX: The uv alloc callback also has a *uv_handle_t arg +pub type AllocCallback = ~fn(uint) -> Buf; +impl Callback for AllocCallback { } + +pub impl StreamWatcher { + + fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) { + // XXX: Borrowchk problems + let data = get_watcher_data(unsafe { transmute_mut_region(self) }); + data.alloc_cb = Some(alloc); + data.read_cb = Some(cb); + + let handle = self.native_handle(); + unsafe { uvll::read_start(handle, alloc_cb, read_cb); } + + extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf { + let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream); + let data = get_watcher_data(&mut stream_watcher); + let alloc_cb = data.alloc_cb.get_ref(); + return (*alloc_cb)(suggested_size as uint); + } + + extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, ++buf: Buf) { + rtdebug!("buf addr: %x", buf.base as uint); + rtdebug!("buf len: %d", buf.len as int); + let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream); + let data = get_watcher_data(&mut stream_watcher); + let cb = data.read_cb.get_ref(); + let status = status_to_maybe_uv_error(stream, nread as c_int); + (*cb)(stream_watcher, nread as int, buf, status); + } + } + + fn read_stop(&mut self) { + // It would be nice to drop the alloc and read callbacks here, + // but read_stop may be called from inside one of them and we + // would end up freeing the in-use environment + let handle = self.native_handle(); + unsafe { uvll::read_stop(handle); } + } + + // XXX: Needs to take &[u8], not ~[u8] + fn write(&mut self, msg: ~[u8], cb: ConnectionCallback) { + // XXX: Borrowck + let data = get_watcher_data(unsafe { transmute_mut_region(self) }); + fail_unless!(data.write_cb.is_none()); + data.write_cb = Some(cb); + + let req = WriteRequest::new(); + let buf = vec_to_uv_buf(msg); + // XXX: Allocation + let bufs = ~[buf]; + unsafe { + fail_unless!(0 == uvll::write(req.native_handle(), + self.native_handle(), + &bufs, write_cb)); + } + // XXX: Freeing immediately after write. Is this ok? + let _v = vec_from_uv_buf(buf); + + extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { + let write_request: WriteRequest = NativeHandle::from_native_handle(req); + let mut stream_watcher = write_request.stream(); + write_request.delete(); + let cb = get_watcher_data(&mut stream_watcher).write_cb.swap_unwrap(); + let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status); + cb(stream_watcher, status); + } + } + + fn accept(&mut self, stream: StreamWatcher) { + let self_handle = self.native_handle() as *c_void; + let stream_handle = stream.native_handle() as *c_void; + unsafe { + fail_unless!(0 == uvll::accept(self_handle, stream_handle)); + } + } + + fn close(self, cb: NullCallback) { + { + let mut self = self; + let data = get_watcher_data(&mut self); + fail_unless!(data.close_cb.is_none()); + data.close_cb = Some(cb); + } + + unsafe { uvll::close(self.native_handle(), close_cb); } + + extern fn close_cb(handle: *uvll::uv_stream_t) { + let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); + { + let mut data = get_watcher_data(&mut stream_watcher); + data.close_cb.swap_unwrap()(); + } + drop_watcher_data(&mut stream_watcher); + unsafe { free_handle(handle as *c_void) } + } + } +} + +impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher { + fn from_native_handle( + handle: *uvll::uv_stream_t) -> StreamWatcher { + StreamWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_stream_t { + match self { &StreamWatcher(ptr) => ptr } + } +} + +pub struct TcpWatcher(*uvll::uv_tcp_t); + +impl Watcher for TcpWatcher { + fn event_loop(&self) -> Loop { + loop_from_watcher(self) + } +} + +pub type ConnectionCallback = ~fn(StreamWatcher, Option); +impl Callback for ConnectionCallback { } + +pub impl TcpWatcher { + fn new(loop_: &mut Loop) -> TcpWatcher { + unsafe { + let handle = malloc_handle(UV_TCP); + fail_unless!(handle.is_not_null()); + fail_unless!(0 == uvll::tcp_init(loop_.native_handle(), handle)); + let mut watcher = NativeHandle::from_native_handle(handle); + install_watcher_data(&mut watcher); + return watcher; + } + } + + fn bind(&mut self, address: IpAddr) { + match address { + Ipv4(*) => { + do ip4_as_uv_ip4(address) |addr| { + let result = unsafe { + uvll::tcp_bind(self.native_handle(), addr) + }; + // XXX: bind is likely to fail. need real error handling + fail_unless!(result == 0); + } + } + _ => fail!() + } + } + + fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) { + unsafe { + fail_unless!(get_watcher_data(self).connect_cb.is_none()); + get_watcher_data(self).connect_cb = Some(cb); + + let mut connect_watcher = ConnectRequest::new(); + let connect_handle = connect_watcher.native_handle(); + match address { + Ipv4(*) => { + do ip4_as_uv_ip4(address) |addr| { + rtdebug!("connect_t: %x", connect_handle as uint); + fail_unless!(0 == uvll::tcp_connect(connect_handle, + self.native_handle(), + addr, connect_cb)); + } + } + _ => fail!() + } + + extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { + rtdebug!("connect_t: %x", req as uint); + let connect_request: ConnectRequest = NativeHandle::from_native_handle(req); + let mut stream_watcher = connect_request.stream(); + connect_request.delete(); + let cb: ConnectionCallback = { + let data = get_watcher_data(&mut stream_watcher); + data.connect_cb.swap_unwrap() + }; + let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status); + cb(stream_watcher, status); + } + } + } + + fn listen(&mut self, cb: ConnectionCallback) { + // XXX: Borrowck + let data = get_watcher_data(unsafe { transmute_mut_region(self) }); + fail_unless!(data.connect_cb.is_none()); + data.connect_cb = Some(cb); + + unsafe { + static BACKLOG: c_int = 128; // XXX should be configurable + // XXX: This can probably fail + fail_unless!(0 == uvll::listen(self.native_handle(), + BACKLOG, connection_cb)); + } + + extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { + rtdebug!("connection_cb"); + let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); + let cb = get_watcher_data(&mut stream_watcher).connect_cb.swap_unwrap(); + let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status); + cb(stream_watcher, status); + } + } + + fn as_stream(&self) -> StreamWatcher { + NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t) + } +} + +impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher { + fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher { + TcpWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_tcp_t { + match self { &TcpWatcher(ptr) => ptr } + } +} + +pub type ConnectCallback = ~fn(ConnectRequest, Option); +impl Callback for ConnectCallback { } + +// uv_connect_t is a subclass of uv_req_t +struct ConnectRequest(*uvll::uv_connect_t); + +impl Request for ConnectRequest { } + +impl ConnectRequest { + + fn new() -> ConnectRequest { + let connect_handle = unsafe { + malloc_req(UV_CONNECT) + }; + fail_unless!(connect_handle.is_not_null()); + let connect_handle = connect_handle as *uvll::uv_connect_t; + ConnectRequest(connect_handle) + } + + fn stream(&self) -> StreamWatcher { + unsafe { + let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle()); + NativeHandle::from_native_handle(stream_handle) + } + } + + fn delete(self) { + unsafe { free_req(self.native_handle() as *c_void) } + } +} + +impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest { + fn from_native_handle( + handle: *uvll:: uv_connect_t) -> ConnectRequest { + ConnectRequest(handle) + } + fn native_handle(&self) -> *uvll::uv_connect_t { + match self { &ConnectRequest(ptr) => ptr } + } +} + +pub struct WriteRequest(*uvll::uv_write_t); + +impl Request for WriteRequest { } + +pub impl WriteRequest { + + fn new() -> WriteRequest { + let write_handle = unsafe { + malloc_req(UV_WRITE) + }; + fail_unless!(write_handle.is_not_null()); + let write_handle = write_handle as *uvll::uv_write_t; + WriteRequest(write_handle) + } + + fn stream(&self) -> StreamWatcher { + unsafe { + let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle()); + NativeHandle::from_native_handle(stream_handle) + } + } + + fn delete(self) { + unsafe { free_req(self.native_handle() as *c_void) } + } +} + +impl NativeHandle<*uvll::uv_write_t> for WriteRequest { + fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest { + WriteRequest(handle) + } + fn native_handle(&self) -> *uvll::uv_write_t { + match self { &WriteRequest(ptr) => ptr } + } +} + + +#[test] +#[ignore(reason = "ffi struct issues")] +fn connect_close() { + do run_in_bare_thread() { + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + // Connect to a port where nobody is listening + let addr = Ipv4(127, 0, 0, 1, 2923); + do tcp_watcher.connect(addr) |stream_watcher, status| { + rtdebug!("tcp_watcher.connect!"); + fail_unless!(status.is_some()); + fail_unless!(status.get().name() == ~"ECONNREFUSED"); + stream_watcher.close(||()); + } + loop_.run(); + loop_.close(); + } +} + +#[test] +#[ignore(reason = "need a server to connect to")] +fn connect_read() { + do run_in_bare_thread() { + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + let addr = Ipv4(127, 0, 0, 1, 2924); + do tcp_watcher.connect(addr) |stream_watcher, status| { + let mut stream_watcher = stream_watcher; + rtdebug!("tcp_watcher.connect!"); + fail_unless!(status.is_none()); + let alloc: AllocCallback = |size| { + vec_to_uv_buf(vec::from_elem(size, 0)) + }; + do stream_watcher.read_start(alloc) + |stream_watcher, nread, buf, status| { + + let buf = vec_from_uv_buf(buf); + rtdebug!("read cb!"); + if status.is_none() { + let bytes = buf.unwrap(); + rtdebug!("%s", bytes.slice(0, nread as uint).to_str()); + } else { + rtdebug!("status after read: %s", status.get().to_str()); + rtdebug!("closing"); + stream_watcher.close(||()); + } + } + } + loop_.run(); + loop_.close(); + } +} + +#[test] +#[ignore(reason = "ffi struct issues")] +fn listen() { + do run_in_bare_thread() { + static MAX: int = 10; + let mut loop_ = Loop::new(); + let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) }; + let addr = Ipv4(127, 0, 0, 1, 2925); + server_tcp_watcher.bind(addr); + let loop_ = loop_; + rtdebug!("listening"); + do server_tcp_watcher.listen |server_stream_watcher, status| { + rtdebug!("listened!"); + fail_unless!(status.is_none()); + let mut server_stream_watcher = server_stream_watcher; + let mut loop_ = loop_; + let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); + let mut client_tcp_watcher = client_tcp_watcher.as_stream(); + server_stream_watcher.accept(client_tcp_watcher); + let count_cell = Cell(0); + let server_stream_watcher = server_stream_watcher; + rtdebug!("starting read"); + let alloc: AllocCallback = |size| { + vec_to_uv_buf(vec::from_elem(size, 0)) + }; + do client_tcp_watcher.read_start(alloc) + |stream_watcher, nread, buf, status| { + + rtdebug!("i'm reading!"); + let buf = vec_from_uv_buf(buf); + let mut count = count_cell.take(); + if status.is_none() { + rtdebug!("got %d bytes", nread); + let buf = buf.unwrap(); + for buf.slice(0, nread as uint).each |byte| { + fail_unless!(*byte == count as u8); + rtdebug!("%u", *byte as uint); + count += 1; + } + } else { + fail_unless!(count == MAX); + do stream_watcher.close { + server_stream_watcher.close(||()); + } + } + count_cell.put_back(count); + } + } + + let _client_thread = do Thread::start { + rtdebug!("starting client thread"); + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + do tcp_watcher.connect(addr) |stream_watcher, status| { + rtdebug!("connecting"); + fail_unless!(status.is_none()); + let mut stream_watcher = stream_watcher; + let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9]; + do stream_watcher.write(msg) |stream_watcher, status| { + rtdebug!("writing"); + fail_unless!(status.is_none()); + stream_watcher.close(||()); + } + } + loop_.run(); + loop_.close(); + }; + + let mut loop_ = loop_; + loop_.run(); + loop_.close(); + } +} diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index 58a4a65ca90c4..bcad67c0c94ff 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -12,13 +12,11 @@ use option::*; use result::*; use super::uv::*; -use super::io::*; +use super::rtio::*; use ops::Drop; use cell::{Cell, empty_cell}; use cast::transmute; -use super::StreamObject; use super::sched::Scheduler; -use super::IoFactoryObject; #[cfg(test)] use super::sched::Task; #[cfg(test)] use unstable::run_in_bare_thread; @@ -189,12 +187,9 @@ impl TcpListener for UvTcpListener { do server_tcp_watcher.listen |server_stream_watcher, status| { let maybe_stream = if status.is_none() { let mut server_stream_watcher = server_stream_watcher; - let mut loop_ = - loop_from_watcher(&server_stream_watcher); - let mut client_tcp_watcher = - TcpWatcher::new(&mut loop_); - let mut client_tcp_watcher = - client_tcp_watcher.as_stream(); + let mut loop_ = loop_from_watcher(&server_stream_watcher); + let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); + let mut client_tcp_watcher = client_tcp_watcher.as_stream(); // XXX: Need's to be surfaced in interface server_stream_watcher.accept(client_tcp_watcher); Some(~UvStream::new(client_tcp_watcher)) @@ -425,8 +420,7 @@ fn test_read_and_block() { // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it - do scheduler.block_running_task_and_then - |scheduler, task| { + do scheduler.block_running_task_and_then |scheduler, task| { scheduler.task_queue.push_back(task); } } diff --git a/src/libcore/rt/uvll.rs b/src/libcore/rt/uvll.rs new file mode 100644 index 0000000000000..5111b2bdc1d22 --- /dev/null +++ b/src/libcore/rt/uvll.rs @@ -0,0 +1,442 @@ +// Copyright 2012 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! + * Low-level bindings to the libuv library. + * + * This module contains a set of direct, 'bare-metal' wrappers around + * the libuv C-API. + * + * We're not bothering yet to redefine uv's structs as Rust structs + * because they are quite large and change often between versions. + * The maintenance burden is just too high. Instead we use the uv's + * `uv_handle_size` and `uv_req_size` to find the correct size of the + * structs and allocate them on the heap. This can be revisited later. + * + * There are also a collection of helper functions to ease interacting + * with the low-level API. + * + * As new functionality, existant in uv.h, is added to the rust stdlib, + * the mappings should be added in this module. + */ + +#[allow(non_camel_case_types)]; // C types + +use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t}; +use libc::{malloc, free}; +use prelude::*; +use ptr::to_unsafe_ptr; + +pub struct uv_err_t { + code: c_int, + sys_errno_: c_int +} + +pub struct uv_buf_t { + base: *u8, + len: libc::size_t, +} + +pub type uv_handle_t = c_void; +pub type uv_loop_t = c_void; +pub type uv_idle_t = c_void; +pub type uv_tcp_t = c_void; +pub type uv_connect_t = c_void; +pub type uv_write_t = c_void; +pub type uv_async_t = c_void; +pub type uv_timer_t = c_void; +pub type uv_stream_t = c_void; +pub type uv_fs_t = c_void; + +pub type uv_idle_cb = *u8; + +pub type sockaddr_in = c_void; +pub type sockaddr_in6 = c_void; + +#[deriving(Eq)] +pub enum uv_handle_type { + UV_UNKNOWN_HANDLE, + UV_ASYNC, + UV_CHECK, + UV_FS_EVENT, + UV_FS_POLL, + UV_HANDLE, + UV_IDLE, + UV_NAMED_PIPE, + UV_POLL, + UV_PREPARE, + UV_PROCESS, + UV_STREAM, + UV_TCP, + UV_TIMER, + UV_TTY, + UV_UDP, + UV_SIGNAL, + UV_FILE, + UV_HANDLE_TYPE_MAX +} + +#[deriving(Eq)] +pub enum uv_req_type { + UV_UNKNOWN_REQ, + UV_REQ, + UV_CONNECT, + UV_WRITE, + UV_SHUTDOWN, + UV_UDP_SEND, + UV_FS, + UV_WORK, + UV_GETADDRINFO, + UV_REQ_TYPE_MAX +} + +pub unsafe fn malloc_handle(handle: uv_handle_type) -> *c_void { + fail_unless!(handle != UV_UNKNOWN_HANDLE && handle != UV_HANDLE_TYPE_MAX); + let size = unsafe { rust_uv_handle_size(handle as uint) }; + let p = malloc(size); + fail_unless!(p.is_not_null()); + return p; +} + +pub unsafe fn free_handle(v: *c_void) { + free(v) +} + +pub unsafe fn malloc_req(req: uv_req_type) -> *c_void { + fail_unless!(req != UV_UNKNOWN_REQ && req != UV_REQ_TYPE_MAX); + let size = unsafe { rust_uv_req_size(req as uint) }; + let p = malloc(size); + fail_unless!(p.is_not_null()); + return p; +} + +pub unsafe fn free_req(v: *c_void) { + free(v) +} + +#[test] +fn handle_sanity_check() { + unsafe { + fail_unless!(UV_HANDLE_TYPE_MAX as uint == rust_uv_handle_type_max()); + } +} + +#[test] +fn request_sanity_check() { + unsafe { + fail_unless!(UV_REQ_TYPE_MAX as uint == rust_uv_req_type_max()); + } +} + +pub unsafe fn loop_new() -> *c_void { + return rust_uv_loop_new(); +} + +pub unsafe fn loop_delete(loop_handle: *c_void) { + rust_uv_loop_delete(loop_handle); +} + +pub unsafe fn run(loop_handle: *c_void) { + rust_uv_run(loop_handle); +} + +pub unsafe fn close(handle: *T, cb: *u8) { + rust_uv_close(handle as *c_void, cb); +} + +pub unsafe fn walk(loop_handle: *c_void, cb: *u8, arg: *c_void) { + rust_uv_walk(loop_handle, cb, arg); +} + +pub unsafe fn idle_new() -> *uv_idle_t { + rust_uv_idle_new() +} + +pub unsafe fn idle_delete(handle: *uv_idle_t) { + rust_uv_idle_delete(handle) +} + +pub unsafe fn idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int { + rust_uv_idle_init(loop_handle, handle) +} + +pub unsafe fn idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int { + rust_uv_idle_start(handle, cb) +} + +pub unsafe fn idle_stop(handle: *uv_idle_t) -> c_int { + rust_uv_idle_stop(handle) +} + +pub unsafe fn tcp_init(loop_handle: *c_void, handle: *uv_tcp_t) -> c_int { + return rust_uv_tcp_init(loop_handle, handle); +} + +// FIXME ref #2064 +pub unsafe fn tcp_connect(connect_ptr: *uv_connect_t, + tcp_handle_ptr: *uv_tcp_t, + addr_ptr: *sockaddr_in, + after_connect_cb: *u8) -> c_int { + return rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr, + after_connect_cb, addr_ptr); +} +// FIXME ref #2064 +pub unsafe fn tcp_connect6(connect_ptr: *uv_connect_t, + tcp_handle_ptr: *uv_tcp_t, + addr_ptr: *sockaddr_in6, + after_connect_cb: *u8) -> c_int { + return rust_uv_tcp_connect6(connect_ptr, tcp_handle_ptr, + after_connect_cb, addr_ptr); +} +// FIXME ref #2064 +pub unsafe fn tcp_bind(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in) -> c_int { + return rust_uv_tcp_bind(tcp_server_ptr, addr_ptr); +} +// FIXME ref #2064 +pub unsafe fn tcp_bind6(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in6) -> c_int { + return rust_uv_tcp_bind6(tcp_server_ptr, addr_ptr); +} + +pub unsafe fn tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in) -> c_int { + return rust_uv_tcp_getpeername(tcp_handle_ptr, name); +} + +pub unsafe fn tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) ->c_int { + return rust_uv_tcp_getpeername6(tcp_handle_ptr, name); +} + +pub unsafe fn listen(stream: *T, backlog: c_int, cb: *u8) -> c_int { + return rust_uv_listen(stream as *c_void, backlog, cb); +} + +pub unsafe fn accept(server: *c_void, client: *c_void) -> c_int { + return rust_uv_accept(server as *c_void, client as *c_void); +} + +pub unsafe fn write(req: *uv_write_t, stream: *T, buf_in: *~[uv_buf_t], cb: *u8) -> c_int { + let buf_ptr = vec::raw::to_ptr(*buf_in); + let buf_cnt = vec::len(*buf_in) as i32; + return rust_uv_write(req as *c_void, stream as *c_void, buf_ptr, buf_cnt, cb); +} +pub unsafe fn read_start(stream: *uv_stream_t, on_alloc: *u8, on_read: *u8) -> c_int { + return rust_uv_read_start(stream as *c_void, on_alloc, on_read); +} + +pub unsafe fn read_stop(stream: *uv_stream_t) -> c_int { + return rust_uv_read_stop(stream as *c_void); +} + +pub unsafe fn last_error(loop_handle: *c_void) -> uv_err_t { + return rust_uv_last_error(loop_handle); +} + +pub unsafe fn strerror(err: *uv_err_t) -> *c_char { + return rust_uv_strerror(err); +} +pub unsafe fn err_name(err: *uv_err_t) -> *c_char { + return rust_uv_err_name(err); +} + +pub unsafe fn async_init(loop_handle: *c_void, async_handle: *uv_async_t, cb: *u8) -> c_int { + return rust_uv_async_init(loop_handle, async_handle, cb); +} + +pub unsafe fn async_send(async_handle: *uv_async_t) { + return rust_uv_async_send(async_handle); +} +pub unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t { + let out_buf = uv_buf_t { base: ptr::null(), len: 0 as size_t }; + let out_buf_ptr = ptr::addr_of(&out_buf); + rust_uv_buf_init(out_buf_ptr, input, len as size_t); + return out_buf; +} + +pub unsafe fn timer_init(loop_ptr: *c_void, timer_ptr: *uv_timer_t) -> c_int { + return rust_uv_timer_init(loop_ptr, timer_ptr); +} +pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: uint, + repeat: uint) -> c_int { + return rust_uv_timer_start(timer_ptr, cb, timeout as c_uint, repeat as c_uint); +} +pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> c_int { + return rust_uv_timer_stop(timer_ptr); +} + +pub unsafe fn malloc_ip4_addr(ip: &str, port: int) -> *sockaddr_in { + do str::as_c_str(ip) |ip_buf| { + rust_uv_ip4_addrp(ip_buf as *u8, port as libc::c_int) + } +} +pub unsafe fn malloc_ip6_addr(ip: &str, port: int) -> *sockaddr_in6 { + do str::as_c_str(ip) |ip_buf| { + rust_uv_ip6_addrp(ip_buf as *u8, port as libc::c_int) + } +} + +pub unsafe fn free_ip4_addr(addr: *sockaddr_in) { + rust_uv_free_ip4_addr(addr); +} + +pub unsafe fn free_ip6_addr(addr: *sockaddr_in6) { + rust_uv_free_ip6_addr(addr); +} + +// data access helpers +pub unsafe fn get_loop_for_uv_handle(handle: *T) -> *c_void { + return rust_uv_get_loop_for_uv_handle(handle as *c_void); +} +pub unsafe fn get_stream_handle_from_connect_req(connect: *uv_connect_t) -> *uv_stream_t { + return rust_uv_get_stream_handle_from_connect_req(connect); +} +pub unsafe fn get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t { + return rust_uv_get_stream_handle_from_write_req(write_req); +} +pub unsafe fn get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void { + rust_uv_get_data_for_uv_loop(loop_ptr) +} +pub unsafe fn set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void) { + rust_uv_set_data_for_uv_loop(loop_ptr, data); +} +pub unsafe fn get_data_for_uv_handle(handle: *T) -> *c_void { + return rust_uv_get_data_for_uv_handle(handle as *c_void); +} +pub unsafe fn set_data_for_uv_handle(handle: *T, data: *U) { + rust_uv_set_data_for_uv_handle(handle as *c_void, data as *c_void); +} +pub unsafe fn get_data_for_req(req: *T) -> *c_void { + return rust_uv_get_data_for_req(req as *c_void); +} +pub unsafe fn set_data_for_req(req: *T, data: *U) { + rust_uv_set_data_for_req(req as *c_void, data as *c_void); +} +pub unsafe fn get_base_from_buf(buf: uv_buf_t) -> *u8 { + return rust_uv_get_base_from_buf(buf); +} +pub unsafe fn get_len_from_buf(buf: uv_buf_t) -> size_t { + return rust_uv_get_len_from_buf(buf); +} +pub unsafe fn malloc_buf_base_of(suggested_size: size_t) -> *u8 { + return rust_uv_malloc_buf_base_of(suggested_size); +} +pub unsafe fn free_base_of_buf(buf: uv_buf_t) { + rust_uv_free_base_of_buf(buf); +} + +pub unsafe fn get_last_err_info(uv_loop: *c_void) -> ~str { + let err = last_error(uv_loop); + let err_ptr = ptr::addr_of(&err); + let err_name = str::raw::from_c_str(err_name(err_ptr)); + let err_msg = str::raw::from_c_str(strerror(err_ptr)); + return fmt!("LIBUV ERROR: name: %s msg: %s", + err_name, err_msg); +} + +pub unsafe fn get_last_err_data(uv_loop: *c_void) -> uv_err_data { + let err = last_error(uv_loop); + let err_ptr = ptr::addr_of(&err); + let err_name = str::raw::from_c_str(err_name(err_ptr)); + let err_msg = str::raw::from_c_str(strerror(err_ptr)); + uv_err_data { err_name: err_name, err_msg: err_msg } +} + +pub struct uv_err_data { + err_name: ~str, + err_msg: ~str, +} + +extern { + + fn rust_uv_handle_size(type_: uintptr_t) -> size_t; + fn rust_uv_req_size(type_: uintptr_t) -> size_t; + fn rust_uv_handle_type_max() -> uintptr_t; + fn rust_uv_req_type_max() -> uintptr_t; + + // libuv public API + fn rust_uv_loop_new() -> *c_void; + fn rust_uv_loop_delete(lp: *c_void); + fn rust_uv_run(loop_handle: *c_void); + fn rust_uv_close(handle: *c_void, cb: *u8); + fn rust_uv_walk(loop_handle: *c_void, cb: *u8, arg: *c_void); + + fn rust_uv_idle_new() -> *uv_idle_t; + fn rust_uv_idle_delete(handle: *uv_idle_t); + fn rust_uv_idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int; + fn rust_uv_idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int; + fn rust_uv_idle_stop(handle: *uv_idle_t) -> c_int; + + fn rust_uv_async_send(handle: *uv_async_t); + fn rust_uv_async_init(loop_handle: *c_void, + async_handle: *uv_async_t, + cb: *u8) -> c_int; + fn rust_uv_tcp_init(loop_handle: *c_void, handle_ptr: *uv_tcp_t) -> c_int; + // FIXME ref #2604 .. ? + fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, len: size_t); + fn rust_uv_last_error(loop_handle: *c_void) -> uv_err_t; + // FIXME ref #2064 + fn rust_uv_strerror(err: *uv_err_t) -> *c_char; + // FIXME ref #2064 + fn rust_uv_err_name(err: *uv_err_t) -> *c_char; + fn rust_uv_ip4_addrp(ip: *u8, port: c_int) -> *sockaddr_in; + fn rust_uv_ip6_addrp(ip: *u8, port: c_int) -> *sockaddr_in6; + fn rust_uv_free_ip4_addr(addr: *sockaddr_in); + fn rust_uv_free_ip6_addr(addr: *sockaddr_in6); + fn rust_uv_ip4_name(src: *sockaddr_in, dst: *u8, size: size_t) -> c_int; + fn rust_uv_ip6_name(src: *sockaddr_in6, dst: *u8, size: size_t) -> c_int; + fn rust_uv_ip4_port(src: *sockaddr_in) -> c_uint; + fn rust_uv_ip6_port(src: *sockaddr_in6) -> c_uint; + // FIXME ref #2064 + fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t, + tcp_handle_ptr: *uv_tcp_t, + ++after_cb: *u8, + ++addr: *sockaddr_in) -> c_int; + // FIXME ref #2064 + fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t, ++addr: *sockaddr_in) -> c_int; + // FIXME ref #2064 + fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t, + tcp_handle_ptr: *uv_tcp_t, + ++after_cb: *u8, + ++addr: *sockaddr_in6) -> c_int; + // FIXME ref #2064 + fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t, ++addr: *sockaddr_in6) -> c_int; + fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, ++name: *sockaddr_in) -> c_int; + fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, ++name: *sockaddr_in6) ->c_int; + fn rust_uv_listen(stream: *c_void, backlog: c_int, cb: *u8) -> c_int; + fn rust_uv_accept(server: *c_void, client: *c_void) -> c_int; + fn rust_uv_write(req: *c_void, + stream: *c_void, + ++buf_in: *uv_buf_t, + buf_cnt: c_int, + cb: *u8) -> c_int; + fn rust_uv_read_start(stream: *c_void, + on_alloc: *u8, + on_read: *u8) -> c_int; + fn rust_uv_read_stop(stream: *c_void) -> c_int; + fn rust_uv_timer_init(loop_handle: *c_void, + timer_handle: *uv_timer_t) -> c_int; + fn rust_uv_timer_start(timer_handle: *uv_timer_t, + cb: *u8, + timeout: c_uint, + repeat: c_uint) -> c_int; + fn rust_uv_timer_stop(handle: *uv_timer_t) -> c_int; + + fn rust_uv_malloc_buf_base_of(sug_size: size_t) -> *u8; + fn rust_uv_free_base_of_buf(++buf: uv_buf_t); + fn rust_uv_get_stream_handle_from_connect_req(connect_req: *uv_connect_t) -> *uv_stream_t; + fn rust_uv_get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t; + fn rust_uv_get_loop_for_uv_handle(handle: *c_void) -> *c_void; + fn rust_uv_get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void; + fn rust_uv_set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void); + fn rust_uv_get_data_for_uv_handle(handle: *c_void) -> *c_void; + fn rust_uv_set_data_for_uv_handle(handle: *c_void, data: *c_void); + fn rust_uv_get_data_for_req(req: *c_void) -> *c_void; + fn rust_uv_set_data_for_req(req: *c_void, data: *c_void); + fn rust_uv_get_base_from_buf(++buf: uv_buf_t) -> *u8; + fn rust_uv_get_len_from_buf(++buf: uv_buf_t) -> size_t; +} diff --git a/src/libcore/unstable.rs b/src/libcore/unstable.rs index 0ef736d519806..6f0c9ba23dfd5 100644 --- a/src/libcore/unstable.rs +++ b/src/libcore/unstable.rs @@ -35,8 +35,6 @@ pub mod extfmt; #[path = "unstable/lang.rs"] #[cfg(notest)] pub mod lang; -#[path = "unstable/uvll.rs"] -pub mod uvll; mod rustrt { use unstable::{raw_thread, rust_little_lock}; diff --git a/src/libcore/unstable/lang.rs b/src/libcore/unstable/lang.rs index ea5dfa0a530c3..5d7920ce820eb 100644 --- a/src/libcore/unstable/lang.rs +++ b/src/libcore/unstable/lang.rs @@ -120,16 +120,25 @@ pub unsafe fn strdup_uniq(ptr: *c_uchar, len: uint) -> ~str { #[lang="start"] pub fn start(main: *u8, argc: int, argv: *c_char, crate_map: *u8) -> int { + use libc::getenv; + use rt::start; + + unsafe { + let use_new_rt = do str::as_c_str("RUST_NEWRT") |s| { + getenv(s).is_null() + }; + if use_new_rt { + return rust_start(main as *c_void, argc as c_int, argv, + crate_map as *c_void) as int; + } else { + return start(main, argc, argv, crate_map); + } + } extern { fn rust_start(main: *c_void, argc: c_int, argv: *c_char, crate_map: *c_void) -> c_int; } - - unsafe { - return rust_start(main as *c_void, argc as c_int, argv, - crate_map as *c_void) as int; - } } // Local Variables: diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs index 04283674d88be..15593571b43ca 100644 --- a/src/libstd/net_ip.rs +++ b/src/libstd/net_ip.rs @@ -21,20 +21,20 @@ use core::vec; use iotask = uv::iotask::IoTask; use interact = uv::iotask::interact; -use sockaddr_in = core::unstable::uvll::sockaddr_in; -use sockaddr_in6 = core::unstable::uvll::sockaddr_in6; -use addrinfo = core::unstable::uvll::addrinfo; -use uv_getaddrinfo_t = core::unstable::uvll::uv_getaddrinfo_t; -use uv_ip4_name = core::unstable::uvll::ip4_name; -use uv_ip4_port = core::unstable::uvll::ip4_port; -use uv_ip6_name = core::unstable::uvll::ip6_name; -use uv_ip6_port = core::unstable::uvll::ip6_port; -use uv_getaddrinfo = core::unstable::uvll::getaddrinfo; -use uv_freeaddrinfo = core::unstable::uvll::freeaddrinfo; -use create_uv_getaddrinfo_t = core::unstable::uvll::getaddrinfo_t; -use set_data_for_req = core::unstable::uvll::set_data_for_req; -use get_data_for_req = core::unstable::uvll::get_data_for_req; -use ll = core::unstable::uvll; +use sockaddr_in = super::uv_ll::sockaddr_in; +use sockaddr_in6 = super::uv_ll::sockaddr_in6; +use addrinfo = super::uv_ll::addrinfo; +use uv_getaddrinfo_t = super::uv_ll::uv_getaddrinfo_t; +use uv_ip4_name = super::uv_ll::ip4_name; +use uv_ip4_port = super::uv_ll::ip4_port; +use uv_ip6_name = super::uv_ll::ip6_name; +use uv_ip6_port = super::uv_ll::ip6_port; +use uv_getaddrinfo = super::uv_ll::getaddrinfo; +use uv_freeaddrinfo = super::uv_ll::freeaddrinfo; +use create_uv_getaddrinfo_t = super::uv_ll::getaddrinfo_t; +use set_data_for_req = super::uv_ll::set_data_for_req; +use get_data_for_req = super::uv_ll::get_data_for_req; +use ll = super::uv_ll; /// An IP address pub enum IpAddr { diff --git a/src/libstd/std.rc b/src/libstd/std.rc index 85e914a60a140..6a7576645b8f8 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -36,7 +36,7 @@ not required in or otherwise suitable for the core library. extern mod core(vers = "0.6"); use core::*; -pub use uv_ll = core::unstable::uvll; +pub mod uv_ll; // General io and system-services modules diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index aaddc9b6836f3..e055b40705773 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -33,6 +33,6 @@ * facilities. */ -pub use ll = core::unstable::uvll; +pub use ll = super::uv_ll; pub use iotask = uv_iotask; pub use global_loop = uv_global_loop; diff --git a/src/libcore/unstable/uvll.rs b/src/libstd/uv_ll.rs similarity index 96% rename from src/libcore/unstable/uvll.rs rename to src/libstd/uv_ll.rs index 80f04cf4ac0c1..57d769d707e67 100644 --- a/src/libcore/unstable/uvll.rs +++ b/src/libstd/uv_ll.rs @@ -32,10 +32,10 @@ #[allow(non_camel_case_types)]; // C types -use libc::size_t; -use libc::c_void; -use prelude::*; -use ptr::to_unsafe_ptr; +use core::libc::size_t; +use core::libc::c_void; +use core::prelude::*; +use core::ptr::to_unsafe_ptr; pub type uv_handle_t = c_void; pub type uv_loop_t = c_void; @@ -357,7 +357,7 @@ pub struct uv_getaddrinfo_t { pub mod uv_ll_struct_stubgen { - use ptr; + use core::ptr; use super::{ uv_async_t, @@ -930,8 +930,6 @@ pub unsafe fn tcp_connect(connect_ptr: *uv_connect_t, addr_ptr: *sockaddr_in, after_connect_cb: *u8) -> libc::c_int { - debug!("b4 foreign tcp_connect--addr port: %u cb: %u", - (*addr_ptr).sin_port as uint, after_connect_cb as uint); return rustrt::rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr, after_connect_cb, addr_ptr); } @@ -1021,22 +1019,8 @@ pub unsafe fn async_send(async_handle: *uv_async_t) { pub unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t { let out_buf = uv_buf_t { base: ptr::null(), len: 0 as libc::size_t }; let out_buf_ptr = ptr::addr_of(&out_buf); - debug!("buf_init - input %u len %u out_buf: %u", - input as uint, - len as uint, - out_buf_ptr as uint); - // yuck :/ rustrt::rust_uv_buf_init(out_buf_ptr, input, len as size_t); - //let result = rustrt::rust_uv_buf_init_2(input, len as size_t); - debug!("after rust_uv_buf_init"); - let res_base = get_base_from_buf(out_buf); - let res_len = get_len_from_buf(out_buf); - //let res_base = get_base_from_buf(result); - debug!("buf_init - result %u len %u", - res_base as uint, - res_len as uint); return out_buf; - //return result; } pub unsafe fn ip4_addr(ip: &str, port: int) -> sockaddr_in { @@ -1078,8 +1062,6 @@ pub unsafe fn ip6_name(src: &sockaddr_in6) -> ~str { 0u8,0u8,0u8,0u8,0u8,0u8]; do vec::as_imm_buf(dst) |dst_buf, size| { let src_unsafe_ptr = to_unsafe_ptr(src); - debug!("val of src *sockaddr_in6: %? sockaddr_in6: %?", - src_unsafe_ptr, src); let result = rustrt::rust_uv_ip6_name(src_unsafe_ptr, dst_buf, size as libc::size_t); match result { @@ -1240,9 +1222,9 @@ pub unsafe fn addrinfo_as_sockaddr_in6(input: *addrinfo) -> *sockaddr_in6 { #[cfg(test)] pub mod test { - use prelude::*; + use core::prelude::*; + use core::comm::{SharedChan, stream, GenericChan, GenericPort}; use super::*; - use comm::{SharedChan, stream, GenericChan, GenericPort}; enum tcp_read_data { tcp_read_eof, @@ -1265,7 +1247,7 @@ pub mod test { suggested_size: libc::size_t) -> uv_buf_t { unsafe { - debug!("on_alloc_cb!"); + debug!(~"on_alloc_cb!"); let char_ptr = malloc_buf_base_of(suggested_size); debug!("on_alloc_cb h: %? char_ptr: %u sugsize: %u", handle, @@ -1298,15 +1280,15 @@ pub mod test { } else if (nread == -1) { // err .. possibly EOF - debug!("read: eof!"); + debug!(~"read: eof!"); } else { // nread == 0 .. do nothing, just free buf as below - debug!("read: do nothing!"); + debug!(~"read: do nothing!"); } // when we're done free_base_of_buf(buf); - debug!("CLIENT exiting on_read_cb"); + debug!(~"CLIENT exiting on_read_cb"); } } @@ -1321,8 +1303,7 @@ pub mod test { "CLIENT on_write_complete_cb: tcp:%d write_handle:%d", stream as int, write_req as int); let result = read_start(stream, on_alloc_cb, on_read_cb); - debug!( - "CLIENT ending on_write_complete_cb .. status: %d", + debug!("CLIENT ending on_write_complete_cb .. status: %d", result as int); } } @@ -1335,7 +1316,7 @@ pub mod test { let stream = get_stream_handle_from_connect_req(connect_req_ptr); if (status == 0i32) { - debug!("on_connect_cb: in status=0 if.."); + debug!(~"on_connect_cb: in status=0 if.."); let client_data = get_data_for_req( connect_req_ptr as *libc::c_void) as *request_wrapper; @@ -1353,10 +1334,10 @@ pub mod test { let test_loop = get_loop_for_uv_handle( stream as *libc::c_void); let err_msg = get_last_err_info(test_loop); - debug!("%?", err_msg); + debug!(err_msg); fail_unless!(false); } - debug!("finishing on_connect_cb"); + debug!(~"finishing on_connect_cb"); } } @@ -1396,9 +1377,9 @@ pub mod test { let tcp_init_result = tcp_init( test_loop as *libc::c_void, tcp_handle_ptr); if (tcp_init_result == 0i32) { - debug!("sucessful tcp_init_result"); + debug!(~"sucessful tcp_init_result"); - debug!("building addr..."); + debug!(~"building addr..."); let addr = ip4_addr(ip, port); // FIXME ref #2064 let addr_ptr = ptr::addr_of(&addr); @@ -1420,17 +1401,17 @@ pub mod test { set_data_for_uv_handle( tcp_handle_ptr as *libc::c_void, ptr::addr_of(&client_data) as *libc::c_void); - debug!("before run tcp req loop"); + debug!(~"before run tcp req loop"); run(test_loop); - debug!("after run tcp req loop"); + debug!(~"after run tcp req loop"); } else { - debug!("tcp_connect() failure"); + debug!(~"tcp_connect() failure"); fail_unless!(false); } } else { - debug!("tcp_init() failure"); + debug!(~"tcp_init() failure"); fail_unless!(false); } loop_delete(test_loop); @@ -1446,8 +1427,7 @@ pub mod test { extern fn client_stream_after_close_cb(handle: *libc::c_void) { unsafe { - debug!( - "SERVER: closed client stream, now closing server stream"); + debug!(~"SERVER: closed client stream, now closing server stream"); let client_data = get_data_for_uv_handle( handle) as *tcp_server_data; @@ -1460,7 +1440,7 @@ pub mod test { unsafe { let client_stream_ptr = get_stream_handle_from_write_req(req); - debug!("SERVER: resp sent... closing client stream"); + debug!(~"SERVER: resp sent... closing client stream"); close(client_stream_ptr as *libc::c_void, client_stream_after_close_cb) } @@ -1491,8 +1471,8 @@ pub mod test { let server_kill_msg = copy (*client_data).server_kill_msg; let write_req = (*client_data).server_write_req; if str::contains(request_str, server_kill_msg) { - debug!("SERVER: client req contains kill_msg!"); - debug!("SERVER: sending response to client"); + debug!(~"SERVER: client req contains kill_msg!"); + debug!(~"SERVER: sending response to client"); read_stop(client_stream_ptr); let server_chan = (*client_data).server_chan.clone(); server_chan.send(request_str); @@ -1504,28 +1484,28 @@ pub mod test { debug!("SERVER: resp write result: %d", write_result as int); if (write_result != 0i32) { - debug!("bad result for server resp write()"); - debug!("%s", get_last_err_info( + debug!(~"bad result for server resp write()"); + debug!(get_last_err_info( get_loop_for_uv_handle(client_stream_ptr as *libc::c_void))); fail_unless!(false); } } else { - debug!("SERVER: client req !contain kill_msg!"); + debug!(~"SERVER: client req !contain kill_msg!"); } } else if (nread == -1) { // err .. possibly EOF - debug!("read: eof!"); + debug!(~"read: eof!"); } else { // nread == 0 .. do nothing, just free buf as below - debug!("read: do nothing!"); + debug!(~"read: do nothing!"); } // when we're done free_base_of_buf(buf); - debug!("SERVER exiting on_read_cb"); + debug!(~"SERVER exiting on_read_cb"); } } @@ -1533,7 +1513,7 @@ pub mod test { *uv_stream_t, status: libc::c_int) { unsafe { - debug!("client connecting!"); + debug!(~"client connecting!"); let test_loop = get_loop_for_uv_handle( server_stream_ptr as *libc::c_void); if status != 0i32 { @@ -1551,7 +1531,7 @@ pub mod test { client_stream_ptr as *libc::c_void, server_data as *libc::c_void); if (client_init_result == 0i32) { - debug!("successfully initialized client stream"); + debug!(~"successfully initialized client stream"); let accept_result = accept(server_stream_ptr as *libc::c_void, client_stream_ptr as @@ -1563,7 +1543,7 @@ pub mod test { on_alloc_cb, on_server_read_cb); if (read_result == 0i32) { - debug!("successful server read start"); + debug!(~"successful server read start"); } else { debug!("server_connection_cb: bad read:%d", @@ -1674,7 +1654,7 @@ pub mod test { let bind_result = tcp_bind(tcp_server_ptr, server_addr_ptr); if (bind_result == 0i32) { - debug!("successful uv_tcp_bind, listening"); + debug!(~"successful uv_tcp_bind, listening"); // uv_listen() let listen_result = listen(tcp_server_ptr as @@ -1694,7 +1674,7 @@ pub mod test { async_send(continue_async_handle_ptr); // uv_run() run(test_loop); - debug!("server uv::run() has returned"); + debug!(~"server uv::run() has returned"); } else { debug!("uv_async_init failure: %d", @@ -1751,9 +1731,9 @@ pub mod test { }; // block until the server up is.. possibly a race? - debug!("before receiving on server continue_port"); + debug!(~"before receiving on server continue_port"); continue_port.recv(); - debug!("received on continue port, set up tcp client"); + debug!(~"received on continue port, set up tcp client"); let kill_server_msg_copy = copy kill_server_msg; do task::spawn_sched(task::ManualThreads(1u)) { @@ -1808,7 +1788,7 @@ pub mod test { let output = fmt!( "STRUCT_SIZE FAILURE: %s -- actual: %u expected: %u", t_name, rust_size, foreign_size as uint); - debug!("%s", output); + debug!(output); } fail_unless!(sizes_match); } diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index d9ef6a52dbef6..803da32cbc8ac 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -21,17 +21,6 @@ void* global_crate_map = NULL; -#ifndef _WIN32 -pthread_key_t sched_key; -#else -DWORD sched_key; -#endif - -extern "C" void* -rust_get_sched_tls_key() { - return &sched_key; -} - /** The runtime entrypoint. The (C ABI) main function generated by rustc calls `rust_start`, providing the address of the Rust ABI main function, the @@ -41,10 +30,6 @@ rust_get_sched_tls_key() { extern "C" CDECL int rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { -#ifndef _WIN32 - pthread_key_create(&sched_key, NULL); -#endif - // Load runtime configuration options from the environment. // FIXME #1497: Should provide a way to get these from the command // line as well. diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index a2053c115bbbf..f586e05772b76 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -882,6 +882,46 @@ rust_get_rt_env() { return task->kernel->env; } +typedef void *(*nullary_fn)(); + +extern "C" CDECL void +rust_call_nullary_fn(nullary_fn f) { + f(); +} + +#ifndef _WIN32 +pthread_key_t sched_key; +#else +DWORD sched_key; +#endif + +extern "C" void* +rust_get_sched_tls_key() { + return &sched_key; +} + +// Initialize the global state required by the new scheduler +extern "C" CDECL void +rust_initialize_global_state() { + + static lock_and_signal init_lock; + static bool initialized = false; + + scoped_lock with(init_lock); + + if (!initialized) { + +#ifndef _WIN32 + assert(!pthread_key_create(&sched_key, NULL)); +#else + sched_key = TlsAlloc(); + assert(sched_key != TLS_OUT_OF_INDEXES); +#endif + + initialized = true; + } +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 5159434873733..325b10b92df6b 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -479,6 +479,34 @@ extern "C" struct sockaddr_in6 rust_uv_ip6_addr(const char* ip, int port) { return uv_ip6_addr(ip, port); } + +extern "C" struct sockaddr_in* +rust_uv_ip4_addrp(const char* ip, int port) { + struct sockaddr_in addr = uv_ip4_addr(ip, port); + struct sockaddr_in *addrp = (sockaddr_in*)malloc(sizeof(struct sockaddr_in)); + assert(addrp); + memcpy(addrp, &addr, sizeof(struct sockaddr_in)); + return addrp; +} +extern "C" struct sockaddr_in6* +rust_uv_ip6_addrp(const char* ip, int port) { + struct sockaddr_in6 addr = uv_ip6_addr(ip, port); + struct sockaddr_in6 *addrp = (sockaddr_in6*)malloc(sizeof(struct sockaddr_in6)); + assert(addrp); + memcpy(addrp, &addr, sizeof(struct sockaddr_in6)); + return addrp; +} + +extern "C" void +rust_uv_free_ip4_addr(sockaddr_in *addrp) { + free(addrp); +} + +extern "C" void +rust_uv_free_ip6_addr(sockaddr_in6 *addrp) { + free(addrp); +} + extern "C" int rust_uv_ip4_name(struct sockaddr_in* src, char* dst, size_t size) { return uv_ip4_name(src, dst, size); @@ -563,3 +591,23 @@ extern "C" int rust_uv_idle_stop(uv_idle_t* idle) { return uv_idle_stop(idle); } + +extern "C" size_t +rust_uv_handle_size(uintptr_t type) { + return uv_handle_size((uv_handle_type)type); +} + +extern "C" size_t +rust_uv_req_size(uintptr_t type) { + return uv_req_size((uv_req_type)type); +} + +extern "C" uintptr_t +rust_uv_handle_type_max() { + return UV_HANDLE_TYPE_MAX; +} + +extern "C" uintptr_t +rust_uv_req_type_max() { + return UV_REQ_TYPE_MAX; +} diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 5a8868f33f9f9..59fd8991622c6 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -202,3 +202,15 @@ rust_dbg_extern_identity_TwoU64s rust_dbg_extern_identity_double rust_dbg_extern_identity_u8 rust_get_rt_env +rust_uv_handle_size +rust_uv_req_size +rust_uv_handle_type_max +rust_uv_req_type_max +rust_uv_ip4_addrp +rust_uv_ip6_addrp +rust_uv_free_ip4_addr +rust_uv_free_ip6_addr +rust_call_nullary_fn +rust_initialize_global_state + +