Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #2 - Port HTTP/2 support. #3

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ readme = "README.md"
version = "0.4.0"
edition = "2021"

[features]
http2 = ["hyper/http2"]

[dependencies]
async-trait = "0.1"
axum-core = "0.4"
axum-core = "0.5.0"
base64 = "0.22"
bytes = "1"
futures-util = { version = "0.3", features = ["sink"] }
Expand Down
11 changes: 11 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ pub enum WebSocketError {
ConnectionNotUpgradeable,
Internal(tokio_websockets::Error),
InvalidConnectionHeader,
/// For WebSocket over HTTP/2+
InvalidProtocolPseudoheader,
InvalidUpgradeHeader,
InvalidWebSocketVersionHeader,
/// Invalid method for WebSocket over HTTP/1.x
MethodNotGet,
/// Invalid method for WebSocket over HTTP/2+
MethodNotConnect,
UpgradeFailed(hyper::Error),
}

Expand All @@ -38,6 +43,9 @@ impl Display for WebSocketError {
WebSocketError::InvalidConnectionHeader => {
write!(f, "invalid `Connection` header")
}
WebSocketError::InvalidProtocolPseudoheader => {
write!(f, "invalid `:protocol` pseudoheader")
}
WebSocketError::InvalidUpgradeHeader => {
write!(f, "invalid `Upgrade` header")
}
Expand All @@ -47,6 +55,9 @@ impl Display for WebSocketError {
WebSocketError::MethodNotGet => {
write!(f, "http request method must be `GET`")
}
WebSocketError::MethodNotConnect => {
write!(f, "http2 request method must be `CONNECT`")
}
WebSocketError::UpgradeFailed(e) => {
write!(f, "upgrade failed: {}", e)
}
Expand Down
106 changes: 70 additions & 36 deletions src/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::future::Future;

use async_trait::async_trait;
use axum_core::body::Body;
use axum_core::extract::FromRequestParts;
use axum_core::response::Response;
use http::request::Parts;
use http::{header, HeaderMap, HeaderName, HeaderValue, Method, StatusCode};
use http::{header, HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Version};
use hyper_util::rt::TokioIo;
use sha1::Digest;
use tokio_websockets::{Config, Limits};
Expand Down Expand Up @@ -38,7 +37,8 @@ pub struct WebSocketUpgrade<F = DefaultOnFailedUpgrade> {
config: Config,
limits: Limits,
protocol: Option<HeaderValue>,
sec_websocket_key: HeaderValue,
/// `None` if HTTP/2+ WebSockets are used.
sec_websocket_key: Option<HeaderValue>,
on_upgrade: hyper::upgrade::OnUpgrade,
on_failed_upgrade: F,
sec_websocket_protocol: Option<HeaderValue>,
Expand All @@ -55,36 +55,57 @@ impl<F> std::fmt::Debug for WebSocketUpgrade<F> {
}
}

#[async_trait]
impl<S> FromRequestParts<S> for WebSocketUpgrade<DefaultOnFailedUpgrade>
where
S: Send + Sync,
{
type Rejection = WebSocketError;

async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
if parts.method != Method::GET {
return Err(WebSocketError::MethodNotGet);
}

if !header_contains(&parts.headers, header::CONNECTION, "upgrade") {
return Err(WebSocketError::InvalidConnectionHeader);
}

if !header_eq(&parts.headers, header::UPGRADE, "websocket") {
return Err(WebSocketError::InvalidUpgradeHeader);
}
let sec_websocket_key = if parts.version <= Version::HTTP_11 || cfg!(not(feature = "http2"))
{
if parts.method != Method::GET {
return Err(WebSocketError::MethodNotGet);
}

if !header_contains(&parts.headers, header::CONNECTION, "upgrade") {
return Err(WebSocketError::InvalidConnectionHeader);
}

if !header_eq(&parts.headers, header::UPGRADE, "websocket") {
return Err(WebSocketError::InvalidUpgradeHeader);
}

let sec_websocket_key = parts
.headers
.get(header::SEC_WEBSOCKET_KEY)
.ok_or(WebSocketError::InvalidWebSocketVersionHeader)?
.clone();

Some(sec_websocket_key)
} else {
if parts.method != Method::CONNECT {
return Err(WebSocketError::MethodNotConnect);
}

// if this feature flag is disabled, we won’t be receiving an HTTP/2 request to begin
// with.
#[cfg(feature = "http2")]
if parts
.extensions
.get::<hyper::ext::Protocol>()
.map_or(true, |p| p.as_str() != "websocket")
{
return Err(WebSocketError::InvalidProtocolPseudoheader);
}

None
};

if !header_eq(&parts.headers, header::SEC_WEBSOCKET_VERSION, "13") {
return Err(WebSocketError::InvalidWebSocketVersionHeader);
}

let sec_websocket_key = parts
.headers
.get(header::SEC_WEBSOCKET_KEY)
.ok_or(WebSocketError::InvalidWebSocketVersionHeader)?
.clone();

let on_upgrade = parts
.extensions
.remove::<hyper::upgrade::OnUpgrade>()
Expand Down Expand Up @@ -163,25 +184,38 @@ impl<F> WebSocketUpgrade<F> {
callback(socket).await;
});

#[allow(clippy::declare_interior_mutable_const)]
const UPGRADE: HeaderValue = HeaderValue::from_static("upgrade");
#[allow(clippy::declare_interior_mutable_const)]
const WEBSOCKET: HeaderValue = HeaderValue::from_static("websocket");

let mut builder = Response::builder()
.status(StatusCode::SWITCHING_PROTOCOLS)
.header(header::CONNECTION, UPGRADE)
.header(header::UPGRADE, WEBSOCKET)
.header(
header::SEC_WEBSOCKET_ACCEPT,
sign(self.sec_websocket_key.as_bytes()),
);
let mut response = if let Some(sec_websocket_key) = &self.sec_websocket_key {
// If `sec_websocket_key` was `Some`, we are using HTTP/1.1.

#[allow(clippy::declare_interior_mutable_const)]
const UPGRADE: HeaderValue = HeaderValue::from_static("upgrade");
#[allow(clippy::declare_interior_mutable_const)]
const WEBSOCKET: HeaderValue = HeaderValue::from_static("websocket");

let builder = Response::builder()
.status(StatusCode::SWITCHING_PROTOCOLS)
.header(header::CONNECTION, UPGRADE)
.header(header::UPGRADE, WEBSOCKET)
.header(
header::SEC_WEBSOCKET_ACCEPT,
sign(sec_websocket_key.as_bytes()),
);

builder.body(Body::empty()).unwrap()
} else {
// Otherwise, we are HTTP/2+. As established in RFC 9113 section 8.5, we just respond
// with a 2XX with an empty body:
// <https://datatracker.ietf.org/doc/html/rfc9113#name-the-connect-method>.
Response::new(Body::empty())
};

if let Some(protocol) = self.protocol {
builder = builder.header(header::SEC_WEBSOCKET_PROTOCOL, protocol);
response
.headers_mut()
.insert(header::SEC_WEBSOCKET_PROTOCOL, protocol);
}

builder.body(Body::empty()).unwrap()
response
}
}

Expand Down