diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index 1cc9f7806..f5954b8d0 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -44,8 +44,8 @@ home = { version = "0.5.4", optional = true } serde = { version = "1.0.130", features = ["derive"] } serde_json = "1.0.68" serde_yaml = { version = "0.9.19", optional = true } -http = "0.2.5" -http-body = { version = "0.4.2", optional = true } +http = "1.0.0" +http-body = { version = "1.0.0", optional = true } either = { version = "1.6.1", optional = true } thiserror = "1.0.29" futures = { version = "0.3.17", optional = true } @@ -58,19 +58,20 @@ tokio = { version = "1.14.0", features = ["time", "signal", "sync"], optional = kube-core = { path = "../kube-core", version = "=0.88.1" } jsonpath-rust = { version = "0.4.0", optional = true } tokio-util = { version = "0.7.0", optional = true, features = ["io", "codec"] } -hyper = { version = "0.14.13", optional = true, features = ["client", "http1", "stream", "tcp"] } +hyper = { version = "1.0.1", optional = true, features = ["client", "http1"] } hyper-rustls = { version = "0.24.0", optional = true } hyper-socks2 = { version = "0.8.0", optional = true, default-features = false } -tokio-tungstenite = { version = "0.20.0", optional = true } +tokio-tungstenite = { version = "0.21.0", optional = true } tower = { version = "0.4.13", optional = true, features = ["buffer", "filter", "util"] } -tower-http = { version = "0.4.0", optional = true, features = ["auth", "map-response-body", "trace"] } -hyper-timeout = {version = "0.4.1", optional = true } +tower-http = { version = "0.5.0", optional = true, features = ["auth", "map-response-body", "trace"] } +hyper-timeout = {version = "0.5.0", optional = true } tame-oauth = { version = "0.9.1", features = ["gcp"], optional = true } pin-project = { version = "1.0.4", optional = true } rand = { version = "0.8.3", optional = true } secrecy = { version = "0.8.0", features = ["alloc", "serde"] } tracing = { version = "0.1.36", features = ["log"], optional = true } -hyper-openssl = { version = "0.9.2", optional = true } +hyper-openssl = { version = "0.10.1", optional = true } +http-body-util = {version = "0.1.0" } form_urlencoded = { version = "1.2.0", optional = true } [dependencies.k8s-openapi] diff --git a/kube-client/src/client/auth/oauth.rs b/kube-client/src/client/auth/oauth.rs index 5478cf338..86af09475 100644 --- a/kube-client/src/client/auth/oauth.rs +++ b/kube-client/src/client/auth/oauth.rs @@ -1,3 +1,5 @@ +use bytes::Bytes; +use http_body_util::Full; use tame_oauth::{ gcp::{TokenOrRequest, TokenProvider, TokenProviderWrapper}, Token, @@ -123,12 +125,12 @@ impl Gcp { let client = hyper::Client::builder().build::<_, hyper::Body>(https); let res = client - .request(request.map(hyper::Body::from)) + .request(request.map(Full::::new)) .await .map_err(Error::RequestToken)?; // Convert response body to `Vec` for parsing. let (parts, body) = res.into_parts(); - let bytes = hyper::body::to_bytes(body).await.map_err(Error::ConcatBuffers)?; + let bytes = body.await; // TODO figure this out after the client stuff is figured out let response = http::Response::from_parts(parts, bytes.to_vec()); match self.provider.parse_token_response(scope_hash, response) { Ok(token) => Ok(token), diff --git a/kube-client/src/client/auth/oidc.rs b/kube-client/src/client/auth/oidc.rs index 0c3d5cbd0..41b4fb4a2 100644 --- a/kube-client/src/client/auth/oidc.rs +++ b/kube-client/src/client/auth/oidc.rs @@ -1,11 +1,13 @@ use std::collections::HashMap; +use bytes::Bytes; use chrono::{Duration, TimeZone, Utc}; use form_urlencoded::Serializer; use http::{ header::{HeaderValue, AUTHORIZATION, CONTENT_TYPE}, Method, Version, }; +use http_body_util::Full; use hyper::{body, client::HttpConnector, http::Uri, Client, Request}; use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Deserializer}; @@ -328,7 +330,7 @@ impl Refresher { let response = self.https_client.get(discovery).await?; if response.status().is_success() { - let body = body::to_bytes(response.into_body()).await?; + let body = Full::::new(response.into_body()); let metadata = serde_json::from_slice::(body.as_ref()) .map_err(errors::RefreshError::InvalidMetadata)?; @@ -416,7 +418,7 @@ impl Refresher { return Err(errors::RefreshError::RequestFailed(response.status())); } - let body = body::to_bytes(response.into_body()).await?; + let body = Full::::new(response.into_body()); let token_response = serde_json::from_slice::(body.as_ref()) .map_err(errors::RefreshError::InvalidTokenResponse)?; diff --git a/kube-client/src/client/builder.rs b/kube-client/src/client/builder.rs index e1ae4871c..ee6c636c9 100644 --- a/kube-client/src/client/builder.rs +++ b/kube-client/src/client/builder.rs @@ -1,5 +1,6 @@ use bytes::Bytes; use http::{header::HeaderMap, Request, Response}; +use http_body_util::Full; use hyper::{ self, client::{connect::Connection, HttpConnector}, @@ -18,7 +19,7 @@ use crate::{client::ConfigExt, Client, Config, Error, Result}; /// HTTP body of a dynamic backing type. /// -/// The suggested implementation type is [`hyper::Body`]. +/// The suggested implementation type is [`Full`]. pub type DynBody = dyn http_body::Body + Send + Unpin; /// Builder for [`Client`] instances with customized [tower](`Service`) middleware. @@ -34,7 +35,7 @@ impl ClientBuilder { /// which provides a default stack as a starting point. pub fn new(service: Svc, default_namespace: impl Into) -> Self where - Svc: Service>, + Svc: Service>>, { Self { service, @@ -57,7 +58,7 @@ impl ClientBuilder { /// Build a [`Client`] instance with the current [`Service`] stack. pub fn build(self) -> Client where - Svc: Service, Response = Response> + Send + 'static, + Svc: Service>, Response = Response> + Send + 'static, Svc::Future: Send + 'static, Svc::Error: Into, B: http_body::Body + Send + 'static, @@ -67,7 +68,7 @@ impl ClientBuilder { } } -pub type GenericService = BoxService, Response>, BoxError>; +pub type GenericService = BoxService>, Response>, BoxError>; impl TryFrom for ClientBuilder { type Error = Error; @@ -104,7 +105,7 @@ where let default_ns = config.default_namespace.clone(); let auth_layer = config.auth_layer()?; - let client: hyper::Client<_, hyper::Body> = { + let client: hyper::Client<_, Full> = { // Current TLS feature precedence when more than one are set: // 1. rustls-tls // 2. openssl-tls @@ -145,7 +146,7 @@ where // Attribute names follow [Semantic Conventions]. // [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md TraceLayer::new_for_http() - .make_span_with(|req: &Request| { + .make_span_with(|req: &Request>| { tracing::debug_span!( "HTTP", http.method = %req.method(), @@ -156,10 +157,10 @@ where otel.status_code = tracing::field::Empty, ) }) - .on_request(|_req: &Request, _span: &Span| { + .on_request(|_req: &Request>, _span: &Span| { tracing::debug!("requesting"); }) - .on_response(|res: &Response, _latency: Duration, span: &Span| { + .on_response(|res: &Response>, _latency: Duration, span: &Span| { let status = res.status(); span.record("http.status_code", status.as_u16()); if status.is_client_error() || status.is_server_error() { diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index 83d87e74a..a307743b4 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -7,10 +7,11 @@ //! //! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically //! retrieve the resources served by the kubernetes API. +use bytes::Bytes; use either::{Either, Left, Right}; use futures::{self, AsyncBufRead, StreamExt, TryStream, TryStreamExt}; use http::{self, Request, Response, StatusCode}; -use hyper::Body; +use http_body_util::{Full, StreamBody}; use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1; pub use kube_core::response::Status; use serde::de::DeserializeOwned; @@ -65,7 +66,7 @@ pub use builder::{ClientBuilder, DynBody}; pub struct Client { // - `Buffer` for cheap clone // - `BoxService` for dynamic response future type - inner: Buffer, Response, BoxError>, Request>, + inner: Buffer>, Response>, BoxError>, Request>>, default_ns: String, } @@ -99,7 +100,7 @@ impl Client { /// ``` pub fn new(service: S, default_namespace: T) -> Self where - S: Service, Response = Response> + Send + 'static, + S: Service, Response = Response> + Send + 'static, S::Future: Send + 'static, S::Error: Into, B: http_body::Body + Send + 'static, @@ -107,7 +108,7 @@ impl Client { T: Into, { // Transform response body to `hyper::Body` and use type erased error to avoid type parameters. - let service = MapResponseBodyLayer::new(|b: B| Body::wrap_stream(b.into_stream())) + let service = MapResponseBodyLayer::new(|b: B| StreamBody::new(b.into_stream())) .layer(service) .map_err(|e| e.into()); Self { @@ -141,7 +142,7 @@ impl Client { /// Perform a raw HTTP request against the API and return the raw response back. /// This method can be used to get raw access to the API which may be used to, for example, /// create a proxy server or application-level gateway between localhost and the API server. - pub async fn send(&self, request: Request) -> Result> { + pub async fn send(&self, request: Request>) -> Result>> { let mut svc = self.inner.clone(); let res = svc .ready() @@ -195,7 +196,7 @@ impl Client { HeaderValue::from_static(upgrade::WS_PROTOCOL), ); - let res = self.send(Request::from_parts(parts, Body::from(body))).await?; + let res = self.send(Request::from_parts(parts, Full::new(body))).await?; upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?; match hyper::upgrade::on(res).await { Ok(upgraded) => { @@ -225,12 +226,10 @@ impl Client { /// Perform a raw HTTP request against the API and get back the response /// as a string pub async fn request_text(&self, request: Request>) -> Result { - let res = self.send(request.map(Body::from)).await?; + let res = self.send(request.map(Full::new)).await?; let status = res.status(); // trace!("Status = {:?} for {}", status, res.url()); - let body_bytes = hyper::body::to_bytes(res.into_body()) - .await - .map_err(Error::HyperError)?; + let body_bytes = Full::from(res.into_body()).await.map_err(Error::HyperError)?; let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?; handle_api_errors(&text, status)?; @@ -242,7 +241,7 @@ impl Client { /// The response can be processed using [`AsyncReadExt`](futures::AsyncReadExt) /// and [`AsyncBufReadExt`](futures::AsyncBufReadExt). pub async fn request_stream(&self, request: Request>) -> Result { - let res = self.send(request.map(Body::from)).await?; + let res = self.send(request.map(Full::new)).await?; // Map the error, since we want to convert this into an `AsyncBufReader` using // `into_async_read` which specifies `std::io::Error` as the stream's error type. let body = res @@ -282,7 +281,7 @@ impl Client { where T: Clone + DeserializeOwned, { - let res = self.send(request.map(Body::from)).await?; + let res = self.send(request.map(Full::new)).await?; // trace!("Streaming from {} -> {}", res.url(), res.status().as_str()); tracing::trace!("headers: {:?}", res.headers()); diff --git a/kube-client/src/client/upgrade.rs b/kube-client/src/client/upgrade.rs index e8fe67c5c..b1da5e9f8 100644 --- a/kube-client/src/client/upgrade.rs +++ b/kube-client/src/client/upgrade.rs @@ -1,5 +1,6 @@ +use bytes::Bytes; use http::{self, Response, StatusCode}; -use hyper::Body; +use http_body_util::StreamBody; use thiserror::Error; use tokio_tungstenite::tungstenite as ws; @@ -41,7 +42,7 @@ pub enum UpgradeConnectionError { // Verify upgrade response according to RFC6455. // Based on `tungstenite` and added subprotocol verification. -pub fn verify_response(res: &Response, key: &str) -> Result<(), UpgradeConnectionError> { +pub fn verify_response(res: &Response>, key: &str) -> Result<(), UpgradeConnectionError> { if res.status() != StatusCode::SWITCHING_PROTOCOLS { return Err(UpgradeConnectionError::ProtocolSwitch(res.status())); } diff --git a/kube-core/Cargo.toml b/kube-core/Cargo.toml index b45ec4f12..68945689e 100644 --- a/kube-core/Cargo.toml +++ b/kube-core/Cargo.toml @@ -29,7 +29,7 @@ serde = { version = "1.0.130", features = ["derive"] } serde_json = "1.0.68" thiserror = "1.0.29" form_urlencoded = "1.0.1" -http = "0.2.5" +http = "1.0.0" json-patch = { version = "1.0.0", optional = true } once_cell = "1.8.0" chrono = { version = "0.4.19", default-features = false, features = ["clock"] } diff --git a/kube-derive/tests/ui/fail_with_suggestion.stderr b/kube-derive/tests/ui/fail_with_suggestion.stderr index d9f9984da..bce418e59 100644 --- a/kube-derive/tests/ui/fail_with_suggestion.stderr +++ b/kube-derive/tests/ui/fail_with_suggestion.stderr @@ -1,5 +1,5 @@ error: Unknown field: `shortnames`. Did you mean `shortname`? - --> $DIR/fail_with_suggestion.rs:6:58 + --> tests/ui/fail_with_suggestion.rs:6:58 | 6 | #[kube(group = "clux.dev", version = "v1", kind = "Foo", shortnames = "foo")] - | ^^^^^^^^^^ + | ^^^^^^^^^^^^^^^^^^ diff --git a/kube/Cargo.toml b/kube/Cargo.toml index f55c7feb4..c88531626 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -61,8 +61,8 @@ futures = "0.3.17" serde_json = "1.0.68" serde = { version = "1.0.130", features = ["derive"] } schemars = "0.8.6" -hyper = "0.14.27" -http = "0.2.9" +hyper = "1.0.1" +http = "1.0.0" tower-test = "0.4.0" anyhow = "1.0.71"