-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathmini-tokio-two.rs
93 lines (76 loc) · 2.34 KB
/
mini-tokio-two.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use crossbeam::channel;
use std::sync::{Arc, Mutex};
use std::pin::Pin;
use futures::{task, task::ArcWake, Future};
use std::task::{Context};
use std::time::{Instant, Duration};
mod common;
use common::delay::Delay;
fn main() {
let mut mini_tokio = MiniTokio::new();
mini_tokio.spawn(async {
println!("这一句先打印出来!");
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
println!("out result: {}", out);
});
mini_tokio.run();
}
struct Task {
future: Mutex<Pin<Box<dyn Future<Output=()> + Send>>>,
executor: channel::Sender<Arc<Task>>,
}
impl Task {
fn schedule(self: &Arc<Self>) {
let _ = self.executor.send(self.clone());
}
fn poll(self: Arc<Self>) {
// 从task实例上创建一个waker. 它使用 ArcWake
let waker = task::waker(self.clone());
let mut context = Context::from_waker(&waker);
// 没有其它线程试图锁住 future,所以可以try_lock()
let mut future = self.future.try_lock().unwrap();
// 轮询future
let _ = future.as_mut().poll(&mut context);
}
/// 使用指定的future产生一个新的任务
///
/// 初始化一个新的task,它包含了future,完成后将task 推送到队列中, channel的另外一半receiver将接收到它们.
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where F: Future<Output=()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
});
let _ = sender.send(task);
}
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}
struct MiniTokio {
scheduled: channel::Receiver<Arc<Task>>,
sender: channel::Sender<Arc<Task>>,
}
impl MiniTokio {
// 此run方法,将会一直执行
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}
fn new() -> Self {
let (sender, scheduled) = channel::bounded(100);
MiniTokio { sender, scheduled }
}
/// MiniTokio 产生一个future
fn spawn<F>(&mut self, future: F)
where F: Future<Output=()> + Send + 'static
{
Task::spawn(future, &self.sender)
}
}