@@ -19,7 +19,7 @@ use futures::channel::mpsc;
19
19
use futures:: task:: { Context , Poll } ;
20
20
use futures:: { Future , Stream , StreamExt } ;
21
21
use log:: { error, info, trace, warn} ;
22
- use nymsphinx:: Destination ;
22
+ use nymsphinx:: { Destination , DestinationAddressBytes } ;
23
23
use std:: pin:: Pin ;
24
24
use std:: time:: Duration ;
25
25
use tokio:: runtime:: Handle ;
@@ -96,31 +96,48 @@ impl<T: 'static + NymTopology> OutQueueControl<T> {
96
96
}
97
97
}
98
98
99
+ async fn get_route (
100
+ & self ,
101
+ client : Option < DestinationAddressBytes > ,
102
+ ) -> Option < Vec < nymsphinx:: Node > > {
103
+ let route = match client {
104
+ None => self . topology_access . random_route ( ) . await ,
105
+ Some ( client) => self . topology_access . random_route_to_client ( client) . await ,
106
+ } ;
107
+
108
+ route
109
+ }
110
+
99
111
async fn on_message ( & mut self , next_message : StreamMessage ) {
100
112
trace ! ( "created new message" ) ;
101
- let route = match self . topology_access . random_route ( ) . await {
102
- None => {
103
- warn ! ( "No valid topology detected - won't send any real or loop message this time" ) ;
104
- // TODO: this creates a potential problem: we can lose real messages if we were
105
- // unable to get topology, perhaps we should store them in some buffer?
106
- return ;
107
- }
108
- Some ( route) => route,
109
- } ;
110
113
111
114
let next_packet = match next_message {
112
- StreamMessage :: Cover => mix_client:: packet:: loop_cover_message_route (
113
- self . our_info . address . clone ( ) ,
114
- self . our_info . identifier ,
115
- route,
116
- self . average_packet_delay ,
117
- ) ,
118
- StreamMessage :: Real ( real_message) => mix_client:: packet:: encapsulate_message_route (
119
- real_message. 0 ,
120
- real_message. 1 ,
121
- route,
122
- self . average_packet_delay ,
123
- ) ,
115
+ StreamMessage :: Cover => {
116
+ let route = self . get_route ( None ) . await ;
117
+ if route. is_none ( ) {
118
+ warn ! ( "No valid topology detected - won't send any real or loop message this time" ) ;
119
+ }
120
+ let route = route. unwrap ( ) ;
121
+ mix_client:: packet:: loop_cover_message_route (
122
+ self . our_info . address . clone ( ) ,
123
+ self . our_info . identifier ,
124
+ route,
125
+ self . average_packet_delay ,
126
+ )
127
+ }
128
+ StreamMessage :: Real ( real_message) => {
129
+ let route = self . get_route ( Some ( real_message. 0 . address . clone ( ) ) ) . await ;
130
+ if route. is_none ( ) {
131
+ warn ! ( "No valid topology detected - won't send any real or loop message this time" ) ;
132
+ }
133
+ let route = route. unwrap ( ) ;
134
+ mix_client:: packet:: encapsulate_message_route (
135
+ real_message. 0 ,
136
+ real_message. 1 ,
137
+ route,
138
+ self . average_packet_delay ,
139
+ )
140
+ }
124
141
} ;
125
142
126
143
let next_packet = match next_packet {
@@ -143,6 +160,9 @@ impl<T: 'static + NymTopology> OutQueueControl<T> {
143
160
. unwrap ( ) ;
144
161
// JS: Not entirely sure why or how it fixes stuff, but without the yield call,
145
162
// the UnboundedReceiver [of mix_rx] will not get a chance to read anything
163
+ // JS2: Basically it was the case that with high enough rate, the stream had already a next value
164
+ // ready and hence was immediately re-scheduled causing other tasks to be starved;
165
+ // yield makes it go back the scheduling queue regardless of its value availability
146
166
tokio:: task:: yield_now ( ) . await ;
147
167
}
148
168
0 commit comments