Skip to content

Commit

Permalink
Initial http POST support
Browse files Browse the repository at this point in the history
  • Loading branch information
fabricedesre committed Nov 26, 2022
1 parent 35955d6 commit 4ee55b4
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 7 deletions.
2 changes: 1 addition & 1 deletion iroh-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ serde_qs = "0.10.1"
sha2 = { version = "0.10", default-features = false }
time = "0.3.9"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "process", "fs", "io-util"] }
tokio-util = { version = "0.7", features = ["io"] }
tokio-util = { version = "0.7", features = ["compat", "io"] }
toml = "0.5.9"
tower = { version = "0.4", features = ["util", "timeout", "load-shed", "limit"] }
tower-http = { version = "0.3", features = ["trace", "compression-full", "cors"] }
Expand Down
9 changes: 9 additions & 0 deletions iroh-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub struct Config {
/// set of user provided headers to attach to all responses
#[serde(with = "http_serde::header_map")]
pub headers: HeaderMap,
/// flag to toggle the capability to POST and PUT content to the http endpoint.
pub writeable: bool,
}

impl Config {
Expand All @@ -62,6 +64,7 @@ impl Config {
indexer_endpoint: None,
metrics: MetricsConfig::default(),
use_denylist: false,
writeable: false,
}
}

Expand Down Expand Up @@ -132,6 +135,7 @@ impl Default for Config {
indexer_endpoint: None,
metrics: MetricsConfig::default(),
use_denylist: false,
writeable: false,
};
t.set_default_headers();
t
Expand Down Expand Up @@ -162,6 +166,7 @@ impl Source for Config {
if let Some(indexer_endpoint) = &self.indexer_endpoint {
insert_into_config_map(&mut map, "indexer_endpoint", indexer_endpoint.clone());
}
insert_into_config_map(&mut map, "writeable", self.writeable);
Ok(map)
}
}
Expand All @@ -182,6 +187,10 @@ impl crate::handlers::StateConfig for Config {
fn user_headers(&self) -> &HeaderMap<HeaderValue> {
&self.headers
}

fn writeable_gateway(&self) -> bool {
self.writeable
}
}

