@@ -3,18 +3,16 @@ use std::{
3
3
sync:: Arc ,
4
4
} ;
5
5
6
- use futures:: { channel:: mpsc, select, FutureExt , SinkExt } ;
7
-
8
6
use async_std:: {
9
7
io:: BufReader ,
10
8
net:: { TcpListener , TcpStream , ToSocketAddrs } ,
11
9
prelude:: * ,
12
10
task,
11
+ sync:: { channel, Sender , Receiver } ,
12
+ stream,
13
13
} ;
14
14
15
15
type Result < T > = std:: result:: Result < T , Box < dyn std:: error:: Error + Send + Sync > > ;
16
- type Sender < T > = mpsc:: UnboundedSender < T > ;
17
- type Receiver < T > = mpsc:: UnboundedReceiver < T > ;
18
16
19
17
#[ derive( Debug ) ]
20
18
enum Void { }
@@ -26,7 +24,7 @@ pub(crate) fn main() -> Result<()> {
26
24
async fn accept_loop ( addr : impl ToSocketAddrs ) -> Result < ( ) > {
27
25
let listener = TcpListener :: bind ( addr) . await ?;
28
26
29
- let ( broker_sender, broker_receiver) = mpsc :: unbounded ( ) ;
27
+ let ( broker_sender, broker_receiver) = channel ( 10 ) ;
30
28
let broker = task:: spawn ( broker_loop ( broker_receiver) ) ;
31
29
let mut incoming = listener. incoming ( ) ;
32
30
while let Some ( stream) = incoming. next ( ) . await {
@@ -39,7 +37,7 @@ async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
39
37
Ok ( ( ) )
40
38
}
41
39
42
- async fn connection_loop ( mut broker : Sender < Event > , stream : TcpStream ) -> Result < ( ) > {
40
+ async fn connection_loop ( broker : Sender < Event > , stream : TcpStream ) -> Result < ( ) > {
43
41
let stream = Arc :: new ( stream) ;
44
42
let reader = BufReader :: new ( & * stream) ;
45
43
let mut lines = reader. lines ( ) ;
@@ -48,15 +46,14 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result
48
46
None => return Err ( "peer disconnected immediately" . into ( ) ) ,
49
47
Some ( line) => line?,
50
48
} ;
51
- let ( _shutdown_sender, shutdown_receiver) = mpsc :: unbounded :: < Void > ( ) ;
49
+ let ( _shutdown_sender, shutdown_receiver) = channel :: < Void > ( 0 ) ;
52
50
broker
53
51
. send ( Event :: NewPeer {
54
52
name : name. clone ( ) ,
55
53
stream : Arc :: clone ( & stream) ,
56
54
shutdown : shutdown_receiver,
57
55
} )
58
- . await
59
- . unwrap ( ) ;
56
+ . await ;
60
57
61
58
while let Some ( line) = lines. next ( ) . await {
62
59
let line = line?;
@@ -76,28 +73,36 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result
76
73
to : dest,
77
74
msg,
78
75
} )
79
- . await
80
- . unwrap ( ) ;
76
+ . await ;
81
77
}
82
78
83
79
Ok ( ( ) )
84
80
}
85
81
82
+ #[ derive( Debug ) ]
83
+ enum ConnectionWriterEvent {
84
+ Message ( String ) ,
85
+ Shutdown
86
+ }
87
+
86
88
async fn connection_writer_loop (
87
89
messages : & mut Receiver < String > ,
88
90
stream : Arc < TcpStream > ,
89
- mut shutdown : Receiver < Void > ,
91
+ shutdown : Receiver < Void > ,
90
92
) -> Result < ( ) > {
91
93
let mut stream = & * stream;
92
- loop {
93
- select ! {
94
- msg = messages. next( ) . fuse( ) => match msg {
95
- Some ( msg) => stream. write_all( msg. as_bytes( ) ) . await ?,
96
- None => break ,
97
- } ,
98
- void = shutdown. next( ) . fuse( ) => match void {
99
- Some ( void) => match void { } ,
100
- None => break ,
94
+ let messages = messages. map ( ConnectionWriterEvent :: Message ) ;
95
+ let shutdown = shutdown. map ( |_| ConnectionWriterEvent :: Shutdown ) . chain ( stream:: once ( ConnectionWriterEvent :: Shutdown ) ) ;
96
+
97
+ let mut events = shutdown. merge ( messages) ;
98
+
99
+ while let Some ( event) = events. next ( ) . await {
100
+ match event {
101
+ ConnectionWriterEvent :: Message ( msg) => {
102
+ stream. write_all ( msg. as_bytes ( ) ) . await ?;
103
+ }
104
+ ConnectionWriterEvent :: Shutdown => {
105
+ break
101
106
}
102
107
}
103
108
}
@@ -118,58 +123,61 @@ enum Event {
118
123
} ,
119
124
}
120
125
121
- async fn broker_loop ( mut events : Receiver < Event > ) {
122
- let ( disconnect_sender, mut disconnect_receiver) =
123
- mpsc:: unbounded :: < ( String , Receiver < String > ) > ( ) ;
126
+ #[ derive( Debug ) ]
127
+ enum BrokerEvent {
128
+ ClientEvent ( Event ) ,
129
+ Disconnection ( ( String , Receiver < String > ) ) ,
130
+ Shutdown ,
131
+ }
132
+
133
+ async fn broker_loop ( events : Receiver < Event > ) {
134
+ let ( disconnect_sender, disconnect_receiver) = channel ( 10 ) ;
135
+
124
136
let mut peers: HashMap < String , Sender < String > > = HashMap :: new ( ) ;
137
+ let disconnect_receiver = disconnect_receiver. map ( BrokerEvent :: Disconnection ) ;
138
+ let events = events. map ( BrokerEvent :: ClientEvent ) . chain ( stream:: once ( BrokerEvent :: Shutdown ) ) ;
125
139
126
- loop {
127
- let event = select ! {
128
- event = events. next( ) . fuse( ) => match event {
129
- None => break ,
130
- Some ( event) => event,
131
- } ,
132
- disconnect = disconnect_receiver. next( ) . fuse( ) => {
133
- let ( name, _pending_messages) = disconnect. unwrap( ) ;
134
- assert!( peers. remove( & name) . is_some( ) ) ;
135
- continue ;
136
- } ,
137
- } ;
140
+ let mut stream = disconnect_receiver. merge ( events) ;
141
+
142
+ while let Some ( event) = stream. next ( ) . await {
138
143
match event {
139
- Event :: Message { from, to, msg } => {
144
+ BrokerEvent :: ClientEvent ( Event :: Message { from, to, msg } ) => {
140
145
for addr in to {
141
146
if let Some ( peer) = peers. get_mut ( & addr) {
142
147
let msg = format ! ( "from {}: {}\n " , from, msg) ;
143
- peer. send ( msg) . await . unwrap ( ) ;
148
+ peer. send ( msg) . await ;
144
149
}
145
150
}
146
151
}
147
- Event :: NewPeer {
152
+ BrokerEvent :: ClientEvent ( Event :: NewPeer {
148
153
name,
149
154
stream,
150
155
shutdown,
151
- } => match peers. entry ( name. clone ( ) ) {
156
+ } ) => match peers. entry ( name. clone ( ) ) {
152
157
Entry :: Occupied ( ..) => ( ) ,
153
158
Entry :: Vacant ( entry) => {
154
- let ( client_sender, mut client_receiver) = mpsc :: unbounded ( ) ;
159
+ let ( client_sender, mut client_receiver) = channel ( 10 ) ;
155
160
entry. insert ( client_sender) ;
156
- let mut disconnect_sender = disconnect_sender. clone ( ) ;
161
+ let disconnect_sender = disconnect_sender. clone ( ) ;
157
162
spawn_and_log_error ( async move {
158
163
let res =
159
164
connection_writer_loop ( & mut client_receiver, stream, shutdown) . await ;
160
165
disconnect_sender
161
166
. send ( ( name, client_receiver) )
162
- . await
163
- . unwrap ( ) ;
167
+ . await ;
164
168
res
165
169
} ) ;
166
170
}
167
- } ,
171
+ }
172
+ BrokerEvent :: Disconnection ( ( name, _pending_messages) ) => {
173
+ assert ! ( peers. remove( & name) . is_some( ) ) ;
174
+ }
175
+ BrokerEvent :: Shutdown => break ,
168
176
}
169
177
}
170
178
drop ( peers) ;
171
179
drop ( disconnect_sender) ;
172
- while let Some ( ( _name, _pending_messages) ) = disconnect_receiver . next ( ) . await { }
180
+ while let Some ( BrokerEvent :: Disconnection ( ( _name, _pending_messages) ) ) = stream . next ( ) . await { }
173
181
}
174
182
175
183
fn spawn_and_log_error < F > ( fut : F ) -> task:: JoinHandle < ( ) >
0 commit comments