-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
447 Consistently use KeyNotFoundException
, DuplicateKeyException
and PermissionException
in Storage
s
#584
447 Consistently use KeyNotFoundException
, DuplicateKeyException
and PermissionException
in Storage
s
#584
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -50,6 +50,7 @@ inline void LmdbStorage::do_write_internal(Composite<KeySegmentPair>&& kvs, ::lm | |||||
ARCTICDB_SUBSAMPLE(LmdbPut, 0) | ||||||
int res = ::mdb_put(txn.handle(), dbi.handle(), &mdb_key, &mdb_val, MDB_RESERVE | overwrite_flag); | ||||||
if (res == MDB_KEYEXIST) { | ||||||
// Since LMDB is in-memory, we can efficiently detect: (see its doc for details) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can quite follow this comment. What can we efficiently detect? |
||||||
throw DuplicateKeyException(kv.variant_key()); | ||||||
} else if (res != 0) { | ||||||
throw std::runtime_error(fmt::format("Invalid lmdb error code {} while putting key {}", | ||||||
|
@@ -85,15 +86,16 @@ inline void LmdbStorage::do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts o | |||||
if(!failed_deletes.empty()) { | ||||||
ARCTICDB_SUBSAMPLE(LmdbStorageCommit, 0) | ||||||
txn.commit(); | ||||||
throw KeyNotFoundException(Composite<VariantKey>(std::move(failed_deletes))); | ||||||
throw KeyNotFoundException(Composite<VariantKey>(std::move(failed_deletes)), | ||||||
"do_update called with upsert=false on non-exist key(s): {}"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
do_write_internal(std::move(kvs), txn); | ||||||
ARCTICDB_SUBSAMPLE(LmdbStorageCommit, 0) | ||||||
txn.commit(); | ||||||
} | ||||||
|
||||||
template<class Visitor> | ||||||
void LmdbStorage::do_read(Composite<VariantKey>&& ks, Visitor &&visitor, storage::ReadKeyOpts) { | ||||||
void LmdbStorage::do_read(Composite<VariantKey>&& ks, Visitor &&visitor, storage::ReadKeyOpts) { | ||||||
ARCTICDB_SAMPLE(LmdbStorageRead, 0) | ||||||
auto txn = ::lmdb::txn::begin(env(), nullptr, MDB_RDONLY); | ||||||
|
||||||
|
@@ -145,7 +147,7 @@ inline bool LmdbStorage::do_key_exists(const VariantKey&key) { | |||||
MDB_val mdb_val; | ||||||
return ::lmdb::dbi_get(txn, dbi.handle(), &mdb_key, &mdb_val); | ||||||
} catch (const ::lmdb::not_found_error &ex) { | ||||||
ARCTICDB_DEBUG(log::storage(), "Caught lmdn not found error: {}", ex.what()); | ||||||
ARCTICDB_DEBUG(log::storage(), "Caught LMDB not found error: {}", ex.what()); | ||||||
return false; | ||||||
} | ||||||
} | ||||||
|
@@ -176,11 +178,14 @@ inline std::vector<VariantKey> LmdbStorage::do_remove_internal(Composite<Variant | |||||
} | ||||||
} | ||||||
} catch (const std::exception& e) { | ||||||
// TODO https://github.com/man-group/ArcticDB/issues/518 | ||||||
if (!opts.ignores_missing_key_) { | ||||||
for (auto &k : group.values()) { | ||||||
log::storage().warn("Failed to delete segment for key {}", variant_key_view(k) ); | ||||||
failed_deletes.push_back(k); | ||||||
} | ||||||
} else { | ||||||
log::storage().debug("Ignoring exception during remove: {}", e.what()); | ||||||
} | ||||||
} | ||||||
}); | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,8 +33,9 @@ namespace arcticdb::storage::memory { | |
key_vec[key] = kv.segment(); | ||
}, | ||
[&](const AtomKey &key) { | ||
util::check(key_vec.find(key) == key_vec.end(), | ||
"Cannot replace atom key in in-memory storage"); | ||
if (key_vec.find(key) != key_vec.end()) { | ||
throw DuplicateKeyException(key); | ||
} | ||
|
||
key_vec[key] = kv.segment(); | ||
} | ||
|
@@ -55,7 +56,10 @@ namespace arcticdb::storage::memory { | |
for (auto &kv : group.values()) { | ||
auto it = key_vec.find(kv.variant_key()); | ||
|
||
util::check_rte(opts.upsert_ || it != key_vec.end(), "update called with upsert=false but key does not exist"); | ||
if (!opts.upsert_ && it == key_vec.end()) { | ||
throw KeyNotFoundException(std::move(kv.variant_key()), | ||
"update called with upsert=false but key does not exist: {}"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error message is different than the error message for LMDB. I think we should unify them. (Minor) I wouldn't move |
||
} | ||
|
||
if(it != key_vec.end()) { | ||
key_vec.erase(it); | ||
|
@@ -111,7 +115,7 @@ inline bool MemoryStorage::do_key_exists(const VariantKey& key) { | |
ARCTICDB_DEBUG(log::storage(), "Read key {}: {}, with {} bytes of data", variant_key_type(k), variant_key_view(k)); | ||
key_vec.erase(it); | ||
} else if (!opts.ignores_missing_key_) { | ||
util::raise_rte("Failed to find segment for key {}",variant_key_view(k)); | ||
throw KeyNotFoundException(std::move(k)); | ||
} | ||
} | ||
}); | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -10,6 +10,7 @@ | |||||
#include <bsoncxx/string/to_string.hpp> | ||||||
#include <bsoncxx/builder/stream/document.hpp> | ||||||
#include <bsoncxx/types.hpp> | ||||||
#include <bsoncxx/json.hpp> | ||||||
#include <mongocxx/uri.hpp> | ||||||
#include <arcticdb/entity/variant_key.hpp> | ||||||
#include <arcticdb/storage/mongo/mongo_instance.hpp> | ||||||
|
@@ -21,7 +22,8 @@ | |||||
#include <arcticdb/util/exponential_backoff.hpp> | ||||||
#include <mongocxx/model/replace_one.hpp> | ||||||
#include <mongocxx/config/version.hpp> | ||||||
#include <arcticdb/util/composite.hpp> | ||||||
#include <mongocxx/exception/bulk_write_exception.hpp> | ||||||
#include <mongocxx/exception/server_error_code.hpp> | ||||||
|
||||||
namespace arcticdb::storage::mongo { | ||||||
|
||||||
|
@@ -154,6 +156,31 @@ auto build_document(storage::KeySegmentPair &kv) { | |||||
|
||||||
return basic_builder.extract(); | ||||||
} | ||||||
|
||||||
/* | ||||||
* Mongo error handling notes: | ||||||
* All exceptions are subtypes of https://mongocxx.org/api/current/classmongocxx_1_1exception.html | ||||||
* - mongocxx::logic_error is thrown for invalid API calls and has no error number | ||||||
* - mongocxx::operation_exception for wrapping errors from libmongoc with an std::error_code | ||||||
* See https://mongoc.org/libmongoc/current/errors.html for docs | ||||||
* https://github.com/mongodb/mongo-cxx-driver/blob/master/src/mongocxx/exception/error_code.hpp for constants | ||||||
*/ | ||||||
|
||||||
/** Mongo often fail to set the what() string, so has to manually format the exception */ | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
[[noreturn]] void translate_operation_exception(const mongocxx::operation_exception& e) { | ||||||
std::string json; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Minor) Isn't the naming slightly misleading? As far as I understand |
||||||
if (e.raw_server_error()) { | ||||||
try { | ||||||
json = bsoncxx::to_json(e.raw_server_error()->view()); | ||||||
} catch (const std::exception& e) { | ||||||
json = fmt::format("(Failed to format server response either: {})", e.what()); | ||||||
} | ||||||
} else { | ||||||
json = "No response from server"; | ||||||
} | ||||||
raise<ErrorCode::E_MONGO_OP_FAILED>("Inner {} error {}: {}. {}\n{}", | ||||||
e.code().category().name(), e.code().value(), e.code().message(), e.what(), json); | ||||||
} | ||||||
} //namespace detail | ||||||
|
||||||
class MongoClientImpl { | ||||||
|
@@ -263,11 +290,25 @@ void MongoClientImpl::write_segment(const std::string &database_name, | |||||
replace.upsert(true); | ||||||
auto bulk_write = collection.create_bulk_write(); | ||||||
bulk_write.append(replace); | ||||||
auto result = bulk_write.execute(); | ||||||
util::check(bool(result), "Mongo error while putting key {}", kv.key_view()); | ||||||
try { | ||||||
auto result = bulk_write.execute(); | ||||||
// We don't use the "mongoc_write_concern_set_w" that allows nullopt to be returned, but check it anyway: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason to check it? Maybe a comment on why we check it anyway. |
||||||
check_retryable<ErrorCode::E_MONGO_BULK_OP_NO_REPLY>( | ||||||
result.has_value(), "Mongo did not acknowledge write for key {}", kv.key_view()); | ||||||
} catch(const mongocxx::operation_exception& e) { | ||||||
detail::translate_operation_exception(e); | ||||||
} | ||||||
} else { | ||||||
auto result = collection.insert_one(doc.view()); | ||||||
util::check(bool(result), "Mongo error while putting key {}", kv.key_view()); | ||||||
try { | ||||||
// Note: since our `key` field in the `doc` is not the unique `_id` field, and `ensure_collection()`, which | ||||||
// adds an index on it, is never called, we cannot guarantee the unique-ness of the AtomKey in the storage | ||||||
// even with `insert_one`. | ||||||
auto result = collection.insert_one(doc.view()); | ||||||
check_retryable<ErrorCode::E_MONGO_BULK_OP_NO_REPLY>( | ||||||
result.has_value(), "Mongo did not acknowledge write for key {}", kv.key_view()); | ||||||
} catch(const mongocxx::operation_exception& e) { | ||||||
detail::translate_operation_exception(e); | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -292,9 +333,18 @@ void MongoClientImpl::update_segment(const std::string &database_name, | |||||
replace.upsert(upsert); | ||||||
auto bulk_write = collection.create_bulk_write(); | ||||||
bulk_write.append(replace); | ||||||
auto result = bulk_write.execute(); | ||||||
util::check(bool(result), "Mongo error while updating key {}", kv.key_view()); | ||||||
util::check(upsert || result->modified_count() > 0, "update called with upsert=false but key does not exist"); | ||||||
try { | ||||||
auto result = bulk_write.execute(); | ||||||
// We don't use the "mongoc_write_concern_set_w" that allows nullopt to be returned, but check it anyway: | ||||||
check_retryable<ErrorCode::E_MONGO_BULK_OP_NO_REPLY>( | ||||||
result.has_value(), "Mongo did not acknowledge write for key {}", kv.key_view()); | ||||||
if (!upsert && result->modified_count() == 0) { | ||||||
throw KeyNotFoundException(std::move(kv.variant_key()), | ||||||
"update_segment called with upsert=false on non-exist key: {}"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
} catch(const mongocxx::operation_exception& e) { | ||||||
detail::translate_operation_exception(e); | ||||||
} | ||||||
} | ||||||
|
||||||
storage::KeySegmentPair MongoClientImpl::read_segment(const std::string &database_name, | ||||||
|
@@ -310,31 +360,28 @@ storage::KeySegmentPair MongoClientImpl::read_segment(const std::string &databas | |||||
auto database = client->database(database_name); //TODO maybe cache | ||||||
auto collection = database[collection_name]; | ||||||
|
||||||
try { | ||||||
ARCTICDB_SUBSAMPLE(MongoStorageReadFindOne, 0) | ||||||
auto stream_id = variant_key_id(key); | ||||||
if(StorageFailureSimulator::instance()->configured()) | ||||||
StorageFailureSimulator::instance()->go(FailureType::READ); | ||||||
ARCTICDB_SUBSAMPLE(MongoStorageReadFindOne, 0) | ||||||
auto stream_id = variant_key_id(key); | ||||||
if(StorageFailureSimulator::instance()->configured()) | ||||||
StorageFailureSimulator::instance()->go(FailureType::READ); | ||||||
Comment on lines
+365
to
+366
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Personal preference) I believe that wrapping if/else/for/while bodies in |
||||||
|
||||||
try { | ||||||
auto result = collection.find_one(document{} << "key" << fmt::format("{}", key) << "stream_id" << | ||||||
fmt::format("{}", stream_id) << finalize); | ||||||
fmt::format("{}", stream_id) << finalize); | ||||||
if (result) { | ||||||
const auto &doc = result->view(); | ||||||
const auto& doc = result->view(); | ||||||
auto size = doc["total_size"].get_int64().value; | ||||||
entity::VariantKey stored_key{ detail::variant_key_from_document(doc, key) }; | ||||||
entity::VariantKey stored_key{detail::variant_key_from_document(doc, key)}; | ||||||
util::check(stored_key == key, "Key mismatch: {} != {}"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a general question not related to this PR in particular. Do we want to leak |
||||||
return storage::KeySegmentPair( | ||||||
std::move(stored_key), | ||||||
Segment::from_bytes(const_cast<uint8_t *>(result->view()["data"].get_binary().bytes), std::size_t(size), true) | ||||||
Segment::from_bytes(const_cast<uint8_t*>(doc["data"].get_binary().bytes), std::size_t(size), true) | ||||||
); | ||||||
} else { | ||||||
// find_one returned nothing, if this was an exception it would fall through to the catch below. | ||||||
throw std::runtime_error(fmt::format("Missing key in mongo: {} for symbol: {}", key, stream_id)); | ||||||
throw KeyNotFoundException(key); | ||||||
} | ||||||
} | ||||||
catch(const std::exception& ex) { | ||||||
log::storage().info("Segment read error: {}", ex.what()); | ||||||
throw storage::KeyNotFoundException{Composite<VariantKey>{VariantKey{key}}}; | ||||||
} catch(const mongocxx::operation_exception& e) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're not catching |
||||||
detail::translate_operation_exception(e); | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -355,10 +402,8 @@ bool MongoClientImpl::key_exists(const std::string &database_name, | |||||
ARCTICDB_SUBSAMPLE(MongoStorageKeyExistsFindOne, 0) | ||||||
auto result = collection.find_one(document{} << "key" << fmt::format("{}", key) << finalize); | ||||||
return static_cast<bool>(result); | ||||||
} | ||||||
catch(const std::exception& ex) { | ||||||
log::storage().error(fmt::format("Key exists error: {}", ex.what())); | ||||||
throw; | ||||||
} catch(const mongocxx::operation_exception& e) { | ||||||
detail::translate_operation_exception(e); | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -375,21 +420,25 @@ void MongoClientImpl::remove_keyvalue(const std::string &database_name, | |||||
auto collection = database[collection_name]; | ||||||
ARCTICDB_SUBSAMPLE(MongoStorageRemoveGetCol, 0) | ||||||
mongocxx::stdx::optional<mongocxx::result::delete_result> result; | ||||||
if (std::holds_alternative<RefKey>(key)) { | ||||||
result = collection.delete_many(document{} << "key" << fmt::format("{}", key) << "stream_id" << | ||||||
fmt::format("{}", variant_key_id(key)) << finalize); | ||||||
} else { | ||||||
result = collection.delete_one(document{} << "key" << fmt::format("{}", key) << "stream_id" << | ||||||
fmt::format("{}", variant_key_id(key)) << finalize); | ||||||
auto filter = document{} << "key" << fmt::format("{}", key) << "stream_id" << fmt::format("{}", variant_key_id(key)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Minor) I think one large |
||||||
<< finalize; | ||||||
try { | ||||||
if (std::holds_alternative<RefKey>(key)) { | ||||||
result = collection.delete_many(std::move(filter)); | ||||||
} else { | ||||||
result = collection.delete_one(std::move(filter)); | ||||||
} | ||||||
} catch(const mongocxx::operation_exception& e) { | ||||||
detail::translate_operation_exception(e); | ||||||
} | ||||||
ARCTICDB_SUBSAMPLE(MongoStorageRemoveDelOne, 0) | ||||||
if (result) { | ||||||
std::int32_t deleted_count = result->deleted_count(); | ||||||
util::warn(deleted_count == 1, "Expect to delete a single document with key {}", | ||||||
key); // possible values are 0 and 1 here when returned from delete_one | ||||||
if (deleted_count == 0) { | ||||||
throw KeyNotFoundException(key); | ||||||
} | ||||||
} else | ||||||
throw std::runtime_error(fmt::format("Mongo error deleting data for key {}", key)); | ||||||
|
||||||
raise_retryable<ErrorCode::E_MONGO_BULK_OP_NO_REPLY>("Mongo did not acknowledge deletion for key {}", key); | ||||||
} | ||||||
|
||||||
void MongoClientImpl::iterate_type(const std::string &database_name, | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,12 +85,25 @@ inline void MongoStorage::do_remove(Composite<VariantKey>&& ks, RemoveOpts) { | |
auto fmt_db = [](auto &&k) { return variant_key_type(k); }; | ||
ARCTICDB_SAMPLE(MongoStorageRemove, 0) | ||
|
||
(fg::from(ks.as_range()) | fg::move | fg::groupBy(fmt_db)).foreach([&](auto &&group) { | ||
Composite<VariantKey> failed_deletes; | ||
|
||
(fg::from(ks.as_range()) | fg::move | fg::groupBy(fmt_db)).foreach([this, &failed_deletes](auto &&group) { | ||
for (auto &k : group.values()) { | ||
auto collection = collection_name(variant_key_type(k)); | ||
client_->remove_keyvalue(db_, collection, k); | ||
try { | ||
client_->remove_keyvalue(db_, collection, k); | ||
} catch(KeyNotFoundException& e) { | ||
failed_deletes.push_back(std::move(e.keys())); | ||
} catch(const std::exception& e) { | ||
// TODO https://github.com/man-group/ArcticDB/issues/518 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure it's safe to catch here and continue deleting the other stuff? You seem worried about the same thing in issue #518, but the original implementation seems to bail out here if any of the deletes fail? I might be misunderstanding, since your ticket does say,
|
||
log::storage().warn("Error during remove: {}", e.what()); | ||
failed_deletes.push_back(std::move(k)); | ||
} | ||
} | ||
}); | ||
if (!failed_deletes.empty()) { | ||
throw KeyNotFoundException(std::move(failed_deletes)); | ||
} | ||
} | ||
|
||
template<class Visitor> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any docs update needed? eg
ArcticDB/docs/mkdocs/docs/error_messages.md
Line 151 in 3330bf3