Skip to content

Commit

Permalink
parse expiration time and reload config at time out (#7281)
Browse files Browse the repository at this point in the history
(cherry picked from commit b777c92)
  • Loading branch information
duke8253 authored and zwoop committed Feb 23, 2021
1 parent e33d4fb commit 6c94cf3
Showing 1 changed file with 226 additions and 22 deletions.
248 changes: 226 additions & 22 deletions plugins/s3_auth/s3_auth.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
#include <openssl/sha.h>
#include <openssl/hmac.h>

#include <chrono>
#include <atomic>
#include <thread>
#include <mutex>
#include <shared_mutex>

#include <ts/ts.h>
#include <ts/remap.h>
#include "tscore/ink_config.h"
Expand Down Expand Up @@ -143,7 +149,31 @@ class ConfigCache
S3Config *get(const char *fname);

private:
std::unordered_map<std::string, std::pair<S3Config *, int>> _cache;
struct _ConfigData {
// This is incremented before and after cnf and load_time are set.
// Thus, an odd value indicates an update is in progress.
std::atomic<unsigned> update_status{0};

// A config from a file and the last time it was loaded.
// config should be written before load_time. That way,
// if config is read after load_time, the load time will
// never indicate config is fresh when it isn't.
std::atomic<S3Config *> config;
std::atomic<time_t> load_time;

_ConfigData() {}

_ConfigData(S3Config *config_, time_t load_time_) : config(config_), load_time(load_time_) {}

_ConfigData(_ConfigData &&lhs)
{
update_status = lhs.update_status.load();
config = lhs.config.load();
load_time = lhs.load_time.load();
}
};

std::unordered_map<std::string, _ConfigData> _cache;
static const int _ttl = 60;
};

Expand All @@ -153,6 +183,7 @@ ConfigCache gConfCache;
// One configuration setup
//
int event_handler(TSCont, TSEvent, void *); // Forward declaration
int config_reloader(TSCont, TSEvent, void *);

class S3Config
{
Expand All @@ -162,6 +193,9 @@ class S3Config
if (get_cont) {
_cont = TSContCreate(event_handler, nullptr);
TSContDataSet(_cont, static_cast<void *>(this));

_conf_rld = TSContCreate(config_reloader, TSMutexCreate());
TSContDataSet(_conf_rld, static_cast<void *>(this));
}
}

Expand All @@ -171,6 +205,13 @@ class S3Config
TSfree(_secret);
TSfree(_keyid);
TSfree(_token);
TSfree(_conf_fname);
if (_conf_rld_act) {
TSActionCancel(_conf_rld_act);
}
if (_conf_rld) {
TSContDestroy(_conf_rld);
}
if (_cont) {
TSContDestroy(_cont);
}
Expand Down Expand Up @@ -212,16 +253,19 @@ class S3Config
copy_changes_from(const S3Config *src)
{
if (src->_secret) {
TSfree(_secret);
_secret = TSstrdup(src->_secret);
_secret_len = src->_secret_len;
}

if (src->_keyid) {
TSfree(_keyid);
_keyid = TSstrdup(src->_keyid);
_keyid_len = src->_keyid_len;
}

if (src->_token) {
TSfree(_token);
_token = TSstrdup(src->_token);
_token_len = src->_token_len;
}
Expand Down Expand Up @@ -250,6 +294,13 @@ class S3Config
_region_map = src->_region_map;
_region_map_modified = true;
}

_expiration = src->_expiration;

if (src->_conf_fname) {
TSfree(_conf_fname);
_conf_fname = TSstrdup(src->_conf_fname);
}
}

// Getters
Expand Down Expand Up @@ -319,6 +370,24 @@ class S3Config
return _region_map;
}

long
expiration() const
{
return _expiration;
}

const char *
conf_fname() const
{
return _conf_fname;
}

int
incr_conf_reload_count()
{
return _conf_reload_count++;
}

// Setters
void
set_secret(const char *s)
Expand Down Expand Up @@ -380,6 +449,25 @@ class S3Config
_region_map_modified = true;
}

void
set_expiration(const char *s)
{
_expiration = strtol(s, nullptr, 10);
}

void
set_conf_fname(const char *s)
{
TSfree(_conf_fname);
_conf_fname = TSstrdup(s);
}

void
reset_conf_reload_count()
{
_conf_reload_count = 0;
}

// Parse configs from an external file
bool parse_config(const std::string &filename);

Expand All @@ -391,6 +479,18 @@ class S3Config
TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_REQUEST_HDR_HOOK, _cont);
}

void
schedule_conf_reload(long delay)
{
if (_conf_rld_act != nullptr && !TSActionDone(_conf_rld_act)) {
TSActionCancel(_conf_rld_act);
}
_conf_rld_act = TSContScheduleOnPool(_conf_rld, delay * 1000, TS_THREAD_POOL_NET);
}

std::shared_mutex reload_mutex;
std::atomic_bool reload_waiting = false;

private:
char *_secret = nullptr;
size_t _secret_len = 0;
Expand All @@ -403,12 +503,17 @@ class S3Config
bool _version_modified = false;
bool _virt_host_modified = false;
TSCont _cont = nullptr;
TSCont _conf_rld = nullptr;
TSAction _conf_rld_act = nullptr;
StringSet _v4includeHeaders;
bool _v4includeHeaders_modified = false;
StringSet _v4excludeHeaders;
bool _v4excludeHeaders_modified = false;
StringMap _region_map;
bool _region_map_modified = false;
long _expiration = 0;
char *_conf_fname = nullptr;
int _conf_reload_count = 0;
};

