diff --git a/iroh-gateway/Cargo.toml b/iroh-gateway/Cargo.toml index 1457f85492..863b7b1fad 100644 --- a/iroh-gateway/Cargo.toml +++ b/iroh-gateway/Cargo.toml @@ -49,7 +49,7 @@ serde_qs = "0.10.1" sha2 = { version = "0.10", default-features = false } time = "0.3.9" tokio = { version = "1", features = ["macros", "rt-multi-thread", "process", "fs", "io-util"] } -tokio-util = { version = "0.7", features = ["io"] } +tokio-util = { version = "0.7", features = ["compat", "io"] } toml = "0.5.9" tower = { version = "0.4", features = ["util", "timeout", "load-shed", "limit"] } tower-http = { version = "0.3", features = ["trace", "compression-full", "cors"] } diff --git a/iroh-gateway/src/config.rs b/iroh-gateway/src/config.rs index 1f7ccafd56..4d2da96acf 100644 --- a/iroh-gateway/src/config.rs +++ b/iroh-gateway/src/config.rs @@ -48,6 +48,8 @@ pub struct Config { /// set of user provided headers to attach to all responses #[serde(with = "http_serde::header_map")] pub headers: HeaderMap, + /// flag to toggle the capability to POST and PUT content to the http endpoint. + pub writeable: bool, } impl Config { @@ -62,6 +64,7 @@ impl Config { indexer_endpoint: None, metrics: MetricsConfig::default(), use_denylist: false, + writeable: false, } } @@ -132,6 +135,7 @@ impl Default for Config { indexer_endpoint: None, metrics: MetricsConfig::default(), use_denylist: false, + writeable: false, }; t.set_default_headers(); t @@ -162,6 +166,7 @@ impl Source for Config { if let Some(indexer_endpoint) = &self.indexer_endpoint { insert_into_config_map(&mut map, "indexer_endpoint", indexer_endpoint.clone()); } + insert_into_config_map(&mut map, "writeable", self.writeable); Ok(map) } } @@ -182,6 +187,10 @@ impl crate::handlers::StateConfig for Config { fn user_headers(&self) -> &HeaderMap { &self.headers } + + fn writeable_gateway(&self) -> bool { + self.writeable + } } fn collect_headers(headers: &HeaderMap) -> Result, ConfigError> { diff --git a/iroh-gateway/src/handlers.rs b/iroh-gateway/src/handlers.rs index f6ee850084..ddd2eec635 100644 --- a/iroh-gateway/src/handlers.rs +++ b/iroh-gateway/src/handlers.rs @@ -6,7 +6,7 @@ use axum::{ http::{header::*, Request as HttpRequest, StatusCode}, middleware, response::IntoResponse, - routing::{get, head}, + routing::{get, head, post}, BoxError, Router, }; use futures::TryStreamExt; @@ -28,6 +28,7 @@ use serde_qs; use std::{ collections::HashMap, fmt::Write, + io, ops::Range, sync::Arc, time::{self, Duration}, @@ -56,6 +57,7 @@ pub trait StateConfig: std::fmt::Debug + Sync + Send { fn public_url_base(&self) -> &str; fn port(&self) -> u16; fn user_headers(&self) -> &HeaderMap; + fn writeable_gateway(&self) -> bool; } pub fn get_app_routes(state: &Arc>) -> Router { @@ -70,6 +72,7 @@ pub fn get_app_routes(state: &Arc)) .layer(cors) .layer(Extension(Arc::clone(state))) .layer( @@ -297,6 +300,53 @@ pub async fn head_handler( } } +#[tracing::instrument(skip(state))] +pub async fn post_handler( + Extension(state): Extension>>, + // Path(params): Path>, + // Query(query_params): Query, + // method: http::Method, + http_req: HttpRequest, + // request_headers: HeaderMap, +) -> Result { + // If this gateway is not writable, return a 400 error. + if !state.config.writeable_gateway() { + return Err(GatewayError::new( + StatusCode::BAD_REQUEST, + "Not a writable gateway", + )); + } + + // TODO: check path & headers + + // Helper to convert a anyhow::Error into a http error response. + let into_gateway = + |err: anyhow::Error| GatewayError::new(StatusCode::INTERNAL_SERVER_ERROR, &err.to_string()); + + // Convert the http body into an AsyncRead + let futures_async_read = TryStreamExt::map_err(http_req.into_body(), |_err| { + io::Error::new(io::ErrorKind::Other, "Error!") + }) + .into_async_read(); + let reader = tokio_util::compat::FuturesAsyncReadCompatExt::compat(futures_async_read); + + let cid = state + .client + .resolver + .loader + .store_file(reader) + .await + .map_err(into_gateway)?; + let location = format!("ipfs://{}", cid); + + let mut headers = HeaderMap::new(); + headers.insert( + "IPFS-Hash", + HeaderValue::from_str(&cid.to_string()).unwrap(), + ); + Ok(GatewayResponse::created(&location, headers)) +} + #[tracing::instrument()] pub async fn health_check() -> String { "OK".to_string() diff --git a/iroh-gateway/src/response.rs b/iroh-gateway/src/response.rs index 2c07263895..5b4622f210 100644 --- a/iroh-gateway/src/response.rs +++ b/iroh-gateway/src/response.rs @@ -176,6 +176,12 @@ impl GatewayResponse { HeaderMap::new(), ) } + + // TODO: better type for url. + pub fn created(url: &str, mut headers: HeaderMap) -> Self { + headers.insert(http::header::LOCATION, HeaderValue::from_str(url).unwrap()); + Self::new(StatusCode::CREATED, BoxBody::default(), headers) + } } #[cfg(test)] diff --git a/iroh-one/src/config.rs b/iroh-one/src/config.rs index f28a9fc9fe..fe7e21ac21 100644 --- a/iroh-one/src/config.rs +++ b/iroh-one/src/config.rs @@ -173,4 +173,8 @@ impl iroh_gateway::handlers::StateConfig for Config { fn user_headers(&self) -> &HeaderMap { &self.gateway.headers } + + fn writeable_gateway(&self) -> bool { + self.gateway.writeable + } } diff --git a/iroh-resolver/src/chunker.rs b/iroh-resolver/src/chunker.rs index 011f9aa87b..63542781da 100644 --- a/iroh-resolver/src/chunker.rs +++ b/iroh-resolver/src/chunker.rs @@ -35,6 +35,8 @@ pub enum ChunkerStream<'a> { Rabin(LocalBoxStream<'a, io::Result>), } +unsafe impl<'a> Send for ChunkerStream<'a> {} + impl<'a> Stream for ChunkerStream<'a> { type Item = io::Result; @@ -57,7 +59,10 @@ impl<'a> Stream for ChunkerStream<'a> { } impl Chunker { - pub fn chunks<'a, R: AsyncRead + Unpin + 'a>(self, source: R) -> ChunkerStream<'a> { + pub fn chunks<'a, R: AsyncRead + Unpin + std::marker::Send + 'a>( + self, + source: R, + ) -> ChunkerStream<'a> { match self { Self::Fixed(chunker) => ChunkerStream::Fixed(chunker.chunks(source)), Self::Rabin(chunker) => ChunkerStream::Rabin(chunker.chunks(source)), diff --git a/iroh-resolver/src/content_loader.rs b/iroh-resolver/src/content_loader.rs index 17b245a953..01824b919c 100644 --- a/iroh-resolver/src/content_loader.rs +++ b/iroh-resolver/src/content_loader.rs @@ -24,6 +24,13 @@ pub trait ContentLoader: Sync + Send + std::fmt::Debug + Clone + 'static { async fn stop_session(&self, ctx: ContextId) -> Result<()>; /// Checks if the given cid is present in the local storage. async fn has_cid(&self, cid: &Cid) -> Result; + /// Store some content + async fn store_file( + &self, + _content: T, + ) -> Result { + unimplemented!() + } } #[async_trait] @@ -39,6 +46,13 @@ impl ContentLoader for Arc { async fn has_cid(&self, cid: &Cid) -> Result { self.as_ref().has_cid(cid).await } + + async fn store_file( + &self, + content: C, + ) -> Result { + self.as_ref().store_file(content).await + } } #[derive(Debug, Clone)] @@ -266,4 +280,35 @@ impl ContentLoader for FullLoader { async fn has_cid(&self, cid: &Cid) -> Result { self.client.try_store()?.has(*cid).await } + + async fn store_file( + &self, + content: T, + ) -> Result { + use crate::unixfs_builder::FileBuilder; + use futures::StreamExt; + + let store = self.client.try_store()?; + + let file_builder = FileBuilder::new() + .content_reader(content) + .name("_http_upload_"); + let file = file_builder.build().await?; + + let mut cids: Vec = vec![]; + let mut blocks = Box::pin(file.encode().await?); + while let Some(block) = blocks.next().await { + let (cid, bytes, links) = block.unwrap().into_parts(); + cids.push(cid); + store.put(cid, bytes, links).await?; + } + + match cids.last() { + Some(root_cid) => { + self.client.try_p2p()?.start_providing(&root_cid).await?; + Ok(*root_cid) + } + None => Err(anyhow!("no root cid!")), + } + } } diff --git a/iroh-resolver/src/resolver.rs b/iroh-resolver/src/resolver.rs index bbbfc2757b..79cf48e67e 100644 --- a/iroh-resolver/src/resolver.rs +++ b/iroh-resolver/src/resolver.rs @@ -659,7 +659,7 @@ pub enum Source { #[derive(Debug, Clone)] pub struct Resolver { - loader: T, + pub loader: T, dns_resolver: Arc, next_id: Arc, _worker: Arc>, diff --git a/iroh-resolver/src/unixfs_builder.rs b/iroh-resolver/src/unixfs_builder.rs index 28d8457a07..6321ce53f4 100644 --- a/iroh-resolver/src/unixfs_builder.rs +++ b/iroh-resolver/src/unixfs_builder.rs @@ -136,10 +136,12 @@ impl Directory { } enum Content { - Reader(Pin>), + Reader(Pin>), Path(PathBuf), } +unsafe impl Send for Content {} + impl Debug for Content { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -168,6 +170,8 @@ pub struct File { chunker: Chunker, } +unsafe impl Send for File {} + impl Debug for File { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("File") @@ -269,7 +273,7 @@ impl Symlink { pub struct FileBuilder { name: Option, path: Option, - reader: Option>>, + reader: Option>>, chunker: Chunker, degree: usize, } @@ -303,6 +307,8 @@ impl Debug for FileBuilder { } } +unsafe impl Send for FileBuilder {} + /// FileBuilder separates uses a reader or bytes to chunk the data into raw unixfs nodes impl FileBuilder { pub fn new() -> Self { @@ -347,7 +353,10 @@ impl FileBuilder { self } - pub fn content_reader(mut self, content: T) -> Self { + pub fn content_reader( + mut self, + content: T, + ) -> Self { self.reader = Some(Box::pin(content)); self }