Skip to content

Commit

Permalink
PS-9148: Reworked dictionary / bookshelf thread-safety model
Browse files Browse the repository at this point in the history
https://perconadev.atlassian.net/browse/PS-9148

Both 'dictionary' and 'bookshelf' classes no longer include their own
'std::shared_mutex' to protect data. Instead, we now have a single
'std::shared_mutex' at the 'query_cache' level.

The return value of the 'get_random()' method in both 'dictionary' and
'bookshelf' classes changed from 'optional_string' to 'std::string_view'. Empty
(default constructed) 'std::string_view' is used as an indicator of an
unsuccessful operation.
'get_random()' method in the 'query_cache' class still returns a string by
value to avoid race conditions.

Changed the behaviour of the 'sql_context::execute_dml()' method - it now
throws when SQL errors (like "no table found", etc.) occur.
  • Loading branch information
percona-ysorokin authored and oleksandr-kachan committed May 31, 2024
1 parent 0315759 commit 242221d
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

#include "masking_functions/bookshelf_fwd.hpp"

#include <shared_mutex>
#include <string>
#include <string_view>
#include <unordered_map>

#include "masking_functions/dictionary_fwd.hpp"
Expand All @@ -28,16 +28,18 @@ namespace masking_functions {

class bookshelf {
public:
bookshelf() = default;
bookshelf();
bookshelf(const dictionary &) = delete;
bookshelf(bookshelf &&) = delete;
bookshelf &operator=(const bookshelf &) = delete;
bookshelf &operator=(bookshelf &&) = delete;
~bookshelf();

bool contains(const std::string &dictionary_name,
const std::string &term) const noexcept;
// returning a copy deliberately for thread safety
optional_string get_random(const std::string &dictionary_name) const noexcept;
// returns empty std::string_view if no such dictionary exist
std::string_view get_random(
const std::string &dictionary_name) const noexcept;
bool remove(const std::string &dictionary_name) noexcept;
bool remove(const std::string &dictionary_name,
const std::string &term) noexcept;
Expand All @@ -49,10 +51,6 @@ class bookshelf {
// transparent_string_like_hash, std::equal_to<>>.
using dictionary_container = std::unordered_map<std::string, dictionary_ptr>;
dictionary_container dictionaries_;
mutable std::shared_mutex dictionaries_mutex_;

dictionary_ptr find_dictionary_internal(
const std::string &dictionary_name) const noexcept;
};

} // namespace masking_functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace masking_functions {

class bookshelf;

using bookshelf_ptr = std::shared_ptr<bookshelf>;
using bookshelf_ptr = std::unique_ptr<bookshelf>;

} // namespace masking_functions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

#include "masking_functions/dictionary_fwd.hpp"

#include <shared_mutex>
#include <string>
#include <string_view>
#include <unordered_set>

