Skip to content

Commit

Permalink
feat: support for gcs storage (#520)
Browse files Browse the repository at this point in the history
* chore: include opendal/services-gcs

* feat: basic gcs scaffolding

* feat: populate config parse with basic details

* feat: include docker-compose integration tests

* feat: add extra iceberg properties

* feat: add tests for gcs read/write

These are currently conditional tests with a todo comment using the
test_with proc macro. More work needs to be done on
investigating/potentially expanding OpenDAL to allow unauthenticated
requests to fake-gcs-server. At the moment this always ends up reaching
the final VM metadata check.

* chore: minor cleanup for compose todo

* fix: do not introduce new properties

* feat: infer bucket from path

* chore: add user-project const

* feat: add allow_anonymous for test

* chore: remove test-with dep

* feat: update with allow_anonymous functionality

This requires the opendal allow_anonymous funcitonality with the GCS
service to work.

* ci: use cargo sort

* chore: undo storage-gcs default feature

* feat: include disable_ params for GCS_NO_AUTH

* ci: use storage-all for async-std tests

* revert: use opendal from workspace

Now that v0.49 has been released, this work does not need to pin to a
particular version!
  • Loading branch information
jdockerty authored Aug 14, 2024
1 parent 257cdbd commit cbd1844
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ jobs:
run: cargo test --no-fail-fast --all-targets --all-features --workspace

- name: Async-std Test
run: cargo test --no-fail-fast --all-targets --no-default-features --features "async-std" --features "storage-fs" --workspace
run: cargo test --no-fail-fast --all-targets --no-default-features --features "async-std" --features "storage-all" --workspace

- name: Doc Test
run: cargo test --no-fail-fast --doc --all-features --workspace
3 changes: 2 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ keywords = ["iceberg"]

[features]
default = ["storage-memory", "storage-fs", "storage-s3", "tokio"]
storage-all = ["storage-memory", "storage-fs", "storage-s3"]
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"]

storage-memory = ["opendal/services-memory"]
storage-fs = ["opendal/services-fs"]
storage-s3 = ["opendal/services-s3"]
storage-gcs = ["opendal/services-gcs"]

async-std = ["dep:async-std"]
tokio = ["dep:tokio"]
Expand Down
4 changes: 4 additions & 0 deletions crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,7 @@ pub use storage_s3::*;
mod storage_fs;
#[cfg(feature = "storage-fs")]
use storage_fs::*;
#[cfg(feature = "storage-gcs")]
mod storage_gcs;
#[cfg(feature = "storage-gcs")]
pub use storage_gcs::*;
28 changes: 27 additions & 1 deletion crates/iceberg/src/io/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

use std::sync::Arc;

#[cfg(feature = "storage-gcs")]
use opendal::services::GcsConfig;
#[cfg(feature = "storage-s3")]
use opendal::services::S3Config;
use opendal::{Operator, Scheme};
Expand All @@ -38,6 +40,8 @@ pub(crate) enum Storage {
scheme_str: String,
config: Arc<S3Config>,
},
#[cfg(feature = "storage-gcs")]
Gcs { config: Arc<GcsConfig> },
}

impl Storage {
Expand All @@ -56,6 +60,10 @@ impl Storage {
scheme_str,
config: super::s3_config_parse(props)?.into(),
}),
#[cfg(feature = "storage-gcs")]
Scheme::Gcs => Ok(Self::Gcs {
config: super::gcs_config_parse(props)?.into(),
}),
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Constructing file io from scheme: {scheme} not supported now",),
Expand Down Expand Up @@ -117,7 +125,24 @@ impl Storage {
))
}
}
#[cfg(all(not(feature = "storage-s3"), not(feature = "storage-fs")))]
#[cfg(feature = "storage-gcs")]
Storage::Gcs { config } => {
let operator = super::gcs_config_build(config, path)?;
let prefix = format!("gs://{}/", operator.info().name());
if path.starts_with(&prefix) {
Ok((operator, &path[prefix.len()..]))
} else {
Err(Error::new(
ErrorKind::DataInvalid,
format!("Invalid gcs url: {}, should start with {}", path, prefix),
))
}
}
#[cfg(all(
not(feature = "storage-s3"),
not(feature = "storage-fs"),
not(feature = "storage-gcs")
))]
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
"No storage service has been enabled",
Expand All @@ -131,6 +156,7 @@ impl Storage {
"memory" => Ok(Scheme::Memory),
"file" | "" => Ok(Scheme::Fs),
"s3" | "s3a" => Ok(Scheme::S3),
"gs" => Ok(Scheme::Gcs),
s => Ok(s.parse::<Scheme>()?),
}
}
Expand Down
68 changes: 68 additions & 0 deletions crates/iceberg/src/io/storage_gcs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Google Cloud Storage properties
use std::collections::HashMap;

use opendal::services::GcsConfig;
use opendal::Operator;
use url::Url;

use crate::{Error, ErrorKind, Result};

// Reference: https://github.com/apache/iceberg/blob/main/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java

