Skip to content

Commit 940bf4c

Browse files
committed
feat: range request support
1 parent 7bcafc6 commit 940bf4c

File tree

8 files changed

+400
-82
lines changed

8 files changed

+400
-82
lines changed

iroh-gateway/src/client.rs

+31-5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::ops::Range;
12
use std::pin::Pin;
23
use std::task::Poll;
34

@@ -13,9 +14,10 @@ use iroh_metrics::{
1314
observe, record,
1415
};
1516
use iroh_resolver::resolver::{
16-
CidOrDomain, ContentLoader, Metadata, Out, OutMetrics, OutPrettyReader, Resolver, Source,
17+
CidOrDomain, ContentLoader, Metadata, Out, OutMetrics, OutPrettyReader, Resolver, ResponseClip,
18+
Source,
1719
};
18-
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWrite};
20+
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWrite};
1921
use tokio_util::io::ReaderStream;
2022
use tracing::{info, warn};
2123

@@ -43,6 +45,10 @@ impl<T: ContentLoader> PrettyStreamBody<T> {
4345
pub fn get_sample(&self) -> Vec<u8> {
4446
self.2.clone()
4547
}
48+
49+
pub fn get_size(&self) -> Option<u64> {
50+
self.1
51+
}
4652
}
4753

4854
impl<T: ContentLoader + std::marker::Unpin> http_body::Body for PrettyStreamBody<T> {
@@ -90,6 +96,7 @@ impl<T: ContentLoader + std::marker::Unpin> Client<T> {
9096
&self,
9197
path: iroh_resolver::resolver::Path,
9298
start_time: std::time::Instant,
99+
range: Option<Range<u64>>,
93100
) -> Result<(FileResult<T>, Metadata), String> {
94101
info!("get file {}", path);
95102
let res = self
@@ -104,8 +111,16 @@ impl<T: ContentLoader + std::marker::Unpin> Client<T> {
104111
let body = FileResult::Directory(res);
105112
Ok((body, metadata))
106113
} else {
114+
let mut clip = 0;
115+
if let Some(range) = &range {
116+
clip = range.end as usize;
117+
}
107118
let reader = res
108-
.pretty(self.resolver.clone(), OutMetrics { start: start_time })
119+
.pretty(
120+
self.resolver.clone(),
121+
OutMetrics { start: start_time },
122+
ResponseClip::from(clip),
123+
)
109124
.map_err(|e| e.to_string())?;
110125

111126
let mut buf_reader = tokio::io::BufReader::with_capacity(1024 * 1024, reader);
@@ -114,6 +129,14 @@ impl<T: ContentLoader + std::marker::Unpin> Client<T> {
114129
.await
115130
.map_err(|e| e.to_string())?
116131
.to_vec();
132+
133+
if let Some(range) = range {
134+
buf_reader
135+
.seek(tokio::io::SeekFrom::Start(range.start))
136+
.await
137+
.map_err(|e| e.to_string())?;
138+
}
139+
117140
let stream = ReaderStream::new(buf_reader);
118141

119142
let body = PrettyStreamBody(stream, metadata.size, body_sample);
@@ -160,8 +183,11 @@ impl<T: ContentLoader + std::marker::Unpin> Client<T> {
160183
Ok(res) => {
161184
let metadata = res.metadata().clone();
162185
record_ttfb_metrics(start_time, &metadata.source);
163-
let reader =
164-
res.pretty(self.resolver.clone(), OutMetrics { start: start_time });
186+
let reader = res.pretty(
187+
self.resolver.clone(),
188+
OutMetrics { start: start_time },
189+
ResponseClip::NoClip,
190+
);
165191
match reader {
166192
Ok(mut reader) => {
167193
let mut bytes = Vec::new();

iroh-gateway/src/handlers.rs

+50-11
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use axum::{
33
body::{self, Body, HttpBody},
44
error_handling::HandleErrorLayer,
55
extract::{Extension, Path, Query},
6-
http::{header::*, StatusCode},
6+
http::{header::*, Request as HttpRequest, StatusCode},
77
response::IntoResponse,
88
routing::get,
99
BoxError, Router,
@@ -27,6 +27,7 @@ use std::{
2727
collections::HashMap,
2828
error::Error,
2929
fmt::Write,
30+
ops::Range,
3031
sync::Arc,
3132
time::{self, Duration},
3233
};
@@ -117,6 +118,7 @@ pub async fn get_handler<T: ContentLoader + std::marker::Unpin>(
117118
Path(params): Path<HashMap<String, String>>,
118119
Query(query_params): Query<GetParams>,
119120
method: http::Method,
121+
http_req: HttpRequest<Body>,
120122
request_headers: HeaderMap,
121123
) -> Result<GatewayResponse, GatewayError> {
122124
inc!(GatewayMetrics::Requests);
@@ -155,6 +157,7 @@ pub async fn get_handler<T: ContentLoader + std::marker::Unpin>(
155157
.parse()
156158
.map_err(|e: anyhow::Error| e.to_string())
157159
.map_err(|e| error(StatusCode::BAD_REQUEST, &e, &state))?;
160+
// TODO: handle 404 or error
158161
let resolved_cid = resolved_path.root();
159162

160163
if handle_only_if_cached(&request_headers, &state, resolved_cid).await? {
@@ -216,9 +219,9 @@ pub async fn get_handler<T: ContentLoader + std::marker::Unpin>(
216219
serve_car_recursive(&req, state, headers, start_time).await
217220
} else {
218221
match req.format {
219-
ResponseFormat::Raw => serve_raw(&req, state, headers, start_time).await,
222+
ResponseFormat::Raw => serve_raw(&req, state, headers, &http_req, start_time).await,
220223
ResponseFormat::Car => serve_car(&req, state, headers, start_time).await,
221-
ResponseFormat::Fs(_) => serve_fs(&req, state, headers, start_time).await,
224+
ResponseFormat::Fs(_) => serve_fs(&req, state, headers, &http_req, start_time).await,
222225
}
223226
}
224227
}
@@ -382,12 +385,18 @@ async fn serve_raw<T: ContentLoader + std::marker::Unpin>(
382385
req: &Request,
383386
state: Arc<State<T>>,
384387
mut headers: HeaderMap,
388+
http_req: &HttpRequest<Body>,
385389
start_time: std::time::Instant,
386390
) -> Result<GatewayResponse, GatewayError> {
391+
let range: Option<Range<u64>> = if http_req.headers().contains_key(RANGE) {
392+
parse_range_header(http_req.headers().get(RANGE).unwrap())
393+
} else {
394+
None
395+
};
387396
// FIXME: we currently only retrieve full cids
388397
let (body, metadata) = state
389398
.client
390-
.get_file(req.resolved_path.clone(), start_time)
399+
.get_file(req.resolved_path.clone(), start_time, None)
391400
.await
392401
.map_err(|e| error(StatusCode::INTERNAL_SERVER_ERROR, &e, &state))?;
393402

@@ -401,8 +410,18 @@ async fn serve_raw<T: ContentLoader + std::marker::Unpin>(
401410
set_content_disposition_headers(&mut headers, &file_name, DISPOSITION_ATTACHMENT);
402411
set_etag_headers(&mut headers, get_etag(&req.cid, Some(req.format.clone())));
403412
add_cache_control_headers(&mut headers, metadata.clone());
404-
add_ipfs_roots_headers(&mut headers, metadata);
405-
response(StatusCode::OK, body, headers)
413+
add_ipfs_roots_headers(&mut headers, metadata.clone());
414+
415+
if let Some(mut capped_range) = range {
416+
if let Some(size) = metadata.size {
417+
capped_range.end = std::cmp::min(capped_range.end, size);
418+
}
419+
add_etag_range(&mut headers, capped_range.clone());
420+
add_content_range_headers(&mut headers, capped_range, metadata.size);
421+
response(StatusCode::PARTIAL_CONTENT, body, headers)
422+
} else {
423+
response(StatusCode::OK, body, headers)
424+
}
406425
}
407426
FileResult::Directory(_) => Err(error(
408427
StatusCode::INTERNAL_SERVER_ERROR,
@@ -423,7 +442,7 @@ async fn serve_car<T: ContentLoader + std::marker::Unpin>(
423442
// FIXME: we currently only retrieve full cids
424443
let (body, metadata) = state
425444
.client
426-
.get_file(req.resolved_path.clone(), start_time)
445+
.get_file(req.resolved_path.clone(), start_time, None)
427446
.await
428447
.map_err(|e| error(StatusCode::INTERNAL_SERVER_ERROR, &e, &state))?;
429448

@@ -487,12 +506,19 @@ async fn serve_fs<T: ContentLoader + std::marker::Unpin>(
487506
req: &Request,
488507
state: Arc<State<T>>,
489508
mut headers: HeaderMap,
509+
http_req: &HttpRequest<Body>,
490510
start_time: std::time::Instant,
491511
) -> Result<GatewayResponse, GatewayError> {
512+
let range: Option<Range<u64>> = if http_req.headers().contains_key(RANGE) {
513+
parse_range_header(http_req.headers().get(RANGE).unwrap())
514+
} else {
515+
None
516+
};
517+
492518
// FIXME: we currently only retrieve full cids
493519
let (body, metadata) = state
494520
.client
495-
.get_file(req.resolved_path.clone(), start_time)
521+
.get_file(req.resolved_path.clone(), start_time, range.clone())
496522
.await
497523
.map_err(|e| error(StatusCode::INTERNAL_SERVER_ERROR, &e, &state))?;
498524

@@ -506,7 +532,9 @@ async fn serve_fs<T: ContentLoader + std::marker::Unpin>(
506532
.try_collect()
507533
.await;
508534
match dir_list {
509-
Ok(dir_list) => serve_fs_dir(&dir_list, req, state, headers, start_time).await,
535+
Ok(dir_list) => {
536+
serve_fs_dir(&dir_list, req, state, headers, http_req, start_time).await
537+
}
510538
Err(e) => {
511539
tracing::warn!("failed to read dir: {:?}", e);
512540
Err(error(
@@ -539,7 +567,17 @@ async fn serve_fs<T: ContentLoader + std::marker::Unpin>(
539567
let body_sample = body.get_sample();
540568
add_content_type_headers(&mut headers, &name, body_sample.as_slice());
541569
}
542-
response(StatusCode::OK, body, headers)
570+
571+
if let Some(mut capped_range) = range {
572+
if let Some(size) = metadata.size {
573+
capped_range.end = std::cmp::min(capped_range.end, size);
574+
}
575+
add_etag_range(&mut headers, capped_range.clone());
576+
add_content_range_headers(&mut headers, capped_range, metadata.size);
577+
response(StatusCode::PARTIAL_CONTENT, body, headers)
578+
} else {
579+
response(StatusCode::OK, body, headers)
580+
}
543581
}
544582
None => Err(error(
545583
StatusCode::BAD_REQUEST,
@@ -557,6 +595,7 @@ async fn serve_fs_dir<T: ContentLoader + std::marker::Unpin>(
557595
req: &Request,
558596
state: Arc<State<T>>,
559597
mut headers: HeaderMap,
598+
http_req: &HttpRequest<Body>,
560599
start_time: std::time::Instant,
561600
) -> Result<GatewayResponse, GatewayError> {
562601
let force_dir = req.query_params.force_dir.unwrap_or(false);
@@ -577,7 +616,7 @@ async fn serve_fs_dir<T: ContentLoader + std::marker::Unpin>(
577616
}
578617
let mut new_req = req.clone();
579618
new_req.resolved_path.push("index.html");
580-
return serve_fs(&new_req, state, headers, start_time).await;
619+
return serve_fs(&new_req, state, headers, http_req, start_time).await;
581620
}
582621

583622
headers.insert(CONTENT_TYPE, HeaderValue::from_str("text/html").unwrap());

iroh-gateway/src/headers.rs

+39-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{constants::*, response::ResponseFormat};
22
use ::time::OffsetDateTime;
33
use axum::http::header::*;
44
use iroh_resolver::resolver::{CidOrDomain, Metadata, PathType};
5-
use std::{fmt::Write, time};
5+
use std::{fmt::Write, ops::Range, time};
66

77
#[tracing::instrument()]
88
pub fn add_user_headers(headers: &mut HeaderMap, user_headers: HeaderMap) {
@@ -74,12 +74,41 @@ pub fn add_content_disposition_headers(
7474

7575
#[tracing::instrument()]
7676
pub fn set_content_disposition_headers(headers: &mut HeaderMap, filename: &str, disposition: &str) {
77+
// TODO: handle non-ascii filenames https://github.com/ipfs/specs/blob/main/http-gateways/PATH_GATEWAY.md#content-disposition-response-header
7778
headers.insert(
7879
CONTENT_DISPOSITION,
7980
HeaderValue::from_str(&format!("{}; filename={}", disposition, filename)).unwrap(),
8081
);
8182
}
8283

84+
#[tracing::instrument()]
85+
pub fn add_content_range_headers(headers: &mut HeaderMap, range: Range<u64>, size: Option<u64>) {
86+
let content_range = if let Some(size) = size {
87+
format!("bytes {}-{}/{}", range.start, range.end - 1, size)
88+
} else {
89+
format!("bytes {}-{}/{}", range.start, range.end - 1, "*")
90+
};
91+
headers.insert(
92+
CONTENT_RANGE,
93+
HeaderValue::from_str(&content_range).unwrap(),
94+
);
95+
}
96+
97+
pub fn parse_range_header(range: &HeaderValue) -> Option<Range<u64>> {
98+
let range = range.to_str().ok()?;
99+
let mut parts = range.splitn(2, '=');
100+
if parts.next() != Some("bytes") {
101+
return None;
102+
}
103+
let mut range = parts.next()?.splitn(2, '-');
104+
let start = range.next()?.parse().ok()?;
105+
let end = range.next()?.parse().ok()?;
106+
if start >= end || end == 0 {
107+
return None;
108+
}
109+
Some(Range { start, end })
110+
}
111+
83112
#[tracing::instrument()]
84113
pub fn add_cache_control_headers(headers: &mut HeaderMap, metadata: Metadata) {
85114
if metadata.path.typ() == PathType::Ipns {
@@ -109,6 +138,15 @@ pub fn set_etag_headers(headers: &mut HeaderMap, etag: String) {
109138
headers.insert(ETAG, HeaderValue::from_str(&etag).unwrap());
110139
}
111140

141+
#[tracing::instrument()]
142+
pub fn add_etag_range(headers: &mut HeaderMap, range: Range<u64>) {
143+
if headers.contains_key(ETAG) {
144+
let etag = headers.get(ETAG).unwrap().to_str().unwrap();
145+
let etag = format!("{}.{}-{}", etag, range.start, range.end - 1);
146+
headers.insert(ETAG, HeaderValue::from_str(&etag).unwrap());
147+
}
148+
}
149+
112150
#[tracing::instrument()]
113151
pub fn get_etag(cid: &CidOrDomain, response_format: Option<ResponseFormat>) -> String {
114152
match cid {

0 commit comments

Comments
 (0)