Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API to access hyper::Upgrade #2488

Closed
wants to merge 2 commits into from

Conversation

Mai-Lapyst
Copy link
Contributor

@Mai-Lapyst Mai-Lapyst commented Mar 10, 2023

This PR adds an API to access the hyper::Upgrade object from a rocket application. It's isolated from my combined PR with also adds websocket support in #2466 . This might also help to solve issues similar to the one in #90 .

The new API can be used like this:

struct TestSocket {}

#[crate::async_trait]
impl Upgrade<'static> for TestSocket {
  async fn start(&self, upgraded: crate::http::hyper::upgrade::Upgraded) {
    // can fully use the hyper::upgrade::Upgraded struct
  }
}

impl<'r, F> Responder<'r, 'r> for TestSocket {
  fn respond_to(self, req: &'r Request<'_>) -> response::Result<'r> {
    Response::build()
      .status(Status::SwitchingProtocols)
      .raw_header("Connection", "upgrade")
      .raw_header("Upgrade", "testsocket")
      .upgrade(Some(Box::new(self)))
      .ok()
  }
}

The main thing to do in order to use the API is to set the status to Status::SwitchingProtocols, and to call .upgrade on the response::Builder (or to set it via .set_upgrade when you have a mutable response::Response).

let mut response = rocket.dispatch(token, &req, data).await;

if response.status() == Status::SwitchingProtocols {
let may_upgrade = response.upgrade_mut().take();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Upgrade::start require a reference to self, why take() the value and not just use upgrade() to get a reference ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi and thanks for the feedback!

upgrade_mut().take() is needed here because inside the match send_response(response,tx) is called, which we need so the hyper::Upgrade struct is actually valid, but it dosnt take a reference for the reponse and as such moves the response out of scope. With take(), the Upgrade struct is taken out of the response and the code can own it as long as it needs it.

Sure, one possiblity might be here to change send_response() so it takes a ref. Tested it and it always errors with that the async block in hyper_service_fn() is not Send:

error: future cannot be sent between threads safely
   --> /home/mai/projects/github/Rocket/core/lib/src/server.rs:74:18
    |
74  |       tokio::spawn(async move {
    |  __________________^
75  | |         // Upgrade before do any other; we handle errors below
76  | |         let hyp_upgraded = hyper::upgrade::on(&mut hyp_req);
77  | |
...   |
132 | |         }
133 | |     });
    | |_____^ future created by async block is not `Send`
    |
    = help: the trait `Sync` is not implemented for `dyn tokio::io::AsyncRead + std::marker::Send`
note: future is not `Send` as this value is used across an await
   --> /home/mai/projects/github/Rocket/core/lib/src/server.rs:154:49
    |
144 |         response: &Response<'_>,
    |         -------- has type `&response::response::Response<'_>` which is not `Send`
...
154 |         match self._send_response(&response, tx).await {
    |                                                 ^^^^^^ await occurs here, with `response` maybe used later
...
159 |     }
    |     - `response` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/mai/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a bit of refactoring, it's possible to don't use take nor having to expose a mutable getter upgrade_mut.

if response.upgrade().is_some() {
    // send the finishing response; needed so that hyper can upgrade the request
    rocket.send_response(response, tx).await;

    let upgrade = response.upgrade().unwrap();

    /* rest of the code */
} else {
    error_!("Status is 101 switching protocols, but response dosn't hold a upgrade");
    let response = rocket.handle_error(Status::InternalServerError, &req).await;
    rocket.send_response(response, tx).await;
}

Copy link
Contributor Author

@Mai-Lapyst Mai-Lapyst Mar 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly when adding your propasal, the same error but at different place arises:

Error log
error: future cannot be sent between threads safely
   --> /home/mai/projects/github/Rocket/core/lib/src/server.rs:74:18
    |
74  |       tokio::spawn(async move {
    |  __________________^
75  | |         // Upgrade before do any other; we handle errors below
76  | |         let hyp_upgraded = hyper::upgrade::on(&mut hyp_req);
77  | |
...   |
155 | |         }
156 | |     });
    | |_____^ future created by async block is not `Send`
    |
    = help: the trait `Sync` is not implemented for `dyn Upgrade<'_> + std::marker::Send`