/// Google Cloud Project ID
pub const GCS_PROJECT_ID: &str = "gcs.project-id";
/// Google Cloud Storage endpoint
pub const GCS_SERVICE_PATH: &str = "gcs.service.path";
/// Google Cloud user project
pub const GCS_USER_PROJECT: &str = "gcs.user-project";
/// Allow unauthenticated requests
pub const GCS_NO_AUTH: &str = "gcs.no-auth";

/// Parse iceberg properties to [`GcsConfig`].
pub(crate) fn gcs_config_parse(mut m: HashMap<String, String>) -> Result<GcsConfig> {
let mut cfg = GcsConfig::default();

if let Some(endpoint) = m.remove(GCS_SERVICE_PATH) {
cfg.endpoint = Some(endpoint);
}

if m.remove(GCS_NO_AUTH).is_some() {
cfg.allow_anonymous = true;
cfg.disable_vm_metadata = true;
cfg.disable_config_load = true;
}

Ok(cfg)
}

/// Build a new OpenDAL [`Operator`] based on a provided [`GcsConfig`].
pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) -> Result<Operator> {
let url = Url::parse(path)?;
let bucket = url.host_str().ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid gcs url: {}, bucket is required", path),
)
})?;

let mut cfg = cfg.clone();
cfg.bucket = bucket.to_string();
Ok(Operator::from_config(cfg)?.finish())
}
23 changes: 23 additions & 0 deletions crates/iceberg/testdata/file_io_gcs/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

services:
gcs-server:
image: fsouza/fake-gcs-server@sha256:36b0116fae5236e8def76ccb07761a9ca323e476f366a5f4bf449cac19deaf2d
expose:
- 4443
command: --scheme http
125 changes: 125 additions & 0 deletions crates/iceberg/tests/file_io_gcs_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Integration tests for FileIO Google Cloud Storage (GCS).
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::RwLock;

use bytes::Bytes;
use ctor::{ctor, dtor};
use iceberg::io::{FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH};
use iceberg_test_utils::docker::DockerCompose;
use iceberg_test_utils::{normalize_test_name, set_up};

static DOCKER_COMPOSE_ENV: RwLock<Option<DockerCompose>> = RwLock::new(None);
static FAKE_GCS_PORT: u16 = 4443;
static FAKE_GCS_BUCKET: &str = "test-bucket";

#[ctor]
fn before_all() {
let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
let docker_compose = DockerCompose::new(
normalize_test_name(module_path!()),
format!("{}/testdata/file_io_gcs", env!("CARGO_MANIFEST_DIR")),
);
docker_compose.run();
guard.replace(docker_compose);
}

#[dtor]
fn after_all() {
let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
guard.take();
}

async fn get_file_io_gcs() -> FileIO {
set_up();

let ip = DOCKER_COMPOSE_ENV
.read()
.unwrap()
.as_ref()
.unwrap()
.get_container_ip("gcs-server");
let addr = SocketAddr::new(ip, FAKE_GCS_PORT);

// A bucket must exist for FileIO
create_bucket(FAKE_GCS_BUCKET, addr.to_string())
.await
.unwrap();

FileIOBuilder::new("gcs")
.with_props(vec![
(GCS_SERVICE_PATH, format!("http://{}", addr)),
(GCS_NO_AUTH, "true".to_string()),
])
.build()
.unwrap()
}

// Create a bucket against the emulated GCS storage server.
async fn create_bucket(name: &str, server_addr: String) -> anyhow::Result<()> {
let mut bucket_data = HashMap::new();
bucket_data.insert("name", name);

let client = reqwest::Client::new();
let endpoint = format!("http://{}/storage/v1/b", server_addr);
client.post(endpoint).json(&bucket_data).send().await?;
Ok(())
}

fn get_gs_path() -> String {
format!("gs://{}", FAKE_GCS_BUCKET)
}

#[tokio::test]
async fn gcs_exists() {
let file_io = get_file_io_gcs().await;
assert!(file_io
.is_exist(format!("{}/", get_gs_path()))
.await
.unwrap());
}

#[tokio::test]
async fn gcs_write() {
let gs_file = format!("{}/write-file", get_gs_path());
let file_io = get_file_io_gcs().await;
let output = file_io.new_output(&gs_file).unwrap();
output
.write(bytes::Bytes::from_static(b"iceberg-gcs!"))
.await
.expect("Write to test output file");
assert!(file_io.is_exist(gs_file).await.unwrap())
}

#[tokio::test]
async fn gcs_read() {
let gs_file = format!("{}/read-gcs", get_gs_path());
let file_io = get_file_io_gcs().await;
let output = file_io.new_output(&gs_file).unwrap();
output
.write(bytes::Bytes::from_static(b"iceberg!"))
.await
.expect("Write to test output file");
assert!(file_io.is_exist(&gs_file).await.unwrap());

let input = file_io.new_input(gs_file).unwrap();
assert_eq!(input.read().await.unwrap(), Bytes::from_static(b"iceberg!"));
}

0 comments on commit cbd1844

Please sign in to comment.