diff --git a/core/Cargo.toml b/core/Cargo.toml index aa7e475ecfe..90a02fef814 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -200,7 +200,7 @@ services-vercel-blob = [] services-webdav = [] services-webhdfs = [] services-yandex-disk = [] - +services-lakefs = [] [lib] bench = false diff --git a/core/src/services/lakefs/backend.rs b/core/src/services/lakefs/backend.rs new file mode 100644 index 00000000000..3197ccc0841 --- /dev/null +++ b/core/src/services/lakefs/backend.rs @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use bytes::Buf; +use chrono::{TimeZone, Utc}; +use http::Response; +use http::StatusCode; +use log::debug; + +use super::core::LakefsCore; +use super::core::LakefsStatus; +use super::error::parse_error; +use crate::raw::*; +use crate::services::LakefsConfig; +use crate::*; + +impl Configurator for LakefsConfig { + type Builder = LakefsBuilder; + fn into_builder(self) -> Self::Builder { + LakefsBuilder { config: self } + } +} + +/// [Lakefs](https://docs.lakefs.io/reference/api.html#/)'s API support. +#[doc = include_str!("docs.md")] +#[derive(Default, Clone)] +pub struct LakefsBuilder { + config: LakefsConfig, +} + +impl Debug for LakefsBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Builder"); + + ds.field("config", &self.config); + ds.finish() + } +} + +impl LakefsBuilder { + /// Set the endpoint of this backend. + /// + /// endpoint must be full uri. + /// + /// This is required. + /// - `http://127.0.0.1:8000` (lakefs daemon in local) + /// - `https://my-lakefs.example.com` (lakefs server) + pub fn endpoint(mut self, endpoint: &str) -> Self { + if !endpoint.is_empty() { + self.config.endpoint = Some(endpoint.to_string()); + } + self + } + + /// Set username of this backend. This is required. + pub fn username(mut self, username: &str) -> Self { + if !username.is_empty() { + self.config.username = Some(username.to_string()); + } + self + } + + /// Set password of this backend. This is required. + pub fn password(mut self, password: &str) -> Self { + if !password.is_empty() { + self.config.password = Some(password.to_string()); + } + self + } + + /// Set branch of this backend or a commit ID. Default is main. + /// + /// Branch can be a branch name. + /// + /// For example, branch can be: + /// - main + /// - 1d0c4eb + pub fn branch(mut self, branch: &str) -> Self { + if !branch.is_empty() { + self.config.branch = Some(branch.to_string()); + } + self + } + + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(mut self, root: &str) -> Self { + if !root.is_empty() { + self.config.root = Some(root.to_string()); + } + self + } + + /// Set the repository of this backend. + /// + /// This is required. + pub fn repository(mut self, repository: &str) -> Self { + if !repository.is_empty() { + self.config.repository = Some(repository.to_string()); + } + self + } +} + +impl Builder for LakefsBuilder { + const SCHEME: Scheme = Scheme::Lakefs; + type Config = LakefsConfig; + + /// Build a LakefsBackend. + fn build(self) -> Result { + debug!("backend build started: {:?}", &self); + + let endpoint = match self.config.endpoint { + Some(endpoint) => Ok(endpoint.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Lakefs)), + }?; + debug!("backend use endpoint: {:?}", &endpoint); + + let repository = match &self.config.repository { + Some(repository) => Ok(repository.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "repository is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Lakefs)), + }?; + debug!("backend use repository: {}", &repository); + + let branch = match &self.config.branch { + Some(branch) => branch.clone(), + None => "main".to_string(), + }; + debug!("backend use branch: {}", &branch); + + let root = normalize_root(&self.config.root.unwrap_or_default()); + debug!("backend use root: {}", &root); + + let username = match &self.config.username { + Some(username) => Ok(username.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "username is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Lakefs)), + }?; + + let password = match &self.config.password { + Some(password) => Ok(password.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Lakefs)), + }?; + + let client = HttpClient::new()?; + + Ok(LakefsBackend { + core: Arc::new(LakefsCore { + endpoint, + repository, + branch, + root, + username, + password, + client, + }), + }) + } +} + +/// Backend for Lakefs service +#[derive(Debug, Clone)] +pub struct LakefsBackend { + core: Arc, +} + +impl Access for LakefsBackend { + type Reader = HttpBody; + type Writer = (); + type Lister = (); + type BlockingReader = (); + type BlockingWriter = (); + type BlockingLister = (); + + fn info(&self) -> Arc { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::Lakefs) + .set_native_capability(Capability { + stat: true, + + read: true, + + ..Default::default() + }); + am.into() + } + + async fn stat(&self, path: &str, _: OpStat) -> Result { + // Stat root always returns a DIR. + if path == "/" { + return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); + } + + let resp = self.core.get_object_metadata(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let mut meta = parse_into_metadata(path, resp.headers())?; + let bs = resp.clone().into_body(); + + let decoded_response: LakefsStatus = + serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?; + + meta.set_content_length(decoded_response.size_bytes); + meta.set_mode(EntryMode::FILE); + if let Some(v) = parse_content_disposition(resp.headers())? { + meta.set_content_disposition(v); + } + + meta.set_last_modified(Utc.timestamp_opt(decoded_response.mtime, 0).unwrap()); + + Ok(RpStat::new(meta)) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self + .core + .get_object_content(path, args.range(), &args) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + Ok((RpRead::default(), resp.into_body())) + } + _ => { + let (part, mut body) = resp.into_parts(); + let buf = body.to_buffer().await?; + Err(parse_error(Response::from_parts(part, buf)).await?) + } + } + } +} diff --git a/core/src/services/lakefs/config.rs b/core/src/services/lakefs/config.rs new file mode 100644 index 00000000000..aa2826e93d0 --- /dev/null +++ b/core/src/services/lakefs/config.rs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Debug, Formatter}; + +use serde::{Deserialize, Serialize}; + +/// Configuration for Lakefs service support. +#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +#[non_exhaustive] +pub struct LakefsConfig { + /// Base url. + /// + /// This is required. + pub endpoint: Option, + /// Username for Lakefs basic authentication. + /// + /// This is required. + pub username: Option, + /// Password for Lakefs basic authentication. + /// + /// This is required. + pub password: Option, + /// Root of this backend. Can be "/path/to/dir". + /// + /// Default is "/". + pub root: Option, + + /// The repository name + /// + /// This is required. + pub repository: Option, + /// Name of the branch or a commit ID. Default is main. + /// + /// This is optional. + pub branch: Option, +} + +impl Debug for LakefsConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("LakefsConfig"); + + if let Some(endpoint) = &self.endpoint { + ds.field("endpoint", &endpoint); + } + if let Some(_username) = &self.username { + ds.field("username", &""); + } + if let Some(_password) = &self.password { + ds.field("password", &""); + } + if let Some(root) = &self.root { + ds.field("root", &root); + } + if let Some(repository) = &self.repository { + ds.field("repository", &repository); + } + if let Some(branch) = &self.branch { + ds.field("branch", &branch); + } + + ds.finish() + } +} diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs new file mode 100644 index 00000000000..c275bbc17fa --- /dev/null +++ b/core/src/services/lakefs/core.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; + +use http::header; +use http::Request; +use http::Response; +use serde::Deserialize; + +use crate::raw::*; +use crate::*; + +pub struct LakefsCore { + pub endpoint: String, + pub repository: String, + pub branch: String, + pub root: String, + pub username: String, + pub password: String, + pub client: HttpClient, +} + +impl Debug for LakefsCore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LakefsCore") + .field("endpoint", &self.endpoint) + .field("username", &self.username) + .field("password", &self.password) + .field("root", &self.root) + .field("repository", &self.repository) + .field("branch", &self.branch) + .finish_non_exhaustive() + } +} + +impl LakefsCore { + pub async fn get_object_metadata(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/api/v1/repositories/{}/refs/{}/objects/stat?path={}", + self.endpoint, + self.repository, + self.branch, + percent_encode_path(&p) + ); + + let mut req = Request::get(&url); + + let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?; + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn get_object_content( + &self, + path: &str, + range: BytesRange, + _args: &OpRead, + ) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/api/v1/repositories/{}/refs/{}/objects?path={}", + self.endpoint, + self.repository, + self.branch, + percent_encode_path(&p) + ); + + let mut req = Request::get(&url); + + let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?; + req = req.header(header::AUTHORIZATION, auth_header_content); + + if !range.is_full() { + req = req.header(header::RANGE, range.to_header()); + } + + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + + self.client.fetch(req).await + } +} + +#[derive(Deserialize, Eq, PartialEq, Debug)] +#[allow(dead_code)] +pub(super) struct LakefsStatus { + pub path: String, + pub path_type: String, + pub physical_address: String, + pub checksum: String, + pub size_bytes: u64, + pub mtime: i64, + pub content_type: String, +} diff --git a/core/src/services/lakefs/docs.md b/core/src/services/lakefs/docs.md new file mode 100644 index 00000000000..2f71184512d --- /dev/null +++ b/core/src/services/lakefs/docs.md @@ -0,0 +1,62 @@ +This service will visit the [Lakefs API](https://Lakefs.co/docs/Lakefs_hub/package_reference/hf_api) to access the Lakefs File System. +Currently, we only support the `model` and `dataset` types of repositories, and operations are limited to reading and listing/stating. + +Lakefs doesn't host official HTTP API docs. Detailed HTTP request API information can be found on the [`Lakefs_hub` Source Code](https://github.com/Lakefs/Lakefs_hub). + +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [ ] write +- [ ] create_dir +- [ ] delete +- [ ] copy +- [ ] rename +- [ ] list +- [ ] ~~presign~~ +- [ ] blocking + +## Configurations + +- `endpoint`: The endpoint of the Lakefs repository. +- `repository`: The id of the repository. +- `branch`: The branch of the repository. +- `root`: Set the work directory for backend. +- `username`: The username for accessing the repository. +- `password`: The password for accessing the repository. + +Refer to [`LakefsBuilder`]'s public API docs for more information. + +## Examples + +### Via Builder + +```rust,no_run +use opendal::Operator; +use opendal::services::Lakefs; +use anyhow::Result; + +#[tokio::main] +async fn main() -> Result<()> { + // Create Lakefs backend builder + let mut builder = Lakefs::default() + // set the type of Lakefs endpoint + .endpoint("https://whole-llama-mh6mux.us-east-1.lakefscloud.io") + // set the id of Lakefs repository + .repository("sample-repo") + // set the branch of Lakefs repository + .branch("main") + // set the username for accessing the repository + .username("xxx") + // set the password for accessing the repository + .password("xxx"); + + let op: Operator = Operator::new(builder)?.finish(); + + let stat = op.stat("README.md").await?; + println!("{:?}", stat); + Ok(()) +} +``` diff --git a/core/src/services/lakefs/error.rs b/core/src/services/lakefs/error.rs new file mode 100644 index 00000000000..38679926a89 --- /dev/null +++ b/core/src/services/lakefs/error.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; + +use bytes::Buf; +use http::Response; +use http::StatusCode; +use serde::Deserialize; + +use crate::raw::*; +use crate::*; + +/// LakefsError is the error returned by Lakefs File System. +#[derive(Default, Deserialize)] +struct LakefsError { + error: String, +} + +impl Debug for LakefsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut de = f.debug_struct("LakefsError"); + de.field("message", &self.error.replace('\n', " ")); + + de.finish() + } +} + +pub async fn parse_error(resp: Response) -> Result { + let (parts, mut body) = resp.into_parts(); + let bs = body.copy_to_bytes(body.remaining()); + + let (kind, retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), + StatusCode::PRECONDITION_FAILED => (ErrorKind::ConditionNotMatch, false), + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let message = match serde_json::from_slice::(&bs) { + Ok(hf_error) => format!("{:?}", hf_error.error), + Err(_) => String::from_utf8_lossy(&bs).into_owned(), + }; + + let mut err = Error::new(kind, message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::raw::new_json_deserialize_error; + use crate::types::Result; + + #[test] + fn test_parse_error() -> Result<()> { + let resp = r#" + { + "error": "Invalid username or password." + } + "#; + let decoded_response = serde_json::from_slice::(resp.as_bytes()) + .map_err(new_json_deserialize_error)?; + + assert_eq!(decoded_response.error, "Invalid username or password."); + + Ok(()) + } +} diff --git a/core/src/services/lakefs/mod.rs b/core/src/services/lakefs/mod.rs new file mode 100644 index 00000000000..5f567503992 --- /dev/null +++ b/core/src/services/lakefs/mod.rs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(feature = "services-lakefs")] +mod core; +#[cfg(feature = "services-lakefs")] +mod error; + +#[cfg(feature = "services-lakefs")] +mod backend; +#[cfg(feature = "services-lakefs")] +pub use backend::LakefsBuilder as Lakefs; + +mod config; +pub use config::LakefsConfig; diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 62ece326329..753838b3131 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -118,6 +118,9 @@ pub use ipmfs::*; mod koofr; pub use koofr::*; +mod lakefs; +pub use lakefs::*; + mod libsql; pub use libsql::*; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 94e0b87bb09..4691eda6b24 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -290,6 +290,8 @@ impl Operator { Scheme::Mongodb => Self::from_iter::(iter)?.finish(), #[cfg(feature = "services-hdfs-native")] Scheme::HdfsNative => Self::from_iter::(iter)?.finish(), + #[cfg(feature = "services-lakefs")] + Scheme::Lakefs => Self::from_iter::(iter)?.finish(), v => { return Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index a78fe5f6b22..4dd8acc69fb 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -163,6 +163,8 @@ pub enum Scheme { HdfsNative, /// [surrealdb](crate::services::Surrealdb): Surrealdb Services Surrealdb, + /// [lakefs](crate::services::Lakefs): LakeFS Services + Lakefs, /// Custom that allow users to implement services outside of OpenDAL. /// /// # NOTE @@ -311,6 +313,8 @@ impl Scheme { Scheme::HdfsNative, #[cfg(feature = "services-surrealdb")] Scheme::Surrealdb, + #[cfg(feature = "services-lakefs")] + Scheme::Lakefs, ]) } } @@ -401,6 +405,7 @@ impl FromStr for Scheme { "mongodb" => Ok(Scheme::Mongodb), "hdfs_native" => Ok(Scheme::HdfsNative), "surrealdb" => Ok(Scheme::Surrealdb), + "lakefs" => Ok(Scheme::Lakefs), _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))), } } @@ -474,6 +479,7 @@ impl From for &'static str { Scheme::Pcloud => "pcloud", Scheme::HdfsNative => "hdfs_native", Scheme::Surrealdb => "surrealdb", + Scheme::Lakefs => "lakefs", Scheme::Custom(v) => v, } }