-
Notifications
You must be signed in to change notification settings - Fork 227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update Tokio to 1.0, Hyper to 0.14, Prost to 0.7 and Bytes to 1.0 #783
Conversation
… to 0.12 and bytes to 1.0
loop { | ||
let next_msg = tokio::time::timeout(RECV_TIMEOUT, self.stream.next()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thanethomson This uses RECV_TIMEOUT
as the timeout for the next message, instead of PING_INTERVAL
. Can you please confirm that this is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thanethomson I've also had to implement the timeout in a different way, because Sleep::reset
now requires the self-type Pin<&mut Sleep>
instead of just &mut Sleep
, and thus takes ownership of the pinned Sleep
value, which makes it impossible to .await
it later in the loop.
You can see the error for yourself if you replace the whole run
method with
/// Executes the WebSocket driver, which manages the underlying WebSocket
/// transport.
pub async fn run(mut self) -> Result<()> {
let mut ping_interval =
tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL);
let recv_timeout = tokio::time::sleep(RECV_TIMEOUT);
tokio::pin!(recv_timeout);
loop {
tokio::select! {
Some(res) = self.stream.next() => match res {
Ok(msg) => {
// Reset the receive timeout every time we successfully
// receive a message from the remote endpoint.
recv_timeout.reset(Instant::now().add(RECV_TIMEOUT));
self.handle_incoming_msg(msg).await?
},
Err(e) => return Err(
Error::websocket_error(
format!("failed to read from WebSocket connection: {}", e),
),
),
},
Some(cmd) = self.cmd_rx.recv() => match cmd {
DriverCommand::Subscribe(subs_cmd) => self.subscribe(subs_cmd).await?,
DriverCommand::Unsubscribe(unsubs_cmd) => self.unsubscribe(unsubs_cmd).await?,
DriverCommand::SimpleRequest(req_cmd) => self.simple_request(req_cmd).await?,
DriverCommand::Terminate => return self.close().await,
},
_ = ping_interval.tick() => self.ping().await?,
_ = &mut recv_timeout => {
return Err(Error::websocket_error(format!(
"reading from WebSocket connection timed out after {} seconds",
RECV_TIMEOUT.as_secs()
)));
}
}
}
}
error[E0382]: borrow of moved value: `recv_timeout`
--> rpc/src/client/transport/websocket.rs:330:21
|
306 | tokio::pin!(recv_timeout);
| -------------------------- move occurs because `recv_timeout` has type `std::pin::Pin<&mut tokio::time::Sleep>`, which does not implement the `Copy` trait
...
314 | recv_timeout.reset(Instant::now().add(RECV_TIMEOUT));
| ------------ value moved here, in previous iteration of loop
...
330 | _ = &mut recv_timeout => {
| ^^^^^^^^^^^^^^^^^ value borrowed here after move
error[E0382]: borrow of moved value: `recv_timeout`
--> rpc/src/client/transport/websocket.rs:330:21
|
306 | tokio::pin!(recv_timeout);
| -------------------------- move occurs because `recv_timeout` has type `std::pin::Pin<&mut tokio::time::Sleep>`, which does not implement the `Copy` trait
...
314 | recv_timeout.reset(Instant::now().add(RECV_TIMEOUT));
| ------------ value moved here, in previous iteration of loop
...
330 | _ = &mut recv_timeout => {
| ^^^^^^^^^^^^^^^^^ value borrowed here after move
Please let me know what you think of the new solution with the timeout
combinator :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thanethomson What do you think about introducing an enum to improve the readability of the client loop (ie. use explicit names instead of nested patterns like Ok(Some(Ok(_)))
):
pub async fn run(mut self) -> Result<()> {
let mut ping_interval =
tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL);
enum NextMsg {
Ok(Message),
StreamEnded,
Timeout,
WebSocketError(async_tungstenite::tungstenite::Error),
}
loop {
let next_msg =
tokio::time::timeout(RECV_TIMEOUT, self.stream.next()).map(|res| match res {
Ok(Some(Ok(msg))) => NextMsg::Ok(msg),
Ok(Some(Err(e))) => NextMsg::WebSocketError(e),
Ok(None) => NextMsg::StreamEnded,
Err(_) => NextMsg::Timeout,
});
tokio::select! {
res = next_msg => match res {
NextMsg::Ok(msg) => {
self.handle_incoming_msg(msg).await?
},
NextMsg::StreamEnded => {
// Websocket stream is over, let's continue in case
// we still receive commands via the `cmd_rx` channel.
continue;
},
NextMsg::Timeout => return Err(Error::websocket_error(format!(
"reading from WebSocket connection timed out after {} seconds",
RECV_TIMEOUT.as_secs()
))),
NextMsg::WebSocketError(e) => return Err(Error::websocket_error(
format!("failed to read from WebSocket connection: {}", e),
)),
},
Some(cmd) = self.cmd_rx.recv() => match cmd {
DriverCommand::Subscribe(subs_cmd) => self.subscribe(subs_cmd).await?,
DriverCommand::Unsubscribe(unsubs_cmd) => self.unsubscribe(unsubs_cmd).await?,
DriverCommand::SimpleRequest(req_cmd) => self.simple_request(req_cmd).await?,
DriverCommand::Terminate => return self.close().await,
},
_ = ping_interval.tick() => self.ping().await?,
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@romac you're right, the recv_timeout
should've been reset with RECV_TIMEOUT
and not PING_INTERVAL
.
But with regard to the timeout combinator, correct me if I'm wrong but it looks like it changes the behaviour of the loop. The expected behaviour is that the recv_timeout
must only be reset when we receive a message.
The new behaviour seems to reset the timeout on each iteration of the loop, which would also then happen when we (1) receive any command, and (2) send out a ping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But with regard to the timeout combinator, correct me if I'm wrong but it looks like it changes the behaviour of the loop. The expected behaviour is that the recv_timeout must only be reset when we receive a message.
Yeah absolutely, I understood the original semantics but somehow didn't realize that this would change the behavior.
Thankfully, I managed to restore the original behavior in 4778810. They key was to use Pin::as_mut
to reborrow the pinned &mut Sleep
in order to call reset
on it.
@@ -586,7 +586,7 @@ mod test { | |||
loop { | |||
tokio::select! { | |||
Some(ev) = self.event_rx.recv() => self.publish_event(ev), | |||
Some(res) = self.listener.next() => self.handle_incoming(res.unwrap()).await, | |||
Ok((stream, _)) = self.listener.accept() => self.handle_incoming(stream).await, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thanethomson This is imho cleaner as it avoids an unwrap
but will silently discard errors when accepting a new connection whereas the previous solution would panic. Does this work for you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In testing code I'd generally prefer .unwrap()
(personal preference). Absolutely nothing should go wrong here, and the test should break clearly and definitively if something fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense! Fixed in f8a37a6.
Some(Ok(msg)) = self.conn.next() => { | ||
if let Some(ret) = self.handle_incoming_msg(msg).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thanethomson Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restored panicking behavior in f84e6aa
Kind::OutOfRange.context(format!("time before EPOCH by {} seconds", e.as_secs())) | ||
})? | ||
.into()) | ||
Ok(SystemTime::from(prost_value).into()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conversion from a prost Timestamp
to SystemTime
is now infallible because prost_types::Timestamp
now provides a Into<SystemTime>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
Codecov Report
@@ Coverage Diff @@
## master #783 +/- ##
========================================
- Coverage 50.6% 50.4% -0.2%
========================================
Files 197 197
Lines 13592 13607 +15
Branches 3259 3266 +7
========================================
- Hits 6879 6870 -9
- Misses 6452 6475 +23
- Partials 261 262 +1
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exciting times! 🕺🎉
Close: #764
This PR updates the following dependencies:
tokio
to1.0
hyper
to0.14
prost
to0.7
bytes
to1.0
async-tungstenite
to0.12