Skip to content

Commit 01ca0d1

Browse files
committed
Be more defensive in pipes (#3098)
1 parent 9e68966 commit 01ca0d1

File tree

3 files changed

+18
-8
lines changed

3 files changed

+18
-8
lines changed

src/libcore/option.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ fn swap_unwrap<T>(opt: &mut option<T>) -> T {
127127
unwrap(util::replace(opt, none))
128128
}
129129

130-
pure fn unwrap_expect<T>(-opt: option<T>, reason: ~str) -> T {
130+
pure fn unwrap_expect<T>(-opt: option<T>, reason: &str) -> T {
131131
//! As unwrap, but with a specified failure message.
132-
if opt.is_none() { fail reason; }
132+
if opt.is_none() { fail reason.to_unique(); }
133133
unwrap(opt)
134134
}
135135

src/libcore/pipes.rs

+13-6
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ Fails if the sender closes the connection.
343343
344344
*/
345345
fn recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>) -> T {
346-
option::unwrap(try_recv(p))
346+
option::unwrap_expect(try_recv(p), "connection closed")
347347
}
348348

349349
/** Attempts to receive a message from a pipe.
@@ -391,10 +391,13 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
391391
full {
392392
let mut payload = none;
393393
payload <-> p.payload;
394+
p.header.blocked_task = none;
394395
p.header.state = empty;
395396
return some(option::unwrap(payload))
396397
}
397398
terminated {
399+
// This assert detects when we've accidentally unsafely
400+
// casted too big of a number to a state.
398401
assert old_state == terminated;
399402
return none;
400403
}
@@ -428,10 +431,13 @@ fn sender_terminate<T: send>(p: *packet<T>) {
428431
}
429432
blocked {
430433
// wake up the target
431-
let target = p.header.blocked_task.get();
432-
rustrt::task_signal_event(target,
433-
ptr::addr_of(p.header) as *libc::c_void);
434-
434+
alt p.header.blocked_task {
435+
some(target) =>
436+
rustrt::task_signal_event(
437+
target,
438+
ptr::addr_of(p.header) as *libc::c_void),
439+
none => { debug!{"receiver is already shutting down"} }
440+
}
435441
// The receiver will eventually clean up.
436442
//unsafe { forget(p) }
437443
}
@@ -448,6 +454,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
448454
#[doc(hidden)]
449455
fn receiver_terminate<T: send>(p: *packet<T>) {
450456
let p = unsafe { &*p };
457+
assert p.header.blocked_task == none;
451458
alt swap_state_rel(p.header.state, terminated) {
452459
empty {
453460
// the sender will clean up
@@ -514,7 +521,7 @@ fn wait_many(pkts: &[*packet_header]) -> uint {
514521

515522
for pkts.each |p| { unsafe{ (*p).unblock()} }
516523

517-
debug!{"%?, %?", ready_packet, pkts[ready_packet]};
524+
debug!("%?, %?", ready_packet, pkts[ready_packet]);
518525

519526
unsafe {
520527
assert (*pkts[ready_packet]).state == full

src/rt/rust_task.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,9 @@ void
680680
rust_task::signal_event(void *event) {
681681
scoped_lock with(lifecycle_lock);
682682

683+
assert(task_state_blocked == state ||
684+
task_state_running == state);
685+
683686
this->event = event;
684687
event_reject = true;
685688
if(task_state_blocked == state) {

0 commit comments

Comments
 (0)