namespace masking_functions {
Expand All @@ -34,9 +34,13 @@ class dictionary {
dictionary &operator=(const dictionary &) = delete;
dictionary &operator=(dictionary &&) = delete;

~dictionary() = default;

bool is_empty() const noexcept { return terms_.empty(); }

bool contains(const std::string &term) const noexcept;
// returning a copy deliberately for thread safety
optional_string get_random() const;
// returns empty std::string_view if the dictionary is empty
std::string_view get_random() const noexcept;
bool insert(const std::string &term);
bool remove(const std::string &term) noexcept;

Expand All @@ -46,7 +50,6 @@ class dictionary {
// transparent_string_like_hash, std::equal_to<>>.
using term_container = std::unordered_set<std::string>;
term_container terms_;
mutable std::shared_mutex terms_mutex_;
};

} // namespace masking_functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@
#define MASKING_FUNCTIONS_DICTIONARY_FWD_HPP

#include <memory>
#include <optional>
#include <string>

namespace masking_functions {

using optional_string = std::optional<std::string>;

class dictionary;

using dictionary_ptr = std::shared_ptr<dictionary>;
using dictionary_ptr = std::unique_ptr<dictionary>;

} // namespace masking_functions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <string>

#include <mysql/components/services/psi_thread.h>
Expand All @@ -35,6 +36,7 @@ namespace masking_functions {

class query_cache {
public:
// passing unique_ptr by value to transfer ownership
query_cache(query_builder_ptr query_builder,
std::uint64_t flusher_interval_seconds);
query_cache(const query_cache &other) = delete;
Expand All @@ -45,29 +47,30 @@ class query_cache {

bool contains(const std::string &dictionary_name,
const std::string &term) const;
optional_string get_random(const std::string &dictionary_name) const;
// returns a copy of the string to avoid race conditions
// an empty string is returned if the dictionary does not exist
std::string get_random(const std::string &dictionary_name) const;
bool remove(const std::string &dictionary_name);
bool remove(const std::string &dictionary_name, const std::string &term);
bool insert(const std::string &dictionary_name, const std::string &term);

void reload_cache();

private:
query_builder_ptr m_query_builder;
query_builder_ptr dict_query_builder_;

// TODO: in c++20 change this to std::atomic<bookshelf_ptr> and
// remove deprecated atomic_load() / atomic_store()
mutable bookshelf_ptr m_dict_cache;
mutable bookshelf_ptr dict_cache_;
mutable std::shared_mutex dict_cache_mutex_;

std::uint64_t m_flusher_interval_seconds;
std::atomic<bool> m_is_flusher_stopped;
std::mutex m_flusher_mutex;
std::condition_variable m_flusher_condition_var;
std::uint64_t flusher_interval_seconds_;
std::atomic<bool> is_flusher_stopped_;
std::mutex flusher_mutex_;
std::condition_variable flusher_condition_var_;

PSI_thread_key m_psi_flusher_thread_key;
my_thread_handle m_flusher_thread;
my_thread_attr_t m_flusher_thread_attr;
std::unique_ptr<THD> m_flusher_thd;
PSI_thread_key psi_flusher_thread_key_;
my_thread_handle flusher_thread_;
my_thread_attr_t flusher_thread_attr_;
std::unique_ptr<THD> flusher_thd_;

void init_thd() noexcept;
void release_thd() noexcept;
Expand All @@ -76,8 +79,11 @@ class query_cache {
static void *run_dict_flusher(void *arg);

bookshelf_ptr create_dict_cache_internal() const;
// returning deliberately by value to increase reference counter
bookshelf_ptr get_pinned_dict_cache_internal() const;
using shared_lock_type = std::shared_lock<std::shared_mutex>;
using unique_lock_type = std::unique_lock<std::shared_mutex>;
const bookshelf &acquire_dict_cache_shared(
shared_lock_type &read_lock, unique_lock_type &write_lock) const;
bookshelf &acquire_dict_cache_unique(unique_lock_type &write_lock) const;
};

} // namespace masking_functions
Expand Down
78 changes: 30 additions & 48 deletions components/masking_functions/src/masking_functions/bookshelf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,82 +15,64 @@

#include "masking_functions/bookshelf.hpp"

#include <mutex>
#include <optional>
#include <shared_mutex>

#include "masking_functions/dictionary.hpp"

namespace masking_functions {

bookshelf::bookshelf() = default;
bookshelf::~bookshelf() = default;

bool bookshelf::contains(const std::string &dictionary_name,
const std::string &term) const noexcept {
const auto dict{find_dictionary_internal(dictionary_name)};
if (!dict) {
const auto dictionary_it{dictionaries_.find(dictionary_name)};
if (dictionary_it == std::cend(dictionaries_)) {
return false;
}
return dict->contains(term);
return dictionary_it->second->contains(term);
}

// returning a copy deliberately for thread safety
optional_string bookshelf::get_random(
std::string_view bookshelf::get_random(
const std::string &dictionary_name) const noexcept {
const auto dict{find_dictionary_internal(dictionary_name)};
if (!dict) {
return std::nullopt;
const auto dictionary_it{dictionaries_.find(dictionary_name)};
if (dictionary_it == std::cend(dictionaries_)) {
return {};
}
return dict->get_random();
return dictionary_it->second->get_random();
}

bool bookshelf::remove(const std::string &dictionary_name) noexcept {
std::unique_lock dictionaries_write_lock{dictionaries_mutex_};
return dictionaries_.erase(dictionary_name) != 0U;
}

bool bookshelf::remove(const std::string &dictionary_name,
const std::string &term) noexcept {
const auto dict{find_dictionary_internal(dictionary_name)};
if (!dict) {
const auto dictionary_it{dictionaries_.find(dictionary_name)};
if (dictionary_it == std::end(dictionaries_)) {
return false;
}
return dict->remove(term);
// after this operation we may have a dictionary with no terms in it -
// it is fine and much safer than trying to re-acquire a write lock and
// removing the dictionary from the bookshelf when it has 0 terms.
const auto result{dictionary_it->second->remove(term)};
if (dictionary_it->second->is_empty()) {
dictionaries_.erase(dictionary_it);
}
return result;
}

bool bookshelf::insert(const std::string &dictionary_name,
const std::string &term) {
auto dict{find_dictionary_internal(dictionary_name)};
if (dict) {
return dict->insert(term);
// here we use try_emplace as an combined version of find and
// insert
const auto [dictionary_it,
inserted]{dictionaries_.try_emplace(dictionary_name)};
if (!inserted) {
return dictionary_it->second->insert(term);
}

// if no dictionary with such name alteady exist, we need to
// create it under a write lock
{
std::unique_lock dictionaries_write_lock{dictionaries_mutex_};
// it may happen that between the read and write locks another thread
// already created the dictionary with the same name - checking again
dict = std::make_shared<dictionary>(term);
const auto [dictionary_it,
inserted]{dictionaries_.emplace(dictionary_name, dict)};
if (inserted) {
return true;
}
dict = dictionary_it->second;
}
return dict->insert(term);
}

dictionary_ptr bookshelf::find_dictionary_internal(
const std::string &dictionary_name) const noexcept {
std::shared_lock dictionaries_read_lock{dictionaries_mutex_};
const auto dictionary_it{dictionaries_.find(dictionary_name)};
if (dictionary_it == std::cend(dictionaries_)) {
return {};
try {
dictionary_it->second = std::make_unique<dictionary>(term);
} catch (...) {
dictionaries_.erase(dictionary_it);
throw;
}
return dictionary_it->second;
return true;
}

} // namespace masking_functions
21 changes: 8 additions & 13 deletions components/masking_functions/src/masking_functions/dictionary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,33 @@
#include "masking_functions/random_string_generators.hpp"

#include <iterator>
#include <mutex>
#include <optional>

namespace masking_functions {

dictionary::dictionary(const std::string &term) : terms_{}, terms_mutex_{} {
terms_.emplace(term);
}
dictionary::dictionary(const std::string &term)
: // here we use std::unordered_set iterator range constructor with
// single 'term' element converted to a fake range
terms_{&term, std::next(&term)} {}

bool dictionary::contains(const std::string &term) const noexcept {
std::shared_lock terms_read_lock{terms_mutex_};
// TODO: in c++20 change to terms_.contains(term)
return terms_.count(term) > 0U;
}

optional_string dictionary::get_random() const {
std::shared_lock terms_read_lock{terms_mutex_};

std::string_view dictionary::get_random() const noexcept {
if (terms_.empty()) {
return std::nullopt;
return {};
}

const auto random_index{random_number(0, terms_.size() - 1U)};
return *std::next(terms_.begin(), random_index);
return *std::next(std::begin(terms_), random_index);
}

bool dictionary::insert(const std::string &term) {
std::unique_lock terms_write_lock{terms_mutex_};
return terms_.emplace(term).second;
}

bool dictionary::remove(const std::string &term) noexcept {
std::unique_lock terms_write_lock{terms_mutex_};
return terms_.erase(term) > 0U;
}

Expand Down
Loading

0 comments on commit 242221d

Please sign in to comment.