note: future is not `Send` as this value is used across an await
   --> /home/mai/projects/github/Rocket/core/lib/src/server.rs:95:43
    |
93  |                         let upgrade = response.upgrade().unwrap();
    |                             ------- has type `&Box<dyn Upgrade<'_> + std::marker::Send>` which is not `Send`
94  |
95  |                         match hyp_upgraded.await {
    |                                           ^^^^^^ await occurs here, with `upgrade` maybe used later
...
109 |                     } else {
    |                     - `upgrade` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/mai/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`

When commenting out the whole match hyp_upgraded.await it becomes clear, that an borrow of moved value error is the real culprit:

error[E0382]: borrow of moved value: `response`
  --> /home/mai/projects/github/Rocket/core/lib/src/server.rs:93:39
   |
85 |                 let mut response = rocket.dispatch(token, &req, data).await;
   |                     ------------ move occurs because `response` has type `response::response::Response<'_>`, which does not implement the `Copy` trait
...
91 |                         rocket.send_response(response, tx).await;
   |                                              -------- value moved here
92 |
93 |                         let upgrade = response.upgrade().unwrap();
   |                                       ^^^^^^^^^^^^^^^^^^ value borrowed here after move

Out of curiosity: Whats so bad with take() anyway? I mean send_response() certainly dosnt need the Upgrade and afterwards the code has the ownage about it so it's also get correctly dropped.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops I forgot the await. You could try response.upgrade().unwrap().start(hyp_upgraded).

Out of curiosity: Whats so bad with take() anyway? I mean send_response() certainly dosnt need the Upgrade and afterwards the code has the ownage about it so it's also get correctly dropped.

Using take() introduce API that alter the response were it could be avoided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using response.upgrade().unwrap().start(hyp_upgraded) which creates the following code, it still dosnt compile:

let mut response = rocket.dispatch(token, &req, data).await;

if response.status() == Status::SwitchingProtocols {

if response.upgrade().is_some() {
    // send the finishing response; needed so that hyper can upgrade the request
    rocket.send_response(response, tx).await;

    match hyp_upgraded.await {
        Ok(hyp_upgraded) => {
            // let the upgrade take the upgraded hyper request
            let fu = response.upgrade().unwrap().start(hyp_upgraded);
            fu.await;
        }
        Err(e) => {
            error_!("Failed to upgrade request: {e}");
            // NOTE: we *should* send a response here but since we send one earlier AND upgraded the request,
            //       this cannot be done easily at this point...
            // let response = rocket.handle_error(Status::InternalServerError, &req).await;
            // rocket.send_response(response, tx).await;
        }
    }
} else {
    error_!("Status is 101 switching protocols, but response dosn't hold a upgrade");
    let response = rocket.handle_error(Status::InternalServerError, &req).await;
    rocket.send_response(response, tx).await;
}

Error:

error[E0382]: borrow of moved value: `response`
  --> /home/mai/projects/github/Rocket/core/lib/src/server.rs:96:42
   |
85 |                 let response = rocket.dispatch(token, &req, data).await;
   |                     -------- move occurs because `response` has type `response::response::Response<'_>`, which does not implement the `Copy` trait
...
91 |                         rocket.send_response(response, tx).await;
   |                                              -------- value moved here
...
96 |                                 let fu = response.upgrade().unwrap().start(hyp_upgraded);
   |                                          ^^^^^^^^^^^^^^^^^^ value borrowed here after move

For more information about this error, try `rustc --explain E0382`.

I tink the only possibility is either to move / take out the upgrade out of the response or to rework send_response to borrow the response instead of moving it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the only solution is to use take 🤔 .
IMHO having take_upgrade as proposed in the other discussion with a note in the documentation that explains it take the ownership of the Upgrade value is better than exposing &mut Option<...> in terms of API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option of using take_upgrade also seems best to me. Added a commit.

Mai-Lapyst added a commit to Mai-Lapyst/Rocket that referenced this pull request Mar 27, 2023
@SergioBenitez
Copy link
Member

SergioBenitez commented Mar 28, 2023

I think it's a great idea to expose enough mechanism to allow external implementations of websocket support for Rocket while we iron out our own internal support.

There are two pieces to the puzzle:

  1. Recognize that the HTTP request as an upgrade request. Determine if we should upgrade based on headers and respond back with an upgrade response if so.
  2. Pull out the upgraded I/O stream and allow raw read/writes.

I would like any such support to be independent of types in hyper. This lets us upgrade hyper under the hood without a breaking change in rocket.

Here's what I imagine a successful implementation of support for upgrades could enable in an external library that implements websocket support for Rocket:

/// `WebSocket` is a request guard that checks the following:
///    * The `Connection` header is `Upgrade`.
///    * The `Upgrade` header is `websocket`
///    * Verifies the various `Sec-` headers.
///
/// `Channel` is a responder that:
///   * Accepts the upgrade request if `ws` is acceptable.
///   * Declines the upgrade request otherwise.
///   * Performs the relavant websocket I/O.
#[get("/echo")]
fn ws_echo(ws: WebSocket) -> Channel {
    // Borrowing from the streams API. This is all implemented externally.
    ws::channel! { ws =>
        for await message in ws {
            yield message;
        }
    })
}

Here's the WebSocket FromRequest impl:

#[rocket::async_trait]
impl<'r> FromRequest<'r> for WebSocket {
    type Error = std::convert::Infallible;

    async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> {
        use rocket::http::uncased::eq;

        let headers = req.headers();
        match (headers.get_one("connection"), headers.get_one("upgrade")) {
            (Some(c), Some(u)) if eq(c, "upgrade") && eq(u, "websocket") => {
                // Check `Sec-` headers, determine if we can handle it.
                Outcome::Success(WebSocket)
            }
            _ => Outcome::Forward(())
        }
    }
}

And finally the Channel impl:

impl From<WebSocket> for Channel {
    pub fn from(ws: WebSocket) -> Channel {
        // convert into a responder. if `FromRequest` is the only constructor,
        // then we know the request was a valid websocket upgrade
        Channel(self)
    }
}

impl<'r> Responder<'r, 'static> for Channel {
    fn respond_to(self, req: &'r Request<'_>) -> response::Result<'static> {
        // perform the upgrade to `websocket`, using `self` as the I/O handler
        Response::build()
            .header(self.ws.sec_foo) // pull out info from inner `WebSocket`
            // Asks Rocket to send over the 101 with `Upgrade: websocket`
            // and `Connection: upgrade`. Overrides any other `Status`,
            // `Upgrade`, or `Connection` headers that were set.
            .upgrade("websocket", self)
            .ok()
    }
}

