diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c7902c0..1b8b3c6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to this project will be documented in this file. +## Unreleased + + * Tiles are now downloaded in parallel. + ## 0.17.0 * `egui` updated to 0.25. diff --git a/Cargo.toml b/Cargo.toml index 2de053b9..fb4a4791 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,12 @@ [workspace] -members = ["walkers", "demo", "demo_native", "demo_web", "demo_android/rust"] +members = [ + "walkers", + "demo", + "demo_native", + "demo_web", + "demo_android/rust", + "hypermocker", +] resolver = "2" [workspace.package] diff --git a/hypermocker/Cargo.toml b/hypermocker/Cargo.toml new file mode 100644 index 00000000..605fbfff --- /dev/null +++ b/hypermocker/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "hypermocker" +edition = "2021" +version = "0.1.0" +publish = false + +[dependencies] +hyper = { version = "1.1.0", features = ["full"] } +tokio = { version = "1.28", features = ["macros"] } +hyper-util = { version = "0.1", features = ["full"] } +http-body-util = "0.1" +log = "0.4" + +[dev-dependencies] +reqwest = "0.11" +futures = "0.3.28" +env_logger = "0.10" diff --git a/hypermocker/logo.jpeg b/hypermocker/logo.jpeg new file mode 100644 index 00000000..b6664537 Binary files /dev/null and b/hypermocker/logo.jpeg differ diff --git a/hypermocker/src/lib.rs b/hypermocker/src/lib.rs new file mode 100644 index 00000000..13a4ce6d --- /dev/null +++ b/hypermocker/src/lib.rs @@ -0,0 +1,189 @@ +use http_body_util::Full; +use hyper::{server::conn::http1, Response}; +use hyper_util::rt::TokioIo; +use std::{ + collections::HashMap, + future::Future, + net::SocketAddr, + pin::Pin, + sync::{Arc, Mutex}, +}; +use tokio::net::TcpListener; +use tokio::sync::oneshot; + +pub use hyper::body::Bytes; + +struct Expectation { + payload_rx: oneshot::Receiver, + happened_tx: oneshot::Sender<()>, +} + +#[derive(Default)] +struct State { + /// Anticipations made by [`Mock::anticipate`]. + expectations: HashMap, + + /// Requests that were unexpected. + unexpected: Vec, +} + +pub struct Server { + port: u16, + state: Arc>, +} + +impl Server { + /// Create new [`Mock`], and bind it to a random port. + pub async fn bind() -> Server { + let state = Arc::new(Mutex::new(State::default())); + + let addr = SocketAddr::from(([127, 0, 0, 1], 0)); + let listener = TcpListener::bind(addr).await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + let state_clone = state.clone(); + tokio::spawn(async move { + loop { + let (stream, _) = listener.accept().await.unwrap(); + let io = TokioIo::new(stream); + + let state = state_clone.clone(); + tokio::task::spawn(async move { + http1::Builder::new() + .serve_connection(io, Service { state }) + .await + .unwrap(); + }); + } + }); + + Server { port, state } + } + + /// Port, which this server listens on. + pub fn port(&self) -> u16 { + self.port + } + + /// Anticipate a HTTP request, but do not respond to it yet. + pub async fn anticipate(&self, url: String) -> AnticipatedRequest { + log::info!("Anticipating '{}'.", url); + let (payload_tx, payload_rx) = oneshot::channel(); + let (happened_tx, happened_rx) = oneshot::channel(); + if self + .state + .lock() + .unwrap() + .expectations + .insert( + url.to_owned(), + Expectation { + payload_rx, + happened_tx, + }, + ) + .is_some() + { + panic!("already anticipating"); + }; + AnticipatedRequest { + url, + payload_tx, + happened_rx: Some(happened_rx), + } + } +} + +impl Drop for Server { + fn drop(&mut self) { + if !self.state.lock().unwrap().unexpected.is_empty() { + panic!("there are unexpected requests"); + } + } +} + +/// HTTP request that was anticipated to arrive. +pub struct AnticipatedRequest { + url: String, + payload_tx: tokio::sync::oneshot::Sender, + happened_rx: Option>, +} + +impl AnticipatedRequest { + /// Respond to this request with the given body. + pub async fn respond(self, payload: Bytes) { + log::info!("Responding to '{}'.", self.url); + self.payload_tx.send(payload).unwrap(); + } + + /// Expect the request to come, but still do not respond to it yet. + pub async fn expect(&mut self) { + log::info!("Expecting '{}'.", self.url); + if let Some(happened_tx) = self.happened_rx.take() { + happened_tx.await.unwrap(); + } else { + panic!("this request was already expected"); + } + } +} + +struct Service { + state: Arc>, +} + +impl hyper::service::Service> for Service { + type Response = Response>; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + fn call(&self, request: hyper::Request) -> Self::Future { + log::info!("Incoming request '{}'.", request.uri()); + let state = self.state.clone(); + Box::pin(async move { + let expectation = state + .lock() + .unwrap() + .expectations + .remove(&request.uri().path().to_string()); + + if let Some(expectation) = expectation { + // [`AnticipatedRequest`] might be dropped by now, and there is no one to receive it, + // but that is OK. + let _ = expectation.happened_tx.send(()); + + match expectation.payload_rx.await { + Ok(payload) => { + log::debug!( + "Proper responding to '{}' with {} bytes.", + request.uri(), + payload.len() + ); + Ok(Response::new(Full::new(payload))) + } + Err(_) => { + log::error!( + "AnticipatedRequest for '{}' was dropped before responding.", + request.uri() + ); + // TODO: This panic will be ignored by hyper/tokio stack. + panic!( + "AnticipatedRequest for '{}' was dropped before responding.", + request.uri() + ); + } + } + } else { + log::warn!("Unexpected '{}'.", request.uri()); + state + .lock() + .unwrap() + .unexpected + .push(request.uri().to_string()); + Ok(Response::builder() + .status(418) + .body(Full::new(Bytes::from_static(b"unexpected"))) + .unwrap()) + } + }) + } +} diff --git a/hypermocker/tests/integration_test.rs b/hypermocker/tests/integration_test.rs new file mode 100644 index 00000000..b2358342 --- /dev/null +++ b/hypermocker/tests/integration_test.rs @@ -0,0 +1,76 @@ +use hyper::body::Bytes; +use hypermocker::Server; +use std::time::Duration; + +#[tokio::test] +async fn anticipate_then_request() { + let _ = env_logger::try_init(); + + let mock = Server::bind().await; + let url = format!("http://localhost:{}/foo", mock.port()); + let request = mock.anticipate("/foo".to_string()).await; + + // Make sure that mock's internals kick in. + tokio::time::sleep(Duration::from_secs(1)).await; + + futures::future::join( + async { + let response = reqwest::get(url).await.unwrap(); + let bytes = response.bytes().await.unwrap(); + assert_eq!(&bytes[..], b"hello"); + }, + async { + request.respond(Bytes::from_static(b"hello")).await; + }, + ) + .await; +} + +#[tokio::test] +async fn anticipate_expect_then_request() { + let _ = env_logger::try_init(); + + let mock = Server::bind().await; + let url = format!("http://localhost:{}/foo", mock.port()); + let mut request = mock.anticipate("/foo".to_string()).await; + + // Make sure that mock's internals kick in. + tokio::time::sleep(Duration::from_secs(1)).await; + + futures::future::join( + async { + let response = reqwest::get(url).await.unwrap(); + let bytes = response.bytes().await.unwrap(); + assert_eq!(&bytes[..], b"hello"); + }, + async { + request.expect().await; + request.respond(Bytes::from_static(b"hello")).await; + }, + ) + .await; +} + +#[tokio::test] +#[should_panic(expected = "there are unexpected requests")] +async fn unanticipated_request() { + let _ = env_logger::try_init(); + + let mock = Server::bind().await; + let url = format!("http://localhost:{}/foo", mock.port()); + + let response = reqwest::get(url).await.unwrap(); + let bytes = response.bytes().await.unwrap(); + assert_eq!(&bytes[..], b"unexpected"); +} + +#[tokio::test] +#[should_panic(expected = "already anticipating")] +async fn can_not_anticipate_twice() { + let _ = env_logger::try_init(); + + let mock = Server::bind().await; + + mock.anticipate("/foo".to_string()).await; + mock.anticipate("/foo".to_string()).await; +} diff --git a/walkers/Cargo.toml b/walkers/Cargo.toml index a72675d4..3ec7566a 100644 --- a/walkers/Cargo.toml +++ b/walkers/Cargo.toml @@ -34,3 +34,4 @@ eframe.workspace = true env_logger = "0.10" approx = "0.5" mockito = "1.1" +hypermocker = { path = "../hypermocker" } diff --git a/walkers/src/download.rs b/walkers/src/download.rs index 81377788..9193676d 100644 --- a/walkers/src/download.rs +++ b/walkers/src/download.rs @@ -1,7 +1,10 @@ -use std::path::PathBuf; +use std::{path::PathBuf, pin::Pin}; use egui::Context; -use futures::{SinkExt, StreamExt}; +use futures::{ + future::{select, select_all, Either}, + SinkExt, StreamExt, +}; use image::ImageError; use reqwest::header::USER_AGENT; use reqwest_middleware::ClientWithMiddleware; @@ -34,20 +37,38 @@ enum Error { Image(ImageError), } +struct Download { + tile_id: TileId, + result: Result, +} + /// Download and decode the tile. async fn download_and_decode( client: &ClientWithMiddleware, - url: &str, + tile_id: TileId, + url: String, + egui_ctx: &Context, +) -> Download { + log::debug!("Downloading '{}'.", url); + Download { + tile_id, + result: download_and_decode_impl(client, url, egui_ctx).await, + } +} + +async fn download_and_decode_impl( + client: &ClientWithMiddleware, + url: String, egui_ctx: &Context, ) -> Result { let image = client - .get(url) + .get(&url) .header(USER_AGENT, "Walkers") .send() .await .map_err(Error::HttpMiddleware)?; - log::debug!("Downloaded {:?}.", image.status()); + log::debug!("Downloaded '{}': {:?}.", url, image.status()); let image = image .error_for_status() @@ -59,11 +80,48 @@ async fn download_and_decode( Texture::new(&image, egui_ctx).map_err(Error::Image) } +async fn download_complete( + mut tile_tx: futures::channel::mpsc::Sender<(TileId, Texture)>, + egui_ctx: Context, + tile_id: TileId, + result: Result, +) -> Result<(), ()> { + match result { + Ok(tile) => { + tile_tx.send((tile_id, tile)).await.map_err(|_| ())?; + egui_ctx.request_repaint(); + } + Err(e) => { + log::warn!("{}", e); + } + }; + + Ok(()) +} + +enum Downloads { + None, + Ongoing(Vec>>), + OngoingSaturated(Vec>>), +} + +impl Downloads { + fn new(downloads: Vec>>) -> Self { + if downloads.is_empty() { + Self::None + } else if downloads.len() < 6 { + Self::Ongoing(downloads) + } else { + Self::OngoingSaturated(downloads) + } + } +} + async fn download_continuously_impl( source: S, http_options: HttpOptions, mut request_rx: futures::channel::mpsc::Receiver, - mut tile_tx: futures::channel::mpsc::Sender<(TileId, Texture)>, + tile_tx: futures::channel::mpsc::Sender<(TileId, Texture)>, egui_ctx: Context, ) -> Result<(), ()> where @@ -71,20 +129,51 @@ where { // Keep outside the loop to reuse it as much as possible. let client = http_client(http_options); + let mut downloads = Downloads::None; loop { - let request = request_rx.next().await.ok_or(())?; - let url = source.tile_url(request); - - log::debug!("Getting {:?} from {}.", request, url); - - match download_and_decode(&client, &url, &egui_ctx).await { - Ok(tile) => { - tile_tx.send((request, tile)).await.map_err(|_| ())?; - egui_ctx.request_repaint(); + downloads = match downloads { + Downloads::None => { + let request = request_rx.next().await.ok_or(())?; + let url = source.tile_url(request); + let download = download_and_decode(&client, request, url, &egui_ctx); + Downloads::Ongoing(vec![Box::pin(download)]) + } + Downloads::Ongoing(ref mut downloads) => { + let download = select_all(downloads.drain(..)); + match select(request_rx.next(), download).await { + // New download was requested. + Either::Left((request, downloads)) => { + let request = request.ok_or(())?; + let url = source.tile_url(request); + let download = download_and_decode(&client, request, url, &egui_ctx); + let mut downloads = downloads.into_inner(); + downloads.push(Box::pin(download)); + Downloads::new(downloads) + } + // Ongoing download was completed. + Either::Right(((result, _, downloads), _)) => { + download_complete( + tile_tx.to_owned(), + egui_ctx.to_owned(), + result.tile_id, + result.result, + ) + .await?; + Downloads::new(downloads) + } + } } - Err(e) => { - log::warn!("Could not download '{}': {}", &url, e); + Downloads::OngoingSaturated(ref mut downloads) => { + let (result, _, downloads) = select_all(downloads.drain(..)).await; + download_complete( + tile_tx.to_owned(), + egui_ctx.to_owned(), + result.tile_id, + result.result, + ) + .await?; + Downloads::Ongoing(downloads) } } } diff --git a/walkers/src/tiles.rs b/walkers/src/tiles.rs index f1d2b9e7..32d48391 100644 --- a/walkers/src/tiles.rs +++ b/walkers/src/tiles.rs @@ -161,9 +161,9 @@ impl TilesManager for Tiles { #[cfg(test)] mod tests { - use std::time::Duration; - use super::*; + use hypermocker::Bytes; + use std::time::Duration; static TILE_ID: TileId = TileId { x: 1, @@ -207,6 +207,13 @@ mod tests { (server, TestSource::new(url)) } + /// Creates [`hypermocker::Mock`], and function mapping `TileId` to its URL. + async fn hypermocker_mock() -> (hypermocker::Server, TestSource) { + let server = hypermocker::Server::bind().await; + let url = format!("http://localhost:{}", server.port()); + (server, TestSource::new(url)) + } + #[test] fn download_single_tile() { let _ = env_logger::try_init(); @@ -228,6 +235,53 @@ mod tests { tile_mock.assert(); } + #[tokio::test] + async fn there_can_be_6_simultaneous_downloads_at_most() { + let _ = env_logger::try_init(); + + let (server, source) = hypermocker_mock().await; + let mut tiles = Tiles::new(source, Context::default()); + + // First download is started immediately. + let mut first_outstanding_request = server.anticipate(format!("/3/1/2.png")).await; + assert!(tiles.at(TILE_ID).is_none()); + first_outstanding_request.expect().await; + + let tile_ids: Vec<_> = (2..7).map(|x| TileId { x, y: 1, zoom: 1 }).collect(); + + // Rest of the downloads are started right away too, but they remain active. + let mut requests = Vec::new(); + for tile_id in tile_ids { + let mut request = server.anticipate(format!("/1/{}/1.png", tile_id.x)).await; + assert!(tiles.at(tile_id).is_none()); + request.expect().await; + requests.push(request); + } + + // Last download is NOT started, because we are at the limit of concurrent downloads. + assert!(tiles + .at(TileId { + x: 99, + y: 99, + zoom: 1 + }) + .is_none()); + + // Make sure it does not come. + tokio::time::sleep(Duration::from_secs(1)).await; + + // Last download will start as soon as one of the previous ones are responded to. + let mut awaiting_request = server.anticipate("/1/99/99.png".to_string()).await; + + first_outstanding_request + .respond(Bytes::from_static(include_bytes!( + "../assets/blank-255-tile.png" + ))) + .await; + + awaiting_request.expect().await; + } + fn assert_tile_is_empty_forever(tiles: &mut Tiles) { // Should be None now, and forever. assert!(tiles.at(TILE_ID).is_none());