-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathchannel.rs
94 lines (80 loc) · 2.51 KB
/
channel.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use std::sync::mpsc::{Sender, Receiver, channel, TryRecvError};
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub enum ChannelError {
SyncError(String),
SendError,
RecvError,
}
pub struct SyncChannel<T> {
_s: Arc<Mutex<Sender<T>>>,
_r: Arc<Mutex<Receiver<T>>>
}
impl<T> SyncChannel<T> {
pub fn new() -> Self {
let (s, r) = channel::<T>();
SyncChannel { _s: Arc::new(Mutex::new(s)), _r: Arc::new(Mutex::new(r)) }
}
pub fn send(&self, data: T) -> Result<(), ChannelError> {
self._s.lock()
.map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
.send(data)
.map_err(|_| { ChannelError::SendError })
}
pub fn recv(&self) -> Result<T, ChannelError> {
self._r.lock()
.map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
.recv()
.map_err(|_| { ChannelError::RecvError })
}
pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
match self._r.lock()
.map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
.try_recv() {
Err(TryRecvError::Empty) => Ok(None),
Err(_) => Err(ChannelError::RecvError),
Ok(d) => Ok(Some(d))
}
}
}
impl<T> Clone for SyncChannel<T> {
fn clone(&self) -> Self {
SyncChannel {
_r: self._r.clone(),
_s: self._s.clone(),
}
}
}
unsafe impl<T> Send for SyncChannel<T> {}
unsafe impl<T> Sync for SyncChannel<T> {}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::sync::atomic::{AtomicBool, Ordering};
#[test]
fn test_channels() {
let ch: SyncChannel<i32> = SyncChannel::new();
let ch_clone = ch.clone();
thread::spawn(move || {
ch_clone.send(3)
});
assert_eq!(ch.recv().unwrap(), 3);
}
#[test]
fn test_channels_try_recv() {
let ch: SyncChannel<i32> = SyncChannel::new();
let ch_clone = ch.clone();
let switch = Arc::new(AtomicBool::new(false));
let sw_clone = switch.clone();
assert_eq!(ch.try_recv().unwrap(), None);
thread::spawn(move || {
ch_clone.send(3).unwrap();
sw_clone.store(true, Ordering::Relaxed);
});
loop {
if switch.load(Ordering::Relaxed) { break; }
}
assert_eq!(ch.try_recv().unwrap(), Some(3));
}
}