Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simultaneous HTTP downloads #121

Merged
merged 57 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
69a5f05
two clients
podusowski Jan 13, 2024
a005d9d
draft of the future handling structure
podusowski Jan 14, 2024
28208e7
handle all cases
podusowski Jan 14, 2024
7845326
do not loose ongoing downloads
podusowski Jan 14, 2024
5a8c149
put some login into a function
podusowski Jan 14, 2024
f6fa01d
reuse client
podusowski Jan 14, 2024
8fa933f
warning removed
podusowski Jan 14, 2024
303253b
comment removed
podusowski Jan 14, 2024
ed2d3b7
minor change
podusowski Jan 14, 2024
aaded03
extract fn
podusowski Jan 14, 2024
277902d
draft
podusowski Jan 14, 2024
443c744
http mock skeleton
podusowski Jan 14, 2024
cf5e9c1
Mock struct
podusowski Jan 14, 2024
dcba219
more work on server
podusowski Jan 14, 2024
11dca02
use MockRequest
podusowski Jan 14, 2024
5634049
pass state around
podusowski Jan 14, 2024
780e679
super basic case works
podusowski Jan 14, 2024
8d08338
doc
podusowski Jan 14, 2024
bfd1f41
handle swapped cases
podusowski Jan 14, 2024
5579c4b
random port
podusowski Jan 14, 2024
317d392
add logger
podusowski Jan 14, 2024
d045dba
deadlock fixed
podusowski Jan 14, 2024
5bd66fc
bind fn
podusowski Jan 14, 2024
a6a1a9e
int test
podusowski Jan 14, 2024
16bac7d
logo - DallE powered
podusowski Jan 14, 2024
6b691a0
warnings removed
podusowski Jan 14, 2024
219ab42
simplify
podusowski Jan 14, 2024
833611e
record unexpected
podusowski Jan 14, 2024
50aebcf
dont return Result
podusowski Jan 14, 2024
592fa62
minor
podusowski Jan 14, 2024
cfc62e8
doc
podusowski Jan 14, 2024
1cc4c86
reexport hyper Bytes
podusowski Jan 14, 2024
67c7839
api rework
podusowski Jan 15, 2024
e8fccf2
test for simultaneous downloads
podusowski Jan 15, 2024
a419af7
test renamed
podusowski Jan 15, 2024
07c2d24
couple renames
podusowski Jan 15, 2024
a2d981c
couple renames
podusowski Jan 15, 2024
911c9cb
rename
podusowski Jan 15, 2024
a061af9
port is now fn
podusowski Jan 15, 2024
aa84687
test renamed
podusowski Jan 15, 2024
6322911
can not anticipate twice
podusowski Jan 15, 2024
9bdd23f
Merge branch 'main' into multi-download
podusowski Jan 15, 2024
3afb416
HttpOptions does not need to be Clone
podusowski Jan 16, 2024
bfb0f51
increase limit
podusowski Jan 16, 2024
5a2dde9
test renamed
podusowski Jan 16, 2024
0e1e00b
warnings removed
podusowski Jan 16, 2024
35f156b
changelog updated
podusowski Jan 18, 2024
1c1cdcb
Downloads enum
podusowski Jan 18, 2024
8c068f5
cleanup
podusowski Jan 18, 2024
533584e
single assign
podusowski Jan 18, 2024
fdfb660
misc
podusowski Jan 18, 2024
d038966
rename
podusowski Jan 18, 2024
803a5c0
move logic to new fn
podusowski Jan 18, 2024
8979920
inline
podusowski Jan 18, 2024
3b3ee0a
rename
podusowski Jan 19, 2024
7bfe171
another rename
podusowski Jan 19, 2024
94937ef
inline + comments
podusowski Jan 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to this project will be documented in this file.

## Unreleased

* Tiles are now downloaded in parallel.

## 0.17.0

