diff --git a/plugins/experimental/stek_share/common.h b/plugins/experimental/stek_share/common.h index 312ad557d0c..f0cf6c1535d 100644 --- a/plugins/experimental/stek_share/common.h +++ b/plugins/experimental/stek_share/common.h @@ -31,8 +31,9 @@ #include #include #include +#include -#define PLUGIN "stek_share" +#define PLUGIN_NAME "stek_share" class PluginThreads { @@ -64,7 +65,7 @@ class PluginThreads } private: - bool shut_down = false; + std::atomic shut_down = false; std::deque threads_queue; std::mutex threads_mutex; }; diff --git a/plugins/experimental/stek_share/log_store.h b/plugins/experimental/stek_share/log_store.h index 6f6659cdee4..8a4a8c38e9c 100644 --- a/plugins/experimental/stek_share/log_store.h +++ b/plugins/experimental/stek_share/log_store.h @@ -34,33 +34,33 @@ class STEKShareLogStore : public nuraft::log_store __nocopy__(STEKShareLogStore); - uint64_t next_slot() const; + uint64_t next_slot() const override; - uint64_t start_index() const; + uint64_t start_index() const override; - nuraft::ptr last_entry() const; + nuraft::ptr last_entry() const override; - uint64_t append(nuraft::ptr &entry); + uint64_t append(nuraft::ptr &entry) override; - void write_at(uint64_t index, nuraft::ptr &entry); + void write_at(uint64_t index, nuraft::ptr &entry) override; - nuraft::ptr>> log_entries(uint64_t start, uint64_t end); + nuraft::ptr>> log_entries(uint64_t start, uint64_t end) override; nuraft::ptr>> log_entries_ext(uint64_t start, uint64_t end, - int64_t batch_size_hint_in_bytes = 0); + int64_t batch_size_hint_in_bytes = 0) override; - nuraft::ptr entry_at(uint64_t index); + nuraft::ptr entry_at(uint64_t index) override; - uint64_t term_at(uint64_t index); + uint64_t term_at(uint64_t index) override; - nuraft::ptr pack(uint64_t index, int32_t cnt); + nuraft::ptr pack(uint64_t index, int32_t cnt) override; - void apply_pack(uint64_t index, nuraft::buffer &pack); + void apply_pack(uint64_t index, nuraft::buffer &pack) override; - bool compact(uint64_t last_log_index); + bool compact(uint64_t last_log_index) override; bool - flush() + flush() override { return true; } diff --git a/plugins/experimental/stek_share/state_machine.h b/plugins/experimental/stek_share/state_machine.h index 26a3995e851..a096f6c8a39 100644 --- a/plugins/experimental/stek_share/state_machine.h +++ b/plugins/experimental/stek_share/state_machine.h @@ -38,19 +38,19 @@ class STEKShareSM : public nuraft::state_machine ~STEKShareSM() {} nuraft::ptr - pre_commit(const uint64_t log_idx, nuraft::buffer &data) + pre_commit(const uint64_t log_idx, nuraft::buffer &data) override { return nullptr; } nuraft::ptr - commit(const uint64_t log_idx, nuraft::buffer &data) + commit(const uint64_t log_idx, nuraft::buffer &data) override { // Extract bytes from "data". size_t len = 0; nuraft::buffer_serializer bs_data(data); void *byte_array = bs_data.get_bytes(len); - // TSDebug(PLUGIN, "commit %lu: %s", log_idx, hex_str(std::string(reinterpret_cast(byte_array), len)).c_str()); + // TSDebug(PLUGIN_NAME, "commit %lu: %s", log_idx, hex_str(std::string(reinterpret_cast(byte_array), len)).c_str()); assert(len == SSL_TICKET_KEY_SIZE); @@ -89,23 +89,23 @@ class STEKShareSM : public nuraft::state_machine } void - commit_config(const uint64_t log_idx, nuraft::ptr &new_conf) + commit_config(const uint64_t log_idx, nuraft::ptr &new_conf) override { // Nothing to do with configuration change. Just update committed index. last_committed_idx_ = log_idx; } void - rollback(const uint64_t log_idx, nuraft::buffer &data) + rollback(const uint64_t log_idx, nuraft::buffer &data) override { // Nothing to do here since we don't have pre-commit. } int read_logical_snp_obj(nuraft::snapshot &s, void *&user_snp_ctx, uint64_t obj_id, nuraft::ptr &data_out, - bool &is_last_obj) + bool &is_last_obj) override { - // TSDebug(PLUGIN, "read snapshot %lu term %lu object ID %lu", s.get_last_log_idx(), s.get_last_log_term(), obj_id); + // TSDebug(PLUGIN_NAME, "read snapshot %lu term %lu object ID %lu", s.get_last_log_idx(), s.get_last_log_term(), obj_id); is_last_obj = true; @@ -124,9 +124,9 @@ class STEKShareSM : public nuraft::state_machine } void - save_logical_snp_obj(nuraft::snapshot &s, uint64_t &obj_id, nuraft::buffer &data, bool is_first_obj, bool is_last_obj) + save_logical_snp_obj(nuraft::snapshot &s, uint64_t &obj_id, nuraft::buffer &data, bool is_first_obj, bool is_last_obj) override { - // TSDebug(PLUGIN, "save snapshot %lu term %lu object ID %lu", s.get_last_log_idx(), s.get_last_log_term(), obj_id); + // TSDebug(PLUGIN_NAME, "save snapshot %lu term %lu object ID %lu", s.get_last_log_idx(), s.get_last_log_term(), obj_id); size_t len = 0; nuraft::buffer_serializer bs_data(data); @@ -150,9 +150,9 @@ class STEKShareSM : public nuraft::state_machine } bool - apply_snapshot(nuraft::snapshot &s) + apply_snapshot(nuraft::snapshot &s) override { - // TSDebug(PLUGIN, "apply snapshot %lu term %lu", s.get_last_log_idx(), s.get_last_log_term()); + // TSDebug(PLUGIN_NAME, "apply snapshot %lu term %lu", s.get_last_log_idx(), s.get_last_log_term()); { std::lock_guard l(snapshot_lock_); @@ -168,12 +168,12 @@ class STEKShareSM : public nuraft::state_machine } void - free_user_snp_ctx(void *&user_snp_ctx) + free_user_snp_ctx(void *&user_snp_ctx) override { } nuraft::ptr - last_snapshot() + last_snapshot() override { // Just return the latest snapshot. std::lock_guard l(snapshot_lock_); @@ -184,15 +184,15 @@ class STEKShareSM : public nuraft::state_machine } uint64_t - last_commit_index() + last_commit_index() override { return last_committed_idx_; } void - create_snapshot(nuraft::snapshot &s, nuraft::async_result::handler_type &when_done) + create_snapshot(nuraft::snapshot &s, nuraft::async_result::handler_type &when_done) override { - // TSDebug(PLUGIN, "create snapshot %lu term %lu", s.get_last_log_idx(), s.get_last_log_term()); + // TSDebug(PLUGIN_NAME, "create snapshot %lu term %lu", s.get_last_log_idx(), s.get_last_log_term()); ssl_ticket_key_t local_stek; { diff --git a/plugins/experimental/stek_share/state_manager.h b/plugins/experimental/stek_share/state_manager.h index 63e16249edb..06873799b23 100644 --- a/plugins/experimental/stek_share/state_manager.h +++ b/plugins/experimental/stek_share/state_manager.h @@ -44,45 +44,45 @@ class STEKShareSMGR : public nuraft::state_mgr ~STEKShareSMGR() {} nuraft::ptr - load_config() + load_config() override { return saved_config_; } void - save_config(const nuraft::cluster_config &config) + save_config(const nuraft::cluster_config &config) override { nuraft::ptr buf = config.serialize(); saved_config_ = nuraft::cluster_config::deserialize(*buf); } void - save_state(const nuraft::srv_state &state) + save_state(const nuraft::srv_state &state) override { nuraft::ptr buf = state.serialize(); saved_state_ = nuraft::srv_state::deserialize(*buf); } nuraft::ptr - read_state() + read_state() override { return saved_state_; } nuraft::ptr - load_log_store() + load_log_store() override { return cur_log_store_; } int32_t - server_id() + server_id() override { return my_id_; } void - system_exit(const int exit_code) + system_exit(const int exit_code) override { } diff --git a/plugins/experimental/stek_share/stek_share.cc b/plugins/experimental/stek_share/stek_share.cc index 669b54d4583..9f582318ac8 100644 --- a/plugins/experimental/stek_share/stek_share.cc +++ b/plugins/experimental/stek_share/stek_share.cc @@ -25,7 +25,6 @@ #include #include #include -#include #include #include @@ -47,143 +46,242 @@ static STEKShareServer stek_share_server; static const nuraft::raft_params::return_method_type CALL_TYPE = nuraft::raft_params::blocking; // static const nuraft::raft_params::return_method_type CALL_TYPE = nuraft::raft_params::async_handler; +static std::string conf_file_path; + +std::shared_ptr plugin_config; +std::shared_mutex plugin_config_mutex; + +std::shared_ptr plugin_config_old; +std::shared_mutex plugin_config_old_mutex; + +int load_config_from_file(); +int init_raft(nuraft::ptr sm_instance, std::shared_ptr config); +static void *stek_updater(void *arg); + +std::shared_ptr +get_scoped_config(bool backup = false) +{ + if (backup) { + std::shared_lock lock(plugin_config_old_mutex); + auto config = plugin_config_old; + return config; + } else { + std::shared_lock lock(plugin_config_mutex); + auto config = plugin_config; + return config; + } +} + +void +backup_config(std::shared_ptr config) +{ + std::unique_lock lock(plugin_config_old_mutex); + plugin_config_old = config; +} + +void +restore_config(std::shared_ptr config) +{ + std::unique_lock lock(plugin_config_mutex); + plugin_config = config; +} + static int shutdown_handler(TSCont contp, TSEvent event, void *edata) { if (event == TS_EVENT_LIFECYCLE_SHUTDOWN) { + stek_share_server.raft_launcher.shutdown(); + stek_share_server.reset(); plugin_threads.terminate(); - stek_share_server.launcher_.shutdown(); } return 0; } +static int +message_handler(TSCont contp, TSEvent event, void *edata) +{ + if (event == TS_EVENT_LIFECYCLE_MSG) { + TSPluginMsg *msg = static_cast(edata); + TSDebug(PLUGIN_NAME, "Message to '%s' - %zu bytes of data", msg->tag, msg->data_size); + if (strcmp(PLUGIN_NAME, msg->tag) == 0) { // Message is for us + if (msg->data_size) { + if (strncmp(reinterpret_cast(const_cast(msg->data)), "reload", msg->data_size) == 0) { + // TODO: If in the middle of generating a new STEK, maybe block until new STEK has been generated? + // Not a big problem since new STEK is only generated once every few hours. + TSDebug(PLUGIN_NAME, "Reloading configurations..."); + if (load_config_from_file() == 0) { + stek_share_server.config_reloading = true; + stek_share_server.raft_launcher.shutdown(); + stek_share_server.reset(); + auto config = get_scoped_config(); + if (init_raft(nuraft::cs_new(), config) == 0) { + backup_config(config); + TSDebug(PLUGIN_NAME, "Server ID: %d, Endpoint: %s", config->server_id, config->endpoint.c_str()); + } else { + TSError("[%s] Raft initialization failed with new config, retrying with old config.", PLUGIN_NAME); + auto config_old = get_scoped_config(true); + restore_config(config_old); + if (init_raft(nuraft::cs_new(), config_old) == 0) { + TSDebug(PLUGIN_NAME, "Server ID: %d, Endpoint: %s", config->server_id, config->endpoint.c_str()); + } else { + TSEmergency("[%s] Raft initialization failed with old config.", PLUGIN_NAME); + } + } + } else { + TSError("[%s] Config reload failed.", PLUGIN_NAME); + } + } else { + TSError("[%s] Unexpected msg %*.s", PLUGIN_NAME, static_cast(msg->data_size), + reinterpret_cast(const_cast(msg->data))); + } + } + } + } else { + TSError("[%s] Unexpected event %d", PLUGIN_NAME, event); + } + return TS_EVENT_NONE; +} + bool cert_verification(const std::string &sn) { - if (sn.compare(stek_share_server.cert_verify_str_) != 0) { - TSDebug(PLUGIN, "Cert incorrect, expecting: %s, got: %s", stek_share_server.cert_verify_str_.c_str(), sn.c_str()); + auto config = get_scoped_config(); + if (!config->cert_verify_str.empty() && sn.compare(config->cert_verify_str) != 0) { + TSDebug(PLUGIN_NAME, "Cert incorrect, expecting: %s, got: %s", config->cert_verify_str.c_str(), sn.c_str()); return false; } return true; } int -init_raft(nuraft::ptr sm_instance) +init_raft(nuraft::ptr sm_instance, std::shared_ptr config) { - // State machine. - stek_share_server.smgr_ = - nuraft::cs_new(stek_share_server.server_id_, stek_share_server.endpoint_, stek_share_server.server_list_); - // State manager. - stek_share_server.sm_ = sm_instance; + { + std::unique_lock lock(stek_share_server.smgr_mutex); + stek_share_server.smgr_instance = nuraft::cs_new(config->server_id, config->endpoint, config->server_list); + } + + // State machine. + { + std::unique_lock lock(stek_share_server.sm_mutex); + stek_share_server.sm_instance = sm_instance; + } // ASIO options. nuraft::asio_service::options asio_opts; - asio_opts.thread_pool_size_ = stek_share_server.asio_thread_pool_size_; + asio_opts.thread_pool_size_ = config->asio_thread_pool_size; asio_opts.enable_ssl_ = true; asio_opts.verify_sn_ = cert_verification; - asio_opts.root_cert_file_ = stek_share_server.root_cert_file_; - asio_opts.server_cert_file_ = stek_share_server.server_cert_file_; - asio_opts.server_key_file_ = stek_share_server.server_key_file_; + asio_opts.root_cert_file_ = config->root_cert_file; + asio_opts.server_cert_file_ = config->server_cert_file; + asio_opts.server_key_file_ = config->server_key_file; // Raft parameters. nuraft::raft_params params; - params.heart_beat_interval_ = stek_share_server.heart_beat_interval_; - params.election_timeout_lower_bound_ = stek_share_server.election_timeout_lower_bound_; - params.election_timeout_upper_bound_ = stek_share_server.election_timeout_upper_bound_; - params.reserved_log_items_ = stek_share_server.reserved_log_items_; - params.snapshot_distance_ = stek_share_server.snapshot_distance_; - params.client_req_timeout_ = stek_share_server.client_req_timeout_; + params.heart_beat_interval_ = config->heart_beat_interval; + params.election_timeout_lower_bound_ = config->election_timeout_lower_bound; + params.election_timeout_upper_bound_ = config->election_timeout_upper_bound; + params.reserved_log_items_ = config->reserved_log_items; + params.snapshot_distance_ = config->snapshot_distance; + params.client_req_timeout_ = config->client_req_timeout; // According to this method, "append_log" function should be handled differently. params.return_method_ = CALL_TYPE; // Initialize Raft server. - stek_share_server.raft_instance_ = stek_share_server.launcher_.init(stek_share_server.sm_, stek_share_server.smgr_, nullptr, - stek_share_server.port_, asio_opts, params); + { + std::unique_lock lock(stek_share_server.raft_mutex); + stek_share_server.raft_instance = stek_share_server.raft_launcher.init( + stek_share_server.sm_instance, stek_share_server.smgr_instance, nullptr, config->port, asio_opts, params); + } - if (!stek_share_server.raft_instance_) { - TSDebug(PLUGIN, "Failed to initialize launcher."); + std::shared_lock lock(stek_share_server.raft_mutex); + if (!stek_share_server.raft_instance) { + TSDebug(PLUGIN_NAME, "Failed to initialize launcher."); return -1; } - TSDebug(PLUGIN, "Raft instance initialization done."); return 0; } int -set_server_info(int argc, const char *argv[]) +load_config_from_file() { - // Get server ID. + auto new_config = std::make_shared(); + YAML::Node server_conf; try { - server_conf = YAML::LoadFile(argv[1]); + server_conf = YAML::LoadFile(conf_file_path); } catch (YAML::BadFile &e) { - TSEmergency("[%s] Cannot load configuration file: %s.", PLUGIN, e.what()); + TSDebug(PLUGIN_NAME, "Cannot load configuration file: %s.", e.what()); + return -1; } catch (std::exception &e) { - TSEmergency("[%s] Unknown error while loading configuration file: %s.", PLUGIN, e.what()); + TSDebug(PLUGIN_NAME, "Unknown error while loading configuration file: %s.", e.what()); + return -1; } + // Get server id. if (server_conf["server_id"]) { - stek_share_server.server_id_ = server_conf["server_id"].as(); - if (stek_share_server.server_id_ < 1) { - TSDebug(PLUGIN, "Wrong server id (must be >= 1): %d", stek_share_server.server_id_); + new_config->server_id = server_conf["server_id"].as(); + if (new_config->server_id < 1) { + TSDebug(PLUGIN_NAME, "Wrong server id (must be >= 1): %d", new_config->server_id); return -1; } } else { - TSDebug(PLUGIN, "Must specify server id in the configuration file."); + TSDebug(PLUGIN_NAME, "Must specify server id in the configuration file."); return -1; } // Get server address and port. if (server_conf["address"]) { - stek_share_server.addr_ = server_conf["address"].as(); + new_config->address = server_conf["address"].as(); } else { - TSDebug(PLUGIN, "Must specify server address in the configuration file."); + TSDebug(PLUGIN_NAME, "Must specify server address in the configuration file."); return -1; } if (server_conf["port"]) { - stek_share_server.port_ = server_conf["port"].as(); + new_config->port = server_conf["port"].as(); } else { - TSDebug(PLUGIN, "Must specify server port in the configuration file."); + TSDebug(PLUGIN_NAME, "Must specify server port in the configuration file."); return -1; } - stek_share_server.endpoint_ = stek_share_server.addr_ + ":" + std::to_string(stek_share_server.port_); + new_config->endpoint = new_config->address + ":" + std::to_string(new_config->port); if (server_conf["asio_thread_pool_size"]) { - stek_share_server.asio_thread_pool_size_ = server_conf["asio_thread_pool_size"].as(); + new_config->asio_thread_pool_size = server_conf["asio_thread_pool_size"].as(); } if (server_conf["heart_beat_interval"]) { - stek_share_server.heart_beat_interval_ = server_conf["heart_beat_interval"].as(); + new_config->heart_beat_interval = server_conf["heart_beat_interval"].as(); } if (server_conf["election_timeout_lower_bound"]) { - stek_share_server.election_timeout_lower_bound_ = server_conf["election_timeout_lower_bound"].as(); + new_config->election_timeout_lower_bound = server_conf["election_timeout_lower_bound"].as(); } if (server_conf["election_timeout_upper_bound"]) { - stek_share_server.election_timeout_upper_bound_ = server_conf["election_timeout_upper_bound"].as(); + new_config->election_timeout_upper_bound = server_conf["election_timeout_upper_bound"].as(); } if (server_conf["reserved_log_items"]) { - stek_share_server.reserved_log_items_ = server_conf["reserved_log_items"].as(); + new_config->reserved_log_items = server_conf["reserved_log_items"].as(); } if (server_conf["snapshot_distance"]) { - stek_share_server.snapshot_distance_ = server_conf["snapshot_distance"].as(); + new_config->snapshot_distance = server_conf["snapshot_distance"].as(); } if (server_conf["client_req_timeout"]) { - stek_share_server.client_req_timeout_ = server_conf["client_req_timeout"].as(); + new_config->client_req_timeout = server_conf["client_req_timeout"].as(); } if (server_conf["key_update_interval"]) { - stek_share_server.key_update_interval_ = server_conf["key_update_interval"].as(); + new_config->key_update_interval = std::chrono::seconds(server_conf["key_update_interval"].as()); } else { - TSDebug(PLUGIN, "Must specify server key update interval in the configuration file."); + TSDebug(PLUGIN_NAME, "Must specify server key update interval in the configuration file."); return -1; } @@ -192,9 +290,11 @@ set_server_info(int argc, const char *argv[]) try { server_list = YAML::LoadFile(server_conf["server_list_file"].as()); } catch (YAML::BadFile &e) { - TSEmergency("[%s] Cannot load server list file: %s.", PLUGIN, e.what()); + TSDebug(PLUGIN_NAME, "Cannot load server list file: %s.", e.what()); + return -1; } catch (std::exception &e) { - TSEmergency("[%s] Unknown error while loading server list file: %s.", PLUGIN, e.what()); + TSDebug(PLUGIN_NAME, "Unknown error while loading server list file: %s.", e.what()); + return -1; } std::string cluster_list_str = ""; @@ -202,52 +302,52 @@ set_server_info(int argc, const char *argv[]) for (auto it = server_list.begin(); it != server_list.end(); ++it) { YAML::Node server_info = it->as(); if (server_info["server_id"] && server_info["address"] && server_info["port"]) { - int server_id = server_info["server_id"].as(); - std::string address = server_info["address"].as(); - int port = server_info["port"].as(); - std::string endpoint = address + ":" + std::to_string(port); - stek_share_server.server_list_[server_id] = endpoint; - cluster_list_str += "\n " + std::to_string(server_id) + ", " + endpoint; + int server_id = server_info["server_id"].as(); + std::string address = server_info["address"].as(); + int port = server_info["port"].as(); + std::string endpoint = address + ":" + std::to_string(port); + new_config->server_list[server_id] = endpoint; + cluster_list_str += "\n " + std::to_string(server_id) + ", " + endpoint; } else { - TSDebug(PLUGIN, "Wrong server list format."); + TSDebug(PLUGIN_NAME, "Wrong server list format."); return -1; } } - TSDebug(PLUGIN, "%s", cluster_list_str.c_str()); + TSDebug(PLUGIN_NAME, "%s", cluster_list_str.c_str()); } else { - TSDebug(PLUGIN, "Must specify server list file in the configuration file."); + TSDebug(PLUGIN_NAME, "Must specify server list file in the configuration file."); return -1; } // TODO: check cert and key files exist if (server_conf["root_cert_file"]) { - stek_share_server.root_cert_file_ = server_conf["root_cert_file"].as(); + new_config->root_cert_file = server_conf["root_cert_file"].as(); } else { - TSDebug(PLUGIN, "Must specify root ca file in the configuration file."); + TSDebug(PLUGIN_NAME, "Must specify root ca file in the configuration file."); return -1; } if (server_conf["server_cert_file"]) { - stek_share_server.server_cert_file_ = server_conf["server_cert_file"].as(); + new_config->server_cert_file = server_conf["server_cert_file"].as(); } else { - TSDebug(PLUGIN, "Must specify server cert file in the configuration file."); + TSDebug(PLUGIN_NAME, "Must specify server cert file in the configuration file."); return -1; } if (server_conf["server_key_file"]) { - stek_share_server.server_key_file_ = server_conf["server_key_file"].as(); + new_config->server_key_file = server_conf["server_key_file"].as(); } else { - TSDebug(PLUGIN, "Must specify server key file in the configuration file."); + TSDebug(PLUGIN_NAME, "Must specify server key file in the configuration file."); return -1; } if (server_conf["cert_verify_str"]) { - stek_share_server.cert_verify_str_ = server_conf["cert_verify_str"].as(); - } else { - TSDebug(PLUGIN, "Must specify cert verify string in the configuration file."); - return -1; + new_config->cert_verify_str = server_conf["cert_verify_str"].as(); } + std::unique_lock lock(plugin_config_mutex); + plugin_config = new_config; + return 0; } @@ -257,10 +357,10 @@ handle_result(raft_result &result, nuraft::ptr &err) if (result.get_result_code() != nuraft::cmd_result_code::OK) { // Something went wrong. // This means committing this log failed, but the log itself is still in the log store. - TSDebug(PLUGIN, "Replication failed: %d", result.get_result_code()); + TSDebug(PLUGIN_NAME, "Replication failed: %d", result.get_result_code()); return; } - TSDebug(PLUGIN, "Replication succeeded."); + TSDebug(PLUGIN_NAME, "Replication succeeded."); } void @@ -272,11 +372,12 @@ append_log(const void *data, int data_len) bs.put_bytes(data, data_len); // Do append. - nuraft::ptr ret = stek_share_server.raft_instance_->append_entries({new_log}); + std::shared_lock lock(stek_share_server.raft_mutex); + nuraft::ptr ret = stek_share_server.raft_instance->append_entries({new_log}); if (!ret->get_accepted()) { // Log append rejected, usually because this node is not a leader. - TSDebug(PLUGIN, "Replication failed: %d", ret->get_result_code()); + TSDebug(PLUGIN_NAME, "Replication failed: %d", ret->get_result_code()); return; } @@ -299,14 +400,17 @@ append_log(const void *data, int data_len) void print_status() { + auto config = get_scoped_config(); // For debugging - nuraft::ptr ls = stek_share_server.smgr_->load_log_store(); + std::shared_lock smgr_lock(stek_share_server.smgr_mutex); + std::shared_lock raft_lock(stek_share_server.raft_mutex); + nuraft::ptr ls = stek_share_server.smgr_instance->load_log_store(); std::string status_str = ""; - status_str += "\n Server ID: " + std::to_string(stek_share_server.server_id_); - status_str += "\n Leader ID: " + std::to_string(stek_share_server.raft_instance_->get_leader()); + status_str += "\n Server ID: " + std::to_string(config->server_id); + status_str += "\n Leader ID: " + std::to_string(stek_share_server.raft_instance->get_leader()); status_str += "\n Raft log range: " + std::to_string(ls->start_index()) + " - " + std::to_string((ls->next_slot() - 1)); - status_str += "\n Last committed index: " + std::to_string(stek_share_server.raft_instance_->get_committed_log_idx()); - TSDebug(PLUGIN, "%s", status_str.c_str()); + status_str += "\n Last committed index: " + std::to_string(stek_share_server.raft_instance->get_committed_log_idx()); + TSDebug(PLUGIN_NAME, "%s", status_str.c_str()); } static void * @@ -316,105 +420,124 @@ stek_updater(void *arg) ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr); ::pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); - ssl_ticket_key_t curr_stek; - time_t init_key_time = 0; + TSDebug(PLUGIN_NAME, "Starting STEK updater thread: %lu", ::pthread_self()); - // Initial key to use before syncing up. - TSDebug(PLUGIN, "Generating initial STEK..."); - if (generate_new_stek(&curr_stek, 0 /* fast start */) == 0) { - TSDebug(PLUGIN, "Generate initial STEK succeeded: %s", - hex_str(std::string(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE)).c_str()); + while (!plugin_threads.is_shut_down()) { + ssl_ticket_key_t curr_stek; + std::chrono::time_point init_key_time; - std::memcpy(&stek_share_server.ticket_keys_[0], &curr_stek, SSL_TICKET_KEY_SIZE); + // Initial key to use before syncing up. + TSDebug(PLUGIN_NAME, "Generating initial STEK..."); + if (generate_new_stek(&curr_stek, 0 /* fast start */) == 0) { + TSDebug(PLUGIN_NAME, "Generate initial STEK succeeded: %s", + hex_str(std::string(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE)).c_str()); - TSDebug(PLUGIN, "Updating SSL Ticket Key..."); - if (TSSslTicketKeyUpdate(reinterpret_cast(stek_share_server.ticket_keys_), SSL_TICKET_KEY_SIZE) == TS_ERROR) { - TSDebug(PLUGIN, "Update SSL Ticket Key failed."); + std::memcpy(&stek_share_server.ticket_keys[0], &curr_stek, SSL_TICKET_KEY_SIZE); + + TSDebug(PLUGIN_NAME, "Updating SSL Ticket Key..."); + if (TSSslTicketKeyUpdate(reinterpret_cast(stek_share_server.ticket_keys), SSL_TICKET_KEY_SIZE) == TS_ERROR) { + TSDebug(PLUGIN_NAME, "Update SSL Ticket Key failed."); + } else { + TSDebug(PLUGIN_NAME, "Update SSL Ticket Key succeeded."); + init_key_time = std::chrono::system_clock::now(); + } } else { - TSDebug(PLUGIN, "Update SSL Ticket Key succeeded."); - init_key_time = time(nullptr); + TSFatal("[%s] Generate initial STEK failed.", PLUGIN_NAME); } - } else { - TSFatal("Generate initial STEK failed."); - } - // Since we're using a pre-configured cluster, we need to have >= 3 nodes in the clust - // to initialize. Busy check before that. - while (!plugin_threads.is_shut_down()) { - if (!stek_share_server.raft_instance_->is_initialized()) { - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - continue; - } + // Since we're using a pre-configured cluster, we need to have >= 2 nodes in the cluster + // to initialize. Busy check before that. + auto config = get_scoped_config(); + while (!stek_share_server.config_reloading && !plugin_threads.is_shut_down()) { + std::shared_lock init_lock(stek_share_server.raft_mutex); + if (!stek_share_server.raft_instance || !stek_share_server.raft_instance->is_initialized()) { + init_lock.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + continue; + } + init_lock.unlock(); + + std::shared_lock leader_lock(stek_share_server.raft_mutex); + if (stek_share_server.raft_instance->is_leader()) { + // We only need to generate new STEK if this server is the leader. + // Otherwise we wake up every 10 seconds to see whether a new STEK has been received. + if (std::chrono::duration_cast(init_key_time.time_since_epoch()).count() != 0 && + std::chrono::system_clock::now() - init_key_time < config->key_update_interval) { + // If we got here after starting up, that means the initial key is still valid and we can send it to everyone else. + stek_share_server.last_updated = init_key_time; + TSDebug(PLUGIN_NAME, "Using initial STEK: %s", + hex_str(std::string(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE)).c_str()); + append_log(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE); + + } else if (std::chrono::system_clock::now() - stek_share_server.last_updated >= config->key_update_interval) { + // Generate a new key as the last one has expired. + // Move the old key from ticket_keys_[0] to ticket_keys_[1], then put the new key in ticket_keys_[0]. + TSDebug(PLUGIN_NAME, "Generating new STEK..."); + if (generate_new_stek(&curr_stek, 1) == 0) { + TSDebug(PLUGIN_NAME, "Generate new STEK succeeded: %s", + hex_str(std::string(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE)).c_str()); - if (stek_share_server.raft_instance_->is_leader()) { - // We only need to generate new STEK if this server is the leader. - // Otherwise we wake up every 10 seconds to see whether a new STEK has been received. - if (init_key_time != 0 && time(nullptr) - init_key_time < stek_share_server.key_update_interval_) { - // If we got here after starting up, that means the initial key is still valid and we can send it to everyone else. - stek_share_server.last_updated_ = init_key_time; - TSDebug(PLUGIN, "Using initial STEK: %s", - hex_str(std::string(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE)).c_str()); - append_log(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE); - - } else if (time(nullptr) - stek_share_server.last_updated_ >= stek_share_server.key_update_interval_) { - // Generate a new key as the last one has expired. - // Move the old key from ticket_keys_[0] to ticket_keys_[1], then put the new key in ticket_keys_[0]. - TSDebug(PLUGIN, "Generating new STEK..."); - if (generate_new_stek(&curr_stek, 1) == 0) { - TSDebug(PLUGIN, "Generate new STEK succeeded: %s", + std::memcpy(&stek_share_server.ticket_keys[1], &stek_share_server.ticket_keys[0], SSL_TICKET_KEY_SIZE); + std::memcpy(&stek_share_server.ticket_keys[0], &curr_stek, SSL_TICKET_KEY_SIZE); + + TSDebug(PLUGIN_NAME, "Updating SSL Ticket Key..."); + if (TSSslTicketKeyUpdate(reinterpret_cast(stek_share_server.ticket_keys), SSL_TICKET_KEY_SIZE * 2) == + TS_ERROR) { + TSDebug(PLUGIN_NAME, "Update SSL Ticket Key failed."); + } else { + stek_share_server.last_updated = std::chrono::system_clock::now(); + TSDebug(PLUGIN_NAME, "Update SSL Ticket Key succeeded."); + TSDebug(PLUGIN_NAME, "Using new STEK: %s", + hex_str(std::string(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE)).c_str()); + append_log(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE); + } + } else { + TSFatal("[%s] Generate new STEK failed.", PLUGIN_NAME); + } + } + // Set this to 0 so we won't enter the code path where we use the initial STEK. + init_key_time = std::chrono::time_point(); + } else { + init_key_time = std::chrono::time_point(); + + std::shared_lock sm_lock(stek_share_server.sm_mutex); + auto sm = dynamic_cast(stek_share_server.sm_instance.get()); + + // Check whether we received a new key. + // TODO: retry updating STEK when failed + if (sm->received_stek(&curr_stek)) { + TSDebug(PLUGIN_NAME, "Received new STEK: %s", hex_str(std::string(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE)).c_str()); - std::memcpy(&stek_share_server.ticket_keys_[1], &stek_share_server.ticket_keys_[0], SSL_TICKET_KEY_SIZE); - std::memcpy(&stek_share_server.ticket_keys_[0], &curr_stek, SSL_TICKET_KEY_SIZE); + // Move the old key from ticket_keys_[0] to ticket_keys_[1], then put the new key in ticket_keys_[0]. + std::memcpy(&stek_share_server.ticket_keys[1], &stek_share_server.ticket_keys[0], SSL_TICKET_KEY_SIZE); + std::memcpy(&stek_share_server.ticket_keys[0], &curr_stek, SSL_TICKET_KEY_SIZE); - TSDebug(PLUGIN, "Updating SSL Ticket Key..."); - if (TSSslTicketKeyUpdate(reinterpret_cast(stek_share_server.ticket_keys_), SSL_TICKET_KEY_SIZE * 2) == TS_ERROR) { - TSDebug(PLUGIN, "Update SSL Ticket Key failed."); + TSDebug(PLUGIN_NAME, "Updating SSL Ticket Key..."); + if (TSSslTicketKeyUpdate(reinterpret_cast(stek_share_server.ticket_keys), SSL_TICKET_KEY_SIZE * 2) == TS_ERROR) { + TSDebug(PLUGIN_NAME, "Update SSL Ticket Key failed."); } else { - stek_share_server.last_updated_ = time(nullptr); - TSDebug(PLUGIN, "Update SSL Ticket Key succeeded."); - TSDebug(PLUGIN, "Using new STEK: %s", - hex_str(std::string(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE)).c_str()); - append_log(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE); + stek_share_server.last_updated = std::chrono::system_clock::now(); + TSDebug(PLUGIN_NAME, "Update SSL Ticket Key succeeded."); } - } else { - TSFatal("Generate new STEK failed."); } } - init_key_time = 0; - } else { - init_key_time = 0; - auto sm = dynamic_cast(stek_share_server.sm_.get()); - - // Check whether we received a new key. - // TODO: retry updating STEK when failed - if (sm->received_stek(&curr_stek)) { - TSDebug(PLUGIN, "Received new STEK: %s", - hex_str(std::string(reinterpret_cast(&curr_stek), SSL_TICKET_KEY_SIZE)).c_str()); - - // Move the old key from ticket_keys_[0] to ticket_keys_[1], then put the new key in ticket_keys_[0]. - std::memcpy(&stek_share_server.ticket_keys_[1], &stek_share_server.ticket_keys_[0], SSL_TICKET_KEY_SIZE); - std::memcpy(&stek_share_server.ticket_keys_[0], &curr_stek, SSL_TICKET_KEY_SIZE); - - TSDebug(PLUGIN, "Updating SSL Ticket Key..."); - if (TSSslTicketKeyUpdate(reinterpret_cast(stek_share_server.ticket_keys_), SSL_TICKET_KEY_SIZE * 2) == TS_ERROR) { - TSDebug(PLUGIN, "Update SSL Ticket Key failed."); - } else { - stek_share_server.last_updated_ = time(nullptr); - TSDebug(PLUGIN, "Update SSL Ticket Key succeeded."); - } - } + leader_lock.unlock(); + + // Wakeup every 10 seconds to check whether there is a new key to use. + // We do this because if a server is lagging behind, either by losing connection or joining late, + // that server might receive multiple keys (the ones it missed) when it reconnects. Since we only need the + // most recent one, and to save time, we check back every 10 seconds in hope that the barrage of incoming + // keys has finished, and if not the second time around it'll definitely has. + std::this_thread::sleep_for(std::chrono::seconds(10)); } - // Wakeup every 10 seconds to check whether there is a new key to use. - // We do this because if a server is lagging behind, either by losing connection or joining late, - // that server might receive multiple keys (the ones it missed) when it reconnects. Since we only need the - // most recent one, and to save time, we check back every 10 seconds in hope that the barrage of incoming - // keys has finished, and if not the second time around it'll definitely has. - std::this_thread::sleep_for(std::chrono::seconds(10)); + stek_share_server.config_reloading = false; } + TSDebug(PLUGIN_NAME, "Stopping STEK updater thread: %lu", ::pthread_self()); + return nullptr; } @@ -430,16 +553,27 @@ TSPluginInit(int argc, const char *argv[]) TSLifecycleHookAdd(TS_LIFECYCLE_SHUTDOWN_HOOK, TSContCreate(shutdown_handler, nullptr)); if (TSPluginRegister(&info) != TS_SUCCESS) { - TSError("Plugin registration failed."); + TSError("[%s] Plugin registration failed.", PLUGIN_NAME); return; } + TSLifecycleHookAdd(TS_LIFECYCLE_MSG_HOOK, TSContCreate(message_handler, nullptr)); + if (argc < 2) { - TSError("Must specify config file."); - } else if (set_server_info(argc, argv) == 0 && init_raft(nuraft::cs_new()) == 0) { - TSDebug(PLUGIN, "Server ID: %d, Endpoint: %s", stek_share_server.server_id_, stek_share_server.endpoint_.c_str()); - TSThreadCreate(stek_updater, nullptr); + TSError("[%s] Must specify config file.", PLUGIN_NAME); } else { - TSError("Raft initialization failed."); + conf_file_path = argv[1]; + if (load_config_from_file() == 0) { + auto config = get_scoped_config(); + if (init_raft(nuraft::cs_new(), config) == 0) { + backup_config(config); + TSDebug(PLUGIN_NAME, "Server ID: %d, Endpoint: %s", config->server_id, config->endpoint.c_str()); + TSThreadCreate(stek_updater, nullptr); + } else { + TSError("[%s] Raft initialization failed.", PLUGIN_NAME); + } + } else { + TSError("[%s] Config load failed.", PLUGIN_NAME); + } } } diff --git a/plugins/experimental/stek_share/stek_share.h b/plugins/experimental/stek_share/stek_share.h index 14a8579bb46..972ed89a47e 100644 --- a/plugins/experimental/stek_share/stek_share.h +++ b/plugins/experimental/stek_share/stek_share.h @@ -21,103 +21,118 @@ limitations under the License. #include #include -#include +#include +#include #include #include "stek_utils.h" -class STEKShareServer +class PluginConfig { public: - STEKShareServer() : server_id_(1), addr_("localhost"), port_(25000), sm_(nullptr), smgr_(nullptr), raft_instance_(nullptr) - { - last_updated_ = 0; - current_log_idx_ = 0; - - // Default ASIO thread pool size: 4. - asio_thread_pool_size_ = 4; - - // Default heart beat interval: 100 ms. - heart_beat_interval_ = 100; - - // Default election timeout: 200~400 ms. - election_timeout_lower_bound_ = 200; - election_timeout_upper_bound_ = 400; - - // Up to 5 logs will be preserved ahead the last snapshot. - reserved_log_items_ = 5; - - // Snapshot will be created for every 5 log appends. - snapshot_distance_ = 5; - - // Client timeout: 3000 ms. - client_req_timeout_ = 3000; - - std::memset(ticket_keys_, 0, SSL_TICKET_KEY_SIZE * 2); - } - - void - reset() + PluginConfig() + : server_id(1), + address("localhost"), + port(25000), + endpoint("localhost:25000"), + asio_thread_pool_size(4), // Default ASIO thread pool size: 4. + heart_beat_interval(100), // Default heart beat interval: 100 ms. + election_timeout_lower_bound(200), // Default election timeout: 200~400 ms. + election_timeout_upper_bound(400), + reserved_log_items(5), // Up to 5 logs will be preserved ahead the last snapshot. + snapshot_distance(5), // Snapshot will be created for every 5 log appends. + client_req_timeout(3000), // Client timeout: 3000 ms. + key_update_interval(60) // Generate new STEK every 60 s. { - sm_.reset(); - smgr_.reset(); - raft_instance_.reset(); } // Server ID. - int server_id_; + int server_id; // Server address. - std::string addr_; + std::string address; // Server port. - int port_; + int port; - // Endpoint: ":". - std::string endpoint_; + // Endpoint: "
:". + std::string endpoint; - // State machine. - nuraft::ptr sm_; + size_t asio_thread_pool_size; - // State manager. - nuraft::ptr smgr_; + int heart_beat_interval; - // Raft launcher. - nuraft::raft_launcher launcher_; + int election_timeout_lower_bound; + int election_timeout_upper_bound; - // Raft server instance. - nuraft::ptr raft_instance_; + int reserved_log_items; - // List of servers to auto add. - std::map server_list_; + int snapshot_distance; + + int client_req_timeout; // STEK update interval. - int key_update_interval_; + std::chrono::seconds key_update_interval; - // When was STEK last updated. - time_t last_updated_; + // List of servers to auto add. + std::map server_list; + + // TLS related stuff. + std::string root_cert_file; + std::string server_cert_file; + std::string server_key_file; + std::string cert_verify_str; +}; + +class STEKShareServer +{ +public: + STEKShareServer() : sm_instance(nullptr), smgr_instance(nullptr), raft_instance(nullptr), current_log_idx(0) + { + std::memset(ticket_keys, 0, SSL_TICKET_KEY_SIZE * 2); + } - uint64_t current_log_idx_; + void + reset() + { + { + std::unique_lock lock(sm_mutex); + sm_instance.reset(); + } + + { + std::unique_lock lock(smgr_mutex); + smgr_instance.reset(); + } + + { + std::unique_lock lock(raft_mutex); + raft_instance.reset(); + } + } - size_t asio_thread_pool_size_; + // State machine. + nuraft::ptr sm_instance; + std::shared_mutex sm_mutex; - int heart_beat_interval_; + // State manager. + nuraft::ptr smgr_instance; + std::shared_mutex smgr_mutex; - int election_timeout_lower_bound_; - int election_timeout_upper_bound_; + // Raft server instance. + nuraft::ptr raft_instance; + std::shared_mutex raft_mutex; - int reserved_log_items_; + // Raft launcher. + nuraft::raft_launcher raft_launcher; - int snapshot_distance_; + std::atomic config_reloading = false; - int client_req_timeout_; + // When was STEK last updated. + std::chrono::time_point last_updated; - // TLS related stuff. - std::string root_cert_file_; - std::string server_cert_file_; - std::string server_key_file_; - std::string cert_verify_str_; + uint64_t current_log_idx; - ssl_ticket_key_t ticket_keys_[2]; + ssl_ticket_key_t ticket_keys[2]; }; diff --git a/plugins/experimental/stek_share/stek_utils.h b/plugins/experimental/stek_share/stek_utils.h index 96cb05364d7..dc9750537ee 100644 --- a/plugins/experimental/stek_share/stek_utils.h +++ b/plugins/experimental/stek_share/stek_utils.h @@ -31,7 +31,7 @@ #define SSL_KEY_LEN 16 -typedef struct ssl_ticket_key // an STEK +typedef struct ssl_ticket_key // a STEK { unsigned char key_name[SSL_KEY_LEN]; // tickets use this name to identify who encrypted unsigned char hmac_secret[SSL_KEY_LEN];