Skip to content

Commit deeca5d

Browse files
committed
auto merge of #10054 : alexcrichton/rust/basic-event-loop, r=brson
This is more progress towards #9128 and all its related tree of issues. This implements a new `BasicLoop` on top of pthreads synchronization primitives (wrapped in `LittleLock`). This also removes the wonky `callback_ms` function from the interface of the event loop. After #9901 is taking forever to land, I'm going to try to do all this runtime work in much smaller chunks than before. Right now this will not work unless #9901 lands first, but I'm close to landing it (hopefully), and I wanted to go ahead and get this reviewed before throwing it at bors later on down the road. This "pausible idle callback" is also a bit of a weird idea, but it wasn't as difficult to implement as callback_ms so I'm more semi-ok with it.
2 parents ac82d18 + 64a5c3b commit deeca5d

File tree

15 files changed

+494
-88
lines changed

15 files changed

+494
-88
lines changed

Diff for: src/libextra/comm.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ pub fn rendezvous<T: Send>() -> (SyncPort<T>, SyncChan<T>) {
136136
#[cfg(test)]
137137
mod test {
138138
use comm::{DuplexStream, rendezvous};
139-
use std::rt::test::run_in_newsched_task;
139+
use std::rt::test::run_in_uv_task;
140140
use std::task::spawn_unlinked;
141141

142142

@@ -165,7 +165,7 @@ mod test {
165165
#[test]
166166
fn recv_a_lot() {
167167
// Rendezvous streams should be able to handle any number of messages being sent
168-
do run_in_newsched_task {
168+
do run_in_uv_task {
169169
let (port, chan) = rendezvous();
170170
do spawn {
171171
do 1000000.times { chan.send(()) }

Diff for: src/libstd/rt/basic.rs

+256
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
//! This is a basic event loop implementation not meant for any "real purposes"
12+
//! other than testing the scheduler and proving that it's possible to have a
13+
//! pluggable event loop.
14+
15+
use prelude::*;
16+
17+
use cast;
18+
use rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausibleIdleCallback};
19+
use unstable::sync::Exclusive;
20+
use util;
21+
22+
/// This is the only exported function from this module.
23+
pub fn event_loop() -> ~EventLoop {
24+
~BasicLoop::new() as ~EventLoop
25+
}
26+
27+
struct BasicLoop {
28+
work: ~[~fn()], // pending work
29+
idle: Option<*BasicPausible>, // only one is allowed
30+
remotes: ~[(uint, ~fn())],
31+
next_remote: uint,
32+
messages: Exclusive<~[Message]>
33+
}
34+
35+
enum Message { RunRemote(uint), RemoveRemote(uint) }
36+
37+
struct Time {
38+
sec: u64,
39+
nsec: u64,
40+
}
41+
42+
impl Ord for Time {
43+
fn lt(&self, other: &Time) -> bool {
44+
self.sec < other.sec || self.nsec < other.nsec
45+
}
46+
}
47+
48+
impl BasicLoop {
49+
fn new() -> BasicLoop {
50+
BasicLoop {
51+
work: ~[],
52+
idle: None,
53+
next_remote: 0,
54+
remotes: ~[],
55+
messages: Exclusive::new(~[]),
56+
}
57+
}
58+
59+
/// Process everything in the work queue (continually)
60+
fn work(&mut self) {
61+
while self.work.len() > 0 {
62+
for work in util::replace(&mut self.work, ~[]).move_iter() {
63+
work();
64+
}
65+
}
66+
}
67+
68+
fn remote_work(&mut self) {
69+
let messages = unsafe {
70+
do self.messages.with |messages| {
71+
if messages.len() > 0 {
72+
Some(util::replace(messages, ~[]))
73+
} else {
74+
None
75+
}
76+
}
77+
};
78+
let messages = match messages {
79+
Some(m) => m, None => return
80+
};
81+
for message in messages.iter() {
82+
self.message(*message);
83+
}
84+
}
85+
86+
fn message(&mut self, message: Message) {
87+
match message {
88+
RunRemote(i) => {
89+
match self.remotes.iter().find(|& &(id, _)| id == i) {
90+
Some(&(_, ref f)) => (*f)(),
91+
None => unreachable!()
92+
}
93+
}
94+
RemoveRemote(i) => {
95+
match self.remotes.iter().position(|&(id, _)| id == i) {
96+
Some(i) => { self.remotes.remove(i); }
97+
None => unreachable!()
98+
}
99+
}
100+
}
101+
}
102+
103+
/// Run the idle callback if one is registered
104+
fn idle(&mut self) {
105+
unsafe {
106+
match self.idle {
107+
Some(idle) => {
108+
if (*idle).active {
109+
(*(*idle).work.get_ref())();
110+
}
111+
}
112+
None => {}
113+
}
114+
}
115+
}
116+
117+
fn has_idle(&self) -> bool {
118+
unsafe { self.idle.is_some() && (**self.idle.get_ref()).active }
119+
}
120+
}
121+
122+
impl EventLoop for BasicLoop {
123+
fn run(&mut self) {
124+
// Not exactly efficient, but it gets the job done.
125+
while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() {
126+
127+
self.work();
128+
self.remote_work();
129+
130+
if self.has_idle() {
131+
self.idle();
132+
continue
133+
}
134+
135+
unsafe {
136+
// We block here if we have no messages to process and we may
137+
// receive a message at a later date
138+
do self.messages.hold_and_wait |messages| {
139+
self.remotes.len() > 0 &&
140+
messages.len() == 0 &&
141+
self.work.len() == 0
142+
}
143+
}
144+
}
145+
}
146+
147+
fn callback(&mut self, f: ~fn()) {
148+
self.work.push(f);
149+
}
150+
151+
// XXX: Seems like a really weird requirement to have an event loop provide.
152+
fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
153+
let callback = ~BasicPausible::new(self);
154+
rtassert!(self.idle.is_none());
155+
unsafe {
156+
let cb_ptr: &*BasicPausible = cast::transmute(&callback);
157+
self.idle = Some(*cb_ptr);
158+
}
159+
return callback as ~PausibleIdleCallback;
160+
}
161+
162+
fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback {
163+
let id = self.next_remote;
164+
self.next_remote += 1;
165+
self.remotes.push((id, f));
166+
~BasicRemote::new(self.messages.clone(), id) as ~RemoteCallback
167+
}
168+
169+
/// This has no bindings for local I/O
170+
fn io<'a>(&'a mut self, _: &fn(&'a mut IoFactory)) {}
171+
}
172+
173+
struct BasicRemote {
174+
queue: Exclusive<~[Message]>,
175+
id: uint,
176+
}
177+
178+
impl BasicRemote {
179+
fn new(queue: Exclusive<~[Message]>, id: uint) -> BasicRemote {
180+
BasicRemote { queue: queue, id: id }
181+
}
182+
}
183+
184+
impl RemoteCallback for BasicRemote {
185+
fn fire(&mut self) {
186+
unsafe {
187+
do self.queue.hold_and_signal |queue| {
188+
queue.push(RunRemote(self.id));
189+
}
190+
}
191+
}
192+
}
193+
194+
impl Drop for BasicRemote {
195+
fn drop(&mut self) {
196+
unsafe {
197+
do self.queue.hold_and_signal |queue| {
198+
queue.push(RemoveRemote(self.id));
199+
}
200+
}
201+
}
202+
}
203+
204+
struct BasicPausible {
205+
eloop: *mut BasicLoop,
206+
work: Option<~fn()>,
207+
active: bool,
208+
}
209+
210+
impl BasicPausible {
211+
fn new(eloop: &mut BasicLoop) -> BasicPausible {
212+
BasicPausible {
213+
active: false,
214+
work: None,
215+
eloop: eloop,
216+
}
217+
}
218+
}
219+
220+
impl PausibleIdleCallback for BasicPausible {
221+
fn start(&mut self, f: ~fn()) {
222+
rtassert!(!self.active && self.work.is_none());
223+
self.active = true;
224+
self.work = Some(f);
225+
}
226+
fn pause(&mut self) {
227+
self.active = false;
228+
}
229+
fn resume(&mut self) {
230+
self.active = true;
231+
}
232+
fn close(&mut self) {
233+
self.active = false;
234+
self.work = None;
235+
}
236+
}
237+
238+
impl Drop for BasicPausible {
239+
fn drop(&mut self) {
240+
unsafe {
241+
(*self.eloop).idle = None;
242+
}
243+
}
244+
}
245+
246+
fn time() -> Time {
247+
#[fixed_stack_segment]; #[inline(never)];
248+
extern {
249+
fn get_time(sec: &mut i64, nsec: &mut i32);
250+
}
251+
let mut sec = 0;
252+
let mut nsec = 0;
253+
unsafe { get_time(&mut sec, &mut nsec) }
254+
255+
Time { sec: sec as u64, nsec: nsec as u64 }
256+
}

Diff for: src/libstd/rt/io/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,13 @@ pub fn standard_error(kind: IoErrorKind) -> IoError {
606606
detail: None
607607
}
608608
}
609+
IoUnavailable => {
610+
IoError {
611+
kind: IoUnavailable,
612+
desc: "I/O is unavailable",
613+
detail: None
614+
}
615+
}
609616
_ => fail!()
610617
}
611618
}

Diff for: src/libstd/rt/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ pub mod shouldnt_be_public {
102102
// Internal macros used by the runtime.
103103
mod macros;
104104

105+
/// Basic implementation of an EventLoop, provides no I/O interfaces
106+
mod basic;
107+
105108
/// The global (exchange) heap.
106109
pub mod global_heap;
107110

Diff for: src/libstd/rt/rtio.rs

-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ pub trait EventLoop {
2828
fn run(&mut self);
2929
fn callback(&mut self, ~fn());
3030
fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback;
31-
fn callback_ms(&mut self, ms: u64, ~fn());
3231
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallback;
3332

3433
/// The asynchronous I/O services. Not all event loops may provide one

0 commit comments

Comments
 (0)