From f0ea81e3ed98e587fc449c4ab0feecb804bdf22e Mon Sep 17 00:00:00 2001 From: Rebekah Davis Date: Mon, 2 Dec 2024 13:47:47 -0500 Subject: [PATCH 1/5] C.41 compliance: GCS factory constructor. --- test/src/unit-vfs.cc | 21 +-- tiledb/common/exception/status.h | 4 - tiledb/sm/cpp_api/config.h | 7 +- tiledb/sm/filesystem/gcs.cc | 257 ++++++++++++------------------- tiledb/sm/filesystem/gcs.h | 162 +++++++++++++------ tiledb/sm/filesystem/vfs.cc | 39 +++-- tiledb/sm/filesystem/vfs.h | 41 ++++- 7 files changed, 275 insertions(+), 256 deletions(-) diff --git a/test/src/unit-vfs.cc b/test/src/unit-vfs.cc index d04837d9f6d..32ac2725bff 100644 --- a/test/src/unit-vfs.cc +++ b/test/src/unit-vfs.cc @@ -823,7 +823,6 @@ TEST_CASE( "[gcs][credentials][impersonation]") { ThreadPool thread_pool(2); Config cfg = set_config_params(true); - GCS gcs; std::string impersonate_service_account, target_service_account; std::vector delegates; @@ -848,9 +847,7 @@ TEST_CASE( require_tiledb_ok(cfg.set( "vfs.gcs.impersonate_service_account", impersonate_service_account)); - - require_tiledb_ok(gcs.init(cfg, &thread_pool)); - + GCS gcs(&thread_pool, cfg); auto credentials = gcs.make_credentials({}); // We are using an internal class only for inspection purposes. @@ -870,16 +867,13 @@ TEST_CASE( "[gcs][credentials][service-account]") { ThreadPool thread_pool(2); Config cfg = set_config_params(true); - GCS gcs; // The content of the credentials does not matter; it does not get parsed // until it is used in an API request, which we are not doing. std::string service_account_key = "{\"foo\": \"bar\"}"; require_tiledb_ok( cfg.set("vfs.gcs.service_account_key", service_account_key)); - - require_tiledb_ok(gcs.init(cfg, &thread_pool)); - + GCS gcs(&thread_pool, cfg); auto credentials = gcs.make_credentials({}); // We are using an internal class only for inspection purposes. @@ -896,7 +890,6 @@ TEST_CASE( "[gcs][credentials][service-account-and-impersonation]") { ThreadPool thread_pool(2); Config cfg = set_config_params(true); - GCS gcs; // The content of the credentials does not matter; it does not get parsed // until it is used in an API request, which we are not doing. std::string service_account_key = "{\"foo\": \"bar\"}"; @@ -906,9 +899,7 @@ TEST_CASE( cfg.set("vfs.gcs.service_account_key", service_account_key)); require_tiledb_ok(cfg.set( "vfs.gcs.impersonate_service_account", impersonate_service_account)); - - require_tiledb_ok(gcs.init(cfg, &thread_pool)); - + GCS gcs(&thread_pool, cfg); auto credentials = gcs.make_credentials({}); // We are using an internal class only for inspection purposes. @@ -934,17 +925,13 @@ TEST_CASE( "[gcs][credentials][external-account]") { ThreadPool thread_pool(2); Config cfg = set_config_params(true); - GCS gcs; // The content of the credentials does not matter; it does not get parsed // until it is used in an API request, which we are not doing. std::string workload_identity_configuration = "{\"foo\": \"bar\"}"; - require_tiledb_ok(cfg.set( "vfs.gcs.workload_identity_configuration", workload_identity_configuration)); - - require_tiledb_ok(gcs.init(cfg, &thread_pool)); - + GCS gcs(&thread_pool, cfg); auto credentials = gcs.make_credentials({}); // We are using an internal class only for inspection purposes. diff --git a/tiledb/common/exception/status.h b/tiledb/common/exception/status.h index 27a7d322976..2da3ac6c81f 100644 --- a/tiledb/common/exception/status.h +++ b/tiledb/common/exception/status.h @@ -314,10 +314,6 @@ inline Status Status_S3Error(const std::string& msg) { inline Status Status_AzureError(const std::string& msg) { return {"[TileDB::Azure] Error", msg}; } -/** Return a FS_GCS error class Status with a given message **/ -inline Status Status_GCSError(const std::string& msg) { - return {"[TileDB::GCS] Error", msg}; -} /** Return a FS_HDFS error class Status with a given message **/ inline Status Status_HDFSError(const std::string& msg) { return {"[TileDB::HDFS] Error", msg}; diff --git a/tiledb/sm/cpp_api/config.h b/tiledb/sm/cpp_api/config.h index 3d8bbc9afff..4860429d3d4 100644 --- a/tiledb/sm/cpp_api/config.h +++ b/tiledb/sm/cpp_api/config.h @@ -1,13 +1,11 @@ /** * @file config.h * - * @author Ravi Gaddipati - * * @section LICENSE * * The MIT License * - * @copyright Copyright (c) 2017-2023 TileDB, Inc. + * @copyright Copyright (c) 2017-2024 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -627,6 +625,9 @@ class Config { * The maximum permissible delay between Azure netwwork request retry * attempts, in milliseconds. * **Default**: 60000 + * - `vfs.gcs.endpoint`
+ * The GCS endpoint.
+ * **Default**: "" * - `vfs.gcs.project_id`
* Set the GCS project ID to create new buckets to. Not required unless you * are going to use the VFS to create buckets.
diff --git a/tiledb/sm/filesystem/gcs.cc b/tiledb/sm/filesystem/gcs.cc index 2f002144c2b..fd05e1024d5 100644 --- a/tiledb/sm/filesystem/gcs.cc +++ b/tiledb/sm/filesystem/gcs.cc @@ -65,13 +65,18 @@ namespace tiledb::sm { /* CONSTRUCTORS & DESTRUCTORS */ /* ********************************* */ -GCS::GCS() - : state_(State::UNINITIALIZED) - , write_cache_max_size_(0) - , max_parallel_ops_(1) - , multi_part_part_size_(0) - , use_multi_part_upload_(true) - , request_timeout_ms_(0) { +GCS::GCS(ThreadPool* thread_pool, const Config& config) + : gcs_params_(GCSParameters(config)) + , state_(State::UNINITIALIZED) + , ssl_cfg_(SSLConfig(config)) + , thread_pool_(thread_pool) + , write_cache_max_size_(0) { + assert(thread_pool); + write_cache_max_size_ = + gcs_params_.use_multi_part_upload_ ? + gcs_params_.max_parallel_ops_ * gcs_params_.multi_part_size_ : + gcs_params_.max_direct_upload_size_; + state_ = State::INITIALIZED; } GCS::~GCS() { @@ -81,59 +86,6 @@ GCS::~GCS() { /* API */ /* ********************************* */ -Status GCS::init(const Config& config, ThreadPool* const thread_pool) { - if (thread_pool == nullptr) { - return LOG_STATUS( - Status_GCSError("Can't initialize with null thread pool.")); - } - - ssl_cfg_ = SSLConfig(config); - - assert(state_ == State::UNINITIALIZED); - - thread_pool_ = thread_pool; - - bool found; - endpoint_ = config.get("vfs.gcs.endpoint", &found); - assert(found); - if (endpoint_.empty() && getenv("TILEDB_TEST_GCS_ENDPOINT")) { - endpoint_ = getenv("TILEDB_TEST_GCS_ENDPOINT"); - } - project_id_ = config.get("vfs.gcs.project_id", &found); - assert(found); - service_account_key_ = config.get("vfs.gcs.service_account_key", &found); - assert(found); - workload_identity_configuration_ = - config.get("vfs.gcs.workload_identity_configuration", &found); - assert(found); - impersonate_service_account_ = - config.get("vfs.gcs.impersonate_service_account", &found); - assert(found); - RETURN_NOT_OK(config.get( - "vfs.gcs.max_parallel_ops", &max_parallel_ops_, &found)); - assert(found); - RETURN_NOT_OK(config.get( - "vfs.gcs.use_multi_part_upload", &use_multi_part_upload_, &found)); - assert(found); - RETURN_NOT_OK(config.get( - "vfs.gcs.multi_part_size", &multi_part_part_size_, &found)); - assert(found); - RETURN_NOT_OK(config.get( - "vfs.gcs.request_timeout_ms", &request_timeout_ms_, &found)); - assert(found); - uint64_t max_direct_upload_size; - RETURN_NOT_OK(config.get( - "vfs.gcs.max_direct_upload_size", &max_direct_upload_size, &found)); - assert(found); - - write_cache_max_size_ = use_multi_part_upload_ ? - max_parallel_ops_ * multi_part_part_size_ : - max_direct_upload_size; - - state_ = State::INITIALIZED; - return Status::Ok(); -} - /** * Builds a chain of service account impersonation credentials. * @@ -191,23 +143,26 @@ static shared_ptr apply_impersonation( std::shared_ptr GCS::make_credentials( const google::cloud::Options& options) const { shared_ptr creds = nullptr; - if (!service_account_key_.empty()) { - if (!workload_identity_configuration_.empty()) { + if (!gcs_params_.service_account_key_.empty()) { + if (!gcs_params_.workload_identity_configuration_.empty()) { LOG_WARN( "Both GCS service account key and workload identity configuration " "were specified; picking the former"); } creds = google::cloud::MakeServiceAccountCredentials( - service_account_key_, options); - } else if (!workload_identity_configuration_.empty()) { + gcs_params_.service_account_key_, options); + } else if (!gcs_params_.workload_identity_configuration_.empty()) { creds = google::cloud::MakeExternalAccountCredentials( - workload_identity_configuration_, options); - } else if (!endpoint_.empty() || getenv("CLOUD_STORAGE_EMULATOR_ENDPOINT")) { + gcs_params_.workload_identity_configuration_, options); + } else if ( + !gcs_params_.endpoint_.empty() || + getenv("CLOUD_STORAGE_EMULATOR_ENDPOINT")) { creds = google::cloud::MakeInsecureCredentials(); } else { creds = google::cloud::MakeGoogleDefaultCredentials(options); } - return apply_impersonation(creds, impersonate_service_account_, options); + return apply_impersonation( + creds, gcs_params_.impersonate_service_account_, options); } Status GCS::init_client() const { @@ -244,17 +199,18 @@ Status GCS::init_client() const { auto client_options = ca_options; client_options.set( make_credentials(ca_options)); - if (!endpoint_.empty()) { - client_options.set(endpoint_); + if (!gcs_params_.endpoint_.empty()) { + client_options.set( + gcs_params_.endpoint_); } client_options.set( make_shared( - HERE(), std::chrono::milliseconds(request_timeout_ms_))); + HERE(), + std::chrono::milliseconds(gcs_params_.request_timeout_ms_))); client_ = tdb_unique_ptr( tdb_new(google::cloud::storage::Client, client_options)); } catch (const std::exception& e) { - return LOG_STATUS( - Status_GCSError("Failed to initialize GCS: " + std::string(e.what()))); + throw GCSException("Failed to initialize GCS: " + std::string(e.what())); } return Status::Ok(); @@ -264,8 +220,7 @@ Status GCS::create_bucket(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -273,12 +228,14 @@ Status GCS::create_bucket(const URI& uri) const { google::cloud::StatusOr bucket_metadata = client_->CreateBucketForProject( - bucket_name, project_id_, google::cloud::storage::BucketMetadata()); + bucket_name, + gcs_params_.project_id_, + google::cloud::storage::BucketMetadata()); if (!bucket_metadata.ok()) { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Create bucket failed on: " + uri.to_string() + " (" + - bucket_metadata.status().message() + ")"))); + bucket_metadata.status().message() + ")"); } return wait_for_bucket_to_propagate(bucket_name); @@ -288,8 +245,7 @@ Status GCS::empty_bucket(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } return remove_dir(uri); @@ -300,8 +256,7 @@ Status GCS::is_empty_bucket(const URI& uri, bool* is_empty) const { assert(is_empty); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -313,9 +268,9 @@ Status GCS::is_empty_bucket(const URI& uri, bool* is_empty) const { for (const google::cloud::StatusOr& object_metadata : objects_reader) { if (!object_metadata) { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "List bucket objects failed on: " + uri.to_string() + " (" + - object_metadata.status().message() + ")"))); + object_metadata.status().message() + ")"); } *is_empty = false; @@ -331,8 +286,7 @@ Status GCS::is_bucket(const URI& uri, bool* const is_bucket) const { assert(is_bucket); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -354,9 +308,9 @@ Status GCS::is_bucket( *is_bucket = false; return Status::Ok(); } else { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Get bucket failed on: " + bucket_name + " (" + status.message() + - ")"))); + ")"); } } @@ -369,8 +323,7 @@ Status GCS::is_dir(const URI& uri, bool* const exists) const { assert(exists); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::vector paths; @@ -383,8 +336,7 @@ Status GCS::remove_bucket(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } // Empty bucket @@ -395,9 +347,9 @@ Status GCS::remove_bucket(const URI& uri) const { const google::cloud::Status status = client_->DeleteBucket(bucket_name); if (!status.ok()) { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Delete bucket failed on: " + uri.to_string() + " (" + - status.message() + ")"))); + status.message() + ")"); } return wait_for_bucket_to_be_deleted(bucket_name); @@ -407,8 +359,7 @@ Status GCS::remove_object(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -418,9 +369,9 @@ Status GCS::remove_object(const URI& uri) const { const google::cloud::Status status = client_->DeleteObject(bucket_name, object_path); if (!status.ok()) { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Delete object failed on: " + uri.to_string() + " (" + - status.message() + ")"))); + status.message() + ")"); } return wait_for_object_to_be_deleted(bucket_name, object_path); @@ -430,8 +381,7 @@ Status GCS::remove_dir(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::vector paths; @@ -633,13 +583,11 @@ Status GCS::copy_object(const URI& old_uri, const URI& new_uri) { RETURN_NOT_OK(init_client()); if (!old_uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + old_uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + old_uri.to_string()); } if (!new_uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + new_uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + new_uri.to_string()); } std::string old_bucket_name; @@ -657,9 +605,9 @@ Status GCS::copy_object(const URI& old_uri, const URI& new_uri) { if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Copy object failed on: " + old_uri.to_string() + " (" + - status.message() + ")"))); + status.message() + ")"); } return wait_for_object_to_propagate(new_bucket_name, new_object_path); @@ -681,8 +629,8 @@ Status GCS::wait_for_object_to_propagate( std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); } - return LOG_STATUS(Status_GCSError( - std::string("Timed out waiting on object to propogate: " + object_path))); + throw GCSException( + "Timed out waiting on object to propogate: " + object_path); } Status GCS::wait_for_object_to_be_deleted( @@ -701,8 +649,8 @@ Status GCS::wait_for_object_to_be_deleted( std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); } - return LOG_STATUS(Status_GCSError(std::string( - "Timed out waiting on object to be deleted: " + object_path))); + throw GCSException( + "Timed out waiting on object to be deleted: " + object_path); } Status GCS::wait_for_bucket_to_propagate(const std::string& bucket_name) const { @@ -719,8 +667,8 @@ Status GCS::wait_for_bucket_to_propagate(const std::string& bucket_name) const { std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); } - return LOG_STATUS(Status_GCSError( - std::string("Timed out waiting on bucket to propogate: " + bucket_name))); + throw GCSException( + "Timed out waiting on bucket to propogate: " + bucket_name); } Status GCS::wait_for_bucket_to_be_deleted( @@ -739,8 +687,8 @@ Status GCS::wait_for_bucket_to_be_deleted( std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); } - return LOG_STATUS(Status_GCSError(std::string( - "Timed out waiting on bucket to be deleted: " + bucket_name))); + throw GCSException( + "Timed out waiting on bucket to be deleted: " + bucket_name); } Status GCS::move_dir(const URI& old_uri, const URI& new_uri) { @@ -760,8 +708,7 @@ Status GCS::touch(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -774,9 +721,9 @@ Status GCS::touch(const URI& uri) const { if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Touch object failed on: " + uri.to_string() + " (" + status.message() + - ")"))); + ")"); } return Status::Ok(); @@ -787,8 +734,7 @@ Status GCS::is_object(const URI& uri, bool* const is_object) const { assert(is_object); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -815,9 +761,9 @@ Status GCS::is_object( *is_object = false; return Status::Ok(); } else { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Get object failed on: " + object_path + " (" + status.message() + - ")"))); + ")"); } } @@ -829,8 +775,7 @@ Status GCS::is_object( Status GCS::write( const URI& uri, const void* const buffer, const uint64_t length) { if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } Buffer* const write_cache_buffer = get_write_cache_buffer(uri.to_string()); @@ -839,13 +784,13 @@ Status GCS::write( RETURN_NOT_OK( fill_write_cache(write_cache_buffer, buffer, length, &nbytes_filled)); - if (!use_multi_part_upload_) { + if (!gcs_params_.use_multi_part_upload_) { if (nbytes_filled != length) { std::stringstream errmsg; errmsg << "Cannot write more than " << write_cache_max_size_ << " bytes without multi-part uploads. This limit can be " "configured with the 'vfs.gcs.max_direct_upload_size' option."; - return LOG_STATUS(Status_GCSError(errmsg.str())); + throw GCSException(errmsg.str()); } else { return Status::Ok(); } @@ -887,8 +832,7 @@ Status GCS::object_size(const URI& uri, uint64_t* const nbytes) const { assert(nbytes); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -901,9 +845,9 @@ Status GCS::object_size(const URI& uri, uint64_t* const nbytes) const { if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Get object size failed on: " + object_path + " (" + status.message() + - ")"))); + ")"); } *nbytes = object_metadata->size(); @@ -959,21 +903,21 @@ Status GCS::write_parts( const uint64_t length, const bool last_part) { if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not an GCS URI: " + uri.to_string()))); + throw GCSException("URI is not an GCS URI: " + uri.to_string()); } - // Ensure that each thread is responsible for exactly multi_part_part_size_ + // Ensure that each thread is responsible for exactly multi_part_size_ // bytes (except if this is the last part, in which case the final // thread should write less). Cap the number of parallel operations at the // configured max number. Length must be evenly divisible by - // multi_part_part_size_ unless this is the last part. - uint64_t num_ops = last_part ? - utils::math::ceil(length, multi_part_part_size_) : - (length / multi_part_part_size_); - num_ops = std::min(std::max(num_ops, uint64_t(1)), max_parallel_ops_); - - if (!last_part && length % multi_part_part_size_ != 0) { + // multi_part_size_ unless this is the last part. + uint64_t num_ops = + last_part ? utils::math::ceil(length, gcs_params_.multi_part_size_) : + (length / gcs_params_.multi_part_size_); + num_ops = + std::min(std::max(num_ops, uint64_t(1)), gcs_params_.max_parallel_ops_); + + if (!last_part && length % gcs_params_.multi_part_size_ != 0) { return LOG_STATUS( Status_S3Error("Length not evenly divisible by part size")); } @@ -1045,9 +989,9 @@ Status GCS::write_parts( std::vector tasks; tasks.reserve(num_ops); for (uint64_t i = 0; i < num_ops; i++) { - const uint64_t begin = i * multi_part_part_size_; + const uint64_t begin = i * gcs_params_.multi_part_size_; const uint64_t end = - std::min((i + 1) * multi_part_part_size_ - 1, length - 1); + std::min((i + 1) * gcs_params_.multi_part_size_ - 1, length - 1); const char* const thread_buffer = reinterpret_cast(buffer) + begin; const uint64_t thread_buffer_len = end - begin + 1; @@ -1088,9 +1032,9 @@ Status GCS::upload_part( if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Upload part failed on: " + object_part_path + " (" + status.message() + - ")"))); + ")"); } return Status::Ok(); @@ -1100,11 +1044,10 @@ Status GCS::flush_object(const URI& uri) { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } - if (!use_multi_part_upload_) { + if (!gcs_params_.use_multi_part_upload_) { return flush_object_direct(uri); } @@ -1187,9 +1130,9 @@ Status GCS::flush_object(const URI& uri) { if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Compse object failed on: " + uri.to_string() + " (" + - status.message() + ")"))); + status.message() + ")"); } return wait_for_object_to_propagate(bucket_name, object_path); @@ -1218,8 +1161,8 @@ Status GCS::delete_part( const google::cloud::Status status = client_->DeleteObject(bucket_name, part_path); if (!status.ok()) { - return Status_GCSError(std::string( - "Delete part failed on: " + part_path + " (" + status.message() + ")")); + throw GCSException( + "Delete part failed on: " + part_path + " (" + status.message() + ")"); } return Status::Ok(); @@ -1268,9 +1211,9 @@ Status GCS::flush_object_direct(const URI& uri) { if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Write object failed on: " + uri.to_string() + " (" + status.message() + - ")"))); + ")"); } return wait_for_object_to_propagate(bucket_name, object_path); @@ -1286,8 +1229,7 @@ Status GCS::read( RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not an GCS URI: " + uri.to_string()))); + throw GCSException("URI is not an GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -1301,9 +1243,9 @@ Status GCS::read( offset, offset + length + read_ahead_length)); if (!stream.status().ok()) { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Read object failed on: " + uri.to_string() + " (" + - stream.status().message() + ")"))); + stream.status().message() + ")"); } stream.read(static_cast(buffer), length + read_ahead_length); @@ -1312,8 +1254,7 @@ Status GCS::read( stream.Close(); if (*length_returned < length) { - return LOG_STATUS(Status_GCSError( - std::string("Read operation read unexpected number of bytes."))); + throw GCSException("Read operation read unexpected number of bytes."); } return Status::Ok(); diff --git a/tiledb/sm/filesystem/gcs.h b/tiledb/sm/filesystem/gcs.h index 838b9fee6aa..82ed5330671 100644 --- a/tiledb/sm/filesystem/gcs.h +++ b/tiledb/sm/filesystem/gcs.h @@ -62,13 +62,11 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage } // namespace google::cloud -namespace tiledb { - -namespace common::filesystem { +namespace tiledb::common::filesystem { class directory_entry; -} +} // namespace tiledb::common::filesystem -namespace sm { +namespace tiledb::sm { /** Class for GCS status exceptions. */ class GCSException : public StatusException { @@ -78,31 +76,128 @@ class GCSException : public StatusException { } }; +/** + * The GCS-specific configuration parameters. + * + * @note The member variables' default declarations have not yet been moved + * from the Config declaration into this struct. + */ +struct GCSParameters { + GCSParameters() = delete; + + GCSParameters(const Config& config) + : endpoint_( + config.get("vfs.gcs.endpoint", Config::must_find)) + , project_id_( + config.get("vfs.gcs.project_id", Config::must_find)) + , service_account_key_(config.get( + "vfs.gcs.service_account_key", Config::must_find)) + , workload_identity_configuration_(config.get( + "vfs.gcs.workload_identity_configuration", Config::must_find)) + , impersonate_service_account_(config.get( + "vfs.gcs.impersonate_service_account", Config::must_find)) + , multi_part_size_( + config.get("vfs.gcs.multi_part_size", Config::must_find)) + , max_parallel_ops_( + config.get("vfs.gcs.max_parallel_ops", Config::must_find)) + , use_multi_part_upload_(config.get( + "vfs.gcs.use_multi_part_upload", Config::must_find)) + , request_timeout_ms_(config.get( + "vfs.gcs.request_timeout_ms", Config::must_find)) + , max_direct_upload_size_(config.get( + "vfs.gcs.max_direct_upload_size", Config::must_find)) { + if (endpoint_.empty() && getenv("TILEDB_TEST_GCS_ENDPOINT")) { + endpoint_ = getenv("TILEDB_TEST_GCS_ENDPOINT"); + } + }; + + ~GCSParameters() = default; + + /** The GCS endpoint. */ + std::string endpoint_; + + /** The project ID to create new buckets on using the VFS. */ + std::string project_id_; + + /** + * The GCS service account credentials JSON string. + * + * Set the JSON string with GCS service account key. Takes precedence + * over `vfs.gcs.workload_identity_configuration` if both are specified. If + * neither is specified, Application Default Credentials will be used. + * + * @note Experimental + */ + std::string service_account_key_; + + /** + * The GCS external account credentials JSON string. + * + * Set the JSON string with Workload Identity Federation configuration. + * `vfs.gcs.service_account_key` takes precedence over this if both are + * specified. If neither is specified, Application Default Credentials will + * be used. + * + * @note Experimental + */ + std::string workload_identity_configuration_; + + /** + * A comma-separated list with the GCS service accounts to impersonate. + * + * Set the GCS service account to impersonate. A chain of impersonated + * accounts can be formed by specifying many service accounts, separated by a + * comma. + * + * @note Experimental + */ + std::string impersonate_service_account_; + + /** + * The part size (in bytes) used in multi part writes. + * + * @note `vfs.gcs.multi_part_size` * `vfs.gcs.max_parallel_ops` bytes will + * be buffered before issuing part uploads in parallel. + */ + uint64_t multi_part_size_; + + /** The maximum number of parallel operations issued. */ + uint64_t max_parallel_ops_; + + /** Whether or not to use chunked part uploads. */ + bool use_multi_part_upload_; + + /** The maximum amount of time to retry network requests. */ + uint64_t request_timeout_ms_; + + /** + * The maximum size in bytes of a direct upload to GCS. + * Ignored if `vfs.gcs.use_multi_part_upload` is set to true. + */ + uint64_t max_direct_upload_size_; +}; + class GCS { public: /* ********************************* */ /* CONSTRUCTORS & DESTRUCTORS */ /* ********************************* */ - /** Constructor. */ - GCS(); + /** + * Constructor. + * + * @param thread_pool The parent VFS thread pool. + * @param config Configuration parameters. + */ + GCS(ThreadPool* thread_pool, const Config& config); - /** Destructor. */ + /** Destructor. Must be explicitly defined. */ ~GCS(); /* ********************************* */ /* API */ /* ********************************* */ - /** - * Initializes and connects a GCS client. - * - * @param config Configuration parameters. - * @param thread_pool The parent VFS thread pool. - * @return Status - */ - Status init(const Config& config, ThreadPool* thread_pool); - /** * Creates a bucket. * @@ -449,6 +544,9 @@ class GCS { /* PRIVATE ATTRIBUTES */ /* ********************************* */ + /** The GCS configuration parameters. */ + GCSParameters gcs_params_; + /** * A libcurl initializer instance. This should remain * the first member variable to ensure that libcurl is @@ -471,21 +569,6 @@ class GCS { /** The VFS thread pool. */ ThreadPool* thread_pool_; - // The GCS endpoint. - std::string endpoint_; - - // The GCS project id. - std::string project_id_; - - // The GCS service account credentials JSON string. - std::string service_account_key_; - - // The GCS external account credentials JSON string. - std::string workload_identity_configuration_; - - // A comma-separated list with the GCS service accounts to impersonate. - std::string impersonate_service_account_; - // The GCS REST client. mutable tdb_unique_ptr client_; @@ -498,18 +581,6 @@ class GCS { /** The maximum size of each value-element in 'write_cache_map_'. */ uint64_t write_cache_max_size_; - /** The maximum number of parallel requests. */ - uint64_t max_parallel_ops_; - - /** The target part size in a part list upload */ - uint64_t multi_part_part_size_; - - /** Whether or not to use part list upload. */ - bool use_multi_part_upload_; - - /** The timeout for network requests. */ - uint64_t request_timeout_ms_; - /** Maps a object URI to its part list upload state. */ std::unordered_map multi_part_upload_states_; @@ -763,8 +834,7 @@ class GCS { Status flush_object_direct(const URI& uri); }; -} // namespace sm -} // namespace tiledb +} // namespace tiledb::sm #endif // HAVE_GCS #endif // TILEDB_GCS_H diff --git a/tiledb/sm/filesystem/vfs.cc b/tiledb/sm/filesystem/vfs.cc index e32e036f23a..cff54296a9a 100644 --- a/tiledb/sm/filesystem/vfs.cc +++ b/tiledb/sm/filesystem/vfs.cc @@ -64,6 +64,7 @@ VFS::VFS( ThreadPool* const io_tp, const Config& config) : VFSBase(parent_stats) + , GCS_within_VFS(io_tp, config) , S3_within_VFS(stats_, io_tp, config) , config_(config) , logger_(logger) @@ -101,10 +102,6 @@ VFS::VFS( #ifdef HAVE_GCS supported_fs_.insert(Filesystem::GCS); - st = gcs_.init(config_, io_tp_); - if (!st.ok()) { - throw VFSException("Failed to initialize GCS backend."); - } #endif #ifdef _WIN32 @@ -271,7 +268,7 @@ Status VFS::touch(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.touch(uri); + return gcs().touch(uri); #else throw BuiltWithout("GCS"); #endif @@ -306,7 +303,7 @@ Status VFS::create_bucket(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.create_bucket(uri); + return gcs().create_bucket(uri); #else throw BuiltWithout("GCS"); #endif @@ -332,7 +329,7 @@ Status VFS::remove_bucket(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.remove_bucket(uri); + return gcs().remove_bucket(uri); #else throw BuiltWithout("GCS"); #endif @@ -359,7 +356,7 @@ Status VFS::empty_bucket(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.empty_bucket(uri); + return gcs().empty_bucket(uri); #else throw BuiltWithout("GCS"); #endif @@ -387,7 +384,7 @@ Status VFS::is_empty_bucket( } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.is_empty_bucket(uri, is_empty); + return gcs().is_empty_bucket(uri, is_empty); #else throw BuiltWithout("GCS"); #endif @@ -423,7 +420,7 @@ Status VFS::remove_dir(const URI& uri) const { #endif } else if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.remove_dir(uri); + return gcs().remove_dir(uri); #else throw BuiltWithout("GCS"); #endif @@ -491,7 +488,7 @@ Status VFS::remove_file(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.remove_object(uri); + return gcs().remove_object(uri); #else throw BuiltWithout("GCS"); #endif @@ -578,7 +575,7 @@ Status VFS::file_size(const URI& uri, uint64_t* size) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.object_size(uri, size); + return gcs().object_size(uri, size); #else throw BuiltWithout("GCS"); #endif @@ -625,7 +622,7 @@ Status VFS::is_dir(const URI& uri, bool* is_dir) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.is_dir(uri, is_dir); + return gcs().is_dir(uri, is_dir); #else *is_dir = false; throw BuiltWithout("GCS"); @@ -676,7 +673,7 @@ Status VFS::is_file(const URI& uri, bool* is_file) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.is_object(uri, is_file); + return gcs().is_object(uri, is_file); #else *is_file = false; throw BuiltWithout("GCS"); @@ -711,7 +708,7 @@ Status VFS::is_bucket(const URI& uri, bool* is_bucket) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - RETURN_NOT_OK(gcs_.is_bucket(uri, is_bucket)); + RETURN_NOT_OK(gcs().is_bucket(uri, is_bucket)); return Status::Ok(); #else *is_bucket = false; @@ -767,7 +764,7 @@ std::vector VFS::ls_with_sizes(const URI& parent) const { #endif } else if (parent.is_gcs()) { #ifdef HAVE_GCS - entries = gcs_.ls_with_sizes(parent); + entries = gcs().ls_with_sizes(parent); #else throw BuiltWithout("GCS"); #endif @@ -853,7 +850,7 @@ Status VFS::move_file(const URI& old_uri, const URI& new_uri) { if (old_uri.is_gcs()) { if (new_uri.is_gcs()) #ifdef HAVE_GCS - return gcs_.move_object(old_uri, new_uri); + return gcs().move_object(old_uri, new_uri); #else throw BuiltWithout("GCS"); #endif @@ -926,7 +923,7 @@ Status VFS::move_dir(const URI& old_uri, const URI& new_uri) { if (old_uri.is_gcs()) { if (new_uri.is_gcs()) #ifdef HAVE_GCS - return gcs_.move_dir(old_uri, new_uri); + return gcs().move_dir(old_uri, new_uri); #else throw BuiltWithout("GCS"); #endif @@ -1216,7 +1213,7 @@ Status VFS::read_impl( #ifdef HAVE_GCS const auto read_fn = std::bind( &GCS::read, - &gcs_, + &gcs(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, @@ -1441,7 +1438,7 @@ Status VFS::close_file(const URI& uri) { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.flush_object(uri); + return gcs().flush_object(uri); #else throw BuiltWithout("GCS"); #endif @@ -1505,7 +1502,7 @@ Status VFS::write( } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.write(uri, buffer, buffer_size); + return gcs().write(uri, buffer, buffer_size); #else throw BuiltWithout("GCS"); #endif diff --git a/tiledb/sm/filesystem/vfs.h b/tiledb/sm/filesystem/vfs.h index 3fc7d5590b5..2546ef17f1f 100644 --- a/tiledb/sm/filesystem/vfs.h +++ b/tiledb/sm/filesystem/vfs.h @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2017-2023 TileDB, Inc. + * @copyright Copyright (c) 2017-2024 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -235,6 +235,37 @@ struct VFSBase { stats::Stats* stats_; }; +/** The GCS filesystem. */ +#ifdef HAVE_GCS +class GCS_within_VFS { + /** Private member variable */ + GCS gcs_; + + protected: + template + GCS_within_VFS(Args&&... args) + : gcs_(std::forward(args)...) { + } + + /** Protected accessor for the GCS object. */ + inline GCS& gcs() { + return gcs_; + } + + /** Protected accessor for the const GCS object. */ + inline const GCS& gcs() const { + return gcs_; + } +}; +#else +class GCS_within_VFS { + protected: + template + GCS_within_VFS(Args&&...) { + } // empty constructor +}; +#endif + /** The S3 filesystem. */ #ifdef HAVE_S3 class S3_within_VFS { @@ -270,7 +301,7 @@ class S3_within_VFS { * This class implements a virtual filesystem that directs filesystem-related * function execution to the appropriate backend based on the input URI. */ -class VFS : private VFSBase, protected S3_within_VFS { +class VFS : private VFSBase, protected GCS_within_VFS, S3_within_VFS { public: /* ********************************* */ /* TYPE DEFINITIONS */ @@ -566,7 +597,7 @@ class VFS : private VFSBase, protected S3_within_VFS { #endif } else if (parent.is_gcs()) { #ifdef HAVE_GCS - results = gcs_.ls_filtered(parent, f, d, recursive); + results = gcs().ls_filtered(parent, f, d, recursive); #else throw filesystem::VFSException("TileDB was built without GCS support"); #endif @@ -943,10 +974,6 @@ class VFS : private VFSBase, protected S3_within_VFS { Azure azure_; #endif -#ifdef HAVE_GCS - GCS gcs_; -#endif - #ifdef _WIN32 Win win_; #else From 3d42725abdd0e96927be50d9c8f20df3fac2c3e6 Mon Sep 17 00:00:00 2001 From: Rebekah Davis Date: Mon, 2 Dec 2024 15:10:18 -0500 Subject: [PATCH 2/5] Add missing config documentation. --- tiledb/api/c_api/config/config_api_external.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tiledb/api/c_api/config/config_api_external.h b/tiledb/api/c_api/config/config_api_external.h index 7ef312d085c..ed4b7f69c58 100644 --- a/tiledb/api/c_api/config/config_api_external.h +++ b/tiledb/api/c_api/config/config_api_external.h @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2023 TileDB, Inc. + * @copyright Copyright (c) 2023-2024 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -455,6 +455,9 @@ TILEDB_EXPORT void tiledb_config_free(tiledb_config_t** config) TILEDB_NOEXCEPT; * The maximum permissible delay between Azure netwwork request retry * attempts, in milliseconds. * **Default**: 60000 + * - `vfs.gcs.endpoint`
+ * The GCS endpoint.
+ * **Default**: "" * - `vfs.gcs.project_id`
* Set the GCS project ID to create new buckets to. Not required unless you * are going to use the VFS to create buckets.
From e62052efcb9dbaf1d04bf8b91e43c83ee86e2efd Mon Sep 17 00:00:00 2001 From: Rebekah Davis Date: Mon, 9 Dec 2024 15:32:39 -0500 Subject: [PATCH 3/5] Migrate GCSParameters constructor to implementation file. --- tiledb/sm/filesystem/gcs.cc | 25 +++++++++++++++++++++++++ tiledb/sm/filesystem/gcs.h | 26 +------------------------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/tiledb/sm/filesystem/gcs.cc b/tiledb/sm/filesystem/gcs.cc index fd05e1024d5..02fccb4bed3 100644 --- a/tiledb/sm/filesystem/gcs.cc +++ b/tiledb/sm/filesystem/gcs.cc @@ -65,6 +65,31 @@ namespace tiledb::sm { /* CONSTRUCTORS & DESTRUCTORS */ /* ********************************* */ +GCSParameters::GCSParameters(const Config& config) + : endpoint_(config.get("vfs.gcs.endpoint", Config::must_find)) + , project_id_( + config.get("vfs.gcs.project_id", Config::must_find)) + , service_account_key_(config.get( + "vfs.gcs.service_account_key", Config::must_find)) + , workload_identity_configuration_(config.get( + "vfs.gcs.workload_identity_configuration", Config::must_find)) + , impersonate_service_account_(config.get( + "vfs.gcs.impersonate_service_account", Config::must_find)) + , multi_part_size_( + config.get("vfs.gcs.multi_part_size", Config::must_find)) + , max_parallel_ops_( + config.get("vfs.gcs.max_parallel_ops", Config::must_find)) + , use_multi_part_upload_( + config.get("vfs.gcs.use_multi_part_upload", Config::must_find)) + , request_timeout_ms_( + config.get("vfs.gcs.request_timeout_ms", Config::must_find)) + , max_direct_upload_size_(config.get( + "vfs.gcs.max_direct_upload_size", Config::must_find)) { + if (endpoint_.empty() && getenv("TILEDB_TEST_GCS_ENDPOINT")) { + endpoint_ = getenv("TILEDB_TEST_GCS_ENDPOINT"); + } +} + GCS::GCS(ThreadPool* thread_pool, const Config& config) : gcs_params_(GCSParameters(config)) , state_(State::UNINITIALIZED) diff --git a/tiledb/sm/filesystem/gcs.h b/tiledb/sm/filesystem/gcs.h index 82ed5330671..783ef7cc2cd 100644 --- a/tiledb/sm/filesystem/gcs.h +++ b/tiledb/sm/filesystem/gcs.h @@ -85,31 +85,7 @@ class GCSException : public StatusException { struct GCSParameters { GCSParameters() = delete; - GCSParameters(const Config& config) - : endpoint_( - config.get("vfs.gcs.endpoint", Config::must_find)) - , project_id_( - config.get("vfs.gcs.project_id", Config::must_find)) - , service_account_key_(config.get( - "vfs.gcs.service_account_key", Config::must_find)) - , workload_identity_configuration_(config.get( - "vfs.gcs.workload_identity_configuration", Config::must_find)) - , impersonate_service_account_(config.get( - "vfs.gcs.impersonate_service_account", Config::must_find)) - , multi_part_size_( - config.get("vfs.gcs.multi_part_size", Config::must_find)) - , max_parallel_ops_( - config.get("vfs.gcs.max_parallel_ops", Config::must_find)) - , use_multi_part_upload_(config.get( - "vfs.gcs.use_multi_part_upload", Config::must_find)) - , request_timeout_ms_(config.get( - "vfs.gcs.request_timeout_ms", Config::must_find)) - , max_direct_upload_size_(config.get( - "vfs.gcs.max_direct_upload_size", Config::must_find)) { - if (endpoint_.empty() && getenv("TILEDB_TEST_GCS_ENDPOINT")) { - endpoint_ = getenv("TILEDB_TEST_GCS_ENDPOINT"); - } - }; + GCSParameters(const Config& config); ~GCSParameters() = default; From f872e0fac4e29ee24816fd3a70dead7f294a08c8 Mon Sep 17 00:00:00 2001 From: Rebekah Davis Date: Mon, 9 Dec 2024 17:37:13 -0500 Subject: [PATCH 4/5] Un-Status (most of) class GCS --- tiledb/sm/filesystem/gcs.cc | 474 +++++++++++------------------------- tiledb/sm/filesystem/gcs.h | 132 +++------- tiledb/sm/filesystem/vfs.cc | 40 +-- 3 files changed, 197 insertions(+), 449 deletions(-) diff --git a/tiledb/sm/filesystem/gcs.cc b/tiledb/sm/filesystem/gcs.cc index 02fccb4bed3..60808d24a17 100644 --- a/tiledb/sm/filesystem/gcs.cc +++ b/tiledb/sm/filesystem/gcs.cc @@ -102,6 +102,7 @@ GCS::GCS(ThreadPool* thread_pool, const Config& config) gcs_params_.max_parallel_ops_ * gcs_params_.multi_part_size_ : gcs_params_.max_direct_upload_size_; state_ = State::INITIALIZED; + init_client(); } GCS::~GCS() { @@ -190,13 +191,11 @@ std::shared_ptr GCS::make_credentials( creds, gcs_params_.impersonate_service_account_, options); } -Status GCS::init_client() const { +void GCS::init_client() const { assert(state_ == State::INITIALIZED); - std::lock_guard lck(client_init_mtx_); - if (client_) { - return Status::Ok(); + return; } google::cloud::Options ca_options; @@ -237,20 +236,14 @@ Status GCS::init_client() const { } catch (const std::exception& e) { throw GCSException("Failed to initialize GCS: " + std::string(e.what())); } - - return Status::Ok(); } -Status GCS::create_bucket(const URI& uri) const { - RETURN_NOT_OK(init_client()); - +void GCS::create_bucket(const URI& uri) const { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } - std::string bucket_name; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, nullptr)); - + auto [bucket_name, object_path] = parse_gcs_uri(uri); google::cloud::StatusOr bucket_metadata = client_->CreateBucketForProject( bucket_name, @@ -263,33 +256,35 @@ Status GCS::create_bucket(const URI& uri) const { bucket_metadata.status().message() + ")"); } - return wait_for_bucket_to_propagate(bucket_name); + // Wait for bucket to propagate + unsigned attempts = 0; + while (attempts++ < constants::gcs_max_attempts) { + if (is_bucket(URI(bucket_name))) { + return; + } + std::this_thread::sleep_for( + std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); + } + throw GCSException( + "Timed out waiting on bucket to propogate: " + bucket_name); } -Status GCS::empty_bucket(const URI& uri) const { - RETURN_NOT_OK(init_client()); - +void GCS::empty_bucket(const URI& uri) const { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } - return remove_dir(uri); + remove_dir(uri); } -Status GCS::is_empty_bucket(const URI& uri, bool* is_empty) const { - RETURN_NOT_OK(init_client()); - assert(is_empty); - +bool GCS::is_empty_bucket(const URI& uri) const { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } - std::string bucket_name; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, nullptr)); - + auto [bucket_name, object_path] = parse_gcs_uri(uri); google::cloud::storage::ListObjectsReader objects_reader = client_->ListObjects(bucket_name); - for (const google::cloud::StatusOr& object_metadata : objects_reader) { if (!object_metadata) { @@ -297,79 +292,52 @@ Status GCS::is_empty_bucket(const URI& uri, bool* is_empty) const { "List bucket objects failed on: " + uri.to_string() + " (" + object_metadata.status().message() + ")"); } - - *is_empty = false; - return Status::Ok(); + return false; } - - *is_empty = true; - return Status::Ok(); + return true; } -Status GCS::is_bucket(const URI& uri, bool* const is_bucket) const { - RETURN_NOT_OK(init_client()); - assert(is_bucket); - +bool GCS::is_bucket(const URI& uri) const { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } + auto [bucket_name, object_path] = parse_gcs_uri(uri); - std::string bucket_name; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, nullptr)); - - return this->is_bucket(bucket_name, is_bucket); -} - -Status GCS::is_bucket( - const std::string& bucket_name, bool* const is_bucket) const { google::cloud::StatusOr bucket_metadata = client_->GetBucketMetadata(bucket_name); if (!bucket_metadata.ok()) { const google::cloud::Status status = bucket_metadata.status(); const google::cloud::StatusCode code = status.code(); - if (code == google::cloud::StatusCode::kNotFound) { - *is_bucket = false; - return Status::Ok(); + return false; } else { throw GCSException( "Get bucket failed on: " + bucket_name + " (" + status.message() + ")"); } } - - *is_bucket = true; - return Status::Ok(); + return true; } -Status GCS::is_dir(const URI& uri, bool* const exists) const { - RETURN_NOT_OK(init_client()); - assert(exists); - +bool GCS::is_dir(const URI& uri) const { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } - - std::vector paths; - RETURN_NOT_OK(ls(uri, &paths, "/", 1)); - *exists = (bool)paths.size(); - return Status::Ok(); + std::vector paths = ls(uri, "/", 1); + return (bool)paths.size(); } -Status GCS::remove_bucket(const URI& uri) const { - RETURN_NOT_OK(init_client()); - +void GCS::remove_bucket(const URI& uri) const { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } // Empty bucket - RETURN_NOT_OK(empty_bucket(uri)); - - std::string bucket_name; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, nullptr)); + empty_bucket(uri); + auto [bucket_name, object_path] = parse_gcs_uri(uri); + // Delete bucket const google::cloud::Status status = client_->DeleteBucket(bucket_name); if (!status.ok()) { throw GCSException( @@ -377,20 +345,25 @@ Status GCS::remove_bucket(const URI& uri) const { status.message() + ")"); } - return wait_for_bucket_to_be_deleted(bucket_name); + // Wait for bucket to be deleted + unsigned attempts = 0; + while (attempts++ < constants::gcs_max_attempts) { + if (!is_bucket(URI(bucket_name))) { + return; + } + std::this_thread::sleep_for( + std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); + } + throw GCSException( + "Timed out waiting on bucket to be deleted: " + bucket_name); } -Status GCS::remove_object(const URI& uri) const { - RETURN_NOT_OK(init_client()); - +void GCS::remove_object(const URI& uri) const { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } - std::string bucket_name; - std::string object_path; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, &object_path)); - + auto [bucket_name, object_path] = parse_gcs_uri(uri); const google::cloud::Status status = client_->DeleteObject(bucket_name, object_path); if (!status.ok()) { @@ -399,25 +372,30 @@ Status GCS::remove_object(const URI& uri) const { status.message() + ")"); } - return wait_for_object_to_be_deleted(bucket_name, object_path); + // Wait for object to be deleted + unsigned attempts = 0; + while (attempts++ < constants::gcs_max_attempts) { + bool is_object; + throw_if_not_ok(this->is_object(bucket_name, object_path, &is_object)); + if (!is_object) { + return; + } + std::this_thread::sleep_for( + std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); + } + throw GCSException( + "Timed out waiting on object to be deleted: " + object_path); } -Status GCS::remove_dir(const URI& uri) const { - RETURN_NOT_OK(init_client()); - +void GCS::remove_dir(const URI& uri) const { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } - - std::vector paths; - RETURN_NOT_OK(ls(uri, &paths, "")); - auto status = parallel_for(thread_pool_, 0, paths.size(), [&](size_t i) { - throw_if_not_ok(remove_object(URI(paths[i]))); + std::vector paths = ls(uri, ""); + throw_if_not_ok(parallel_for(thread_pool_, 0, paths.size(), [&](size_t i) { + remove_object(URI(paths[i])); return Status::Ok(); - }); - RETURN_NOT_OK(status); - - return Status::Ok(); + })); } std::string GCS::remove_front_slash(const std::string& path) { @@ -444,33 +422,22 @@ std::string GCS::remove_trailing_slash(const std::string& path) { return path; } -Status GCS::ls( - const URI& uri, - std::vector* paths, - const std::string& delimiter, - const int max_paths) const { - assert(paths); - +std::vector GCS::ls( + const URI& uri, const std::string& delimiter, const int max_paths) const { + std::vector paths; for (auto& fs : ls_with_sizes(uri, delimiter, max_paths)) { - paths->emplace_back(fs.path().native()); + paths.emplace_back(fs.path().native()); } - - return Status::Ok(); + return paths; } std::vector GCS::ls_with_sizes( const URI& uri, const std::string& delimiter, int max_paths) const { - throw_if_not_ok(init_client()); - const URI uri_dir = uri.add_trailing_slash(); - if (!uri_dir.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri_dir.to_string()); } - - std::string bucket_name; - std::string object_path; - throw_if_not_ok(parse_gcs_uri(uri_dir, &bucket_name, &object_path)); + auto [bucket_name, object_path] = parse_gcs_uri(uri); std::vector entries; google::cloud::storage::Prefix prefix_option(object_path); @@ -523,22 +490,14 @@ LsObjects GCS::ls_filtered_impl( const URI& uri, std::function file_filter, bool recursive) const { - throw_if_not_ok(init_client()); - const URI uri_dir = uri.add_trailing_slash(); - if (!uri_dir.is_gcs()) { - throw GCSException( - std::string("URI is not a GCS URI: " + uri_dir.to_string())); + throw GCSException("URI is not a GCS URI: " + uri_dir.to_string()); } std::string prefix = uri_dir.backend_name() + "://"; - std::string bucket_name; - std::string object_path; - throw_if_not_ok(parse_gcs_uri(uri_dir, &bucket_name, &object_path)); - + auto [bucket_name, object_path] = parse_gcs_uri(uri); LsObjects result; - auto to_directory_entry = [&bucket_name, &prefix](const google::cloud::storage::ObjectMetadata& obj) -> LsObjects::value_type { @@ -596,52 +555,35 @@ LsObjects GCS::ls_filtered_impl( return result; } -Status GCS::move_object(const URI& old_uri, const URI& new_uri) { - RETURN_NOT_OK(init_client()); - - RETURN_NOT_OK(copy_object(old_uri, new_uri)); - RETURN_NOT_OK(remove_object(old_uri)); - return Status::Ok(); -} - -Status GCS::copy_object(const URI& old_uri, const URI& new_uri) { - RETURN_NOT_OK(init_client()); - +void GCS::move_object(const URI& old_uri, const URI& new_uri) { + // Copy old object into new object if (!old_uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + old_uri.to_string()); } - if (!new_uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + new_uri.to_string()); } - - std::string old_bucket_name; - std::string old_object_path; - RETURN_NOT_OK(parse_gcs_uri(old_uri, &old_bucket_name, &old_object_path)); - - std::string new_bucket_name; - std::string new_object_path; - RETURN_NOT_OK(parse_gcs_uri(new_uri, &new_bucket_name, &new_object_path)); - + auto [old_bucket_name, old_object_path] = parse_gcs_uri(old_uri); + auto [new_bucket_name, new_object_path] = parse_gcs_uri(new_uri); google::cloud::StatusOr object_metadata = client_->CopyObject( old_bucket_name, old_object_path, new_bucket_name, new_object_path); if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - throw GCSException( "Copy object failed on: " + old_uri.to_string() + " (" + status.message() + ")"); } + throw_if_not_ok( + wait_for_object_to_propagate(new_bucket_name, new_object_path)); - return wait_for_object_to_propagate(new_bucket_name, new_object_path); + // Remove the old object + remove_object(old_uri); } Status GCS::wait_for_object_to_propagate( const std::string& bucket_name, const std::string& object_path) const { - RETURN_NOT_OK(init_client()); - unsigned attempts = 0; while (attempts++ < constants::gcs_max_attempts) { bool is_object; @@ -658,113 +600,37 @@ Status GCS::wait_for_object_to_propagate( "Timed out waiting on object to propogate: " + object_path); } -Status GCS::wait_for_object_to_be_deleted( - const std::string& bucket_name, const std::string& object_path) const { - RETURN_NOT_OK(init_client()); - - unsigned attempts = 0; - while (attempts++ < constants::gcs_max_attempts) { - bool is_object; - RETURN_NOT_OK(this->is_object(bucket_name, object_path, &is_object)); - if (!is_object) { - return Status::Ok(); - } - - std::this_thread::sleep_for( - std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); - } - - throw GCSException( - "Timed out waiting on object to be deleted: " + object_path); -} - -Status GCS::wait_for_bucket_to_propagate(const std::string& bucket_name) const { - unsigned attempts = 0; - while (attempts++ < constants::gcs_max_attempts) { - bool is_bucket; - RETURN_NOT_OK(this->is_bucket(bucket_name, &is_bucket)); - - if (is_bucket) { - return Status::Ok(); - } - - std::this_thread::sleep_for( - std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); - } - - throw GCSException( - "Timed out waiting on bucket to propogate: " + bucket_name); -} - -Status GCS::wait_for_bucket_to_be_deleted( - const std::string& bucket_name) const { - RETURN_NOT_OK(init_client()); - - unsigned attempts = 0; - while (attempts++ < constants::gcs_max_attempts) { - bool is_bucket; - RETURN_NOT_OK(this->is_bucket(bucket_name, &is_bucket)); - if (!is_bucket) { - return Status::Ok(); - } - - std::this_thread::sleep_for( - std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); - } - - throw GCSException( - "Timed out waiting on bucket to be deleted: " + bucket_name); -} - -Status GCS::move_dir(const URI& old_uri, const URI& new_uri) { - RETURN_NOT_OK(init_client()); - - std::vector paths; - RETURN_NOT_OK(ls(old_uri, &paths, "")); +void GCS::move_dir(const URI& old_uri, const URI& new_uri) { + std::vector paths = ls(old_uri, ""); for (const auto& path : paths) { const std::string suffix = path.substr(old_uri.to_string().size()); const URI new_path = new_uri.join_path(suffix); - RETURN_NOT_OK(move_object(URI(path), new_path)); + move_object(URI(path), new_path); } - return Status::Ok(); } -Status GCS::touch(const URI& uri) const { - RETURN_NOT_OK(init_client()); - +void GCS::touch(const URI& uri) const { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } - - std::string bucket_name; - std::string object_path; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, &object_path)); + auto [bucket_name, object_path] = parse_gcs_uri(uri); google::cloud::StatusOr object_metadata = client_->InsertObject(bucket_name, object_path, ""); - if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - throw GCSException( "Touch object failed on: " + uri.to_string() + " (" + status.message() + ")"); } - - return Status::Ok(); } Status GCS::is_object(const URI& uri, bool* const is_object) const { - RETURN_NOT_OK(init_client()); assert(is_object); - if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } - - std::string bucket_name; - std::string object_path; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, &object_path)); + auto [bucket_name, object_path] = parse_gcs_uri(uri); return this->is_object(bucket_name, object_path, is_object); } @@ -797,18 +663,14 @@ Status GCS::is_object( return Status::Ok(); } -Status GCS::write( +void GCS::write( const URI& uri, const void* const buffer, const uint64_t length) { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } Buffer* const write_cache_buffer = get_write_cache_buffer(uri.to_string()); - - uint64_t nbytes_filled; - RETURN_NOT_OK( - fill_write_cache(write_cache_buffer, buffer, length, &nbytes_filled)); - + uint64_t nbytes_filled = fill_write_cache(write_cache_buffer, buffer, length); if (!gcs_params_.use_multi_part_upload_) { if (nbytes_filled != length) { std::stringstream errmsg; @@ -817,67 +679,52 @@ Status GCS::write( "configured with the 'vfs.gcs.max_direct_upload_size' option."; throw GCSException(errmsg.str()); } else { - return Status::Ok(); + return; } } if (write_cache_buffer->size() == write_cache_max_size_) { - RETURN_NOT_OK(flush_write_cache(uri, write_cache_buffer, false)); + throw_if_not_ok(flush_write_cache(uri, write_cache_buffer, false)); } uint64_t new_length = length - nbytes_filled; uint64_t offset = nbytes_filled; while (new_length > 0) { if (new_length >= write_cache_max_size_) { - RETURN_NOT_OK(write_parts( + write_parts( uri, static_cast(buffer) + offset, write_cache_max_size_, - false)); + false); offset += write_cache_max_size_; new_length -= write_cache_max_size_; } else { - RETURN_NOT_OK(fill_write_cache( + nbytes_filled = fill_write_cache( write_cache_buffer, static_cast(buffer) + offset, - new_length, - &nbytes_filled)); + new_length); offset += nbytes_filled; new_length -= nbytes_filled; } } - assert(offset == length); - - return Status::Ok(); } -Status GCS::object_size(const URI& uri, uint64_t* const nbytes) const { - RETURN_NOT_OK(init_client()); - assert(nbytes); - +uint64_t GCS::object_size(const URI& uri) const { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } - - std::string bucket_name; - std::string object_path; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, &object_path)); + auto [bucket_name, object_path] = parse_gcs_uri(uri); google::cloud::StatusOr object_metadata = client_->GetObjectMetadata(bucket_name, object_path); - if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - throw GCSException( "Get object size failed on: " + object_path + " (" + status.message() + ")"); } - - *nbytes = object_metadata->size(); - - return Status::Ok(); + return object_metadata->size(); } Buffer* GCS::get_write_cache_buffer(const std::string& uri) { @@ -889,40 +736,40 @@ Buffer* GCS::get_write_cache_buffer(const std::string& uri) { } } -Status GCS::fill_write_cache( +uint64_t GCS::fill_write_cache( Buffer* const write_cache_buffer, const void* const buffer, - const uint64_t length, - uint64_t* const nbytes_filled) { + const uint64_t length) { assert(write_cache_buffer); assert(buffer); - assert(nbytes_filled); - *nbytes_filled = + uint64_t nbytes_filled = std::min(write_cache_max_size_ - write_cache_buffer->size(), length); - - if (*nbytes_filled > 0) { - RETURN_NOT_OK(write_cache_buffer->write(buffer, *nbytes_filled)); + if (nbytes_filled > 0) { + throw_if_not_ok(write_cache_buffer->write(buffer, nbytes_filled)); } - - return Status::Ok(); + return nbytes_filled; } Status GCS::flush_write_cache( const URI& uri, Buffer* const write_cache_buffer, const bool last_part) { assert(write_cache_buffer); - if (write_cache_buffer->size() > 0) { - const Status st = write_parts( - uri, write_cache_buffer->data(), write_cache_buffer->size(), last_part); - write_cache_buffer->reset_size(); - RETURN_NOT_OK(st); + try { + write_parts( + uri, + write_cache_buffer->data(), + write_cache_buffer->size(), + last_part); + write_cache_buffer->reset_size(); + } catch (std::exception& e) { + return Status("[TileDB::GCS]", e.what()); + } } - return Status::Ok(); } -Status GCS::write_parts( +void GCS::write_parts( const URI& uri, const void* const buffer, const uint64_t length, @@ -943,13 +790,9 @@ Status GCS::write_parts( std::min(std::max(num_ops, uint64_t(1)), gcs_params_.max_parallel_ops_); if (!last_part && length % gcs_params_.multi_part_size_ != 0) { - return LOG_STATUS( - Status_S3Error("Length not evenly divisible by part size")); + throw GCSException("Length not evenly divisible by part size"); } - - std::string bucket_name; - std::string object_path; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, &object_path)); + auto [bucket_name, object_path] = parse_gcs_uri(uri); MultiPartUploadState* state; std::unique_lock state_lck; @@ -981,9 +824,9 @@ Status GCS::write_parts( // Delete file if it exists (overwrite). bool exists; - RETURN_NOT_OK(is_object(uri, &exists)); + throw_if_not_ok(is_object(uri, &exists)); if (exists) { - RETURN_NOT_OK(remove_object(uri)); + remove_object(uri); } } else { // If another thread switched state, switch back to a read lock @@ -1009,7 +852,8 @@ Status GCS::write_parts( upload_part(bucket_name, object_part_path, buffer, length); state->update_st(st); state_lck.unlock(); - return st; + throw_if_not_ok(st); + return; } else { std::vector tasks; tasks.reserve(num_ops); @@ -1036,10 +880,8 @@ Status GCS::write_parts( const Status st = thread_pool_->wait_all(tasks); state->update_st(st); state_lck.unlock(); - return st; + throw_if_not_ok(st); } - - return Status::Ok(); } Status GCS::upload_part( @@ -1065,15 +907,14 @@ Status GCS::upload_part( return Status::Ok(); } -Status GCS::flush_object(const URI& uri) { - RETURN_NOT_OK(init_client()); - +void GCS::flush_object(const URI& uri) { if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } if (!gcs_params_.use_multi_part_upload_) { - return flush_object_direct(uri); + flush_object_direct(uri); + return; } Buffer* const write_cache_buffer = get_write_cache_buffer(uri.to_string()); @@ -1085,7 +926,7 @@ Status GCS::flush_object(const URI& uri) { UniqueReadLock unique_rl(&multipart_upload_rwlock_); if (multi_part_upload_states_.count(uri.to_string()) == 0) { - return flush_write_cache_st; + throw_if_not_ok(flush_write_cache_st); } MultiPartUploadState* const state = @@ -1096,10 +937,7 @@ Status GCS::flush_object(const URI& uri) { unique_rl.unlock(); const std::vector part_paths = state->get_part_paths(); - - std::string bucket_name; - std::string object_path; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, &object_path)); + auto [bucket_name, object_path] = parse_gcs_uri(uri); // Wait for the last written part to propogate to ensure all parts // are available for composition into a single object. @@ -1115,8 +953,7 @@ Status GCS::flush_object(const URI& uri) { // Release all instance state associated with this part list // transactions. finish_multi_part_upload(uri); - - return Status::Ok(); + return; } // Build a list of objects to compose. @@ -1160,7 +997,7 @@ Status GCS::flush_object(const URI& uri) { status.message() + ")"); } - return wait_for_object_to_propagate(bucket_name, object_path); + throw_if_not_ok(wait_for_object_to_propagate(bucket_name, object_path)); } void GCS::delete_parts( @@ -1205,17 +1042,12 @@ void GCS::finish_multi_part_upload(const URI& uri) { cache_lock.unlock(); } -Status GCS::flush_object_direct(const URI& uri) { +void GCS::flush_object_direct(const URI& uri) { Buffer* const write_cache_buffer = get_write_cache_buffer(uri.to_string()); - if (write_cache_buffer->size() == 0) { - return Status::Ok(); + return; } - - std::string bucket_name; - std::string object_path; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, &object_path)); - + auto [bucket_name, object_path] = parse_gcs_uri(uri); Buffer buffer_moved; // Protect 'write_cache_map_' from multiple writers. @@ -1241,7 +1073,7 @@ Status GCS::flush_object_direct(const URI& uri) { ")"); } - return wait_for_object_to_propagate(bucket_name, object_path); + throw_if_not_ok(wait_for_object_to_propagate(bucket_name, object_path)); } Status GCS::read( @@ -1251,15 +1083,10 @@ Status GCS::read( const uint64_t length, const uint64_t read_ahead_length, uint64_t* const length_returned) const { - RETURN_NOT_OK(init_client()); - if (!uri.is_gcs()) { throw GCSException("URI is not an GCS URI: " + uri.to_string()); } - - std::string bucket_name; - std::string object_path; - RETURN_NOT_OK(parse_gcs_uri(uri, &bucket_name, &object_path)); + auto [bucket_name, object_path] = parse_gcs_uri(uri); google::cloud::storage::ObjectReadStream stream = client_->ReadObject( bucket_name, @@ -1275,7 +1102,6 @@ Status GCS::read( stream.read(static_cast(buffer), length + read_ahead_length); *length_returned = stream.gcount(); - stream.Close(); if (*length_returned < length) { @@ -1285,10 +1111,7 @@ Status GCS::read( return Status::Ok(); } -Status GCS::parse_gcs_uri( - const URI& uri, - std::string* const bucket_name, - std::string* const object_path) const { +std::tuple GCS::parse_gcs_uri(const URI& uri) const { assert(uri.is_gcs()); const std::string uri_str = uri.to_string(); @@ -1297,11 +1120,7 @@ Status GCS::parse_gcs_uri( assert(uri_str.rfind(gcs_prefix, 0) == 0); if (uri_str.size() == gcs_prefix.size()) { - if (bucket_name) - *bucket_name = ""; - if (object_path) - *object_path = ""; - return Status::Ok(); + return std::make_tuple("", ""); } // Find the '/' after the bucket name. @@ -1311,11 +1130,8 @@ Status GCS::parse_gcs_uri( if (separator == std::string::npos) { const size_t c_pos_start = gcs_prefix.size(); const size_t c_pos_end = uri_str.size(); - if (bucket_name) - *bucket_name = uri_str.substr(c_pos_start, c_pos_end - c_pos_start); - if (object_path) - *object_path = ""; - return Status::Ok(); + return std::make_tuple( + uri_str.substr(c_pos_start, c_pos_end - c_pos_start), ""); } // There is only a bucket name if there aren't any characters past the @@ -1323,11 +1139,8 @@ Status GCS::parse_gcs_uri( if (uri_str.size() == separator) { const size_t c_pos_start = gcs_prefix.size(); const size_t c_pos_end = separator; - if (bucket_name) - *bucket_name = uri_str.substr(c_pos_start, c_pos_end - c_pos_start); - if (object_path) - *object_path = ""; - return Status::Ok(); + return std::make_tuple( + uri_str.substr(c_pos_start, c_pos_end - c_pos_start), ""); } const size_t c_pos_start = gcs_prefix.size(); @@ -1335,12 +1148,9 @@ Status GCS::parse_gcs_uri( const size_t b_pos_start = separator + 1; const size_t b_pos_end = uri_str.size(); - if (bucket_name) - *bucket_name = uri_str.substr(c_pos_start, c_pos_end - c_pos_start); - if (object_path) - *object_path = uri_str.substr(b_pos_start, b_pos_end - b_pos_start); - - return Status::Ok(); + return std::make_tuple( + uri_str.substr(c_pos_start, c_pos_end - c_pos_start), + uri_str.substr(b_pos_start, b_pos_end - b_pos_start)); } } // namespace tiledb::sm diff --git a/tiledb/sm/filesystem/gcs.h b/tiledb/sm/filesystem/gcs.h index 783ef7cc2cd..34d0995d614 100644 --- a/tiledb/sm/filesystem/gcs.h +++ b/tiledb/sm/filesystem/gcs.h @@ -178,30 +178,27 @@ class GCS { * Creates a bucket. * * @param uri The uri of the bucket to be created. - * @return Status */ - Status create_bucket(const URI& uri) const; + void create_bucket(const URI& uri) const; /** Removes the contents of a GCS bucket. */ - Status empty_bucket(const URI& uri) const; + void empty_bucket(const URI& uri) const; /** * Check if a bucket is empty. * * @param bucket The name of the bucket. - * @param is_empty Mutates to `true` if the bucket is empty. - * @return Status + * @param `true` if the bucket is empty, `false` otherwise. */ - Status is_empty_bucket(const URI& uri, bool* is_empty) const; + bool is_empty_bucket(const URI& uri) const; /** * Check if a bucket exists. * * @param bucket The name of the bucket. - * @param is_bucket Mutates to `true` if `uri` is a bucket. - * @return Status + * @return `true` if `uri` is a bucket, `false` otherwise. */ - Status is_bucket(const URI& uri, bool* is_bucket) const; + bool is_bucket(const URI& uri) const; /** * Check if 'is_object' is a object on GCS. @@ -227,26 +224,23 @@ class GCS { * prefix `gcs://some_gcs/foo2/` (in this case there is not). * * @param uri The URI to check. - * @param exists Sets it to `true` if the above mentioned condition holds. - * @return Status + * @return `true` if the above mentioned condition holds, `false` otherwise. */ - Status is_dir(const URI& uri, bool* exists) const; + bool is_dir(const URI& uri) const; /** * Deletes a bucket. * * @param uri The URI of the bucket to be deleted. - * @return Status */ - Status remove_bucket(const URI& uri) const; + void remove_bucket(const URI& uri) const; /** * Deletes an object with a given URI. * * @param uri The URI of the object to be deleted. - * @return Status */ - Status remove_object(const URI& uri) const; + void remove_object(const URI& uri) const; /** * Deletes all objects with prefix `uri/` (if the ending `/` does not @@ -271,9 +265,8 @@ class GCS { * this example. * * @param uri The prefix uri of the objects to be deleted. - * @return Status */ - Status remove_dir(const URI& uri) const; + void remove_dir(const URI& uri) const; /** * Lists the objects that start with `uri`. Full URI paths are @@ -292,15 +285,13 @@ class GCS { * - `foo/bar` * * @param uri The prefix URI. - * @param paths Pointer of a vector of URIs to store the retrieved paths. * @param delimiter The delimiter that will * @param max_paths The maximum number of paths to be retrieved. The default * `-1` indicates that no upper bound is specified. - * @return Status + * @return A vector of the retrieved paths' URIs. */ - Status ls( + std::vector ls( const URI& uri, - std::vector* paths, const std::string& delimiter = "/", int max_paths = -1) const; @@ -349,9 +340,8 @@ class GCS { * * @param old_uri The URI of the old path. * @param new_uri The URI of the new path. - * @return Status */ - Status move_object(const URI& old_uri, const URI& new_uri); + void move_object(const URI& old_uri, const URI& new_uri); /** * Renames a directory. Note that this is an expensive operation. @@ -361,17 +351,15 @@ class GCS { * * @param old_uri The URI of the old path. * @param new_uri The URI of the new path. - * @return Status */ - Status move_dir(const URI& old_uri, const URI& new_uri); + void move_dir(const URI& old_uri, const URI& new_uri); /** * Creates an empty object. * * @param uri The URI of the object to be created. - * @return Status */ - Status touch(const URI& uri) const; + void touch(const URI& uri) const; /** * Writes the input buffer to an GCS object. Note that this is essentially @@ -380,9 +368,8 @@ class GCS { * @param uri The URI of the object to be written to. * @param buffer The input buffer. * @param length The size of the input buffer. - * @return Status */ - Status write(const URI& uri, const void* buffer, uint64_t length); + void write(const URI& uri, const void* buffer, uint64_t length); /** * Reads data from an object into a buffer. @@ -407,18 +394,16 @@ class GCS { * Returns the size of the input object with a given URI in bytes. * * @param uri The URI of the object. - * @param nbytes Pointer to `uint64_t` bytes to return. - * @return Status + * @return The size of the input object, in bytes. */ - Status object_size(const URI& uri, uint64_t* nbytes) const; + uint64_t object_size(const URI& uri) const; /** * Flushes an object to GCS, finalizing the upload. * * @param uri The URI of the object to be flushed. - * @return Status */ - Status flush_object(const URI& uri); + void flush_object(const URI& uri); /** * Creates a GCS credentials object. @@ -568,12 +553,8 @@ class GCS { /* PRIVATE METHODS */ /* ********************************* */ - /** - * Initializes the client, if it has not already been initialized. - * - * @return Status - */ - Status init_client() const; + /** Initializes the client, if it has not already been initialized. */ + void init_client() const; /** * Parses a URI into a bucket name and object path. For example, @@ -581,12 +562,9 @@ class GCS { * `*bucket_name == "my-bucket"` and `*object_path == "dir1/file1"`. * * @param uri The URI to parse. - * @param bucket_name Mutates to the bucket name. - * @param object_path Mutates to the object path. - * @return Status + * @return A tuple of the bucket name and object path. */ - Status parse_gcs_uri( - const URI& uri, std::string* bucket_name, std::string* object_path) const; + std::tuple parse_gcs_uri(const URI& uri) const; /** * Removes a leading slash from 'path' if it exists. @@ -609,15 +587,6 @@ class GCS { */ static std::string remove_trailing_slash(const std::string& path); - /** - * Copies the object at 'old_uri' to `new_uri`. - * - * @param old_uri The object's current URI. - * @param new_uri The object's URI to move to. - * @return Status - */ - Status copy_object(const URI& old_uri, const URI& new_uri); - /** * Waits for a object with `bucket_name` and `object_path` * to exist on GCS. @@ -629,35 +598,6 @@ class GCS { Status wait_for_object_to_propagate( const std::string& bucket_name, const std::string& object_path) const; - /** - * Waits for a object with `bucket_name` and `object_path` - * to not exist on GCS. - * - * @param bucket_name The object's bucket name. - * @param object_path The object's path - * @return Status - */ - Status wait_for_object_to_be_deleted( - const std::string& bucket_name, const std::string& object_path) const; - - /** - * Waits for a bucket with `bucket_name` - * to exist on GCS. - * - * @param bucket_name The bucket's name. - * @return Status - */ - Status wait_for_bucket_to_propagate(const std::string& bucket_name) const; - - /** - * Waits for a bucket with `bucket_name` - * to not exist on GCS. - * - * @param bucket_name The bucket's name. - * @return Status - */ - Status wait_for_bucket_to_be_deleted(const std::string& bucket_name) const; - /** * Check if 'is_object' is a object on GCS. * @@ -671,15 +611,6 @@ class GCS { const std::string& object_path, bool* const is_object) const; - /** - * Check if 'bucket_name' is a bucket on GCS. - * - * @param bucket_name The bucket's name. - * @param is_bucket Mutates to the output. - * @return Status - */ - Status is_bucket(const std::string& bucket_name, bool* const is_bucket) const; - /** * Contains the implementation of ls_filtered. * @@ -724,14 +655,10 @@ class GCS { * @param write_cache_buffer The destination write cache buffer to fill. * @param buffer The source binary buffer to fill the data from. * @param length The length of `buffer`. - * @param nbytes_filled The number of bytes filled into `write_cache_buffer`. - * @return Status + * @return The number of bytes filled into `write_cache_buffer`. */ - Status fill_write_cache( - Buffer* write_cache_buffer, - const void* buffer, - const uint64_t length, - uint64_t* nbytes_filled); + uint64_t fill_write_cache( + Buffer* write_cache_buffer, const void* buffer, const uint64_t length); /** * Writes the contents of the input buffer to the object given by @@ -756,9 +683,8 @@ class GCS { * @param length The size of the input buffer. * @param last_part Should be true only when this is the last part of a * object. - * @return Status */ - Status write_parts( + void write_parts( const URI& uri, const void* buffer, uint64_t length, bool last_part); /** @@ -807,7 +733,7 @@ class GCS { * Uploads the write cache buffer associated with 'uri' as an entire * object. */ - Status flush_object_direct(const URI& uri); + void flush_object_direct(const URI& uri); }; } // namespace tiledb::sm diff --git a/tiledb/sm/filesystem/vfs.cc b/tiledb/sm/filesystem/vfs.cc index cff54296a9a..3e2688eab5c 100644 --- a/tiledb/sm/filesystem/vfs.cc +++ b/tiledb/sm/filesystem/vfs.cc @@ -268,7 +268,8 @@ Status VFS::touch(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs().touch(uri); + gcs().touch(uri); + return Status::Ok(); #else throw BuiltWithout("GCS"); #endif @@ -303,7 +304,8 @@ Status VFS::create_bucket(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs().create_bucket(uri); + gcs().create_bucket(uri); + return Status::Ok(); #else throw BuiltWithout("GCS"); #endif @@ -329,7 +331,8 @@ Status VFS::remove_bucket(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs().remove_bucket(uri); + gcs().remove_bucket(uri); + return Status::Ok(); #else throw BuiltWithout("GCS"); #endif @@ -356,7 +359,8 @@ Status VFS::empty_bucket(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs().empty_bucket(uri); + gcs().empty_bucket(uri); + return Status::Ok(); #else throw BuiltWithout("GCS"); #endif @@ -384,7 +388,8 @@ Status VFS::is_empty_bucket( } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs().is_empty_bucket(uri, is_empty); + *is_empty = gcs().is_empty_bucket(uri); + return Status::Ok(); #else throw BuiltWithout("GCS"); #endif @@ -420,7 +425,7 @@ Status VFS::remove_dir(const URI& uri) const { #endif } else if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs().remove_dir(uri); + gcs().remove_dir(uri); #else throw BuiltWithout("GCS"); #endif @@ -488,7 +493,8 @@ Status VFS::remove_file(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs().remove_object(uri); + gcs().remove_object(uri); + return Status::Ok(); #else throw BuiltWithout("GCS"); #endif @@ -575,7 +581,8 @@ Status VFS::file_size(const URI& uri, uint64_t* size) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs().object_size(uri, size); + *size = gcs().object_size(uri); + return Status::Ok(); #else throw BuiltWithout("GCS"); #endif @@ -622,7 +629,8 @@ Status VFS::is_dir(const URI& uri, bool* is_dir) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs().is_dir(uri, is_dir); + *is_dir = gcs().is_dir(uri); + return Status::Ok(); #else *is_dir = false; throw BuiltWithout("GCS"); @@ -708,7 +716,7 @@ Status VFS::is_bucket(const URI& uri, bool* is_bucket) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - RETURN_NOT_OK(gcs().is_bucket(uri, is_bucket)); + *is_bucket = gcs().is_bucket(uri); return Status::Ok(); #else *is_bucket = false; @@ -850,7 +858,8 @@ Status VFS::move_file(const URI& old_uri, const URI& new_uri) { if (old_uri.is_gcs()) { if (new_uri.is_gcs()) #ifdef HAVE_GCS - return gcs().move_object(old_uri, new_uri); + gcs().move_object(old_uri, new_uri); + return Status::Ok(); #else throw BuiltWithout("GCS"); #endif @@ -923,7 +932,8 @@ Status VFS::move_dir(const URI& old_uri, const URI& new_uri) { if (old_uri.is_gcs()) { if (new_uri.is_gcs()) #ifdef HAVE_GCS - return gcs().move_dir(old_uri, new_uri); + gcs().move_dir(old_uri, new_uri); + return Status::Ok(); #else throw BuiltWithout("GCS"); #endif @@ -1438,7 +1448,8 @@ Status VFS::close_file(const URI& uri) { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs().flush_object(uri); + gcs().flush_object(uri); + return Status::Ok(); #else throw BuiltWithout("GCS"); #endif @@ -1502,7 +1513,8 @@ Status VFS::write( } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs().write(uri, buffer, buffer_size); + gcs().write(uri, buffer, buffer_size); + return Status::Ok(); #else throw BuiltWithout("GCS"); #endif From d4016a6f0e21156f4cb7d128a37af057d88be66d Mon Sep 17 00:00:00 2001 From: Rebekah Davis Date: Tue, 10 Dec 2024 16:30:27 -0500 Subject: [PATCH 5/5] Don't init_client() in constructor. --- tiledb/sm/filesystem/gcs.cc | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/tiledb/sm/filesystem/gcs.cc b/tiledb/sm/filesystem/gcs.cc index 60808d24a17..6ce6f1b4ce0 100644 --- a/tiledb/sm/filesystem/gcs.cc +++ b/tiledb/sm/filesystem/gcs.cc @@ -102,7 +102,6 @@ GCS::GCS(ThreadPool* thread_pool, const Config& config) gcs_params_.max_parallel_ops_ * gcs_params_.multi_part_size_ : gcs_params_.max_direct_upload_size_; state_ = State::INITIALIZED; - init_client(); } GCS::~GCS() { @@ -239,6 +238,7 @@ void GCS::init_client() const { } void GCS::create_bucket(const URI& uri) const { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } @@ -270,6 +270,7 @@ void GCS::create_bucket(const URI& uri) const { } void GCS::empty_bucket(const URI& uri) const { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } @@ -278,6 +279,7 @@ void GCS::empty_bucket(const URI& uri) const { } bool GCS::is_empty_bucket(const URI& uri) const { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } @@ -298,6 +300,7 @@ bool GCS::is_empty_bucket(const URI& uri) const { } bool GCS::is_bucket(const URI& uri) const { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } @@ -321,6 +324,7 @@ bool GCS::is_bucket(const URI& uri) const { } bool GCS::is_dir(const URI& uri) const { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } @@ -329,6 +333,7 @@ bool GCS::is_dir(const URI& uri) const { } void GCS::remove_bucket(const URI& uri) const { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } @@ -359,6 +364,7 @@ void GCS::remove_bucket(const URI& uri) const { } void GCS::remove_object(const URI& uri) const { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } @@ -388,6 +394,7 @@ void GCS::remove_object(const URI& uri) const { } void GCS::remove_dir(const URI& uri) const { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } @@ -433,6 +440,7 @@ std::vector GCS::ls( std::vector GCS::ls_with_sizes( const URI& uri, const std::string& delimiter, int max_paths) const { + init_client(); const URI uri_dir = uri.add_trailing_slash(); if (!uri_dir.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri_dir.to_string()); @@ -490,6 +498,7 @@ LsObjects GCS::ls_filtered_impl( const URI& uri, std::function file_filter, bool recursive) const { + init_client(); const URI uri_dir = uri.add_trailing_slash(); if (!uri_dir.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri_dir.to_string()); @@ -556,6 +565,7 @@ LsObjects GCS::ls_filtered_impl( } void GCS::move_object(const URI& old_uri, const URI& new_uri) { + init_client(); // Copy old object into new object if (!old_uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + old_uri.to_string()); @@ -584,6 +594,7 @@ void GCS::move_object(const URI& old_uri, const URI& new_uri) { Status GCS::wait_for_object_to_propagate( const std::string& bucket_name, const std::string& object_path) const { + init_client(); unsigned attempts = 0; while (attempts++ < constants::gcs_max_attempts) { bool is_object; @@ -601,6 +612,7 @@ Status GCS::wait_for_object_to_propagate( } void GCS::move_dir(const URI& old_uri, const URI& new_uri) { + init_client(); std::vector paths = ls(old_uri, ""); for (const auto& path : paths) { const std::string suffix = path.substr(old_uri.to_string().size()); @@ -610,6 +622,7 @@ void GCS::move_dir(const URI& old_uri, const URI& new_uri) { } void GCS::touch(const URI& uri) const { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } @@ -626,6 +639,7 @@ void GCS::touch(const URI& uri) const { } Status GCS::is_object(const URI& uri, bool* const is_object) const { + init_client(); assert(is_object); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); @@ -711,6 +725,7 @@ void GCS::write( } uint64_t GCS::object_size(const URI& uri) const { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } @@ -908,6 +923,7 @@ Status GCS::upload_part( } void GCS::flush_object(const URI& uri) { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not a GCS URI: " + uri.to_string()); } @@ -1083,6 +1099,7 @@ Status GCS::read( const uint64_t length, const uint64_t read_ahead_length, uint64_t* const length_returned) const { + init_client(); if (!uri.is_gcs()) { throw GCSException("URI is not an GCS URI: " + uri.to_string()); }