fn collect_headers(headers: &HeaderMap) -> Result<Map<String, Value>, ConfigError> {
Expand Down
52 changes: 51 additions & 1 deletion iroh-gateway/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::{
http::{header::*, Request as HttpRequest, StatusCode},
middleware,
response::IntoResponse,
routing::{get, head},
routing::{get, head, post},
BoxError, Router,
};
use futures::TryStreamExt;
Expand All @@ -28,6 +28,7 @@ use serde_qs;
use std::{
collections::HashMap,
fmt::Write,
io,
ops::Range,
sync::Arc,
time::{self, Duration},
Expand Down Expand Up @@ -56,6 +57,7 @@ pub trait StateConfig: std::fmt::Debug + Sync + Send {
fn public_url_base(&self) -> &str;
fn port(&self) -> u16;
fn user_headers(&self) -> &HeaderMap<HeaderValue>;
fn writeable_gateway(&self) -> bool;
}

pub fn get_app_routes<T: ContentLoader + std::marker::Unpin>(state: &Arc<State<T>>) -> Router {
Expand All @@ -70,6 +72,7 @@ pub fn get_app_routes<T: ContentLoader + std::marker::Unpin>(state: &Arc<State<T
.route("/icons.css", get(stylesheet_icons))
.route("/style.css", get(stylesheet_main))
.route("/info", get(info))
.route("/:scheme/", post(post_handler::<T>))
.layer(cors)
.layer(Extension(Arc::clone(state)))
.layer(
Expand Down Expand Up @@ -297,6 +300,53 @@ pub async fn head_handler<T: ContentLoader + std::marker::Unpin>(
}
}

#[tracing::instrument(skip(state))]
pub async fn post_handler<T: ContentLoader + std::marker::Unpin>(
Extension(state): Extension<Arc<State<T>>>,
// Path(params): Path<HashMap<String, String>>,
// Query(query_params): Query<GetParams>,
// method: http::Method,
http_req: HttpRequest<Body>,
// request_headers: HeaderMap,
) -> Result<GatewayResponse, GatewayError> {
// If this gateway is not writable, return a 400 error.
if !state.config.writeable_gateway() {
return Err(GatewayError::new(
StatusCode::BAD_REQUEST,
"Not a writable gateway",
));
}

// TODO: check path & headers

// Helper to convert a anyhow::Error into a http error response.
let into_gateway =
|err: anyhow::Error| GatewayError::new(StatusCode::INTERNAL_SERVER_ERROR, &err.to_string());

// Convert the http body into an AsyncRead
let futures_async_read = TryStreamExt::map_err(http_req.into_body(), |_err| {
io::Error::new(io::ErrorKind::Other, "Error!")
})
.into_async_read();
let reader = tokio_util::compat::FuturesAsyncReadCompatExt::compat(futures_async_read);

let cid = state
.client
.resolver
.loader
.store_file(reader)
.await
.map_err(into_gateway)?;
let location = format!("ipfs://{}", cid);

let mut headers = HeaderMap::new();
headers.insert(
"IPFS-Hash",
HeaderValue::from_str(&cid.to_string()).unwrap(),
);
Ok(GatewayResponse::created(&location, headers))
}

#[tracing::instrument()]
pub async fn health_check() -> String {
"OK".to_string()
Expand Down
6 changes: 6 additions & 0 deletions iroh-gateway/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ impl GatewayResponse {
HeaderMap::new(),
)
}

// TODO: better type for url.
pub fn created(url: &str, mut headers: HeaderMap) -> Self {
headers.insert(http::header::LOCATION, HeaderValue::from_str(url).unwrap());
Self::new(StatusCode::CREATED, BoxBody::default(), headers)
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions iroh-one/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,8 @@ impl iroh_gateway::handlers::StateConfig for Config {
fn user_headers(&self) -> &HeaderMap<HeaderValue> {
&self.gateway.headers
}

fn writeable_gateway(&self) -> bool {
self.gateway.writeable
}
}
2 changes: 1 addition & 1 deletion iroh-resolver/src/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl Chunker {
Chunker::FixedSize { chunk_size: size }
}

pub fn chunks<'a, R: AsyncRead + Unpin + 'a>(
pub fn chunks<'a, R: AsyncRead + Unpin + std::marker::Send + 'a>(
&self,
mut source: R,
) -> impl Stream<Item = io::Result<BytesMut>> + 'a {
Expand Down
45 changes: 45 additions & 0 deletions iroh-resolver/src/content_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ pub trait ContentLoader: Sync + Send + std::fmt::Debug + Clone + 'static {
async fn stop_session(&self, ctx: ContextId) -> Result<()>;
/// Checks if the given cid is present in the local storage.
async fn has_cid(&self, cid: &Cid) -> Result<bool>;
/// Store some content
async fn store_file<T: tokio::io::AsyncRead + 'static + std::marker::Send>(
&self,
_content: T,
) -> Result<cid::Cid, anyhow::Error> {
unimplemented!()
}
}

#[async_trait]
Expand All @@ -39,6 +46,13 @@ impl<T: ContentLoader> ContentLoader for Arc<T> {
async fn has_cid(&self, cid: &Cid) -> Result<bool> {
self.as_ref().has_cid(cid).await
}

async fn store_file<C: tokio::io::AsyncRead + 'static + std::marker::Send>(
&self,
content: C,
) -> Result<cid::Cid, anyhow::Error> {
self.as_ref().store_file(content).await
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -266,4 +280,35 @@ impl ContentLoader for FullLoader {
async fn has_cid(&self, cid: &Cid) -> Result<bool> {
self.client.try_store()?.has(*cid).await
}

async fn store_file<T: tokio::io::AsyncRead + 'static + std::marker::Send>(
&self,
content: T,
) -> Result<cid::Cid, anyhow::Error> {
use crate::unixfs_builder::FileBuilder;
use futures::StreamExt;

let store = self.client.try_store()?;

let mut file_builder = FileBuilder::new();
file_builder.content_reader(content).name("_http_upload_");
let file = file_builder.build().await?;

let mut cids: Vec<cid::Cid> = vec![];
let mut blocks = Box::pin(file.encode().await?);
while let Some(block) = blocks.next().await {
let (cid, bytes, links) = block.unwrap().into_parts();
cids.push(cid);
store.put(cid, bytes, links).await?;
}

let maybe_root_cid = cids.last();
if let Some(root_cid) = maybe_root_cid {
self.client.try_p2p()?.start_providing(&root_cid).await?;
}

maybe_root_cid
.cloned()
.ok_or_else(|| anyhow!("no root cid!"))
}
}
2 changes: 1 addition & 1 deletion iroh-resolver/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ pub enum Source {

#[derive(Debug, Clone)]
pub struct Resolver<T: ContentLoader> {
loader: T,
pub loader: T,
dns_resolver: Arc<DnsResolver>,
next_id: Arc<AtomicU64>,
_worker: Arc<JoinHandle<()>>,
Expand Down
15 changes: 12 additions & 3 deletions iroh-resolver/src/unixfs_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,12 @@ impl Directory {
}

enum Content {
Reader(Pin<Box<dyn AsyncRead>>),
Reader(Pin<Box<dyn AsyncRead + std::marker::Send>>),
Path(PathBuf),
}

unsafe impl Send for Content {}

impl Debug for Content {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -167,6 +169,8 @@ pub struct File {
chunker: Chunker,
}

unsafe impl Send for File {}

impl Debug for File {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("File")
Expand Down Expand Up @@ -269,7 +273,7 @@ impl Symlink {
pub struct FileBuilder {
name: Option<String>,
path: Option<PathBuf>,
reader: Option<Pin<Box<dyn AsyncRead>>>,
reader: Option<Pin<Box<dyn AsyncRead + std::marker::Send>>>,
chunk_size: Option<usize>,
degree: Option<usize>,
}
Expand All @@ -291,6 +295,8 @@ impl Debug for FileBuilder {
}
}

unsafe impl Send for FileBuilder {}

/// FileBuilder separates uses a reader or bytes to chunk the data into raw unixfs nodes
impl FileBuilder {
pub fn new() -> Self {
Expand Down Expand Up @@ -323,7 +329,10 @@ impl FileBuilder {
self
}

pub fn content_reader<T: tokio::io::AsyncRead + 'static>(&mut self, content: T) -> &mut Self {
pub fn content_reader<T: tokio::io::AsyncRead + std::marker::Send + 'static>(
&mut self,
content: T,
) -> &mut Self {
self.reader = Some(Box::pin(content));
self
}
Expand Down

0 comments on commit 4ee55b4

Please sign in to comment.