diff --git a/Cargo.toml b/Cargo.toml index 701084200fd..0080644e321 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,6 +82,7 @@ quick-xml = { version = "0.23", features = ["serialize"] } radix_trie = { version = "0.2", optional = true } reqsign = "0.3" serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" thiserror = "1.0" time = "0.3" tokio = { version = "1.20", features = ["fs"] } diff --git a/src/operator.rs b/src/operator.rs index b5bf7b1ed06..bd6d1b13693 100644 --- a/src/operator.rs +++ b/src/operator.rs @@ -128,6 +128,7 @@ impl Operator { #[cfg(feature = "services-http")] Scheme::Http => services::http::Backend::from_iter(it)?.into(), Scheme::Memory => services::memory::Builder::default().build()?.into(), + Scheme::Gcs => services::gcs::Backend::from_iter(it)?.into(), Scheme::S3 => services::s3::Backend::from_iter(it)?.into(), }; diff --git a/src/scheme.rs b/src/scheme.rs index ac77ab7db90..856dcaeef81 100644 --- a/src/scheme.rs +++ b/src/scheme.rs @@ -22,7 +22,7 @@ use crate::error::other; use crate::error::BackendError; /// Backends that OpenDAL supports -#[derive(Copy, Clone, Debug, PartialEq)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum Scheme { /// [azblob][crate::services::azblob]: Azure Storage Blob services. Azblob, @@ -34,6 +34,8 @@ pub enum Scheme { /// [http][crate::services::http]: HTTP backend. #[cfg(feature = "services-http")] Http, + /// [gcs][crate::services::gcs]: Google Cloud Storage backend. + Gcs, /// [memory][crate::services::memory]: In memory backend support. Memory, /// [s3][crate::services::s3]: AWS S3 alike services. @@ -60,6 +62,7 @@ impl Display for Scheme { Scheme::Fs => write!(f, "fs"), #[cfg(feature = "services-hdfs")] Scheme::Hdfs => write!(f, "hdfs"), + Scheme::Gcs => write!(f, "gcs"), #[cfg(feature = "services-http")] Scheme::Http => write!(f, "http"), Scheme::Memory => write!(f, "memory"), @@ -80,6 +83,7 @@ impl FromStr for Scheme { "hdfs" => Ok(Scheme::Hdfs), #[cfg(feature = "services-http")] "http" | "https" => Ok(Scheme::Http), + "gcs" => Ok(Scheme::Gcs), "memory" => Ok(Scheme::Memory), "s3" => Ok(Scheme::S3), v => Err(other(BackendError::new( @@ -99,6 +103,7 @@ impl From for &'static str { Scheme::Hdfs => "hdfs", #[cfg(feature = "services-http")] Scheme::Http => "http", + Scheme::Gcs => "gcs", Scheme::Memory => "memory", Scheme::S3 => "s3", } diff --git a/src/services/gcs/backend.rs b/src/services/gcs/backend.rs new file mode 100644 index 00000000000..16bf761522b --- /dev/null +++ b/src/services/gcs/backend.rs @@ -0,0 +1,680 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt::{Debug, Formatter, Write}; +use std::io::Result; +use std::sync::Arc; + +use anyhow::anyhow; +use async_trait::async_trait; +use http::StatusCode; +use isahc::{AsyncBody, AsyncReadResponseExt}; +use log::{debug, error, info, warn}; +use reqsign::services::google::Signer; +use serde::Deserialize; +use serde_json::de; +use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; + +use crate::error::{other, BackendError, ObjectError}; +use crate::http_util::{ + new_http_channel, new_request_build_error, new_request_send_error, new_request_sign_error, + parse_error_response, percent_encode_path, HttpBodyWriter, HttpClient, +}; +use crate::ops::{BytesRange, OpCreate, OpDelete, OpList, OpRead, OpStat, OpWrite}; +use crate::services::gcs::dir_stream::DirStream; +use crate::services::gcs::error::parse_error; +use crate::AccessorMetadata; +use crate::{Accessor, BytesReader, BytesWriter, DirStreamer, ObjectMetadata, ObjectMode, Scheme}; + +const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com"; +const DEFAULT_GCS_AUTH: &str = "https://www.googleapis.com/auth/devstorage.read_write"; + +// TODO: Server side encryption support + +/// GCS storage backend builder +#[derive(Clone, Default)] +pub struct Builder { + /// root URI, all operations happens under `root` + root: Option, + /// bucket name + bucket: String, + /// endpoint URI of GCS service, + /// default is "https://storage.googleapis.com" + endpoint: Option, + + /// credential string for GCS service + credentials: Option, +} + +impl Builder { + /// set the working directory root of backend + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_string()) + } + + self + } + + /// set the container's name + pub fn bucket(&mut self, bucket: &str) -> &mut Self { + self.bucket = bucket.to_string(); + self + } + + /// set the endpoint GCS service uses + pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { + if !endpoint.is_empty() { + self.endpoint = Some(endpoint.to_string()) + }; + self + } + + /// set the credentials string used for OAuth2 + pub fn credentials(&mut self, credentials: &str) -> &mut Self { + if !credentials.is_empty() { + self.credentials = Some(credentials.to_string()) + }; + self + } + + /// Establish connection to GCS and finish making GCS backend + pub fn build(&mut self) -> Result { + info!("backend build started: {:?}", self); + + let root = match &self.root { + None => "/".to_string(), + Some(v) => { + // remove successive '/'s + let mut v = v + .split('/') + .filter(|s| !s.is_empty()) + .collect::>() + .join("/"); + // path should start with '/' + v.insert(0, '/'); + + // path should end with '/' + if !v.ends_with('/') { + v.push('/'); + } + v + } + }; + + info!("backend use root: {}", &root); + + // Handle endpoint and bucket name + let bucket = match self.bucket.is_empty() { + false => Ok(&self.bucket), + true => Err(other(BackendError::new( + HashMap::from([("bucket".to_string(), "".to_string())]), + anyhow!("bucket name is empty"), + ))), + }?; + + // setup error context + let mut ctx = HashMap::from([("bucket".to_string(), bucket.to_string())]); + + // TODO: server side encryption + + // build http client + let client = HttpClient::new(); + let endpoint = self + .endpoint + .clone() + .unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string()); + ctx.insert("endpoint".to_string(), endpoint.clone()); + + debug!("backend use endpoint: {endpoint}"); + + // build signer + let auth_url = DEFAULT_GCS_AUTH.to_string(); + let mut signer_builder = Signer::builder(); + signer_builder.scope(&auth_url); + if let Some(cred) = &self.credentials { + signer_builder.credential_from_content(cred); + } + let signer = signer_builder + .build() + .map_err(|e| other(BackendError::new(ctx, e)))?; + let signer = Arc::new(signer); + + let backend = Backend { + root, + endpoint, + bucket: bucket.clone(), + signer, + client, + }; + + Ok(backend) + } +} + +impl Debug for Builder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Builder"); + + ds.field("root", &self.root) + .field("bucket", &self.bucket) + .field("endpoint", &self.endpoint); + if self.credentials.is_some() { + ds.field("credentials", &""); + } + ds.finish() + } +} + +/// GCS storage backend +#[derive(Clone)] +pub struct Backend { + endpoint: String, + bucket: String, + // root should end with "/" + root: String, + + client: HttpClient, + signer: Arc, +} + +impl Debug for Backend { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut de = f.debug_struct("Backend"); + de.field("endpoint", &self.endpoint) + .field("bucket", &self.bucket) + .field("root", &self.root) + .field("client", &self.client) + .field("signer", &"") + .finish() + } +} + +impl Backend { + /// normalized paths, relative path -> absolute path + pub fn get_abs_path(&self, path: &str) -> String { + if path == "/" { + return self.root.trim_start_matches('/').to_string(); + } + + format!("{}{}", self.root, path) + .trim_start_matches('/') + .to_string() + } + + /// convert paths, absolute path -> relative path + pub fn get_rel_path(&self, path: &str) -> String { + let path = format!("/{}", path); + + match path.strip_prefix(&self.root) { + Some(p) => p.to_string(), + None => unreachable!( + "invalid path {} that not start with backend root {}", + &path, &self.root + ), + } + } + + pub(crate) fn from_iter(it: impl Iterator) -> Result { + let mut builder = Builder::default(); + for (k, v) in it { + let v = v.as_str(); + match k.as_ref() { + "root" => builder.root(v), + "bucket" => builder.bucket(v), + "endpoint" => builder.endpoint(v), + "credentials" => builder.credentials(v), + _ => continue, + }; + } + builder.build() + } +} + +impl Backend { + pub(crate) fn get_object_request( + &self, + path: &str, + offset: Option, + size: Option, + ) -> Result> { + let url = format!( + "{}/storage/v1/b/{}/o/{}?alt=media", + self.endpoint, + self.bucket, + percent_encode_path(path) + ); + + let mut req = isahc::Request::get(&url); + + if offset.is_some() || size.is_some() { + req = req.header( + http::header::RANGE, + BytesRange::new(offset, size).to_string(), + ); + } + + let req = req.body(isahc::AsyncBody::empty()).map_err(|e| { + error!("object {path} get_object: {url} {e:?}"); + new_request_build_error("read", path, e) + })?; + + Ok(req) + } + + pub(crate) async fn get_object( + &self, + path: &str, + offset: Option, + size: Option, + ) -> Result> { + let mut req = self.get_object_request(path, offset, size)?; + let url = req.uri().to_string(); + + self.signer.sign(&mut req).map_err(|e| { + error!("object {path} get_object: {url} {e:?}"); + new_request_sign_error("read", path, e) + })?; + + self.client.send_async(req).await.map_err(|e| { + error!("object {path} get_object: {url} {e:?}"); + new_request_send_error("read", path, e) + }) + } + + pub(crate) fn insert_object_request( + &self, + path: &str, + size: Option, + body: AsyncBody, + ) -> Result> { + let url = format!( + "{}/upload/storage/b/{}/o?name={}", + self.endpoint, + self.bucket, + percent_encode_path(path) + ); + + let mut req = isahc::Request::post(&url); + + // Set content length. + if let Some(size) = size { + req = req.header(http::header::CONTENT_LENGTH, size.to_string()); + } + + // Set body + let req = req.body(body).map_err(|e| { + error!("object {path} put_object: {url} {e:?}"); + new_request_build_error("write", path, e) + })?; + + Ok(req) + } + + pub(crate) async fn insert_object( + &self, + path: &str, + size: u64, + body: isahc::AsyncBody, + ) -> Result> { + let mut req = self.insert_object_request(path, Some(size), body)?; + let url = req.uri().to_string(); + + self.signer.sign(&mut req).map_err(|e| { + error!("object {path} insert_object: {url} {e:?}"); + new_request_sign_error("write", path, e) + })?; + + Ok(req) + } + + pub(crate) async fn get_object_metadata( + &self, + path: &str, + ) -> Result> { + let url = format!( + "{}/storage/v1/b/{}/o/{}", + self.endpoint, + self.bucket, + percent_encode_path(path) + ); + + let req = isahc::Request::get(&url); + + let mut req = req.body(AsyncBody::empty()).map_err(|e| { + error!("object {path} get_object_metadata: {url} {e:?}"); + new_request_build_error("stat", path, e) + })?; + + self.signer.sign(&mut req).map_err(|e| { + error!("object {path} get_object_metadata: {url} {e:?}"); + new_request_sign_error("stat", path, e) + })?; + + self.client.send_async(req).await.map_err(|e| { + error!("object {path} get_object_metadata: {url} {e:?}"); + new_request_send_error("stat", path, e) + }) + } + + pub(crate) async fn delete_object(&self, path: &str) -> Result> { + let url = format!( + "{}/storage/v1/b/{}/o/{}", + self.endpoint, + self.bucket, + percent_encode_path(path) + ); + + let mut req = isahc::Request::delete(&url) + .body(AsyncBody::empty()) + .map_err(|e| { + error!("object {path} delete_object: {url} {e:?}"); + new_request_build_error("delete", path, e) + })?; + + self.signer.sign(&mut req).map_err(|e| { + error!("object {path} delete_object: {url} {e:?}"); + new_request_sign_error("delete", path, e) + })?; + + self.client.send_async(req).await.map_err(|e| { + error!("object {path} delete_object: {url} {e:?}"); + new_request_send_error("delete", path, e) + }) + } + + pub(crate) async fn list_objects( + &self, + path: &str, + page_token: &str, + ) -> Result> { + let mut url = format!( + "{}/storage/v1/b/{}/o?delimiter=/&prefix={}", + self.endpoint, + self.bucket, + percent_encode_path(path) + ); + if !page_token.is_empty() { + // NOTE: + // + // GCS uses pageToken in request and nextPageToken in response + // + // Don't know how will those tokens be like so this part are copied + // directly from AWS S3 service. + write!(url, "&pageToken={}", percent_encode_path(page_token)) + .expect("write into string must succeed"); + } + + let mut req = isahc::Request::get(&url) + .body(AsyncBody::empty()) + .map_err(|e| { + error!("object {path} list_objects: {url} {e:?}"); + new_request_build_error("list", path, e) + })?; + + self.signer.sign(&mut req).map_err(|e| { + error!("object {path} list_objects: {url} {e:?}"); + new_request_sign_error("list", path, e) + })?; + + self.client.send_async(req).await.map_err(|e| { + error!("object {path} list_object: {url} {e:?}"); + new_request_send_error("list", path, e) + }) + } +} + +/// `RawMeta` is an intermediate type able to +/// deserialize directly from JSON data. +/// +/// In OpenDAL, `ObjectMetadata`'s `last_modified` field's type is `time::OffsetDateTime`, +/// which could only be represented as strings in JSON files. +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct RawMeta { + pub size: String, // string formatted unsigned long + pub etag: String, + pub updated: String, // rfc3339 styled datetime string + pub md5_hash: String, +} + +#[async_trait] +impl Accessor for Backend { + fn metadata(&self) -> AccessorMetadata { + let mut am = AccessorMetadata::default(); + am.set_scheme(Scheme::Gcs) + .set_root(&self.root) + .set_name(&self.bucket); + am + } + + async fn create(&self, args: &OpCreate) -> Result<()> { + let p = self.get_abs_path(args.path()); + let req = self + .insert_object(&p, 0, AsyncBody::from_bytes_static(b"")) + .await?; + let resp = self.client.send_async(req).await.map_err(|e| { + error!("object {} insert_object: {:?}", p, e); + new_request_send_error("create", args.path(), e) + })?; + + if resp.status().is_success() { + debug!("object {} create finished", args.path()); + Ok(()) + } else { + warn!( + "object {} create returned status code: {}", + args.path(), + resp.status() + ); + let er = parse_error_response(resp).await?; + let e = parse_error("create", args.path(), er); + Err(e) + } + } + async fn read(&self, args: &OpRead) -> Result { + let p = self.get_abs_path(args.path()); + debug!( + "object {} read start: offset {:?}, size {:?}", + &p, + args.offset(), + args.size() + ); + + let resp = self + .get_object(&p, args.offset(), args.size()) + .await + .map_err(|e| { + error!("object {} get_object: {:?}", p, e); + e + })?; + + if resp.status().is_success() { + debug!( + "object {} reader created: offset {:?}, size {:?}", + &p, + args.offset(), + args.size() + ); + + Ok(Box::new(resp.into_body())) + } else { + warn!( + "object {} read with status code: {}", + args.path(), + resp.status() + ); + let er = parse_error_response(resp).await?; + let e = parse_error("read", args.path(), er); + Err(e) + } + } + + async fn write(&self, args: &OpWrite) -> Result { + let p = self.get_abs_path(args.path()); + debug!("object {} write start: size {}", &p, args.size()); + + let (tx, body) = new_http_channel(args.size()); + + let req = self.insert_object(&p, args.size(), body).await?; + + let bs = HttpBodyWriter::new( + args, + tx, + self.client.send_async(req), + |c| (200..300).contains(&c.as_u16()), + parse_error, + ); + + Ok(Box::new(bs)) + } + + async fn stat(&self, args: &OpStat) -> Result { + let p = self.get_abs_path(args.path()); + debug!("object {} stat start", &p); + + // Stat root always returns a DIR. + if self.get_rel_path(&p).is_empty() { + let mut m = ObjectMetadata::default(); + m.set_mode(ObjectMode::DIR); + + debug!("backed root object stat finished"); + return Ok(m); + } + + let mut resp = self.get_object_metadata(&p).await?; + + if resp.status().is_success() { + let mut m = ObjectMetadata::default(); + // read http response body + let slc = resp.bytes().await.map_err(|e| { + error!("GCS backend failed to read response body: {:?}", e); + e + })?; + let meta: RawMeta = de::from_slice(&slc[..]).map_err(|e| { + error!( + "GCS backend failed to parse response body into JSON: {:?}", + e + ); + other(ObjectError::new("stat", &p, e)) + })?; + + m.set_etag(&meta.etag); + m.set_content_md5(&meta.md5_hash); + + let size = meta.size.parse::().map_err(|e| { + error!("GCS backend failed to parse size of object: {:?}", e); + other(ObjectError::new("stat", &p, e)) + })?; + m.set_content_length(size); + + let datetime = OffsetDateTime::parse(&meta.updated, &Rfc3339).map_err(|e| { + error!("GCS backend failed to parse datetime in stat: {:?}", e); + other(ObjectError::new("stat", &p, e)) + })?; + m.set_last_modified(datetime); + + if p.ends_with('/') { + m.set_mode(ObjectMode::DIR); + } else { + m.set_mode(ObjectMode::FILE); + }; + + debug!("object {} stat finished: {:?}", &p, m); + Ok(m) + } else if resp.status() == StatusCode::NOT_FOUND && p.ends_with('/') { + let mut m = ObjectMetadata::default(); + m.set_mode(ObjectMode::DIR); + + debug!("object {} stat finished", &p); + Ok(m) + } else { + let er = parse_error_response(resp).await?; + let e = parse_error("stat", args.path(), er); + Err(e) + } + } + + async fn delete(&self, args: &OpDelete) -> Result<()> { + let p = self.get_abs_path(args.path()); + debug!("object {} delete start", &p); + + let resp = self.delete_object(&p).await?; + + if resp.status().is_success() { + debug!("object {} delete finished", &p); + Ok(()) + } else { + let er = parse_error_response(resp).await?; + let err = parse_error("delete", args.path(), er); + Err(err) + } + } + + async fn list(&self, args: &OpList) -> Result { + let mut path = self.get_abs_path(args.path()); + // Make sure list path is endswith '/' + if !path.ends_with('/') && !path.is_empty() { + path.push('/') + } + debug!("object {} list start", &path); + + Ok(Box::new(DirStream::new(Arc::new(self.clone()), &path))) + } + + // inherits the default implementation of Accessor. +} + +#[cfg(test)] +mod backend_test { + use crate::services::gcs::backend::RawMeta; + use time::format_description::well_known::Rfc3339; + use time::OffsetDateTime; + + #[test] + fn raw_meta_test() { + let raw_meta = serde_json::de::from_str::(META_JSON); + assert!(raw_meta.is_ok()); + let meta = raw_meta.unwrap(); + assert_eq!(meta.size, "56535"); + assert_eq!(meta.updated, "2022-08-15T11:33:34.866Z"); + assert_eq!(meta.md5_hash, "fHcEH1vPwA6eTPqxuasXcg=="); + assert_eq!(meta.etag, "CKWasoTgyPkCEAE="); + assert!(OffsetDateTime::parse(&meta.updated, &Rfc3339).is_ok()); + } + + const META_JSON: &str = r#" + { + "kind": "storage#object", + "id": "example/1.png/1660563214863653", + "selfLink": "https://www.googleapis.com/storage/v1/b/example/o/1.png", + "mediaLink": "https://content-storage.googleapis.com/download/storage/v1/b/example/o/1.png?generation=1660563214863653&alt=media", + "name": "1.png", + "bucket": "example", + "generation": "1660563214863653", + "metageneration": "1", + "contentType": "image/png", + "storageClass": "STANDARD", + "size": "56535", + "md5Hash": "fHcEH1vPwA6eTPqxuasXcg==", + "crc32c": "j/un9g==", + "etag": "CKWasoTgyPkCEAE=", + "timeCreated": "2022-08-15T11:33:34.866Z", + "updated": "2022-08-15T11:33:34.866Z", + "timeStorageClassUpdated": "2022-08-15T11:33:34.866Z" +} + "#; +} diff --git a/src/services/gcs/dir_stream.rs b/src/services/gcs/dir_stream.rs new file mode 100644 index 00000000000..7e9ac77abc0 --- /dev/null +++ b/src/services/gcs/dir_stream.rs @@ -0,0 +1,339 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Result; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use anyhow::anyhow; +use bytes::Buf; +use futures::{future::BoxFuture, ready, Future, Stream}; +use isahc::AsyncReadResponseExt; +use serde::Deserialize; +use serde_json::de; + +use crate::error::{other, ObjectError}; +use crate::http_util::parse_error_response; +use crate::services::gcs::backend::Backend; +use crate::services::gcs::error::parse_error; +use crate::{DirEntry, ObjectMode}; + +enum State { + Standby, + Pending(BoxFuture<'static, Result>>), + Walking((Output, usize, usize)), +} + +/// DirStream takes over task of listing objects and +/// helps walking directory +pub struct DirStream { + backend: Arc, + path: String, + page_token: String, + + done: bool, + state: State, +} + +impl DirStream { + /// Generate a new directory walker + pub fn new(backend: Arc, path: &str) -> Self { + Self { + backend, + path: path.to_string(), + page_token: "".to_string(), // don't know if it works? + + done: false, + state: State::Standby, + } + } +} + +impl Stream for DirStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let backend = self.backend.clone(); + + match &mut self.state { + State::Standby => { + let path = self.path.clone(); + let token = self.page_token.clone(); + + let fut = async move { + let mut resp = backend.list_objects(&path, token.as_str()).await?; + + if !resp.status().is_success() { + log::error!("GCS failed to list objects, status code: {}", resp.status()); + let er = parse_error_response(resp).await?; + let err = parse_error("list", &path, er); + return Err(err); + } + let bytes = resp.bytes().await.map_err(|e| { + other(ObjectError::new( + "list", + &path, + anyhow!("read body: {:?}", e), + )) + })?; + + Ok(bytes) + }; + + self.state = State::Pending(Box::pin(fut)); + self.poll_next(cx) + } + State::Pending(fut) => { + let bytes = ready!(Pin::new(fut).poll(cx))?; + let output: Output = de::from_reader(bytes.reader()).map_err(|e| { + other(ObjectError::new( + "list", + &self.path, + anyhow!("deserialize list_bucket output: {:?}", e), + )) + })?; + + if let Some(token) = &output.next_page_token { + self.page_token = token.clone(); + } else { + self.done = true; + } + self.state = State::Walking((output, 0, 0)); + self.poll_next(cx) + } + State::Walking((output, common_prefixes_idx, objects_idx)) => { + let prefixes = &output.prefixes; + if *common_prefixes_idx < prefixes.len() { + let prefix = &prefixes[*common_prefixes_idx]; + *common_prefixes_idx += 1; + + let de = DirEntry::new( + backend.clone(), + ObjectMode::DIR, + &backend.get_rel_path(prefix), + ); + + log::debug!( + "object {} got entry, mode: {}, path: {}", + &self.path, + de.path(), + de.mode() + ); + return Poll::Ready(Some(Ok(de))); + } + let objects = &output.items; + while *objects_idx < objects.len() { + let object = &objects[*objects_idx]; + *objects_idx += 1; + + if object.name.ends_with('/') { + continue; + } + + let de = DirEntry::new( + backend.clone(), + ObjectMode::FILE, + &backend.get_rel_path(&object.name), + ); + + log::debug!( + "dir object {} got entry, mode: {}, path: {}", + &self.path, + de.mode(), + de.path() + ); + return Poll::Ready(Some(Ok(de))); + } + + // end of asynchronous iteration + if self.done { + log::debug!("object {} list done", &self.path); + return Poll::Ready(None); + } + + self.state = State::Standby; + self.poll_next(cx) + } + } + } +} + +/// Response JSON from GCS list objects API. +/// +/// refer to https://cloud.google.com/storage/docs/json_api/v1/objects/list for details +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "camelCase")] +struct Output { + /// The kind of item this is. + /// For lists of objects, this is always "storage#objects" + kind: String, + /// The continuation token. + /// + /// If this is the last page of results, then no continuation token is returned. + next_page_token: Option, + /// Object name prefixes for objects that matched the listing request + /// but were excluded from [items] because of a delimiter. + prefixes: Vec, + /// The list of objects, ordered lexicographically by name. + items: Vec, +} + +#[derive(Default, Debug, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +struct OutputContent { + name: String, + size: String, +} + +#[cfg(test)] +mod ds_test { + use crate::services::gcs::dir_stream::Output; + + #[test] + fn test_de_list() { + let names = vec!["1.png", "2.png"]; + let sizes = vec![56535, 45506]; + + let list_continuous = serde_json::de::from_str::(LIST_CONTINUOUS); + assert!(list_continuous.is_ok()); + let list_continuous = list_continuous.unwrap(); + assert_eq!( + list_continuous.next_page_token, + Some("CgYxMC5wbmc=".to_string()) + ); + assert_eq!(list_continuous.items.len(), 2); + for (idx, item) in list_continuous.items.iter().enumerate() { + assert_eq!(item.name, names[idx]); + let size_res = item.size.as_str().parse(); + assert!(size_res.is_ok()); + let size: u64 = size_res.unwrap(); + assert_eq!(size, sizes[idx]); + } + + let list_discrete = serde_json::de::from_str::(LIST_DISCRETE); + assert!(list_discrete.is_ok()); + let list_discrete = list_discrete.unwrap(); + assert!(list_discrete.next_page_token.is_none()); + assert_eq!(list_discrete.items.len(), 2); + + for (idx, item) in list_discrete.items.iter().enumerate() { + assert_eq!(item.name, names[idx]); + let size_res = item.size.as_str().parse(); + assert!(size_res.is_ok()); + let size: u64 = size_res.unwrap(); + assert_eq!(size, sizes[idx]); + } + } + + const LIST_DISCRETE: &str = r#" + { + "kind": "storage#objects", + "prefixes": [ + "dir/", + "test/" + ], + "items": [ + { + "kind": "storage#object", + "id": "example/1.png/1660563214863653", + "selfLink": "https://www.googleapis.com/storage/v1/b/example/o/1.png", + "mediaLink": "https://content-storage.googleapis.com/download/storage/v1/b/example/o/1.png?generation=1660563214863653&alt=media", + "name": "1.png", + "bucket": "example", + "generation": "1660563214863653", + "metageneration": "1", + "contentType": "image/png", + "storageClass": "STANDARD", + "size": "56535", + "md5Hash": "fHcEH1vPwA6eTPqxuasXcg==", + "crc32c": "j/un9g==", + "etag": "CKWasoTgyPkCEAE=", + "timeCreated": "2022-08-15T11:33:34.866Z", + "updated": "2022-08-15T11:33:34.866Z", + "timeStorageClassUpdated": "2022-08-15T11:33:34.866Z" + }, + { + "kind": "storage#object", + "id": "example/2.png/1660563214883337", + "selfLink": "https://www.googleapis.com/storage/v1/b/example/o/2.png", + "mediaLink": "https://content-storage.googleapis.com/download/storage/v1/b/example/o/2.png?generation=1660563214883337&alt=media", + "name": "2.png", + "bucket": "example", + "generation": "1660563214883337", + "metageneration": "1", + "contentType": "image/png", + "storageClass": "STANDARD", + "size": "45506", + "md5Hash": "e6LsGusU7pFJZk+114NV1g==", + "crc32c": "L00QAg==", + "etag": "CIm0s4TgyPkCEAE=", + "timeCreated": "2022-08-15T11:33:34.886Z", + "updated": "2022-08-15T11:33:34.886Z", + "timeStorageClassUpdated": "2022-08-15T11:33:34.886Z" + } + ] +} + "#; + const LIST_CONTINUOUS: &str = r#" + { + "kind": "storage#objects", + "prefixes": [ + "dir/", + "test/" + ], + "nextPageToken": "CgYxMC5wbmc=", + "items": [ + { + "kind": "storage#object", + "id": "example/1.png/1660563214863653", + "selfLink": "https://www.googleapis.com/storage/v1/b/example/o/1.png", + "mediaLink": "https://content-storage.googleapis.com/download/storage/v1/b/example/o/1.png?generation=1660563214863653&alt=media", + "name": "1.png", + "bucket": "example", + "generation": "1660563214863653", + "metageneration": "1", + "contentType": "image/png", + "storageClass": "STANDARD", + "size": "56535", + "md5Hash": "fHcEH1vPwA6eTPqxuasXcg==", + "crc32c": "j/un9g==", + "etag": "CKWasoTgyPkCEAE=", + "timeCreated": "2022-08-15T11:33:34.866Z", + "updated": "2022-08-15T11:33:34.866Z", + "timeStorageClassUpdated": "2022-08-15T11:33:34.866Z" + }, + { + "kind": "storage#object", + "id": "example/2.png/1660563214883337", + "selfLink": "https://www.googleapis.com/storage/v1/b/example/o/2.png", + "mediaLink": "https://content-storage.googleapis.com/download/storage/v1/b/example/o/2.png?generation=1660563214883337&alt=media", + "name": "2.png", + "bucket": "example", + "generation": "1660563214883337", + "metageneration": "1", + "contentType": "image/png", + "storageClass": "STANDARD", + "size": "45506", + "md5Hash": "e6LsGusU7pFJZk+114NV1g==", + "crc32c": "L00QAg==", + "etag": "CIm0s4TgyPkCEAE=", + "timeCreated": "2022-08-15T11:33:34.886Z", + "updated": "2022-08-15T11:33:34.886Z", + "timeStorageClassUpdated": "2022-08-15T11:33:34.886Z" + } + ] +} + "#; +} diff --git a/src/services/gcs/error.rs b/src/services/gcs/error.rs new file mode 100644 index 00000000000..2e5aec9dc2d --- /dev/null +++ b/src/services/gcs/error.rs @@ -0,0 +1,41 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Error; +use std::io::ErrorKind; + +use anyhow::anyhow; +use http::StatusCode; + +use crate::error::ObjectError; +use crate::http_util::ErrorResponse; + +/// Parse error response into io::Error. +/// +/// # TODO +/// +/// Make our own error type :) +pub fn parse_error(op: &'static str, path: &str, er: ErrorResponse) -> Error { + let kind = match er.status_code() { + StatusCode::NOT_FOUND => ErrorKind::NotFound, + StatusCode::FORBIDDEN => ErrorKind::PermissionDenied, + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => ErrorKind::Interrupted, + _ => ErrorKind::Other, + }; + + Error::new(kind, ObjectError::new(op, path, anyhow!("{er}"))) +} diff --git a/src/services/gcs/mod.rs b/src/services/gcs/mod.rs new file mode 100644 index 00000000000..fb697f8a369 --- /dev/null +++ b/src/services/gcs/mod.rs @@ -0,0 +1,28 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Google Cloud Storage support for OpenDAL. +//! +//! # Configuration +//! +//! - `root`: Set the work directory for backend +//! - `bucket`: Set the container name for backend +//! - `endpoint`: Customizable endpoint setting +//! - `credentials`: Credential string for GCS OAuth2 +mod backend; +pub use backend::Backend; +pub use backend::Builder; + +mod dir_stream; +mod error; diff --git a/src/services/mod.rs b/src/services/mod.rs index 70fbc8ea37f..2e581bccb76 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -21,6 +21,7 @@ pub mod azblob; pub mod fs; +pub mod gcs; #[cfg(feature = "services-hdfs")] pub mod hdfs; #[cfg(feature = "services-http")]