* `egui` updated to 0.25.
Expand Down
9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
[workspace]
members = ["walkers", "demo", "demo_native", "demo_web", "demo_android/rust"]
members = [
"walkers",
"demo",
"demo_native",
"demo_web",
"demo_android/rust",
"hypermocker",
]
resolver = "2"

[workspace.package]
Expand Down
17 changes: 17 additions & 0 deletions hypermocker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to get rid of Mockito, and use this crate instead. Once it can handle all Walkers' use cases, I will create a new repo for it.

name = "hypermocker"
edition = "2021"
version = "0.1.0"
publish = false

[dependencies]
hyper = { version = "1.1.0", features = ["full"] }
tokio = { version = "1.28", features = ["macros"] }
hyper-util = { version = "0.1", features = ["full"] }
http-body-util = "0.1"
log = "0.4"

[dev-dependencies]
reqwest = "0.11"
futures = "0.3.28"
env_logger = "0.10"
Binary file added hypermocker/logo.jpeg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
189 changes: 189 additions & 0 deletions hypermocker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use http_body_util::Full;
use hyper::{server::conn::http1, Response};
use hyper_util::rt::TokioIo;
use std::{
collections::HashMap,
future::Future,
net::SocketAddr,
pin::Pin,
sync::{Arc, Mutex},
};
use tokio::net::TcpListener;
use tokio::sync::oneshot;

pub use hyper::body::Bytes;

struct Expectation {
payload_rx: oneshot::Receiver<Bytes>,
happened_tx: oneshot::Sender<()>,
}

#[derive(Default)]
struct State {
/// Anticipations made by [`Mock::anticipate`].
expectations: HashMap<String, Expectation>,

/// Requests that were unexpected.
unexpected: Vec<String>,
}

pub struct Server {
port: u16,
state: Arc<Mutex<State>>,
}

impl Server {
/// Create new [`Mock`], and bind it to a random port.
pub async fn bind() -> Server {
let state = Arc::new(Mutex::new(State::default()));

let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let listener = TcpListener::bind(addr).await.unwrap();
let port = listener.local_addr().unwrap().port();

let state_clone = state.clone();
tokio::spawn(async move {
loop {
let (stream, _) = listener.accept().await.unwrap();
let io = TokioIo::new(stream);

let state = state_clone.clone();
tokio::task::spawn(async move {
http1::Builder::new()
.serve_connection(io, Service { state })
.await
.unwrap();
});
}
});

Server { port, state }
}

/// Port, which this server listens on.
pub fn port(&self) -> u16 {
self.port
}

/// Anticipate a HTTP request, but do not respond to it yet.
pub async fn anticipate(&self, url: String) -> AnticipatedRequest {
log::info!("Anticipating '{}'.", url);
let (payload_tx, payload_rx) = oneshot::channel();
let (happened_tx, happened_rx) = oneshot::channel();
if self
.state
.lock()
.unwrap()
.expectations
.insert(
url.to_owned(),
Expectation {
payload_rx,
happened_tx,
},
)
.is_some()
{
panic!("already anticipating");
};
AnticipatedRequest {
url,
payload_tx,
happened_rx: Some(happened_rx),
}
}
}

impl Drop for Server {
fn drop(&mut self) {
if !self.state.lock().unwrap().unexpected.is_empty() {
panic!("there are unexpected requests");
}
}
}

/// HTTP request that was anticipated to arrive.
pub struct AnticipatedRequest {
url: String,
payload_tx: tokio::sync::oneshot::Sender<Bytes>,
happened_rx: Option<oneshot::Receiver<()>>,
}

impl AnticipatedRequest {
/// Respond to this request with the given body.
pub async fn respond(self, payload: Bytes) {
log::info!("Responding to '{}'.", self.url);
self.payload_tx.send(payload).unwrap();
}

/// Expect the request to come, but still do not respond to it yet.
pub async fn expect(&mut self) {
log::info!("Expecting '{}'.", self.url);
if let Some(happened_tx) = self.happened_rx.take() {
happened_tx.await.unwrap();
} else {
panic!("this request was already expected");
}
}
}

