Skip to content

Commit

Permalink
Merge pull request #4081 from Xuanwo/dal2
Browse files Browse the repository at this point in the history
query: Replace dal with dal2, let's rock!
  • Loading branch information
mergify[bot] authored Feb 11, 2022
2 parents c04dca4 + 1196718 commit 85d443b
Show file tree
Hide file tree
Showing 44 changed files with 462 additions and 215 deletions.
21 changes: 11 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/dal-context/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "dal_context"
name = "common-dal-context"
version = "0.1.0"
edition = "2021"

Expand Down
31 changes: 22 additions & 9 deletions common/dal-context/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_dal2::ops::OpDelete;
use common_dal2::ops::OpRead;
use common_dal2::ops::OpStat;
use common_dal2::ops::OpWrite;
use common_dal2::readers::CallbackReader;
use common_dal2::Accessor;
use common_dal2::Layer;
use common_dal2::Object;
Expand All @@ -27,16 +28,16 @@ use common_infallible::RwLock;

use crate::metrics::DalMetrics;

#[derive(Clone)]
#[derive(Clone, Default)]
pub struct DalContext {
inner: Arc<dyn Accessor>,
inner: Option<Arc<dyn Accessor>>,
metrics: Arc<RwLock<DalMetrics>>,
}

impl DalContext {
pub fn new(inner: Arc<dyn Accessor>) -> Self {
DalContext {
inner,
inner: Some(inner),
metrics: Arc::new(Default::default()),
}
}
Expand Down Expand Up @@ -110,26 +111,38 @@ impl DalContext {

impl Layer for DalContext {
fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor> {
Arc::new(DalContext::new(inner))
Arc::new(DalContext {
inner: Some(inner),
metrics: self.metrics.clone(),
})
}
}

#[async_trait]
impl Accessor for DalContext {
async fn read(&self, args: &OpRead) -> DalResult<Reader> {
// TODO(xuanwo): Implement context callback reader to collect metrics.
self.inner.read(args).await
let metrics = self.metrics.clone();

// TODO(xuanwo): Maybe it's better to move into metrics.
self.inner.as_ref().unwrap().read(args).await.map(|reader| {
let r = CallbackReader::new(reader, move |n| {
let mut metrics = metrics.write();
metrics.read_bytes += n;
});

Box::new(r) as Reader
})
}
async fn write(&self, r: Reader, args: &OpWrite) -> DalResult<usize> {
self.inner.write(r, args).await.map(|n| {
self.inner.as_ref().unwrap().write(r, args).await.map(|n| {
self.inc_write_bytes(n);
n
})
}
async fn stat(&self, args: &OpStat) -> DalResult<Object> {
self.inner.stat(args).await
self.inner.as_ref().unwrap().stat(args).await
}
async fn delete(&self, args: &OpDelete) -> DalResult<()> {
self.inner.delete(args).await
self.inner.as_ref().unwrap().delete(args).await
}
}
1 change: 1 addition & 0 deletions common/dal-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod metrics;
pub use metrics::DalMetrics;

mod context;
pub use context::DalContext;
2 changes: 1 addition & 1 deletion common/dal2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ bytes = "1.1.0"
futures = { version = "0.3.21", features = ["alloc"] }
tokio = { version = "1.16.1", features = ["full"] }
thiserror = "1.0.30"
aws-config = "0.6.0"
aws-types = { version = "0.6.0", features = ["hardcoded-credentials"] }
aws-sdk-s3 = "0.6.0"
aws-endpoint = "0.6.0"
http = "0.2.6"
aws-smithy-http = "0.36.0"
hyper = { version = "0.14.16", features = ["stream"] }
pin-project = "1.0.10"
aws-config = "0.6.0"

[dev-dependencies]
uuid = { version = "0.8.2", features = ["serde", "v4"] }
6 changes: 6 additions & 0 deletions common/dal2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ pub type Result<T> = std::result::Result<T, Error>;
/// The error will be named as `noun-adj`. For example, `ObjectNotExist` or `PermissionDenied`.
/// The error will be formatted as `description: (keyA valueA, keyB valueB, ...)`.
/// As an exception, `Error::Unexpected` is used for all unexpected errors.
///
/// ## TODO
///
/// Maybe it's better to include the operation name in the error message.
#[derive(Error, Debug)]
pub enum Error {
#[error("backend not supported: (type {0})")]
BackendNotSupported(String),
#[error("backend configuration invalid: (key {key}, value {value})")]
BackendConfigurationInvalid { key: String, value: String },

Expand Down
6 changes: 6 additions & 0 deletions common/dal2/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,9 @@ use crate::Accessor;
pub trait Layer {
fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor>;
}

impl<T: Layer> Layer for Arc<T> {
fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor> {
self.as_ref().layer(inner)
}
}
3 changes: 3 additions & 0 deletions common/dal2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ pub use operator::Operator;
mod object;
pub use object::Object;

mod scheme;
pub use scheme::Scheme;

pub mod credential;
pub mod error;
pub mod ops;
Expand Down
43 changes: 43 additions & 0 deletions common/dal2/src/scheme.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::str::FromStr;

use super::error::Error;

#[derive(Clone, Debug, PartialEq)]
pub enum Scheme {
// TODO: Although we don't have azblob support for now, but we need to add it for compatibility. We will implement azblob support as soon as possible.
Azblob,
Fs,
S3,
}

impl FromStr for Scheme {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.to_lowercase();
match s.as_str() {
"azblob" => Ok(Scheme::Azblob),
"fs" => Ok(Scheme::Fs),
"s3" => Ok(Scheme::S3),

// TODO: it's used for compatibility with dal1, should be removed in the future
"local" | "disk" => Ok(Scheme::Fs),
"azurestorageblob" => Ok(Scheme::Azblob),

_ => Err(Error::BackendNotSupported(s)),
}
}
}
38 changes: 33 additions & 5 deletions common/dal2/src/services/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,20 @@ impl Builder {
self
}

pub fn finish(&mut self) -> Arc<dyn Accessor> {
Arc::new(Backend {
// Make `/` as the default of root.
root: self.root.clone().unwrap_or_else(|| "/".to_string()),
})
pub async fn finish(&mut self) -> Result<Arc<dyn Accessor>> {
// Make `/` as the default of root.
let root = self.root.clone().unwrap_or_else(|| "/".to_string());

// If root dir is not exist, we must create it.
if let Err(e) = fs::metadata(&root).await {
if e.kind() == std::io::ErrorKind::NotFound {
fs::create_dir_all(&root)
.await
.map_err(|e| parse_io_error(&e, PathBuf::from(&root).as_path()))?;
}
}

Ok(Arc::new(Backend { root }))
}
}

Expand Down Expand Up @@ -92,6 +101,19 @@ impl Accessor for Backend {
async fn write(&self, mut r: Reader, args: &OpWrite) -> Result<usize> {
let path = PathBuf::from(&self.root).join(&args.path);

// Create dir before write path.
//
// TODO(xuanwo): There are many works to do here:
// - Is it safe to create dir concurrently?
// - Do we need to extract this logic as new util functions?
// - Is it better to check the parent dir exists before call mkdir?
let parent = path
.parent()
.ok_or_else(|| Error::Unexpected(format!("malformed path: {:?}", path.to_str())))?;
fs::create_dir_all(parent)
.await
.map_err(|e| parse_io_error(&e, parent))?;

let mut f = fs::OpenOptions::new()
.create(true)
.write(true)
Expand All @@ -104,6 +126,12 @@ impl Accessor for Backend {
.await
.map_err(|e| parse_io_error(&e, &path))?;

// `std::fs::File`'s errors detected on closing are ignored by
// the implementation of Drop.
// So we need to call `sync_all` to make sure all internal metadata
// have been flushed to fs successfully.
f.sync_all().await.map_err(|e| parse_io_error(&e, &path))?;

Ok(s as usize)
}

Expand Down
20 changes: 14 additions & 6 deletions common/dal2/src/services/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,15 @@ impl Builder {
String::new()
};

// Load from runtime env as default.
let aws_cfg = aws_config::load_from_env().await;
// Load config from environment, including:
// - Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_REGION
// - The default credentials files located in ~/.aws/config and ~/.aws/credentials (location can vary per platform)
// - Web Identity Token credentials from the environment or container (including EKS)
// - ECS Container Credentials (IAM roles for tasks)
// - EC2 Instance Metadata Service (IAM Roles attached to instance)
let cfg = aws_config::load_from_env().await;

let mut cfg = AwsS3::config::Builder::from(&aws_cfg);
let mut cfg = AwsS3::config::Builder::from(&cfg);

// TODO: Maybe we can
//
Expand Down Expand Up @@ -174,10 +179,13 @@ impl Backend {
/// If user input a relative path, we will calculate the absolute path with the root.
fn get_abs_path(&self, path: &str) -> String {
if path.starts_with('/') {
path.strip_prefix('/').unwrap().to_string()
} else {
format!("{}/{}", self.root, path)
return path.strip_prefix('/').unwrap().to_string();
}
if self.root.is_empty() {
return path.to_string();
}

format!("{}/{}", self.root, path)
}
}

Expand Down
10 changes: 6 additions & 4 deletions common/dal2/tests/it/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ use futures::lock::Mutex;

struct Test {
#[allow(dead_code)]
inner: Arc<dyn Accessor>,
inner: Option<Arc<dyn Accessor>>,
deleted: Arc<Mutex<bool>>,
}

impl Layer for &Test {
fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor> {
Arc::new(Test {
inner: inner.clone(),
inner: Some(inner.clone()),
deleted: self.deleted.clone(),
})
}
Expand All @@ -42,6 +42,8 @@ impl Accessor for Test {
let mut x = self.deleted.lock().await;
*x = true;

assert!(self.inner.is_some());

// We will not call anything here to test the layer.
Ok(())
}
Expand All @@ -50,11 +52,11 @@ impl Accessor for Test {
#[tokio::test]
async fn test_layer() {
let test = Test {
inner: Arc::new(fs::Backend::build().finish()),
inner: None,
deleted: Arc::new(Mutex::new(false)),
};

let op = Operator::new(fs::Backend::build().finish()).layer(&test);
let op = Operator::new(fs::Backend::build().finish().await.unwrap()).layer(&test);

op.delete("xxxxx").run().await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion common/dal2/tests/it/services/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures::io::Cursor;

#[tokio::test]
async fn normal() {
let f = Operator::new(fs::Backend::build().finish());
let f = Operator::new(fs::Backend::build().finish().await.unwrap());

let path = format!("/tmp/{}", uuid::Uuid::new_v4());

Expand Down
Loading

0 comments on commit 85d443b

Please sign in to comment.