From 95fedf53573c09683b49de4801bcbc46345ff9f4 Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Wed, 17 Mar 2021 21:24:28 +0100 Subject: [PATCH] Add back hyper-proxy --- Cargo.lock | 101 ++++++++++++++++++++------------- Cargo.toml | 1 - core/Cargo.toml | 6 +- core/src/apresolve.rs | 111 ++++++++++++++++++------------------- core/src/connection/mod.rs | 53 ++++++++++-------- core/src/lib.rs | 15 ++++- core/src/proxytunnel.rs | 62 ++------------------- core/src/session.rs | 6 +- 8 files changed, 168 insertions(+), 187 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65493a277..c654e7057 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -581,6 +581,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f55667319111d593ba876406af7c409c0ebb44dc4be6132a783ccf163ea14c1" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.13" @@ -588,6 +603,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c2dd2df839b57db9ab69c2c9d8f3e8c81984781937fe2807dc6dcf3b2ad2939" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -607,6 +623,12 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-io" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71c2c65c57704c32f5241c1223167c2c3294fd34ac020c807ddbe6db287ba59" + [[package]] name = "futures-macro" version = "0.3.13" @@ -637,10 +659,13 @@ version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1812c7ab8aedf8d6f2701a43e1243acdbcc2b36ab26e2ad421eb99ac963d96d1" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "proc-macro-hack", @@ -851,30 +876,29 @@ dependencies = [ ] [[package]] -name = "h2" -version = "0.3.0" +name = "headers" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b67e66362108efccd8ac053abafc8b7a8d86a37e6e48fc4f6f7485eb5e9e6a5" +checksum = "f0b7591fb62902706ae8e7aaff416b1b0fa2c0fd0878b46dc13baa3712d8a855" dependencies = [ + "base64", + "bitflags", "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", + "headers-core", "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", - "tracing-futures", + "mime", + "sha-1", + "time 0.1.43", ] [[package]] -name = "hashbrown" -version = "0.9.1" +name = "headers-core" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] [[package]] name = "heck" @@ -970,7 +994,6 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", "http", "http-body", "httparse", @@ -984,6 +1007,21 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-proxy" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca815a891b24fdfb243fa3239c86154392b0953ee584aa1a2a1f66d20cbe75cc" +dependencies = [ + "bytes", + "futures", + "headers", + "http", + "hyper", + "tokio", + "tower-service", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -1022,16 +1060,6 @@ dependencies = [ "libc", ] -[[package]] -name = "indexmap" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb1fa934250de4de8aef298d81c729a7d33d8c239daa3a7575e6b92bfc7313b" -dependencies = [ - "autocfg", - "hashbrown", -] - [[package]] name = "instant" version = "0.1.9" @@ -1333,13 +1361,14 @@ dependencies = [ "base64", "byteorder", "bytes", - "cfg-if 1.0.0", "env_logger", "futures-core", "futures-util", "hmac", + "http", "httparse", "hyper", + "hyper-proxy", "librespot-protocol", "log", "num-bigint", @@ -1469,6 +1498,12 @@ version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "miniz_oxide" version = "0.4.3" @@ -2691,16 +2726,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "try-lock" version = "0.2.3" diff --git a/Cargo.toml b/Cargo.toml index 6b8cbee50..8f3e44c28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,6 @@ sha-1 = "0.9" [features] apresolve = ["librespot-core/apresolve"] -apresolve-http2 = ["librespot-core/apresolve-http2"] alsa-backend = ["librespot-playback/alsa-backend"] portaudio-backend = ["librespot-playback/portaudio-backend"] diff --git a/core/Cargo.toml b/core/Cargo.toml index 373e30885..ff4d2862c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -17,12 +17,13 @@ aes = "0.6" base64 = "0.13" byteorder = "1.4" bytes = "1.0" -cfg-if = "1" futures-core = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "unstable", "sink"] } hmac = "0.10" httparse = "1.3" +http = "0.2" hyper = { version = "0.14", optional = true, features = ["client", "tcp", "http1"] } +hyper-proxy = { version = "0.9.1", optional = true, default-features = false } log = "0.4" num-bigint = "0.3" num-integer = "0.1" @@ -50,5 +51,4 @@ env_logger = "*" tokio = {version = "1.0", features = ["macros"] } [features] -apresolve = ["hyper"] -apresolve-http2 = ["apresolve", "hyper/http2"] +apresolve = ["hyper", "hyper-proxy"] diff --git a/core/src/apresolve.rs b/core/src/apresolve.rs index 531a3e074..c954dab5b 100644 --- a/core/src/apresolve.rs +++ b/core/src/apresolve.rs @@ -1,73 +1,68 @@ -const AP_FALLBACK: &str = "ap.spotify.com:443"; +use std::error::Error; +use hyper::client::HttpConnector; +use hyper::{Body, Client, Method, Request, Uri}; +use hyper_proxy::{Intercept, Proxy, ProxyConnector}; +use serde::Deserialize; use url::Url; -cfg_if! { - if #[cfg(feature = "apresolve")] { - const APRESOLVE_ENDPOINT: &str = "http://apresolve.spotify.com:80"; +use super::AP_FALLBACK; - use std::error::Error; +const APRESOLVE_ENDPOINT: &str = "http://apresolve.spotify.com:80"; - use hyper::{Body, Client, Method, Request, Uri}; - use serde::{Serialize, Deserialize}; - - use crate::proxytunnel::ProxyTunnel; - - #[derive(Clone, Debug, Serialize, Deserialize)] - pub struct APResolveData { - ap_list: Vec, - } +#[derive(Clone, Debug, Deserialize)] +struct APResolveData { + ap_list: Vec, +} - async fn apresolve(proxy: &Option, ap_port: &Option) -> Result> { - let port = ap_port.unwrap_or(443); +async fn try_apresolve( + proxy: Option<&Url>, + ap_port: Option, +) -> Result> { + let port = ap_port.unwrap_or(443); - let req = Request::builder() - .method(Method::GET) - .uri( - APRESOLVE_ENDPOINT - .parse::() - .expect("invalid AP resolve URL"), - ) - .body(Body::empty())?; + let mut req = Request::new(Body::empty()); + *req.method_mut() = Method::GET; + // panic safety: APRESOLVE_ENDPOINT above is valid url. + *req.uri_mut() = APRESOLVE_ENDPOINT.parse().expect("invalid AP resolve URL"); - let response = if let Some(url) = proxy { - Client::builder() - .build(ProxyTunnel::new(&url.socket_addrs(|| None)?[..])?) - .request(req) - .await? - } else { - Client::new().request(req).await? - }; + let response = if let Some(url) = proxy { + // Panic safety: all URLs are valid URIs + let uri = url.to_string().parse().unwrap(); + let proxy = Proxy::new(Intercept::All, uri); + let connector = HttpConnector::new(); + let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy); + Client::builder() + .build(proxy_connector) + .request(req) + .await? + } else { + Client::new().request(req).await? + }; - let body = hyper::body::to_bytes(response.into_body()).await?; - let data: APResolveData = serde_json::from_slice(body.as_ref())?; + let body = hyper::body::to_bytes(response.into_body()).await?; + let data: APResolveData = serde_json::from_slice(body.as_ref())?; - let ap = if ap_port.is_some() || proxy.is_some() { - data.ap_list.into_iter().find_map(|ap| { - if ap.parse::().ok()?.port()? == port { - Some(ap) - } else { - None - } - }) + let ap = if ap_port.is_some() || proxy.is_some() { + data.ap_list.into_iter().find_map(|ap| { + if ap.parse::().ok()?.port()? == port { + Some(ap) } else { - data.ap_list.into_iter().next() + None } - .ok_or("empty AP List")?; - - Ok(ap) - } - - pub async fn apresolve_or_fallback(proxy: &Option, ap_port: &Option) -> String { - apresolve(proxy, ap_port).await.unwrap_or_else(|e| { - warn!("Failed to resolve Access Point: {}", e); - warn!("Using fallback \"{}\"", AP_FALLBACK); - AP_FALLBACK.into() - }) - } + }) } else { - pub async fn apresolve_or_fallback(_: &Option, _: &Option) -> String { - AP_FALLBACK.to_string() - } + data.ap_list.into_iter().next() } + .ok_or("empty AP List")?; + + Ok(ap) +} + +pub async fn apresolve(proxy: Option<&Url>, ap_port: Option) -> String { + try_apresolve(proxy, ap_port).await.unwrap_or_else(|e| { + warn!("Failed to resolve Access Point: {}", e); + warn!("Using fallback \"{}\"", AP_FALLBACK); + AP_FALLBACK.into() + }) } diff --git a/core/src/connection/mod.rs b/core/src/connection/mod.rs index b715d3578..ab353669c 100644 --- a/core/src/connection/mod.rs +++ b/core/src/connection/mod.rs @@ -58,25 +58,11 @@ impl From for AuthenticationError { } } -pub async fn connect(addr: String, proxy: &Option) -> io::Result { - let socket = if let Some(proxy) = proxy { - info!("Using proxy \"{}\"", proxy); +pub async fn connect(addr: String, proxy: Option<&Url>) -> io::Result { + let socket = if let Some(proxy_url) = proxy { + info!("Using proxy \"{}\"", proxy_url); - let mut split = addr.rsplit(':'); - - let port = split - .next() - .unwrap() // will never panic, split iterator contains at least one element - .parse() - .map_err(|e| { - io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid port: {}", e)) - })?; - - let host = split - .next() - .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))?; - - let socket_addr = proxy.socket_addrs(|| None).and_then(|addrs| { + let socket_addr = proxy_url.socket_addrs(|| None).and_then(|addrs| { addrs.into_iter().next().ok_or_else(|| { io::Error::new( io::ErrorKind::NotFound, @@ -86,13 +72,34 @@ pub async fn connect(addr: String, proxy: &Option) -> io::Result })?; let socket = TcpStream::connect(&socket_addr).await?; - proxytunnel::connect(socket, host, port).await? + let uri = addr.parse::().map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidData, + "Can't parse access point address", + ) + })?; + let host = uri.host().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "The access point address contains no hostname", + ) + })?; + let port = uri.port().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "The access point address contains no port", + ) + })?; + + proxytunnel::proxy_connect(socket, host, port.as_str()).await? } else { - let socket_addr = addr.to_socket_addrs().and_then(|mut iter| { - iter.next().ok_or_else(|| { - io::Error::new(io::ErrorKind::NotFound, "Can't resolve server address") - }) + let socket_addr = addr.to_socket_addrs()?.next().ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + "Can't resolve access point address", + ) })?; + TcpStream::connect(&socket_addr).await? }; diff --git a/core/src/lib.rs b/core/src/lib.rs index 4ebe85819..320967f70 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -2,15 +2,12 @@ #[macro_use] extern crate log; -#[macro_use] -extern crate cfg_if; use librespot_protocol as protocol; #[macro_use] mod component; -mod apresolve; pub mod audio_key; pub mod authentication; pub mod cache; @@ -25,3 +22,15 @@ pub mod session; pub mod spotify_id; pub mod util; pub mod version; + +const AP_FALLBACK: &str = "ap.spotify.com:443"; + +#[cfg(feature = "apresolve")] +mod apresolve; + +#[cfg(not(feature = "apresolve"))] +mod apresolve { + pub async fn apresolve(_: Option<&url::Url>, _: Option) -> String { + return super::AP_FALLBACK.into(); + } +} diff --git a/core/src/proxytunnel.rs b/core/src/proxytunnel.rs index 158d314f6..6f1587f06 100644 --- a/core/src/proxytunnel.rs +++ b/core/src/proxytunnel.rs @@ -2,16 +2,16 @@ use std::io; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -pub async fn connect( +pub async fn proxy_connect( mut proxy_connection: T, connect_host: &str, - connect_port: u16, + connect_port: &str, ) -> io::Result { let mut buffer = Vec::new(); buffer.extend_from_slice(b"CONNECT "); buffer.extend_from_slice(connect_host.as_bytes()); buffer.push(b':'); - buffer.extend_from_slice(connect_port.to_string().as_bytes()); + buffer.extend_from_slice(connect_port.as_bytes()); buffer.extend_from_slice(b" HTTP/1.1\r\n\r\n"); proxy_connection.write_all(buffer.as_ref()).await?; @@ -49,61 +49,7 @@ pub async fn connect( } if offset >= buffer.len() { - buffer.resize(buffer.len() * 2, 0); - } - } -} - -cfg_if! { - if #[cfg(feature = "apresolve")] { - use std::future::Future; - use std::net::{SocketAddr, ToSocketAddrs}; - use std::pin::Pin; - use std::task::Poll; - - use hyper::service::Service; - use hyper::Uri; - use tokio::net::TcpStream; - - #[derive(Clone)] - pub struct ProxyTunnel { - proxy_addr: SocketAddr, - } - - impl ProxyTunnel { - pub fn new(addr: T) -> io::Result { - let addr = addr.to_socket_addrs()?.next().ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "No socket address given") - })?; - Ok(Self { proxy_addr: addr }) - } - } - - impl Service for ProxyTunnel { - type Response = TcpStream; - type Error = io::Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, url: Uri) -> Self::Future { - let proxy_addr = self.proxy_addr; - let fut = async move { - let host = url - .host() - .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Host is missing"))?; - let port = url - .port() - .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Port is missing"))?; - - let conn = TcpStream::connect(proxy_addr).await?; - connect(conn, host, port.as_u16()).await - }; - - Box::pin(fut) - } + buffer.resize(buffer.len() + 100, 0); } } } diff --git a/core/src/session.rs b/core/src/session.rs index 53bbabd8b..d7e478fa9 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -16,7 +16,7 @@ use thiserror::Error; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; -use crate::apresolve::apresolve_or_fallback; +use crate::apresolve::apresolve; use crate::audio_key::AudioKeyManager; use crate::authentication::Credentials; use crate::cache::Cache; @@ -67,10 +67,10 @@ impl Session { credentials: Credentials, cache: Option, ) -> Result { - let ap = apresolve_or_fallback(&config.proxy, &config.ap_port).await; + let ap = apresolve(config.proxy.as_ref(), config.ap_port).await; info!("Connecting to AP \"{}\"", ap); - let mut conn = connection::connect(ap, &config.proxy).await?; + let mut conn = connection::connect(ap, config.proxy.as_ref()).await?; let reusable_credentials = connection::authenticate(&mut conn, credentials, &config.device_id).await?;