Skip to content

Commit 065e121

Browse files
committed
Relax an assertion in start_selection()
It asserted that the previous count was always nonnegative, but DISCONNECTED is a valid value for it to see. In order to continue to remember to store DISCONNECTED after DISCONNECTED was seen, I also added a helper method. Closes #12226
1 parent 411a01f commit 065e121

File tree

3 files changed

+122
-10
lines changed

3 files changed

+122
-10
lines changed

src/libstd/comm/select.rs

+94-2
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ impl Select {
151151
/// event could either be that data is available or the corresponding
152152
/// channel has been closed.
153153
pub fn wait(&self) -> uint {
154+
self.wait2(false)
155+
}
156+
157+
/// Helper method for skipping the preflight checks during testing
158+
fn wait2(&self, do_preflight_checks: bool) -> uint {
154159
// Note that this is currently an inefficient implementation. We in
155160
// theory have knowledge about all ports in the set ahead of time, so
156161
// this method shouldn't really have to iterate over all of them yet
@@ -175,7 +180,7 @@ impl Select {
175180
let mut amt = 0;
176181
for p in self.iter() {
177182
amt += 1;
178-
if (*p).packet.can_recv() {
183+
if do_preflight_checks && (*p).packet.can_recv() {
179184
return (*p).id;
180185
}
181186
}
@@ -507,7 +512,7 @@ mod test {
507512
let (p2, c2) = Chan::<()>::new();
508513
let (p, c) = Chan::new();
509514
spawn(proc() {
510-
let mut s = Select::new();
515+
let s = Select::new();
511516
let mut h1 = s.handle(&p1);
512517
let mut h2 = s.handle(&p2);
513518
unsafe { h2.add(); }
@@ -521,4 +526,91 @@ mod test {
521526
c2.send(());
522527
p.recv();
523528
})
529+
530+
test!(fn preflight1() {
531+
let (p, c) = Chan::new();
532+
c.send(());
533+
select!(
534+
() = p.recv() => {},
535+
)
536+
})
537+
538+
test!(fn preflight2() {
539+
let (p, c) = Chan::new();
540+
c.send(());
541+
c.send(());
542+
select!(
543+
() = p.recv() => {},
544+
)
545+
})
546+
547+
test!(fn preflight3() {
548+
let (p, c) = Chan::new();
549+
drop(c.clone());
550+
c.send(());
551+
select!(
552+
() = p.recv() => {},
553+
)
554+
})
555+
556+
test!(fn preflight4() {
557+
let (p, c) = Chan::new();
558+
c.send(());
559+
let s = Select::new();
560+
let mut h = s.handle(&p);
561+
unsafe { h.add(); }
562+
assert_eq!(s.wait2(false), h.id);
563+
})
564+
565+
test!(fn preflight5() {
566+
let (p, c) = Chan::new();
567+
c.send(());
568+
c.send(());
569+
let s = Select::new();
570+
let mut h = s.handle(&p);
571+
unsafe { h.add(); }
572+
assert_eq!(s.wait2(false), h.id);
573+
})
574+
575+
test!(fn preflight6() {
576+
let (p, c) = Chan::new();
577+
drop(c.clone());
578+
c.send(());
579+
let s = Select::new();
580+
let mut h = s.handle(&p);
581+
unsafe { h.add(); }
582+
assert_eq!(s.wait2(false), h.id);
583+
})
584+
585+
test!(fn preflight7() {
586+
let (p, c) = Chan::<()>::new();
587+
drop(c);
588+
let s = Select::new();
589+
let mut h = s.handle(&p);
590+
unsafe { h.add(); }
591+
assert_eq!(s.wait2(false), h.id);
592+
})
593+
594+
test!(fn preflight8() {
595+
let (p, c) = Chan::new();
596+
c.send(());
597+
drop(c);
598+
p.recv();
599+
let s = Select::new();
600+
let mut h = s.handle(&p);
601+
unsafe { h.add(); }
602+
assert_eq!(s.wait2(false), h.id);
603+
})
604+
605+
test!(fn preflight9() {
606+
let (p, c) = Chan::new();
607+
drop(c.clone());
608+
c.send(());
609+
drop(c);
610+
p.recv();
611+
let s = Select::new();
612+
let mut h = s.handle(&p);
613+
unsafe { h.add(); }
614+
assert_eq!(s.wait2(false), h.id);
615+
})
524616
}

src/libstd/comm/shared.rs

+14-4
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,17 @@ impl<T: Send> Packet<T> {
398398
cnt == DISCONNECTED || cnt - self.steals > 0
399399
}
400400

401+
// increment the count on the channel (used for selection)
402+
fn bump(&mut self, amt: int) -> int {
403+
match self.cnt.fetch_add(amt, atomics::SeqCst) {
404+
DISCONNECTED => {
405+
self.cnt.store(DISCONNECTED, atomics::SeqCst);
406+
DISCONNECTED
407+
}
408+
n => n
409+
}
410+
}
411+
401412
// Inserts the blocked task for selection on this port, returning it back if
402413
// the port already has data on it.
403414
//
@@ -408,8 +419,8 @@ impl<T: Send> Packet<T> {
408419
match self.decrement(task) {
409420
Ok(()) => Ok(()),
410421
Err(task) => {
411-
let prev = self.cnt.fetch_add(1, atomics::SeqCst);
412-
assert!(prev >= 0);
422+
let prev = self.bump(1);
423+
assert!(prev == DISCONNECTED || prev >= 0);
413424
return Err(task);
414425
}
415426
}
@@ -440,11 +451,10 @@ impl<T: Send> Packet<T> {
440451
let cnt = self.cnt.load(atomics::SeqCst);
441452
if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
442453
};
443-
let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
454+
let prev = self.bump(steals + 1);
444455

445456
if prev == DISCONNECTED {
446457
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
447-
self.cnt.store(DISCONNECTED, atomics::SeqCst);
448458
true
449459
} else {
450460
let cur = prev + steals + 1;

src/libstd/comm/stream.rs

+14-4
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,17 @@ impl<T: Send> Packet<T> {
333333
}
334334
}
335335

336+
// increment the count on the channel (used for selection)
337+
fn bump(&mut self, amt: int) -> int {
338+
match self.cnt.fetch_add(amt, atomics::SeqCst) {
339+
DISCONNECTED => {
340+
self.cnt.store(DISCONNECTED, atomics::SeqCst);
341+
DISCONNECTED
342+
}
343+
n => n
344+
}
345+
}
346+
336347
// Attempts to start selecting on this port. Like a oneshot, this can fail
337348
// immediately because of an upgrade.
338349
pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
@@ -351,8 +362,8 @@ impl<T: Send> Packet<T> {
351362
};
352363
// Undo our decrement above, and we should be guaranteed that the
353364
// previous value is positive because we're not going to sleep
354-
let prev = self.cnt.fetch_add(1, atomics::SeqCst);
355-
assert!(prev >= 0);
365+
let prev = self.bump(1);
366+
assert!(prev == DISCONNECTED || prev >= 0);
356367
return ret;
357368
}
358369
}
@@ -384,13 +395,12 @@ impl<T: Send> Packet<T> {
384395
// and in the stream case we can have at most one steal, so just assume
385396
// that we had one steal.
386397
let steals = 1;
387-
let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
398+
let prev = self.bump(steals + 1);
388399

389400
// If we were previously disconnected, then we know for sure that there
390401
// is no task in to_wake, so just keep going
391402
let has_data = if prev == DISCONNECTED {
392403
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
393-
self.cnt.store(DISCONNECTED, atomics::SeqCst);
394404
true // there is data, that data is that we're disconnected
395405
} else {
396406
let cur = prev + steals + 1;

0 commit comments

Comments
 (0)