-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
client's main select loop sometimes loops without selecting any of the branches #153
Comments
Macro expansion of the loop: loop {
let mut read_buf: [u8; 1024] = [0; 1024];
{
enum ProcMacroHack {
Nested = ( "futures_crate_path (:: futures) cmd = rcv_cmd . next () =>\n{\n match cmd\n {\n None =>\n {\n debug !\n (\"main loop: command channel terminated from the other end\") ;\n } Some (Cmd :: Msg (irc_msg)) =>\n { snd_msg . try_send (irc_msg) . unwrap () ; } Some\n (Cmd :: Reconnect (mb_port)) =>\n {\n if let Some (new_port) = mb_port { port = new_port ; } wait =\n false ; continue \'connect ;\n } Some (Cmd :: Quit (reason)) =>\n { snd_msg . try_send (wire :: quit (reason)) . unwrap () ; return ; }\n }\n} bytes = read_half . read (& mut read_buf) . fuse () =>\n{\n match bytes\n {\n Err (io_err) =>\n {\n debug !\n (\"main loop: error when reading from socket: {:?}\", io_err) ;\n snd_ev . send (Event :: IoErr (io_err)) . await . unwrap () ;\n snd_ev . send (Event :: Disconnected) . await . unwrap () ; wait =\n true ; continue \'connect ;\n } Ok (bytes) =>\n {\n parse_buf . extend_from_slice (& read_buf [0 .. bytes]) ; while\n let Some (mut msg) = wire :: parse_irc_msg (& mut parse_buf)\n {\n debug ! (\"parsed msg: {:?}\", msg) ; pinger . reset () ;\n irc_state . update (& mut msg, & mut snd_ev, & mut snd_msg) ;\n snd_ev . send (Event :: Msg (msg)) . await . unwrap () ;\n }\n }\n }\n} ping_ev = rcv_ping_evs_fused . next () =>\n{\n match ping_ev\n {\n None => { debug ! (\"Ping thread terminated unexpectedly???\") ; } Some\n (pinger :: Event :: SendPing) =>\n { irc_state . send_ping (& mut snd_msg) ; } Some\n (pinger :: Event :: Disconnect) =>\n {\n snd_ev . send (Event :: Disconnected) . await . unwrap () ; wait =\n true ; continue \'connect ;\n }\n }\n}" , 0 ) . 1 , }
{
enum __PrivResult<_0, _1, _2> {
_0(_0),
_1(_1),
_2(_2),
}
let mut _0 = rcv_cmd.next();
let mut _1 = read_half.read(&mut read_buf).fuse();
let mut _2 = rcv_ping_evs_fused.next();
let mut __poll_fn = |__cx: &mut ::futures::task::Context<'_>| {
let mut __any_polled = false;
let mut _0 = |__cx: &mut ::futures::task::Context<'_>| {
if ::futures::future::FusedFuture::is_terminated(&_0) {
None
} else {
Some(
::futures::future::FutureExt::poll_unpin(&mut _0, __cx)
.map(__PrivResult::_0),
)
}
};
let _0: &mut dyn FnMut(
&mut ::futures::task::Context<'_>,
)
-> Option<::futures::task::Poll<_>> = &mut _0;
let mut _1 = |__cx: &mut ::futures::task::Context<'_>| {
if ::futures::future::FusedFuture::is_terminated(&_1) {
None
} else {
Some(
::futures::future::FutureExt::poll_unpin(&mut _1, __cx)
.map(__PrivResult::_1),
)
}
};
let _1: &mut dyn FnMut(
&mut ::futures::task::Context<'_>,
)
-> Option<::futures::task::Poll<_>> = &mut _1;
let mut _2 = |__cx: &mut ::futures::task::Context<'_>| {
if ::futures::future::FusedFuture::is_terminated(&_2) {
None
} else {
Some(
::futures::future::FutureExt::poll_unpin(&mut _2, __cx)
.map(__PrivResult::_2),
)
}
};
let _2: &mut dyn FnMut(
&mut ::futures::task::Context<'_>,
)
-> Option<::futures::task::Poll<_>> = &mut _2;
let mut __select_arr = [_0, _1, _2];
::futures::async_await::shuffle(&mut __select_arr);
for poller in &mut __select_arr {
let poller: &mut &mut dyn FnMut(
&mut ::futures::task::Context<'_>,
)
-> Option<::futures::task::Poll<_>> = poller;
match poller(__cx) {
Some(x @ ::futures::task::Poll::Ready(_)) => return x,
Some(::futures::task::Poll::Pending) => {
__any_polled = true;
}
None => {}
}
}
if !__any_polled {
{
::std::rt::begin_panic(
"all futures in select! were completed,\
but no `complete =>` handler was provided",
&("libtiny_client/src/lib.rs", 471u32, 13u32),
)
}
} else {
::futures::task::Poll::Pending
}
};
match ::futures::future::poll_fn(__poll_fn).await {
__PrivResult::_0(cmd) => match cmd {
None => {
{
let lvl = ::log::Level::Debug;
if lvl <= ::log::STATIC_MAX_LEVEL && lvl <= ::log::max_level() {
:: log :: __private_api_log ( :: core :: fmt :: Arguments :: new_v1 ( & [ "main loop: command channel terminated from the other end" ] , & match ( ) { ( ) => [ ] , } ) , lvl , & ( "libtiny_client" , "libtiny_client" , "libtiny_client/src/lib.rs" , 471u32 ) ) ;
}
};
}
Some(Cmd::Msg(irc_msg)) => {
snd_msg.try_send(irc_msg).unwrap();
}
Some(Cmd::Reconnect(mb_port)) => {
if let Some(new_port) = mb_port {
port = new_port;
}
wait = false;
continue 'connect;
}
Some(Cmd::Quit(reason)) => {
snd_msg.try_send(wire::quit(reason)).unwrap();
return;
}
},
__PrivResult::_1(bytes) => {
match bytes {
Err(io_err) => {
{
let lvl = ::log::Level::Debug;
if lvl <= ::log::STATIC_MAX_LEVEL
&& lvl <= ::log::max_level()
{
:: log :: __private_api_log ( :: core :: fmt :: Arguments :: new_v1 ( & [ "main loop: error when reading from socket: " ] , & match ( & io_err , ) { ( arg0 , ) => [ :: core :: fmt :: ArgumentV1 :: new ( arg0 , :: core :: fmt :: Debug :: fmt ) ] , } ) , lvl , & ( "libtiny_client" , "libtiny_client" , "libtiny_client/src/lib.rs" , 471u32 ) ) ;
}
};
snd_ev.send(Event::IoErr(io_err)).await.unwrap();
snd_ev.send(Event::Disconnected).await.unwrap();
wait = true;
continue 'connect;
}
Ok(bytes) => {
parse_buf.extend_from_slice(&read_buf[0..bytes]);
while let Some(mut msg) = wire::parse_irc_msg(&mut parse_buf) {
{
let lvl = ::log::Level::Debug;
if lvl <= ::log::STATIC_MAX_LEVEL
&& lvl <= ::log::max_level()
{
::log::__private_api_log(
::core::fmt::Arguments::new_v1(
&["parsed msg: "],
&match (&msg,) {
(arg0,) => {
[::core::fmt::ArgumentV1::new(
arg0,
::core::fmt::Debug::fmt,
)]
}
},
),
lvl,
&(
"libtiny_client",
"libtiny_client",
"libtiny_client/src/lib.rs",
471u32,
),
);
}
};
pinger.reset();
irc_state.update(&mut msg, &mut snd_ev, &mut snd_msg);
snd_ev.send(Event::Msg(msg)).await.unwrap();
}
}
}
}
__PrivResult::_2(ping_ev) => match ping_ev {
None => {
{
let lvl = ::log::Level::Debug;
if lvl <= ::log::STATIC_MAX_LEVEL && lvl <= ::log::max_level() {
::log::__private_api_log(
::core::fmt::Arguments::new_v1(
&["Ping thread terminated unexpectedly???"],
&match () {
() => [],
},
),
lvl,
&(
"libtiny_client",
"libtiny_client",
"libtiny_client/src/lib.rs",
471u32,
),
);
}
};
}
Some(pinger::Event::SendPing) => {
irc_state.send_ping(&mut snd_msg);
}
Some(pinger::Event::Disconnect) => {
snd_ev.send(Event::Disconnected).await.unwrap();
wait = true;
continue 'connect;
}
},
}
}
}
} |
I don't know if this is the problem, but I think there's one problem in this line: The fusing part should be out of the loop, as also described in |
I asked about this in https://users.rust-lang.org/t/tokio-fusing-an-asyncreadext-read-future/33695 |
On my laptop, after waking up from a suspend, this loop in libtiny_client sometimes loops forever without selecting any of the branches:
Basically the
select!
doesn't really select any of the branches, and doesn't panic either. In the next loop it does the same, causing this to loop forever without doing anything.Not sure if I'm misusing
select!
here, or this is aselect!
bug.It's also a bit hard to reproduce. For example, if I hibernate my desktop and run it again this does not happen. Only on my laptop I was able to reproduce this.
The text was updated successfully, but these errors were encountered: