From d9b0c85bd984da4cc8c976d24004d06326805adf Mon Sep 17 00:00:00 2001 From: Fei Deng Date: Wed, 14 Oct 2020 12:49:13 -0500 Subject: [PATCH] parse expiration time and reload config at time out --- plugins/s3_auth/s3_auth.cc | 248 +++++++++++++++++++++++++++++++++---- 1 file changed, 226 insertions(+), 22 deletions(-) diff --git a/plugins/s3_auth/s3_auth.cc b/plugins/s3_auth/s3_auth.cc index 696fcab279d..5b381ab395f 100644 --- a/plugins/s3_auth/s3_auth.cc +++ b/plugins/s3_auth/s3_auth.cc @@ -37,6 +37,12 @@ #include #include +#include +#include +#include +#include +#include + #include #include #include "tscore/ink_config.h" @@ -143,7 +149,31 @@ class ConfigCache S3Config *get(const char *fname); private: - std::unordered_map> _cache; + struct _ConfigData { + // This is incremented before and after cnf and load_time are set. + // Thus, an odd value indicates an update is in progress. + std::atomic update_status{0}; + + // A config from a file and the last time it was loaded. + // config should be written before load_time. That way, + // if config is read after load_time, the load time will + // never indicate config is fresh when it isn't. + std::atomic config; + std::atomic load_time; + + _ConfigData() {} + + _ConfigData(S3Config *config_, time_t load_time_) : config(config_), load_time(load_time_) {} + + _ConfigData(_ConfigData &&lhs) + { + update_status = lhs.update_status.load(); + config = lhs.config.load(); + load_time = lhs.load_time.load(); + } + }; + + std::unordered_map _cache; static const int _ttl = 60; }; @@ -153,6 +183,7 @@ ConfigCache gConfCache; // One configuration setup // int event_handler(TSCont, TSEvent, void *); // Forward declaration +int config_reloader(TSCont, TSEvent, void *); class S3Config { @@ -162,6 +193,9 @@ class S3Config if (get_cont) { _cont = TSContCreate(event_handler, nullptr); TSContDataSet(_cont, static_cast(this)); + + _conf_rld = TSContCreate(config_reloader, TSMutexCreate()); + TSContDataSet(_conf_rld, static_cast(this)); } } @@ -171,6 +205,13 @@ class S3Config TSfree(_secret); TSfree(_keyid); TSfree(_token); + TSfree(_conf_fname); + if (_conf_rld_act) { + TSActionCancel(_conf_rld_act); + } + if (_conf_rld) { + TSContDestroy(_conf_rld); + } if (_cont) { TSContDestroy(_cont); } @@ -212,16 +253,19 @@ class S3Config copy_changes_from(const S3Config *src) { if (src->_secret) { + TSfree(_secret); _secret = TSstrdup(src->_secret); _secret_len = src->_secret_len; } if (src->_keyid) { + TSfree(_keyid); _keyid = TSstrdup(src->_keyid); _keyid_len = src->_keyid_len; } if (src->_token) { + TSfree(_token); _token = TSstrdup(src->_token); _token_len = src->_token_len; } @@ -250,6 +294,13 @@ class S3Config _region_map = src->_region_map; _region_map_modified = true; } + + _expiration = src->_expiration; + + if (src->_conf_fname) { + TSfree(_conf_fname); + _conf_fname = TSstrdup(src->_conf_fname); + } } // Getters @@ -319,6 +370,24 @@ class S3Config return _region_map; } + long + expiration() const + { + return _expiration; + } + + const char * + conf_fname() const + { + return _conf_fname; + } + + int + incr_conf_reload_count() + { + return _conf_reload_count++; + } + // Setters void set_secret(const char *s) @@ -380,6 +449,25 @@ class S3Config _region_map_modified = true; } + void + set_expiration(const char *s) + { + _expiration = strtol(s, nullptr, 10); + } + + void + set_conf_fname(const char *s) + { + TSfree(_conf_fname); + _conf_fname = TSstrdup(s); + } + + void + reset_conf_reload_count() + { + _conf_reload_count = 0; + } + // Parse configs from an external file bool parse_config(const std::string &filename); @@ -391,6 +479,18 @@ class S3Config TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_REQUEST_HDR_HOOK, _cont); } + void + schedule_conf_reload(long delay) + { + if (_conf_rld_act != nullptr && !TSActionDone(_conf_rld_act)) { + TSActionCancel(_conf_rld_act); + } + _conf_rld_act = TSContScheduleOnPool(_conf_rld, delay * 1000, TS_THREAD_POOL_NET); + } + + std::shared_mutex reload_mutex; + std::atomic_bool reload_waiting = false; + private: char *_secret = nullptr; size_t _secret_len = 0; @@ -403,12 +503,17 @@ class S3Config bool _version_modified = false; bool _virt_host_modified = false; TSCont _cont = nullptr; + TSCont _conf_rld = nullptr; + TSAction _conf_rld_act = nullptr; StringSet _v4includeHeaders; bool _v4includeHeaders_modified = false; StringSet _v4excludeHeaders; bool _v4excludeHeaders_modified = false; StringMap _region_map; bool _region_map_modified = false; + long _expiration = 0; + char *_conf_fname = nullptr; + int _conf_reload_count = 0; }; bool @@ -465,6 +570,8 @@ S3Config::parse_config(const std::string &config_fname) set_exclude_headers(pos2 + 19); } else if (0 == strncasecmp(pos2, "v4-region-map=", 14)) { set_region_map(pos2 + 14); + } else if (0 == strncasecmp(pos2, "expiration=", 11)) { + set_expiration(pos2 + 11); } else { // ToDo: warnings? } @@ -486,6 +593,8 @@ S3Config::parse_config(const std::string &config_fname) S3Config * ConfigCache::get(const char *fname) { + S3Config *s3; + struct timeval tv; gettimeofday(&tv, nullptr); @@ -496,40 +605,56 @@ ConfigCache::get(const char *fname) auto it = _cache.find(config_fname); if (it != _cache.end()) { - if (tv.tv_sec > (it->second.second + _ttl)) { - // Update the cached configuration file. - S3Config *s3 = new S3Config(false); // false == this config does not get the continuation - - TSDebug(PLUGIN_NAME, "Configuration from %s is stale, reloading", config_fname.c_str()); - it->second.second = tv.tv_sec; - if (s3->parse_config(config_fname)) { - it->second.first = s3; + unsigned update_status = it->second.update_status; + if (tv.tv_sec > (it->second.load_time + _ttl)) { + if (!(update_status & 1) && it->second.update_status.compare_exchange_strong(update_status, update_status + 1)) { + TSDebug(PLUGIN_NAME, "Configuration from %s is stale, reloading", config_fname.c_str()); + s3 = new S3Config(false); // false == this config does not get the continuation + + if (s3->parse_config(config_fname)) { + s3->set_conf_fname(fname); + } else { + // Failed the configuration parse... Set the cache response to nullptr + delete s3; + s3 = nullptr; + TSAssert(!"Configuration parsing / caching failed"); + } + + delete it->second.config; + it->second.config = s3; + it->second.load_time = tv.tv_sec; + + // Update is complete. + ++it->second.update_status; } else { - // Failed the configuration parse... Set the cache response to nullptr - delete s3; - it->second.first = nullptr; + // This thread lost the race with another thread that is also reloading + // the config for this file. Wait for the other thread to finish reloading. + while (it->second.update_status & 1) { + // Hopefully yielding will sleep the thread at least until the next + // scheduler interrupt, preventing a busy wait. + std::this_thread::yield(); + } + s3 = it->second.config; } } else { TSDebug(PLUGIN_NAME, "Configuration from %s is fresh, reusing", config_fname.c_str()); + s3 = it->second.config; } - return it->second.first; } else { // Create a new cached file. - S3Config *s3 = new S3Config(false); // false == this config does not get the continuation + s3 = new S3Config(false); // false == this config does not get the continuation + TSDebug(PLUGIN_NAME, "Parsing and caching configuration from %s, version:%d", config_fname.c_str(), s3->version()); if (s3->parse_config(config_fname)) { - _cache[config_fname] = std::make_pair(s3, tv.tv_sec); - TSDebug(PLUGIN_NAME, "Parsing and caching configuration from %s, version:%d", config_fname.c_str(), s3->version()); + s3->set_conf_fname(fname); + _cache.emplace(config_fname, _ConfigData(s3, tv.tv_sec)); } else { delete s3; - return nullptr; + s3 = nullptr; + TSAssert(!"Configuration parsing / caching failed"); } - - return s3; } - - TSAssert(!"Configuration parsing / caching failed"); - return nullptr; + return s3; } /////////////////////////////////////////////////////////////////////////////// @@ -876,6 +1001,11 @@ event_handler(TSCont cont, TSEvent event, void *edata) switch (event) { case TS_EVENT_HTTP_SEND_REQUEST_HDR: if (request.initialize()) { + while (s3->reload_waiting) { + std::this_thread::yield(); + } + + std::shared_lock lock(s3->reload_mutex); status = request.authorize(s3); } @@ -897,6 +1027,63 @@ event_handler(TSCont cont, TSEvent event, void *edata) return 0; } +// If the token has more than one hour to expire, reload is scheduled one hour before expiration. +// If the token has less than one hour to expire, reload is scheduled 15 minutes before expiration. +// If the token has less than 15 minutes to expire, reload is scheduled at the expiration time. +static long +cal_reload_delay(long time_diff) +{ + if (time_diff > 3600) { + return time_diff - 3600; + } else if (time_diff > 900) { + return time_diff - 900; + } else { + return time_diff; + } +} + +int +config_reloader(TSCont cont, TSEvent event, void *edata) +{ + TSDebug(PLUGIN_NAME, "reloading configs"); + S3Config *s3 = static_cast(TSContDataGet(cont)); + S3Config *file_config = gConfCache.get(s3->conf_fname()); + + if (!file_config || !file_config->valid()) { + TSError("[%s] requires both shared and AWS secret configuration", PLUGIN_NAME); + return TS_ERROR; + } + + s3->reload_waiting = true; + { + std::unique_lock lock(s3->reload_mutex); + s3->copy_changes_from(file_config); + } + s3->reload_waiting = false; + + if (s3->expiration() == 0) { + TSDebug(PLUGIN_NAME, "disabling auto config reload"); + } else { + // auto reload is scheduled to be 5 minutes before the expiration time to get some headroom + long time_diff = s3->expiration() - + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + if (time_diff > 0) { + long delay = cal_reload_delay(time_diff); + TSDebug(PLUGIN_NAME, "scheduling config reload with %ld seconds delay", delay); + s3->reset_conf_reload_count(); + s3->schedule_conf_reload(delay); + } else { + TSDebug(PLUGIN_NAME, "config expiration time is in the past, re-checking in 1 minute"); + if (s3->incr_conf_reload_count() == 10) { + TSError("[%s] tried to reload config automatically but failed, please try manual reloading the config", PLUGIN_NAME); + } + s3->schedule_conf_reload(60); + } + } + + return TS_SUCCESS; +} + /////////////////////////////////////////////////////////////////////////////// // Initialize the plugin. // @@ -1000,6 +1187,23 @@ TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSE return TS_ERROR; } + if (s3->expiration() == 0) { + TSDebug(PLUGIN_NAME, "disabling auto config reload"); + } else { + // auto reload is scheduled to be 5 minutes before the expiration time to get some headroom + long time_diff = s3->expiration() - + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + if (time_diff > 0) { + long delay = cal_reload_delay(time_diff); + TSDebug(PLUGIN_NAME, "scheduling config reload with %ld seconds delay", delay); + s3->reset_conf_reload_count(); + s3->schedule_conf_reload(delay); + } else { + TSDebug(PLUGIN_NAME, "config expiration time is in the past, re-checking in 1 minute"); + s3->schedule_conf_reload(60); + } + } + *ih = static_cast(s3); TSDebug(PLUGIN_NAME, "New rule: access_key=%s, virtual_host=%s, version=%d", s3->keyid(), s3->virt_host() ? "yes" : "no", s3->version());