12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
+ use futures:: stream:: { SplitSink , SplitStream } ;
15
16
use futures:: { channel:: mpsc, future:: BoxFuture , FutureExt , SinkExt , StreamExt } ;
16
17
use gateway_requests:: auth_token:: { AuthToken , AuthTokenConversionError } ;
17
18
use gateway_requests:: { BinaryRequest , ClientControlRequest , ServerResponse } ;
19
+ use log:: * ;
18
20
use nymsphinx:: DestinationAddressBytes ;
19
21
use std:: convert:: TryFrom ;
20
22
use std:: fmt:: { self , Error , Formatter } ;
@@ -113,18 +115,101 @@ impl fmt::Display for GatewayClientError {
113
115
}
114
116
}
115
117
118
+ // We have ownership over sink half of the connection, but the stream is owned
119
+ // by some other task, however, we can notify it to get the stream back.
120
+ struct PartiallyDelegated < ' a > {
121
+ sink_half : SplitSink < WsConn , Message > ,
122
+ delegated_stream : (
123
+ BoxFuture < ' a , Result < SplitStream < WsConn > , GatewayClientError > > ,
124
+ Arc < Notify > ,
125
+ ) ,
126
+ }
127
+
128
+ impl < ' a > PartiallyDelegated < ' a > {
129
+ // TODO: this can be potentially bad as we have no direct restrictions of ensuring it's called
130
+ // within tokio runtime. Perhaps we should use the "old" way of passing explicit
131
+ // runtime handle to the constructor and using that instead?
132
+ fn split_and_listen_for_sphinx_packets (
133
+ conn : WsConn ,
134
+ sphinx_packet_sender : SphinxPacketSender ,
135
+ ) -> Result < Self , GatewayClientError > {
136
+ // when called for, it NEEDS TO yield back the stream so that we could merge it and
137
+ // read control request responses.
138
+ let notify = Arc :: new ( Notify :: new ( ) ) ;
139
+ let notify_clone = Arc :: clone ( & notify) ;
140
+
141
+ let ( sink, mut stream) = conn. split ( ) ;
142
+
143
+ let sphinx_receiver_future = async move {
144
+ let mut should_return = false ;
145
+ while !should_return {
146
+ tokio:: select! {
147
+ _ = notify_clone. notified( ) => {
148
+ should_return = true ;
149
+ }
150
+ msg = stream. next( ) => {
151
+ if msg. is_none( ) {
152
+ return Err ( GatewayClientError :: ConnectionAbruptlyClosed ) ;
153
+ }
154
+ let msg = match msg. unwrap( ) {
155
+ Ok ( msg) => msg,
156
+ Err ( err) => {
157
+ return Err ( GatewayClientError :: NetworkError ( err) ) ;
158
+ }
159
+ } ;
160
+ match msg {
161
+ Message :: Binary ( bin_msg) => {
162
+ // TODO: some batching mechanism to allow reading and sending more than
163
+ // one packet at the time, because the receiver can easily handle it
164
+ sphinx_packet_sender. unbounded_send( vec![ bin_msg] ) . unwrap( )
165
+ } ,
166
+ // I think that in the future we should perhaps have some sequence number system, i.e.
167
+ // so each request/reponse pair can be easily identified, so that if messages are
168
+ // not ordered (for some peculiar reason) we wouldn't lose anything.
169
+ // This would also require NOT discarding any text responses here.
170
+ Message :: Text ( _) => debug!( "received a text message - probably a response to some previous query!" ) ,
171
+ _ => ( ) ,
172
+ } ;
173
+ }
174
+ } ;
175
+ }
176
+ Ok ( stream)
177
+ } ;
178
+
179
+ let spawned_boxed_task = tokio:: spawn ( sphinx_receiver_future)
180
+ . map ( |join_handle| {
181
+ join_handle. expect ( "task must have not failed to finish its execution!" )
182
+ } )
183
+ . boxed ( ) ;
184
+
185
+ Ok ( PartiallyDelegated {
186
+ sink_half : sink,
187
+ delegated_stream : ( spawned_boxed_task, notify) ,
188
+ } )
189
+ }
190
+
191
+ // if we want to send a message and don't care about response, we can don't need to reunite the split,
192
+ // the sink itself is enough
193
+ async fn send_without_response ( & mut self , msg : Message ) -> Result < ( ) , GatewayClientError > {
194
+ Ok ( self . sink_half . send ( msg) . await ?)
195
+ }
196
+
197
+ async fn merge ( self ) -> Result < WsConn , GatewayClientError > {
198
+ let ( stream_fut, notify) = self . delegated_stream ;
199
+ notify. notify ( ) ;
200
+ let stream = stream_fut. await ?;
201
+ // the error is thrown when trying to reunite sink and stream that did not originate
202
+ // from the same split which is impossible to happen here
203
+ Ok ( self . sink_half . reunite ( stream) . unwrap ( ) )
204
+ }
205
+ }
206
+
116
207
// we can either have the stream itself or an option to re-obtain it
117
208
// by notifying the future owning it to finish the execution and awaiting the result
118
209
// which should be almost immediate (or an invalid state which should never, ever happen)
119
- // TODO: perhaps restore the previous idea of Split(Stream, Sink) state to allow for
120
- // sending messages without waiting for any responses and having no effect on rate of
121
- // messages being pushed to us
122
210
enum SocketState < ' a > {
123
211
Available ( WsConn ) ,
124
- Delegated (
125
- BoxFuture < ' a , Result < WsConn , GatewayClientError > > ,
126
- Arc < Notify > ,
127
- ) ,
212
+ PartiallyDelegated ( PartiallyDelegated < ' a > ) ,
128
213
NotConnected ,
129
214
Invalid ,
130
215
}
@@ -137,16 +222,16 @@ impl<'a> SocketState<'a> {
137
222
}
138
223
}
139
224
140
- fn is_delegated ( & self ) -> bool {
225
+ fn is_partially_delegated ( & self ) -> bool {
141
226
match self {
142
- SocketState :: Delegated ( _ , _) => true ,
227
+ SocketState :: PartiallyDelegated ( _) => true ,
143
228
_ => false ,
144
229
}
145
230
}
146
231
147
232
fn is_established ( & self ) -> bool {
148
233
match self {
149
- SocketState :: Available ( _) | SocketState :: Delegated ( _ , _) => true ,
234
+ SocketState :: Available ( _) | SocketState :: PartiallyDelegated ( _) => true ,
150
235
_ => false ,
151
236
}
152
237
}
@@ -249,25 +334,22 @@ where
249
334
res. expect ( "response value should have been written in one of the branches!. If you see this error, please report a bug!" )
250
335
}
251
336
252
- // If we want to send a message, we need to have a full control over the socket,
337
+ // If we want to send a message (with response) , we need to have a full control over the socket,
253
338
// as we need to be able to write the request and read the subsequent response
254
339
async fn send_websocket_message (
255
340
& mut self ,
256
341
msg : Message ,
257
342
) -> Result < ServerResponse , GatewayClientError > {
258
343
let mut should_restart_sphinx_listener = false ;
259
- if self . connection . is_delegated ( ) {
344
+ if self . connection . is_partially_delegated ( ) {
260
345
self . recover_socket_connection ( ) . await ?;
261
346
should_restart_sphinx_listener = true ;
262
347
}
263
348
264
349
let conn = match self . connection {
265
350
SocketState :: Available ( ref mut conn) => conn,
266
- SocketState :: Delegated ( _, _) => {
267
- return Err ( GatewayClientError :: ConnectionInInvalidState )
268
- }
269
- SocketState :: Invalid => return Err ( GatewayClientError :: ConnectionInInvalidState ) ,
270
351
SocketState :: NotConnected => return Err ( GatewayClientError :: ConnectionNotEstablished ) ,
352
+ _ => return Err ( GatewayClientError :: ConnectionInInvalidState ) ,
271
353
} ;
272
354
conn. send ( msg) . await ?;
273
355
let response = self . read_control_response ( ) . await ;
@@ -278,13 +360,18 @@ where
278
360
response
279
361
}
280
362
281
- // next on TODO list:
282
- // so that presumably we could increase our sending/receiving rate
283
363
async fn send_websocket_message_without_response (
284
364
& mut self ,
285
365
msg : Message ,
286
366
) -> Result < ( ) , GatewayClientError > {
287
- unimplemented ! ( )
367
+ match self . connection {
368
+ SocketState :: Available ( ref mut conn) => Ok ( conn. send ( msg) . await ?) ,
369
+ SocketState :: PartiallyDelegated ( ref mut partially_delegated) => {
370
+ partially_delegated. send_without_response ( msg) . await
371
+ }
372
+ SocketState :: NotConnected => Err ( GatewayClientError :: ConnectionNotEstablished ) ,
373
+ _ => Err ( GatewayClientError :: ConnectionInInvalidState ) ,
374
+ }
288
375
}
289
376
290
377
pub async fn register ( & mut self ) -> Result < AuthToken , GatewayClientError > {
@@ -348,102 +435,59 @@ where
348
435
}
349
436
}
350
437
351
- // TODO: make it optionally use `send_websocket_message_without_response`
438
+ // TODO: possibly make responses optional
352
439
pub async fn send_sphinx_packet (
353
440
& mut self ,
354
441
address : SocketAddr ,
355
442
packet : Vec < u8 > ,
356
- ) -> Result < bool , GatewayClientError > {
443
+ ) -> Result < ( ) , GatewayClientError > {
357
444
if !self . authenticated {
358
445
return Err ( GatewayClientError :: NotAuthenticated ) ;
359
446
}
360
447
if !self . connection . is_established ( ) {
361
448
return Err ( GatewayClientError :: ConnectionNotEstablished ) ;
362
449
}
363
450
let msg = BinaryRequest :: new_forward_request ( address, packet) . into ( ) ;
364
- match self . send_websocket_message ( msg) . await ? {
365
- ServerResponse :: Send { status } => Ok ( status) ,
366
- ServerResponse :: Error { message } => Err ( GatewayClientError :: GatewayError ( message) ) ,
367
- _ => unreachable ! ( ) ,
368
- }
451
+ self . send_websocket_message_without_response ( msg) . await
369
452
}
370
453
371
454
async fn recover_socket_connection ( & mut self ) -> Result < ( ) , GatewayClientError > {
372
455
if self . connection . is_available ( ) {
373
456
return Ok ( ( ) ) ;
374
457
}
375
- if !self . connection . is_delegated ( ) {
458
+ if !self . connection . is_partially_delegated ( ) {
376
459
return Err ( GatewayClientError :: ConnectionInInvalidState ) ;
377
460
}
378
461
379
- let ( conn_fut, notify) = match std:: mem:: replace ( & mut self . connection , SocketState :: Invalid )
380
- {
381
- SocketState :: Delegated ( conn_fut, notify) => ( conn_fut, notify) ,
462
+ let conn = match std:: mem:: replace ( & mut self . connection , SocketState :: Invalid ) {
463
+ SocketState :: PartiallyDelegated ( delegated_conn) => delegated_conn. merge ( ) . await ?,
382
464
_ => unreachable ! ( ) ,
383
465
} ;
384
466
385
- // tell the future to wrap up whatever it's doing now
386
- notify. notify ( ) ;
387
- self . connection = SocketState :: Available ( conn_fut. await ?) ;
467
+ self . connection = SocketState :: Available ( conn) ;
388
468
Ok ( ( ) )
389
469
}
390
470
391
- // TODO: this can be potentially bad as we have no direct restrictions of ensuring it's called
392
- // within tokio runtime. Perhaps we should use the "old" way of passing explicit
393
- // runtime handle to the constructor and using that instead?
394
471
fn start_listening_for_sphinx_packets ( & mut self ) -> Result < ( ) , GatewayClientError > {
472
+ if self . connection . is_partially_delegated ( ) {
473
+ return Ok ( ( ) ) ;
474
+ }
395
475
if !self . connection . is_available ( ) {
396
476
return Err ( GatewayClientError :: ConnectionInInvalidState ) ;
397
477
}
398
478
399
- // when called for, it NEEDS TO yield back the stream so that we could merge it and
400
- // read control request responses.
401
- let notify = Arc :: new ( Notify :: new ( ) ) ;
402
- let notify_clone = Arc :: clone ( & notify) ;
403
-
404
- let mut extracted_connection =
479
+ let partially_delegated =
405
480
match std:: mem:: replace ( & mut self . connection , SocketState :: Invalid ) {
406
- SocketState :: Available ( conn) => conn,
407
- _ => unreachable ! ( ) , // impossible due to initial check
481
+ SocketState :: Available ( conn) => {
482
+ PartiallyDelegated :: split_and_listen_for_sphinx_packets (
483
+ conn,
484
+ self . sphinx_packet_sender . clone ( ) ,
485
+ ) ?
486
+ }
487
+ _ => unreachable ! ( ) ,
408
488
} ;
409
489
410
- let sphinx_packet_sender = self . sphinx_packet_sender . clone ( ) ;
411
- let sphinx_receiver_future = async move {
412
- let mut should_return = false ;
413
- while !should_return {
414
- tokio:: select! {
415
- _ = notify_clone. notified( ) => {
416
- should_return = true ;
417
- }
418
- msg = extracted_connection. next( ) => {
419
- if msg. is_none( ) {
420
- return Err ( GatewayClientError :: ConnectionAbruptlyClosed ) ;
421
- }
422
- let msg = match msg. unwrap( ) {
423
- Ok ( msg) => msg,
424
- Err ( err) => {
425
- return Err ( GatewayClientError :: NetworkError ( err) ) ;
426
- }
427
- } ;
428
- match msg {
429
- Message :: Binary ( bin_msg) => {
430
- sphinx_packet_sender. unbounded_send( vec![ bin_msg] ) . unwrap( )
431
- }
432
- _ => ( ) ,
433
- } ;
434
- }
435
- } ;
436
- }
437
- Ok ( extracted_connection)
438
- } ;
439
-
440
- let spawned_boxed_task = tokio:: spawn ( sphinx_receiver_future)
441
- . map ( |join_handle| {
442
- join_handle. expect ( "task must have not failed to finish its execution!" )
443
- } )
444
- . boxed ( ) ;
445
-
446
- self . connection = SocketState :: Delegated ( spawned_boxed_task, notify) ;
490
+ self . connection = SocketState :: PartiallyDelegated ( partially_delegated) ;
447
491
Ok ( ( ) )
448
492
}
449
493
}
0 commit comments