Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Hyper 1.0 Upgrade #1377

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ home = { version = "0.5.4", optional = true }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.68"
serde_yaml = { version = "0.9.19", optional = true }
http = "0.2.5"
http-body = { version = "0.4.2", optional = true }
http = "1.0.0"
http-body = { version = "1.0.0", optional = true }
either = { version = "1.6.1", optional = true }
thiserror = "1.0.29"
futures = { version = "0.3.17", optional = true }
Expand All @@ -58,19 +58,20 @@ tokio = { version = "1.14.0", features = ["time", "signal", "sync"], optional =
kube-core = { path = "../kube-core", version = "=0.88.1" }
jsonpath-rust = { version = "0.4.0", optional = true }
tokio-util = { version = "0.7.0", optional = true, features = ["io", "codec"] }
hyper = { version = "0.14.13", optional = true, features = ["client", "http1", "stream", "tcp"] }
hyper = { version = "1.0.1", optional = true, features = ["client", "http1"] }
hyper-rustls = { version = "0.24.0", optional = true }
hyper-socks2 = { version = "0.8.0", optional = true, default-features = false }
tokio-tungstenite = { version = "0.20.0", optional = true }
tokio-tungstenite = { version = "0.21.0", optional = true }
tower = { version = "0.4.13", optional = true, features = ["buffer", "filter", "util"] }
tower-http = { version = "0.4.0", optional = true, features = ["auth", "map-response-body", "trace"] }
hyper-timeout = {version = "0.4.1", optional = true }
tower-http = { version = "0.5.0", optional = true, features = ["auth", "map-response-body", "trace"] }
hyper-timeout = {version = "0.5.0", optional = true }
tame-oauth = { version = "0.9.1", features = ["gcp"], optional = true }
pin-project = { version = "1.0.4", optional = true }
rand = { version = "0.8.3", optional = true }
secrecy = { version = "0.8.0", features = ["alloc", "serde"] }
tracing = { version = "0.1.36", features = ["log"], optional = true }
hyper-openssl = { version = "0.9.2", optional = true }
hyper-openssl = { version = "0.10.1", optional = true }
http-body-util = {version = "0.1.0" }
form_urlencoded = { version = "1.2.0", optional = true }

[dependencies.k8s-openapi]
Expand Down
6 changes: 4 additions & 2 deletions kube-client/src/client/auth/oauth.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use bytes::Bytes;
use http_body_util::Full;
use tame_oauth::{
gcp::{TokenOrRequest, TokenProvider, TokenProviderWrapper},
Token,
Expand Down Expand Up @@ -120,15 +122,15 @@
let https =
hyper_openssl::HttpsConnector::new().map_err(Error::CreateOpensslHttpsConnector)?;

let client = hyper::Client::builder().build::<_, hyper::Body>(https);

Check failure on line 125 in kube-client/src/client/auth/oauth.rs

View workflow job for this annotation

GitHub Actions / msrv

failed to resolve: could not find `Client` in `hyper`

Check failure on line 125 in kube-client/src/client/auth/oauth.rs

View workflow job for this annotation

GitHub Actions / msrv

cannot find type `Body` in crate `hyper`

let res = client
.request(request.map(hyper::Body::from))
.request(request.map(Full::<Bytes>::new))
.await
.map_err(Error::RequestToken)?;
// Convert response body to `Vec<u8>` for parsing.
let (parts, body) = res.into_parts();
let bytes = hyper::body::to_bytes(body).await.map_err(Error::ConcatBuffers)?;
let bytes = body.await; // TODO figure this out after the client stuff is figured out
let response = http::Response::from_parts(parts, bytes.to_vec());
match self.provider.parse_token_response(scope_hash, response) {
Ok(token) => Ok(token),
Expand Down
6 changes: 4 additions & 2 deletions kube-client/src/client/auth/oidc.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::collections::HashMap;

use bytes::Bytes;
use chrono::{Duration, TimeZone, Utc};
use form_urlencoded::Serializer;
use http::{
header::{HeaderValue, AUTHORIZATION, CONTENT_TYPE},
Method, Version,
};
use http_body_util::Full;
use hyper::{body, client::HttpConnector, http::Uri, Client, Request};

Check failure on line 11 in kube-client/src/client/auth/oidc.rs

View workflow job for this annotation

GitHub Actions / msrv

unresolved imports `hyper::client::HttpConnector`, `hyper::Client`

Check warning on line 11 in kube-client/src/client/auth/oidc.rs

View workflow job for this annotation

GitHub Actions / msrv

unused import: `body`
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Deserializer};
use serde_json::Number;
Expand Down Expand Up @@ -304,7 +306,7 @@
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
let https = hyper_openssl::HttpsConnector::new()?;

let https_client = hyper::Client::builder().build(https);

Check failure on line 309 in kube-client/src/client/auth/oidc.rs

View workflow job for this annotation

GitHub Actions / msrv

failed to resolve: could not find `Client` in `hyper`

Ok(Self {
issuer,
Expand All @@ -328,7 +330,7 @@
let response = self.https_client.get(discovery).await?;

if response.status().is_success() {
let body = body::to_bytes(response.into_body()).await?;
let body = Full::<Bytes>::new(response.into_body());
let metadata = serde_json::from_slice::<Metadata>(body.as_ref())
.map_err(errors::RefreshError::InvalidMetadata)?;

Expand Down Expand Up @@ -416,7 +418,7 @@
return Err(errors::RefreshError::RequestFailed(response.status()));
}

let body = body::to_bytes(response.into_body()).await?;
let body = Full::<Bytes>::new(response.into_body());
let token_response = serde_json::from_slice::<TokenResponse>(body.as_ref())
.map_err(errors::RefreshError::InvalidTokenResponse)?;

Expand Down
17 changes: 9 additions & 8 deletions kube-client/src/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use bytes::Bytes;
use http::{header::HeaderMap, Request, Response};
use http_body_util::Full;
use hyper::{
self,
client::{connect::Connection, HttpConnector},

Check failure on line 6 in kube-client/src/client/builder.rs

View workflow job for this annotation

GitHub Actions / msrv

unresolved imports `hyper::client::connect`, `hyper::client::HttpConnector`
};
use hyper_timeout::TimeoutConnector;
pub use kube_core::response::Status;
Expand All @@ -18,7 +19,7 @@

/// HTTP body of a dynamic backing type.
///
/// The suggested implementation type is [`hyper::Body`].
/// The suggested implementation type is [`Full<Bytes>`].
pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send + Unpin;

/// Builder for [`Client`] instances with customized [tower](`Service`) middleware.
Expand All @@ -34,7 +35,7 @@
/// which provides a default stack as a starting point.
pub fn new(service: Svc, default_namespace: impl Into<String>) -> Self
where
Svc: Service<Request<hyper::Body>>,
Svc: Service<Request<Full<Bytes>>>,
{
Self {
service,
Expand All @@ -57,7 +58,7 @@
/// Build a [`Client`] instance with the current [`Service`] stack.
pub fn build<B>(self) -> Client
where
Svc: Service<Request<hyper::Body>, Response = Response<B>> + Send + 'static,
Svc: Service<Request<Full<Bytes>>, Response = Response<B>> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
Expand All @@ -67,7 +68,7 @@
}
}

pub type GenericService = BoxService<Request<hyper::Body>, Response<Box<DynBody>>, BoxError>;
pub type GenericService = BoxService<Request<Full<Bytes>>, Response<Box<DynBody>>, BoxError>;

impl TryFrom<Config> for ClientBuilder<GenericService> {
type Error = Error;
Expand Down Expand Up @@ -104,7 +105,7 @@
let default_ns = config.default_namespace.clone();
let auth_layer = config.auth_layer()?;

let client: hyper::Client<_, hyper::Body> = {
let client: hyper::Client<_, Full<Bytes>> = {

Check failure on line 108 in kube-client/src/client/builder.rs

View workflow job for this annotation

GitHub Actions / msrv

cannot find type `Client` in crate `hyper`
// Current TLS feature precedence when more than one are set:
// 1. rustls-tls
// 2. openssl-tls
Expand All @@ -127,7 +128,7 @@
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);

hyper::Client::builder().build(connector)

Check failure on line 131 in kube-client/src/client/builder.rs

View workflow job for this annotation

GitHub Actions / msrv

failed to resolve: could not find `Client` in `hyper`
};

let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
Expand All @@ -145,7 +146,7 @@
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
TraceLayer::new_for_http()
.make_span_with(|req: &Request<hyper::Body>| {
.make_span_with(|req: &Request<Full<Bytes>>| {
tracing::debug_span!(
"HTTP",
http.method = %req.method(),
Expand All @@ -156,10 +157,10 @@
otel.status_code = tracing::field::Empty,
)
})
.on_request(|_req: &Request<hyper::Body>, _span: &Span| {
.on_request(|_req: &Request<Full<Bytes>>, _span: &Span| {
tracing::debug!("requesting");
})
.on_response(|res: &Response<hyper::Body>, _latency: Duration, span: &Span| {
.on_response(|res: &Response<Full<Bytes>>, _latency: Duration, span: &Span| {
let status = res.status();
span.record("http.status_code", status.as_u16());
if status.is_client_error() || status.is_server_error() {
Expand Down
23 changes: 11 additions & 12 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
//!
//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
//! retrieve the resources served by the kubernetes API.
use bytes::Bytes;
use either::{Either, Left, Right};
use futures::{self, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response, StatusCode};
use hyper::Body;
use http_body_util::{Full, StreamBody};
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
pub use kube_core::response::Status;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -65,7 +66,7 @@ pub use builder::{ClientBuilder, DynBody};
pub struct Client {
// - `Buffer` for cheap clone
// - `BoxService` for dynamic response future type
inner: Buffer<BoxService<Request<Body>, Response<Body>, BoxError>, Request<Body>>,
inner: Buffer<BoxService<Request<Full<Bytes>>, Response<Full<Bytes>>, BoxError>, Request<Full<Bytes>>>,
default_ns: String,
}

Expand Down Expand Up @@ -99,15 +100,15 @@ impl Client {
/// ```
pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
where
S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
S: Service<Request<B>, Response = Response<B>> + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
T: Into<String>,
{
// Transform response body to `hyper::Body` and use type erased error to avoid type parameters.
let service = MapResponseBodyLayer::new(|b: B| Body::wrap_stream(b.into_stream()))
let service = MapResponseBodyLayer::new(|b: B| StreamBody::new(b.into_stream()))
.layer(service)
.map_err(|e| e.into());
Self {
Expand Down Expand Up @@ -141,7 +142,7 @@ impl Client {
/// Perform a raw HTTP request against the API and return the raw response back.
/// This method can be used to get raw access to the API which may be used to, for example,
/// create a proxy server or application-level gateway between localhost and the API server.
pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
pub async fn send(&self, request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>> {
let mut svc = self.inner.clone();
let res = svc
.ready()
Expand Down Expand Up @@ -195,7 +196,7 @@ impl Client {
HeaderValue::from_static(upgrade::WS_PROTOCOL),
);

let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
let res = self.send(Request::from_parts(parts, Full::new(body))).await?;
upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
match hyper::upgrade::on(res).await {
Ok(upgraded) => {
Expand Down Expand Up @@ -225,12 +226,10 @@ impl Client {
/// Perform a raw HTTP request against the API and get back the response
/// as a string
pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
let res = self.send(request.map(Body::from)).await?;
let res = self.send(request.map(Full::new)).await?;
let status = res.status();
// trace!("Status = {:?} for {}", status, res.url());
let body_bytes = hyper::body::to_bytes(res.into_body())
.await
.map_err(Error::HyperError)?;
let body_bytes = Full::from(res.into_body()).await.map_err(Error::HyperError)?;
let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
handle_api_errors(&text, status)?;

Expand All @@ -242,7 +241,7 @@ impl Client {
/// The response can be processed using [`AsyncReadExt`](futures::AsyncReadExt)
/// and [`AsyncBufReadExt`](futures::AsyncBufReadExt).
pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead> {
let res = self.send(request.map(Body::from)).await?;
let res = self.send(request.map(Full::new)).await?;
// Map the error, since we want to convert this into an `AsyncBufReader` using
// `into_async_read` which specifies `std::io::Error` as the stream's error type.
let body = res
Expand Down Expand Up @@ -282,7 +281,7 @@ impl Client {
where
T: Clone + DeserializeOwned,
{
let res = self.send(request.map(Body::from)).await?;
let res = self.send(request.map(Full::new)).await?;
// trace!("Streaming from {} -> {}", res.url(), res.status().as_str());
tracing::trace!("headers: {:?}", res.headers());

Expand Down
5 changes: 3 additions & 2 deletions kube-client/src/client/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bytes::Bytes;
use http::{self, Response, StatusCode};
use hyper::Body;
use http_body_util::StreamBody;
use thiserror::Error;
use tokio_tungstenite::tungstenite as ws;

Expand Down Expand Up @@ -41,7 +42,7 @@ pub enum UpgradeConnectionError {

// Verify upgrade response according to RFC6455.
// Based on `tungstenite` and added subprotocol verification.
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeConnectionError> {
pub fn verify_response(res: &Response<StreamBody<Bytes>>, key: &str) -> Result<(), UpgradeConnectionError> {
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err(UpgradeConnectionError::ProtocolSwitch(res.status()));
}
Expand Down
2 changes: 1 addition & 1 deletion kube-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.68"
thiserror = "1.0.29"
form_urlencoded = "1.0.1"
http = "0.2.5"
http = "1.0.0"
json-patch = { version = "1.0.0", optional = true }
once_cell = "1.8.0"
chrono = { version = "0.4.19", default-features = false, features = ["clock"] }
Expand Down
4 changes: 2 additions & 2 deletions kube-derive/tests/ui/fail_with_suggestion.stderr
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
error: Unknown field: `shortnames`. Did you mean `shortname`?
--> $DIR/fail_with_suggestion.rs:6:58
--> tests/ui/fail_with_suggestion.rs:6:58
|
6 | #[kube(group = "clux.dev", version = "v1", kind = "Foo", shortnames = "foo")]
| ^^^^^^^^^^
| ^^^^^^^^^^^^^^^^^^
4 changes: 2 additions & 2 deletions kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ futures = "0.3.17"
serde_json = "1.0.68"
serde = { version = "1.0.130", features = ["derive"] }
schemars = "0.8.6"
hyper = "0.14.27"
http = "0.2.9"
hyper = "1.0.1"
http = "1.0.0"
tower-test = "0.4.0"
anyhow = "1.0.71"

Expand Down
Loading