From f776d91b35b9a9dd5325c22ecf24b491048cb4ca Mon Sep 17 00:00:00 2001 From: Ypatia Tsavliri Date: Fri, 7 Jun 2024 16:15:52 +0300 Subject: [PATCH] Support remote arrays for fragment_list consolidation --- test/src/unit-capi-consolidation.cc | 60 +++++++- test/support/src/serialization_wrappers.cc | 28 ++++ test/support/src/serialization_wrappers.h | 7 + tiledb/sm/c_api/tiledb.cc | 58 +++++++- tiledb/sm/consolidator/consolidator.cc | 106 +++++++------- tiledb/sm/rest/rest_client.cc | 11 +- tiledb/sm/rest/rest_client.h | 7 +- tiledb/sm/serialization/consolidation.cc | 160 +++++++++++++-------- tiledb/sm/serialization/consolidation.h | 33 +++-- 9 files changed, 336 insertions(+), 134 deletions(-) diff --git a/test/src/unit-capi-consolidation.cc b/test/src/unit-capi-consolidation.cc index 3cb1459e5cfb..1c7020875bcb 100644 --- a/test/src/unit-capi-consolidation.cc +++ b/test/src/unit-capi-consolidation.cc @@ -32,6 +32,7 @@ #include #include "test/support/src/helpers.h" +#include "test/support/src/serialization_wrappers.h" #include "test/support/src/vfs_helpers.h" #include "tiledb/common/stdx_string.h" #include "tiledb/platform/platform.h" @@ -72,6 +73,8 @@ struct ConsolidationFx { tiledb_encryption_type_t encryption_type_ = TILEDB_NO_ENCRYPTION; const char* encryption_key_ = nullptr; + bool serialize_ = false; + // Constructors/destructors ConsolidationFx(); @@ -5081,6 +5084,10 @@ TEST_CASE_METHOD( ConsolidationFx, "C API: Test advanced consolidation #1", "[capi][consolidation][adv][adv-1][non-rest]") { +#ifdef TILEDB_SERIALIZATION + serialize_ = true; +#endif + remove_dense_vector(); create_dense_vector(); write_dense_vector_4_fragments(); @@ -5113,6 +5120,13 @@ TEST_CASE_METHOD( REQUIRE(rc == TILEDB_OK); REQUIRE(error == nullptr); + if (serialize_) { + std::vector frag_uris_deserialized; + tiledb_array_consolidation_request_wrapper( + ctx_, tiledb_serialization_type_t(0), nullptr, &frag_uris_deserialized); + REQUIRE(frag_uris_deserialized.empty()); + } + // Consolidate rc = tiledb_array_consolidate(ctx_, dense_vector_uri_.c_str(), config); CHECK(rc == TILEDB_OK); @@ -7158,7 +7172,11 @@ TEST_CASE_METHOD( TEST_CASE_METHOD( ConsolidationFx, "C API: Test consolidation, dense split fragments", - "[capi][consolidation][dense][split-fragments][non-rest]") { + "[capi][consolidation][dense][split-fragments][rest]") { +#ifdef TILEDB_SERIALIZATION + serialize_ = true; +#endif + remove_dense_array(); create_dense_array(); write_dense_subarray(1, 2, 1, 2); @@ -7198,6 +7216,23 @@ TEST_CASE_METHOD( // Consolidate const char* uris[2] = {strrchr(uri, '/') + 1, strrchr(uri2, '/') + 1}; + + if (serialize_) { + std::vector frag_uris; + frag_uris.reserve(2); + for (uint64_t i = 0; i < 2; i++) { + frag_uris.emplace_back(uris[i]); + } + + std::vector frag_uris_deserialized; + tiledb_array_consolidation_request_wrapper( + ctx_, + tiledb_serialization_type_t(0), + &frag_uris, + &frag_uris_deserialized); + REQUIRE(frag_uris == frag_uris_deserialized); + } + rc = tiledb_array_consolidate_fragments( ctx_, dense_array_uri_.c_str(), uris, 2, cfg); CHECK(rc == TILEDB_OK); @@ -7234,7 +7269,11 @@ TEST_CASE_METHOD( TEST_CASE_METHOD( ConsolidationFx, "C API: Test consolidation, sparse split fragments", - "[capi][consolidation][sparse][split-fragments][non-rest]") { + "[capi][consolidation][sparse][split-fragments][rest]") { +#ifdef TILEDB_SERIALIZATION + serialize_ = true; +#endif + remove_sparse_array(); create_sparse_array(); write_sparse_row(0); @@ -7274,6 +7313,23 @@ TEST_CASE_METHOD( // Consolidate const char* uris[2] = {strrchr(uri, '/') + 1, strrchr(uri2, '/') + 1}; + + if (serialize_) { + std::vector frag_uris; + frag_uris.reserve(2); + for (uint64_t i = 0; i < 2; i++) { + frag_uris.emplace_back(uris[i]); + } + + std::vector frag_uris_deserialized; + tiledb_array_consolidation_request_wrapper( + ctx_, + tiledb_serialization_type_t(0), + &frag_uris, + &frag_uris_deserialized); + REQUIRE(frag_uris == frag_uris_deserialized); + } + rc = tiledb_array_consolidate_fragments( ctx_, sparse_array_uri_.c_str(), uris, 2, cfg); CHECK(rc == TILEDB_OK); diff --git a/test/support/src/serialization_wrappers.cc b/test/support/src/serialization_wrappers.cc index 1ae4d58ea95c..296b7eb9d7f0 100644 --- a/test/support/src/serialization_wrappers.cc +++ b/test/support/src/serialization_wrappers.cc @@ -32,9 +32,12 @@ */ #include "test/support/src/helpers.h" +#include "tiledb/api/c_api/buffer/buffer_api_internal.h" +#include "tiledb/api/c_api/context/context_api_internal.h" #include "tiledb/sm/c_api/tiledb.h" #include "tiledb/sm/c_api/tiledb_serialization.h" #include "tiledb/sm/c_api/tiledb_struct_def.h" +#include "tiledb/sm/serialization/consolidation.h" #include "tiledb/sm/serialization/query.h" #ifdef TILEDB_SERIALIZATION @@ -218,3 +221,28 @@ void tiledb_subarray_serialize( *subarray = deserialized_subarray; #endif } + +void tiledb_array_consolidation_request_wrapper( + tiledb_ctx_t* ctx, + tiledb_serialization_type_t serialize_type, + const std::vector* fragment_uris_in, + std::vector* fragment_uris_out) { + // Serialize and Deserialize + auto buffer = tiledb_buffer_handle_t::make_handle(); + serialization::array_consolidation_request_serialize( + ctx->config(), + fragment_uris_in, + static_cast(serialize_type), + &(buffer->buffer())); + + auto [config, fragment_uris_deser] = + serialization::array_consolidation_request_deserialize( + static_cast(serialize_type), + buffer->buffer()); + + tiledb_buffer_handle_t::break_handle(buffer); + + if (fragment_uris_deser.has_value()) { + *fragment_uris_out = fragment_uris_deser.value(); + } +} diff --git a/test/support/src/serialization_wrappers.h b/test/support/src/serialization_wrappers.h index 55317de32d4a..4c7280f61fe0 100644 --- a/test/support/src/serialization_wrappers.h +++ b/test/support/src/serialization_wrappers.h @@ -141,4 +141,11 @@ int tiledb_fragment_info_serialize( */ void tiledb_subarray_serialize( tiledb_ctx_t* ctx, tiledb_array_t* array, tiledb_subarray_t** subarray); + +void tiledb_array_consolidation_request_wrapper( + tiledb_ctx_t* ctx, + tiledb_serialization_type_t serialize_type, + const std::vector* fragment_uris_in, + std::vector* fragment_uris_out); + #endif // TILEDB_TEST_SERIALIZATION_WRAPPERS_H diff --git a/tiledb/sm/c_api/tiledb.cc b/tiledb/sm/c_api/tiledb.cc index 26249ea91b64..3116c81836d9 100644 --- a/tiledb/sm/c_api/tiledb.cc +++ b/tiledb/sm/c_api/tiledb.cc @@ -2664,13 +2664,31 @@ int32_t tiledb_array_create_with_key( int32_t tiledb_array_consolidate( tiledb_ctx_t* ctx, const char* array_uri, tiledb_config_t* config) { + // Validate input arguments + api::ensure_context_is_valid(ctx); api::ensure_config_is_valid_if_present(config); + + auto uri = tiledb::sm::URI(array_uri); + if (uri.is_invalid()) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input array uri"); + } + + auto input_config = (config == nullptr) ? ctx->config() : config->config(); + if (uri.is_tiledb() && + tiledb::sm::Consolidator::mode_from_config(input_config) == + tiledb::sm::ConsolidationMode::FRAGMENT) { + throw api::CAPIStatusException( + "Please use tiledb_array_consolidate_fragments API for consolidating " + "fragments on remote arrays."); + } + tiledb::sm::Consolidator::array_consolidate( array_uri, tiledb::sm::EncryptionType::NO_ENCRYPTION, nullptr, 0, - (config == nullptr) ? ctx->config() : config->config(), + input_config, ctx->storage_manager()); return TILEDB_OK; } @@ -2682,7 +2700,15 @@ int32_t tiledb_array_consolidate_with_key( const void* encryption_key, uint32_t key_length, tiledb_config_t* config) { - // Sanity checks + // Validate input arguments + api::ensure_context_is_valid(ctx); + api::ensure_config_is_valid_if_present(config); + + auto uri = tiledb::sm::URI(array_uri); + if (uri.is_invalid()) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input array uri"); + } tiledb::sm::Consolidator::array_consolidate( array_uri, @@ -2701,7 +2727,33 @@ int32_t tiledb_array_consolidate_fragments( const char** fragment_uris, const uint64_t num_fragments, tiledb_config_t* config) { - // Sanity checks + // Validate input arguments + api::ensure_context_is_valid(ctx); + api::ensure_config_is_valid_if_present(config); + + if (fragment_uris == nullptr) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input fragment list"); + } + + auto uri = tiledb::sm::URI(array_uri); + if (uri.is_invalid()) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input array uri"); + } + + if (num_fragments < 1) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input number of fragments"); + } + + for (size_t i = 0; i < num_fragments; i++) { + if (tiledb::sm::URI(fragment_uris[i]).is_invalid()) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid uri(s) in input fragment " + "list"); + } + } // Convert the list of fragments to a vector std::vector uris; diff --git a/tiledb/sm/consolidator/consolidator.cc b/tiledb/sm/consolidator/consolidator.cc index b0e711b2b2be..eee2b3e409a5 100644 --- a/tiledb/sm/consolidator/consolidator.cc +++ b/tiledb/sm/consolidator/consolidator.cc @@ -146,21 +146,21 @@ void Consolidator::array_consolidate( throw ConsolidatorException("Cannot consolidate array; Invalid URI"); } - // Check if array exists - ObjectType obj_type; - throw_if_not_ok( - object_type(storage_manager->resources(), array_uri, &obj_type)); - - if (obj_type != ObjectType::ARRAY) { - throw ConsolidatorException( - "Cannot consolidate array; Array does not exist"); - } - if (array_uri.is_tiledb()) { throw_if_not_ok( storage_manager->resources().rest_client()->post_consolidation_to_rest( array_uri, config)); } else { + // Check if array exists + ObjectType obj_type; + throw_if_not_ok( + object_type(storage_manager->resources(), array_uri, &obj_type)); + + if (obj_type != ObjectType::ARRAY) { + throw ConsolidatorException( + "Cannot consolidate array; Array does not exist"); + } + // Get encryption key from config std::string encryption_key_from_cfg; if (!encryption_key) { @@ -210,50 +210,58 @@ void Consolidator::fragments_consolidate( throw ConsolidatorException("Cannot consolidate array; Invalid URI"); } - // Check if array exists - ObjectType obj_type; - throw_if_not_ok( - object_type(storage_manager->resources(), array_uri, &obj_type)); + if (array_uri.is_tiledb()) { + throw_if_not_ok( + storage_manager->resources().rest_client()->post_consolidation_to_rest( + array_uri, config, &fragment_uris)); + } else { + // Check if array exists + ObjectType obj_type; + throw_if_not_ok( + object_type(storage_manager->resources(), array_uri, &obj_type)); - if (obj_type != ObjectType::ARRAY) { - throw ConsolidatorException( - "Cannot consolidate array; Array does not exist"); - } + if (obj_type != ObjectType::ARRAY) { + throw ConsolidatorException( + "Cannot consolidate array; Array does not exist"); + } - // Get encryption key from config - std::string encryption_key_from_cfg; - if (!encryption_key) { - bool found = false; - encryption_key_from_cfg = config.get("sm.encryption_key", &found); - assert(found); - } + // Get encryption key from config + std::string encryption_key_from_cfg; + if (!encryption_key) { + bool found = false; + encryption_key_from_cfg = config.get("sm.encryption_key", &found); + assert(found); + } + + if (!encryption_key_from_cfg.empty()) { + encryption_key = encryption_key_from_cfg.c_str(); + key_length = static_cast(encryption_key_from_cfg.size()); + std::string encryption_type_from_cfg; + bool found = false; + encryption_type_from_cfg = config.get("sm.encryption_type", &found); + assert(found); + auto [st, et] = encryption_type_enum(encryption_type_from_cfg); + throw_if_not_ok(st); + encryption_type = et.value(); - if (!encryption_key_from_cfg.empty()) { - encryption_key = encryption_key_from_cfg.c_str(); - key_length = static_cast(encryption_key_from_cfg.size()); - std::string encryption_type_from_cfg; - bool found = false; - encryption_type_from_cfg = config.get("sm.encryption_type", &found); - assert(found); - auto [st, et] = encryption_type_enum(encryption_type_from_cfg); - throw_if_not_ok(st); - encryption_type = et.value(); - - if (!EncryptionKey::is_valid_key_length( - encryption_type, - static_cast(encryption_key_from_cfg.size()))) { - encryption_key = nullptr; - key_length = 0; + if (!EncryptionKey::is_valid_key_length( + encryption_type, + static_cast(encryption_key_from_cfg.size()))) { + encryption_key = nullptr; + key_length = 0; + } } - } - // Consolidate - auto consolidator = Consolidator::create( - ConsolidationMode::FRAGMENT, config, storage_manager); - auto fragment_consolidator = - dynamic_cast(consolidator.get()); - throw_if_not_ok(fragment_consolidator->consolidate_fragments( - array_name, encryption_type, encryption_key, key_length, fragment_uris)); + // Consolidate + auto fragment_consolidator = + make_shared(HERE(), config, storage_manager); + throw_if_not_ok(fragment_consolidator->consolidate_fragments( + array_name, + encryption_type, + encryption_key, + key_length, + fragment_uris)); + } } void Consolidator::write_consolidated_commits_file( diff --git a/tiledb/sm/rest/rest_client.cc b/tiledb/sm/rest/rest_client.cc index c291323f39ea..2bd3a727fdf3 100644 --- a/tiledb/sm/rest/rest_client.cc +++ b/tiledb/sm/rest/rest_client.cc @@ -1570,10 +1570,12 @@ Status RestClient::ensure_json_null_delimited_string(Buffer* buffer) { } Status RestClient::post_consolidation_to_rest( - const URI& uri, const Config& config) { + const URI& uri, + const Config& config, + const std::vector* fragment_uris) { Buffer buff; - RETURN_NOT_OK(serialization::array_consolidation_request_serialize( - config, serialization_type_, &buff)); + serialization::array_consolidation_request_serialize( + config, fragment_uris, serialization_type_, &buff); // Wrap in a list BufferList serialized; RETURN_NOT_OK(serialized.add_buffer(std::move(buff))); @@ -1817,7 +1819,8 @@ void RestClient::delete_group_from_rest(const URI&, bool) { throw RestClientDisabledException(); } -Status RestClient::post_consolidation_to_rest(const URI&, const Config&) { +Status RestClient::post_consolidation_to_rest( + const URI&, const Config&, const std::vector*) { throw RestClientDisabledException(); } diff --git a/tiledb/sm/rest/rest_client.h b/tiledb/sm/rest/rest_client.h index 72fb1ad20df6..520b71164777 100644 --- a/tiledb/sm/rest/rest_client.h +++ b/tiledb/sm/rest/rest_client.h @@ -390,9 +390,14 @@ class RestClient { * * @param uri Array URI * @param config config + * @param fragment_uris The uris of the fragments to be consolidated if this + * is a request for fragment list consolidation * @return */ - Status post_consolidation_to_rest(const URI& uri, const Config& config); + Status post_consolidation_to_rest( + const URI& uri, + const Config& config, + const std::vector* fragment_uris = nullptr); /** * Post array vacuum request to the REST server. diff --git a/tiledb/sm/serialization/consolidation.cc b/tiledb/sm/serialization/consolidation.cc index 7efababc271d..75c2feebd7ea 100644 --- a/tiledb/sm/serialization/consolidation.cc +++ b/tiledb/sm/serialization/consolidation.cc @@ -43,6 +43,7 @@ // clang-format on #include "tiledb/common/logger_public.h" +#include "tiledb/sm/consolidator/consolidator.h" #include "tiledb/sm/enums/serialization_type.h" #include "tiledb/sm/serialization/config.h" #include "tiledb/sm/serialization/consolidation.h" @@ -53,36 +54,76 @@ namespace tiledb { namespace sm { namespace serialization { +class ConsolidationSerializationException : public StatusException { + public: + explicit ConsolidationSerializationException(const std::string& message) + : StatusException("[TileDB::Serialization][Consolidation]", message) { + } +}; + #ifdef TILEDB_SERIALIZATION -Status array_consolidation_request_to_capnp( +void array_consolidation_request_to_capnp( const Config& config, + const std::vector* fragment_uris, capnp::ArrayConsolidationRequest::Builder* array_consolidation_request_builder) { + // Validate input arguments to be sure that what we are serializing make sense + auto mode = Consolidator::mode_from_config(config); + if (mode != ConsolidationMode::FRAGMENT && + (fragment_uris != nullptr || !fragment_uris->empty())) { + throw ConsolidationSerializationException( + "[array_consolidation_request_to_capnp] Error serializing " + "consolidation request. A non-empty fragment list should only be " + "provided for fragment consoldiation"); + } + auto config_builder = array_consolidation_request_builder->initConfig(); - RETURN_NOT_OK(config_to_capnp(config, &config_builder)); - return Status::Ok(); + throw_if_not_ok(config_to_capnp(config, &config_builder)); + + if (fragment_uris != nullptr && !fragment_uris->empty()) { + auto fragment_list_builder = + array_consolidation_request_builder->initFragments( + fragment_uris->size()); + for (size_t i = 0; i < fragment_uris->size(); i++) { + fragment_list_builder.set(i, (*fragment_uris)[i]); + } + } } -Status array_consolidation_request_from_capnp( +std::pair>> +array_consolidation_request_from_capnp( const capnp::ArrayConsolidationRequest::Reader& - array_consolidation_request_reader, - tdb_unique_ptr* config) { + array_consolidation_request_reader) { auto config_reader = array_consolidation_request_reader.getConfig(); - RETURN_NOT_OK(config_from_capnp(config_reader, config)); - return Status::Ok(); + tdb_unique_ptr decoded_config = nullptr; + throw_if_not_ok(config_from_capnp(config_reader, &decoded_config)); + + std::vector fragment_uris; + if (array_consolidation_request_reader.hasFragments()) { + auto fragment_reader = array_consolidation_request_reader.getFragments(); + fragment_uris.reserve(fragment_reader.size()); + for (const auto& fragment_uri : fragment_reader) { + fragment_uris.emplace_back(fragment_uri); + } + + return {*decoded_config, fragment_uris}; + } else { + return {*decoded_config, std::nullopt}; + } } -Status array_consolidation_request_serialize( +void array_consolidation_request_serialize( const Config& config, + const std::vector* fragment_uris, SerializationType serialize_type, Buffer* serialized_buffer) { try { ::capnp::MallocMessageBuilder message; capnp::ArrayConsolidationRequest::Builder ArrayConsolidationRequestBuilder = message.initRoot(); - RETURN_NOT_OK(array_consolidation_request_to_capnp( - config, &ArrayConsolidationRequestBuilder)); + array_consolidation_request_to_capnp( + config, fragment_uris, &ArrayConsolidationRequestBuilder); serialized_buffer->reset_size(); serialized_buffer->reset_offset(); @@ -94,45 +135,44 @@ Status array_consolidation_request_serialize( const auto json_len = capnp_json.size(); const char nul = '\0'; // size does not include needed null terminator, so add +1 - RETURN_NOT_OK(serialized_buffer->realloc(json_len + 1)); - RETURN_NOT_OK(serialized_buffer->write(capnp_json.cStr(), json_len)); - RETURN_NOT_OK(serialized_buffer->write(&nul, 1)); + throw_if_not_ok(serialized_buffer->realloc(json_len + 1)); + throw_if_not_ok(serialized_buffer->write(capnp_json.cStr(), json_len)); + throw_if_not_ok(serialized_buffer->write(&nul, 1)); break; } case SerializationType::CAPNP: { kj::Array<::capnp::word> protomessage = messageToFlatArray(message); kj::ArrayPtr message_chars = protomessage.asChars(); const auto nbytes = message_chars.size(); - RETURN_NOT_OK(serialized_buffer->realloc(nbytes)); - RETURN_NOT_OK(serialized_buffer->write(message_chars.begin(), nbytes)); + throw_if_not_ok(serialized_buffer->realloc(nbytes)); + throw_if_not_ok( + serialized_buffer->write(message_chars.begin(), nbytes)); break; } default: { - return LOG_STATUS(Status_SerializationError( - "Error serializing config; Unknown serialization type " - "passed")); + throw ConsolidationSerializationException( + "[array_consolidation_request_serialize] Error serializing config; " + "Unknown serialization type passed"); } } } catch (kj::Exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error serializing config; kj::Exception: " + - std::string(e.getDescription().cStr()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_serialize] Error serializing config; " + "kj::Exception: " + + std::string(e.getDescription().cStr())); } catch (std::exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error serializing config; exception " + std::string(e.what()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_serialize] Error serializing config; " + "exception " + + std::string(e.what())); } - - return Status::Ok(); } -Status array_consolidation_request_deserialize( - Config** config, - SerializationType serialize_type, - const Buffer& serialized_buffer) { +std::pair>> +array_consolidation_request_deserialize( + SerializationType serialize_type, const Buffer& serialized_buffer) { try { - tdb_unique_ptr decoded_config = nullptr; - switch (serialize_type) { case SerializationType::JSON: { ::capnp::JsonCodec json; @@ -146,8 +186,8 @@ Status array_consolidation_request_deserialize( capnp::ArrayConsolidationRequest::Reader array_consolidation_request_reader = array_consolidation_request_builder.asReader(); - RETURN_NOT_OK(array_consolidation_request_from_capnp( - array_consolidation_request_reader, &decoded_config)); + return array_consolidation_request_from_capnp( + array_consolidation_request_reader); break; } case SerializationType::CAPNP: { @@ -159,32 +199,27 @@ Status array_consolidation_request_deserialize( capnp::ArrayConsolidationRequest::Reader array_consolidation_request_reader = reader.getRoot(); - RETURN_NOT_OK(array_consolidation_request_from_capnp( - array_consolidation_request_reader, &decoded_config)); + return array_consolidation_request_from_capnp( + array_consolidation_request_reader); break; } default: { - return LOG_STATUS(Status_SerializationError( - "Error deserializing config; Unknown serialization type " - "passed")); + throw ConsolidationSerializationException( + "[array_consolidation_request_deserialize] Error deserializing " + "config; Unknown serialization type passed"); } } - - if (decoded_config == nullptr) - return LOG_STATUS(Status_SerializationError( - "Error serializing config; deserialized config is null")); - - *config = decoded_config.release(); } catch (kj::Exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error deserializing config; kj::Exception: " + - std::string(e.getDescription().cStr()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_deserialize] Error deserializing config; " + "kj::Exception: " + + std::string(e.getDescription().cStr())); } catch (std::exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error deserializing config; exception " + std::string(e.what()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_deserialize] Error deserializing config; " + "exception " + + std::string(e.what())); } - - return Status::Ok(); } void consolidation_plan_request_to_capnp( @@ -419,16 +454,21 @@ std::vector> deserialize_consolidation_plan_response( #else -Status array_consolidation_request_serialize( - const Config&, SerializationType, Buffer*) { - return LOG_STATUS(Status_SerializationError( - "Cannot serialize; serialization not enabled.")); +void array_consolidation_request_serialize( + const Config&, + const std::vector*, + SerializationType, + Buffer*) { + throw ConsolidationSerializationException( + "[array_consolidation_request_deserialize] Cannot serialize; " + "serialization not enabled."); } -Status array_consolidation_request_deserialize( - Config**, SerializationType, const Buffer&) { - return LOG_STATUS(Status_SerializationError( - "Cannot deserialize; serialization not enabled.")); +std::pair>> +array_consolidation_request_deserialize(SerializationType, const Buffer&) { + throw ConsolidationSerializationException( + "[array_consolidation_request_deserialize] Cannot deserialize; " + "serialization not enabled."); } void serialize_consolidation_plan_request( diff --git a/tiledb/sm/serialization/consolidation.h b/tiledb/sm/serialization/consolidation.h index d27c6c6e3466..3be366a4dc60 100644 --- a/tiledb/sm/serialization/consolidation.h +++ b/tiledb/sm/serialization/consolidation.h @@ -58,22 +58,24 @@ namespace serialization { * Convert Cap'n Proto message to Consolidation request * * @param consolidation_req_reader cap'n proto class. - * @param config config object to deserialize into. - * @return Status + * @return {config, fragment_uris} config object to deserialize into, and the + * uris of the fragments to be consolidated if any */ -Status array_consolidation_request_from_capnp( - const capnp::ArrayConsolidationRequest::Reader& consolidation_req_reader, - Config* Config); +std::pair>> +array_consolidation_request_from_capnp( + const capnp::ArrayConsolidationRequest::Reader& consolidation_req_reader); /** * Convert Consolidation Request to Cap'n Proto message. * * @param config config to serialize info from + * @param fragment_uris The uris of the fragments to be consolidated if this is + * a request for fragment list consolidation * @param consolidation_req_builder cap'n proto class. - * @return Status */ -Status array_consolidation_request_to_capnp( +void array_consolidation_request_to_capnp( const Config& config, + const std::vector* fragment_uris, capnp::ArrayConsolidationRequest::Builder* consolidation_req_builder); #endif @@ -81,27 +83,28 @@ Status array_consolidation_request_to_capnp( * Serialize a consolidation request via Cap'n Proto. * * @param config config object to get info to serialize. + * @param fragment_uris The uris of the fragments to be consolidated if this is + * a request for fragment list consolidation * @param serialize_type format to serialize into Cap'n Proto or JSON. * @param serialized_buffer buffer to store serialized bytes in. - * @return Status */ -Status array_consolidation_request_serialize( +void array_consolidation_request_serialize( const Config& config, + const std::vector* fragment_uris, SerializationType serialize_type, Buffer* serialized_buffer); /** * Deserialize consolidation request via Cap'n Proto * - * @param config config object to store the deserialized info in. * @param serialize_type format the data is serialized in: Cap'n Proto of JSON. * @param serialized_buffer buffer to read serialized bytes from. - * @return Status + * @return {config, fragment_uris} config object to deserialize into, and the + * uris of the fragments to be consolidated if any */ -Status array_consolidation_request_deserialize( - Config** config, - SerializationType serialize_type, - const Buffer& serialized_buffer); +std::pair>> +array_consolidation_request_deserialize( + SerializationType serialize_type, const Buffer& serialized_buffer); /** * Serialize a consolidation plan request via Cap'n Proto.