struct Service {
state: Arc<Mutex<State>>,
}

impl hyper::service::Service<hyper::Request<hyper::body::Incoming>> for Service {
type Response = Response<Full<Bytes>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn call(&self, request: hyper::Request<hyper::body::Incoming>) -> Self::Future {
log::info!("Incoming request '{}'.", request.uri());
let state = self.state.clone();
Box::pin(async move {
let expectation = state
.lock()
.unwrap()
.expectations
.remove(&request.uri().path().to_string());

if let Some(expectation) = expectation {
// [`AnticipatedRequest`] might be dropped by now, and there is no one to receive it,
// but that is OK.
let _ = expectation.happened_tx.send(());

match expectation.payload_rx.await {
Ok(payload) => {
log::debug!(
"Proper responding to '{}' with {} bytes.",
request.uri(),
payload.len()
);
Ok(Response::new(Full::new(payload)))
}
Err(_) => {
log::error!(
"AnticipatedRequest for '{}' was dropped before responding.",
request.uri()
);
// TODO: This panic will be ignored by hyper/tokio stack.
panic!(
"AnticipatedRequest for '{}' was dropped before responding.",
request.uri()
);
}
}
} else {
log::warn!("Unexpected '{}'.", request.uri());
state
.lock()
.unwrap()
.unexpected
.push(request.uri().to_string());
Ok(Response::builder()
.status(418)
.body(Full::new(Bytes::from_static(b"unexpected")))
.unwrap())
}
})
}
}
76 changes: 76 additions & 0 deletions hypermocker/tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use hyper::body::Bytes;
use hypermocker::Server;
use std::time::Duration;

#[tokio::test]
async fn anticipate_then_request() {
let _ = env_logger::try_init();

let mock = Server::bind().await;
let url = format!("http://localhost:{}/foo", mock.port());
let request = mock.anticipate("/foo".to_string()).await;

// Make sure that mock's internals kick in.
tokio::time::sleep(Duration::from_secs(1)).await;

futures::future::join(
async {
let response = reqwest::get(url).await.unwrap();
let bytes = response.bytes().await.unwrap();
assert_eq!(&bytes[..], b"hello");
},
async {
request.respond(Bytes::from_static(b"hello")).await;
},
)
.await;
}

#[tokio::test]
async fn anticipate_expect_then_request() {
let _ = env_logger::try_init();

let mock = Server::bind().await;
let url = format!("http://localhost:{}/foo", mock.port());
let mut request = mock.anticipate("/foo".to_string()).await;

// Make sure that mock's internals kick in.
tokio::time::sleep(Duration::from_secs(1)).await;

futures::future::join(
async {
let response = reqwest::get(url).await.unwrap();
let bytes = response.bytes().await.unwrap();
assert_eq!(&bytes[..], b"hello");
},
async {
request.expect().await;
request.respond(Bytes::from_static(b"hello")).await;
},
)
.await;
}

#[tokio::test]
#[should_panic(expected = "there are unexpected requests")]
async fn unanticipated_request() {
let _ = env_logger::try_init();

let mock = Server::bind().await;
let url = format!("http://localhost:{}/foo", mock.port());

let response = reqwest::get(url).await.unwrap();
let bytes = response.bytes().await.unwrap();
assert_eq!(&bytes[..], b"unexpected");
}

#[tokio::test]
#[should_panic(expected = "already anticipating")]
async fn can_not_anticipate_twice() {
let _ = env_logger::try_init();

let mock = Server::bind().await;

mock.anticipate("/foo".to_string()).await;
mock.anticipate("/foo".to_string()).await;
}
1 change: 1 addition & 0 deletions walkers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ eframe.workspace = true
env_logger = "0.10"
approx = "0.5"
mockito = "1.1"
hypermocker = { path = "../hypermocker" }
Loading
Loading