-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(lib): convert to futures 0.2.0-beta #1470
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow! Thanks for working on this big refactor!
Besides the comments I've left inline, one thing I wonder about is depending on the unstable-futures
feature of tokio. It's not clear to me that tokio plans to treat that feature with semver, so it might not be something to depend on directly.
I started on a futures-compat crate, to allow people to use libraries that haven't upgraded yet. I wonder if we make use of that until tokio's support is no longer unstable?
src/client/conn.rs
Outdated
@@ -186,7 +187,7 @@ where | |||
pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture { | |||
let inner = match self.dispatch.send(req) { | |||
Ok(rx) => { | |||
Either::A(rx.then(move |res| { | |||
Either::Left(rx.then(move |res| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be made slightly nicer in 0.2 with FutureExt::left
and FutureExt::right
, meaning we no longer need to import Either
, ya?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it, yeah!
- Use
.left()
and.right()
combinators
src/client/connect.rs
Outdated
} | ||
}, | ||
State::Resolving(ref mut future) => { | ||
match try!(future.poll()) { | ||
Async::NotReady => return Ok(Async::NotReady), | ||
match try!(future.poll(cx)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hadn't noticed before, this could just be try_ready!
, huh? Don't need to fix, but if you wanted 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries, I'll fix it up!
- use
try_ready!
instead of explicit match
src/client/mod.rs
Outdated
.and_then(move |(io, connected)| { | ||
conn::Builder::new() | ||
.h1_writev(h1_writev) | ||
.handshake_no_upgrades(io) | ||
.and_then(move |(tx, conn)| { | ||
executor.execute(conn.map_err(|e| debug!("client connection error: {}", e)))?; | ||
executor.execute(conn.then(|result| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's no FutureExt::recover
for this pattern, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, neat!
- use
recover
// XXX: should wait on the Checkout? Problem is | ||
// that if the connector is failing, it may be that we | ||
// never had a pooled stream at all | ||
ClientError::Normal(either.either(|(e, _)| e, |(e, _)| e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern is kind of confusing, could be fixed later though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, Either
used to have this useful split
, that's gone away in futures 0.2 unfortunately.
Maybe just replace it with a match
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can change this to use Either::factor_first
once rayon-rs/either#19 is merged!
src/client/mod.rs
Outdated
// | ||
// If the executor doesn't have room, oh well. Things will likely | ||
// be blowing up soon, but this specific task isn't required. | ||
let _ = executor.execute(future::poll_fn(move |cx| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check of poll_ready
before spawning on the executor was to try to let the connection be available for reuse earlier, which was noticeable when using tokio's runtime, since the task might end up on another thread and thus insert back into the pool after the user got a finished response.
If the problem was calling poll_
inside a combinator, I've been meaning to add a pub(crate) fn is_ready
to SendRequest
, that would simply check want
s atomic state without parking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into that!
- look into adding
SendRequest::is_ready
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Luckily my PR to add a Context
arg to future::lazy
just landed in time for futures 0.2 beta, so I fixed it using that instead!
src/proto/h1/io.rs
Outdated
// If there's more than one IoVec available, attempt to | ||
// determine the best buffering strategy based on whether | ||
// the underlying AsyncWrite object supports vectored I/O. | ||
if n == bufs[0].len() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, this is how I'd probably do this too. 👍
src/proto/h1/io.rs
Outdated
} else { | ||
loop { | ||
let n = try_ready!(self.io.write_buf(&mut self.write_buf.auto())); | ||
let (n, strategy) = try_ready!(write_buf(&mut self.io, cx, &mut self.write_buf)); | ||
if self.write_buf.strategy == Strategy::Auto { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can still be done with WriteBufAuto
, that allows only doing the strategy checking as long as it's still Auto
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you go into more detail about that? Why would we need WriteBufAuto
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mostly was thinking of these reasons:
- Allows the strategy swapping code to be separate from this, which is just wanting to write until blocked.
- The checks to determine what strategy to switch to could probably only have to happen when strategy it still auto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, cool! I can definitely factor out the strategy swapping code, I guess WriteBufAuto
would now implement some sort of write_to
method that takes a AsyncWrite
?
- factor out strategy swapping code
src/server/mod.rs
Outdated
.map_err(move |err| error!("server connection error: ({}) {}", addr, err)); | ||
spawn(fut); | ||
Ok(()) | ||
let fut = protocol.serve_connection(socket, s).then(move |result| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be FutureExt::recover
also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do!
- use
recover
src/server/mod.rs
Outdated
let service = new_service.new_service()?; | ||
let service = match new_service.new_service() { | ||
Ok(service) => service, | ||
Err(err) => return future::Either::Left(future::err(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this return Either
now? It's not immediately clear to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It returns Either
because further down we return the Spawn
future from the same function. The Spawn
future is lazy and needs to be polled to spawn the future (since it's using the executor from the context to spawn).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, weird! (That spawn
API feels a little odd, but not your fault :))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- change to use
.left()
and.right()
@@ -18,7 +18,7 @@ | |||
|
|||
extern crate bytes; | |||
#[macro_use] extern crate futures; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a recommendation that libraries shouldn't depend on futures
directly, but rather on the individual crates (like futures-core
, futures-channel
, etc). The idea is that hyper could keep working if some extension needed a breaking change, and we only ever publicly claimed to depend on futures-core
...
It'd be good to take inventory of what public dependencies we may be exposing here, either directly or indirectly:
futures-core
since things likeFutureResponse
implementFuture
.futures-io
since we acceptAsyncRead
andAsyncWrite
...- I think all usage of
futures-channel
is no longer publicly exposed, so not a public dependency, right? - While we use
FutureExt
internally, any types returned from it aren't viewable from outside the crate, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points! We have the same situation with tokio
, where I was gonna fix that up in another PR.
Your call whether to do it as part of this PR, or whether I should follow up with a combined futures
and tokio
PR for it!
I'll take a look at On the other hand, I would expect I'll look into the build errors tomorrow if you don't beat me to it! |
The |
6f0c507
to
77aaa89
Compare
Addressed a bunch of stuff, as well as fixed everything to work with the futures 0.2 beta release! @seanmonstar I was fighting the |
It's not immediately apparent to me why the 2/4 AppVeyor builds failed. Any pointers? |
Hm, when it just exists like that with an error code, my assumption is that the process was aborted (either a double panic, or segfault). I'll restart and see poke around. |
So, searching for the exit code, its a segfault basically. Derefencing a bad pointer. This might be fixed in tokio-rs/tokio#243 |
Still seeing issues while using Carl's |
@seanmonstar Nice work finding that use-after-free bug in tokio! 👏 I've targeted your branch with the fix, and at least locally the tests are running reliably now. Pushed the changes to also check with Travis and AppVeyor. |
@seanmonstar Addressed your feedback so far, I think this should be ready for a second round of review! Tests run smooth on OS X and Linux, however the AppVeyor builds seem to hang occasionally. The last thread of discussion that's still open was around whether to split the futures and tokio dependencies into dependencies on their subcrates. I'm happy to do it in this PR if you want me to, otherwise I can follow up with another PR once tokio and futures stabilise a bit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
\o/
src/client/dns.rs
Outdated
.map(|i| IpAddrs { iter: i }); | ||
sender.send(result).ok(); | ||
Ok(()) | ||
}))).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the receiver will return an error if the sender was dropped, so we should be able to just ignore this spawn error, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
- ignore dns spawn error
src/client/mod.rs
Outdated
@@ -85,9 +86,9 @@ impl<C, B> Client<C, B> { | |||
fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> { | |||
Client { | |||
connector: Arc::new(config.connector), | |||
executor: exec, | |||
executor: exec.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to completely remove the executor field of the client, and always use cx
.
(HttpConnector
should still own its own executor, but that's because it needs to spawn blocking tasks. It should be enough to allow configuring the HttpConnector
manually.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- remove custom executor
All done! I'm using my branch of futures momentarily until rust-lang/futures-rs#902 is merged in some form. |
@srijs Awesome work! I started a checklist in the original description to track what is blocking merging this. I realized I didn't know all of what that is, if you'd like to add to this list... |
@seanmonstar Good stuff! I added 2 PRs that we should get merged, one of them is seanmonstar/want#1. As for other merge blockers, would you consider anything that needs a release (and is sourced from git right now) a blocker, or is that okay to fix up later as the new versions get published? |
OK, so I'm thinking: screw it, we just merge this, and track that other stuff in new issues. Having it stagnate here doesn't help, and prevents working on newer things that need these changes. So, unless there's objections, I'll just merge as-is in a couple hours :D |
This reverts commit a12f7be. Much sadness 😢.
This reverts commit a12f7be. Much sadness 😢.
@seanmonstar @srijs why was this work reverted? |
Fixes #1448.
Noteworthy changes:
tokio-service
, instead copied over theService
andNewService
traits for now. I looked intotower
, but they're still on futures 0.1, and also made some changes to theService
trait.Pool::spawn_expired_interval
, but needed aSend
-able executor instead.proto::h1::io
to detect vectored I/O needed to adapt to changes in theAsyncWrite
trait, I removedWriteBufAuto
and am now relying on the result ofpoll_vectored_write
instead.Aside from that, relatively straight-forward changes. Oh, and of course...
Blockers to merging:
with_executor
combinator in futures