bool
Expand Down Expand Up @@ -465,6 +570,8 @@ S3Config::parse_config(const std::string &config_fname)
set_exclude_headers(pos2 + 19);
} else if (0 == strncasecmp(pos2, "v4-region-map=", 14)) {
set_region_map(pos2 + 14);
} else if (0 == strncasecmp(pos2, "expiration=", 11)) {
set_expiration(pos2 + 11);
} else {
// ToDo: warnings?
}
Expand All @@ -486,6 +593,8 @@ S3Config::parse_config(const std::string &config_fname)
S3Config *
ConfigCache::get(const char *fname)
{
S3Config *s3;

struct timeval tv;

gettimeofday(&tv, nullptr);
Expand All @@ -496,40 +605,56 @@ ConfigCache::get(const char *fname)
auto it = _cache.find(config_fname);

if (it != _cache.end()) {
if (tv.tv_sec > (it->second.second + _ttl)) {
// Update the cached configuration file.
S3Config *s3 = new S3Config(false); // false == this config does not get the continuation

TSDebug(PLUGIN_NAME, "Configuration from %s is stale, reloading", config_fname.c_str());
it->second.second = tv.tv_sec;
if (s3->parse_config(config_fname)) {
it->second.first = s3;
unsigned update_status = it->second.update_status;
if (tv.tv_sec > (it->second.load_time + _ttl)) {
if (!(update_status & 1) && it->second.update_status.compare_exchange_strong(update_status, update_status + 1)) {
TSDebug(PLUGIN_NAME, "Configuration from %s is stale, reloading", config_fname.c_str());
s3 = new S3Config(false); // false == this config does not get the continuation

if (s3->parse_config(config_fname)) {
s3->set_conf_fname(fname);
} else {
// Failed the configuration parse... Set the cache response to nullptr
delete s3;
s3 = nullptr;
TSAssert(!"Configuration parsing / caching failed");
}

delete it->second.config;
it->second.config = s3;
it->second.load_time = tv.tv_sec;

// Update is complete.
++it->second.update_status;
} else {
// Failed the configuration parse... Set the cache response to nullptr
delete s3;
it->second.first = nullptr;
// This thread lost the race with another thread that is also reloading
// the config for this file. Wait for the other thread to finish reloading.
while (it->second.update_status & 1) {
// Hopefully yielding will sleep the thread at least until the next
// scheduler interrupt, preventing a busy wait.
std::this_thread::yield();
}
s3 = it->second.config;
}
} else {
TSDebug(PLUGIN_NAME, "Configuration from %s is fresh, reusing", config_fname.c_str());
s3 = it->second.config;
}
return it->second.first;
} else {
// Create a new cached file.
S3Config *s3 = new S3Config(false); // false == this config does not get the continuation
s3 = new S3Config(false); // false == this config does not get the continuation

TSDebug(PLUGIN_NAME, "Parsing and caching configuration from %s, version:%d", config_fname.c_str(), s3->version());
if (s3->parse_config(config_fname)) {
_cache[config_fname] = std::make_pair(s3, tv.tv_sec);
TSDebug(PLUGIN_NAME, "Parsing and caching configuration from %s, version:%d", config_fname.c_str(), s3->version());
s3->set_conf_fname(fname);
_cache.emplace(config_fname, _ConfigData(s3, tv.tv_sec));
} else {
delete s3;
return nullptr;
s3 = nullptr;
TSAssert(!"Configuration parsing / caching failed");
}

return s3;
}

TSAssert(!"Configuration parsing / caching failed");
return nullptr;
return s3;
}

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -876,6 +1001,11 @@ event_handler(TSCont cont, TSEvent event, void *edata)
switch (event) {
case TS_EVENT_HTTP_SEND_REQUEST_HDR:
if (request.initialize()) {
while (s3->reload_waiting) {
std::this_thread::yield();
}

std::shared_lock lock(s3->reload_mutex);
status = request.authorize(s3);
}

Expand All @@ -897,6 +1027,63 @@ event_handler(TSCont cont, TSEvent event, void *edata)
return 0;
}

// If the token has more than one hour to expire, reload is scheduled one hour before expiration.
// If the token has less than one hour to expire, reload is scheduled 15 minutes before expiration.
// If the token has less than 15 minutes to expire, reload is scheduled at the expiration time.
static long
cal_reload_delay(long time_diff)
{
if (time_diff > 3600) {
return time_diff - 3600;
} else if (time_diff > 900) {
return time_diff - 900;
} else {
return time_diff;
}
}

int
config_reloader(TSCont cont, TSEvent event, void *edata)
{
TSDebug(PLUGIN_NAME, "reloading configs");
S3Config *s3 = static_cast<S3Config *>(TSContDataGet(cont));
S3Config *file_config = gConfCache.get(s3->conf_fname());

if (!file_config || !file_config->valid()) {
TSError("[%s] requires both shared and AWS secret configuration", PLUGIN_NAME);
return TS_ERROR;
}

s3->reload_waiting = true;
{
std::unique_lock lock(s3->reload_mutex);
s3->copy_changes_from(file_config);
}
s3->reload_waiting = false;

if (s3->expiration() == 0) {
TSDebug(PLUGIN_NAME, "disabling auto config reload");
} else {
// auto reload is scheduled to be 5 minutes before the expiration time to get some headroom
long time_diff = s3->expiration() -
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
if (time_diff > 0) {
long delay = cal_reload_delay(time_diff);
TSDebug(PLUGIN_NAME, "scheduling config reload with %ld seconds delay", delay);
s3->reset_conf_reload_count();
s3->schedule_conf_reload(delay);
} else {
TSDebug(PLUGIN_NAME, "config expiration time is in the past, re-checking in 1 minute");
if (s3->incr_conf_reload_count() == 10) {
TSError("[%s] tried to reload config automatically but failed, please try manual reloading the config", PLUGIN_NAME);
}
s3->schedule_conf_reload(60);
}
}

return TS_SUCCESS;
}

///////////////////////////////////////////////////////////////////////////////
// Initialize the plugin.
//
Expand Down Expand Up @@ -1000,6 +1187,23 @@ TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSE
return TS_ERROR;
}

if (s3->expiration() == 0) {
TSDebug(PLUGIN_NAME, "disabling auto config reload");
} else {
// auto reload is scheduled to be 5 minutes before the expiration time to get some headroom
long time_diff = s3->expiration() -
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
if (time_diff > 0) {
long delay = cal_reload_delay(time_diff);
TSDebug(PLUGIN_NAME, "scheduling config reload with %ld seconds delay", delay);
s3->reset_conf_reload_count();
s3->schedule_conf_reload(delay);
} else {
TSDebug(PLUGIN_NAME, "config expiration time is in the past, re-checking in 1 minute");
s3->schedule_conf_reload(60);
}
}

*ih = static_cast<void *>(s3);
TSDebug(PLUGIN_NAME, "New rule: access_key=%s, virtual_host=%s, version=%d", s3->keyid(), s3->virt_host() ? "yes" : "no",
s3->version());
Expand Down

0 comments on commit 6c94cf3

Please sign in to comment.