#[crate::async_trait]
impl IoHandler for Channel {
    // `IoStream` is a Rocket type that is `AsyncRead` and `AsyncWrite`. It
    // probably just wraps `Upgraded`, but we don't expose that.
    async fn io(self, io: IoStream) -> io::Result<()> {
        // use inner `self.channel` to read messages from the Rocket app's
        // stream and perform I/O with the connected client.
        Ok(())
    }
}

This means that the only things we need to support in Rocket are:

  1. The ResponseBuilder::upgrade() method and accompanying Response::set_upgrade() method.
  2. The IoHandler trait.
  3. The IoStream type.

This is nearly identical, in spirit, to what you've proposed here, however the signatures need to change to be more usable, and the Upgraded type needs to be wrapped.

impl Response<'r> {
    pub fn set_upgrade<H: IoHandler + Send + 'r>(&mut self, handler: H);
}

impl Builder<'r> {
    pub fn upgrade<H: IoHandler + Send + 'r>(&mut self, handler: H);
}

Finally, the logic during response handling needs to be moved into a separate method so that we don't pollute that section of code.

@SergioBenitez
Copy link
Member

SergioBenitez commented Mar 29, 2023

@Mai-Lapyst Just wanted to check in and see if this is something you can take on in the next day or so. If not, I have some bandwidth tomorrow and can push it across the finish line.

@SergioBenitez SergioBenitez self-assigned this Mar 29, 2023
@SergioBenitez
Copy link
Member

I got excited and am taking this on now. I really like this idea. Hope to push the merged changes soon!

@SergioBenitez
Copy link
Member

Okay, I've finished the implementation I've detailed above, and it works! Pushing to this branch soon.

@SergioBenitez
Copy link
Member

SergioBenitez commented Mar 30, 2023

Landed in 19e7e82! Thank you so much for this! This is an excellent idea.

@initprism
Copy link

omg. I finally have a reason to migrate to Rocket. I look forward to releasing this version soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants