From 1529774ae06323c0b6799c731b51b3ab010a2f6d Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Sun, 6 Dec 2020 22:41:24 +0800 Subject: [PATCH 01/17] [WIP] feature: endpoint choose by health check --- lib/resty/etcd.lua | 25 ++++- lib/resty/etcd/utils.lua | 8 ++ lib/resty/etcd/v3.lua | 237 +++++++++++++++++++++++++++++---------- 3 files changed, 207 insertions(+), 63 deletions(-) diff --git a/lib/resty/etcd.lua b/lib/resty/etcd.lua index dd6a9c71..63721b2f 100644 --- a/lib/resty/etcd.lua +++ b/lib/resty/etcd.lua @@ -1,9 +1,10 @@ -local etcdv2 = require("resty.etcd.v2") -local etcdv3 = require("resty.etcd.v3") -local utils = require("resty.etcd.utils") -local typeof = require("typeof") -local require = require -local pcall = pcall +local etcdv2 = require("resty.etcd.v2") +local etcdv3 = require("resty.etcd.v3") +local utils = require("resty.etcd.utils") +local typeof = require("typeof") +local require = require +local pcall = pcall +local ngx_shared = ngx.shared local prefix_v3 = { ["3.5."] = "/v3", ["3.4."] = "/v3", @@ -46,6 +47,18 @@ function _M.new(opts) return nil, 'opts must be table' end + -- health_check has value means enable etcd cluster health check + if opts.health_check then + local shared_dict = ngx_shared[opts.health_check.shm_name] + if not shared_dict then + return nil, "failed to get ngx.shared dict: " .. opts.health_check.shm_name + end + + opts.health_check.failure_window = opts.health_check.failure_window or 1 + opts.health_check.failure_times = opts.health_check.failure_times or 1 + opts.health_check.disable_duration = opts.health_check.disable_duration or 100 + end + opts.timeout = opts.timeout or 5 -- 5 sec opts.http_host = opts.http_host or "http://127.0.0.1:2379" opts.ttl = opts.ttl or -1 diff --git a/lib/resty/etcd/utils.lua b/lib/resty/etcd/utils.lua index c9766e3f..a6acc1d9 100644 --- a/lib/resty/etcd/utils.lua +++ b/lib/resty/etcd/utils.lua @@ -84,6 +84,7 @@ end local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_INFO = ngx.INFO +local ngx_WARN= ngx.WARN local function log_error(...) return ngx_log(ngx_ERR, ...) end @@ -95,6 +96,13 @@ local function log_info( ... ) end _M.log_info = log_info + +local function log_warn( ... ) + return ngx_log(ngx_WARN, ...) +end +_M.log_warn = log_warn + + local function verify_key(key) if not key or #key == 0 then return false, "key should not be empty" diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index e1117caf..e3be8803 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -19,7 +19,11 @@ local decode_json = cjson.decode local encode_json = cjson.encode local encode_base64 = ngx.encode_base64 local decode_base64 = ngx.decode_base64 -local INIT_COUNT_RESIZE = 2e8 +local tostring = tostring +local string_format = string.format +local math_floor = math.floor +local ngx_shared = ngx.shared +local ngx_timer_at = ngx.timer.at local _M = {} @@ -28,7 +32,64 @@ local mt = { __index = _M } -- define local refresh function variable local refresh_jwt_token -local function _request_uri(self, method, uri, opts, timeout, ignore_auth) + +local function get_id(time, window) + return tostring(math_floor(time / window)) +end + + +local function get_counter_key(http_host, time, window) + local id = get_id(time, window) + return string_format("%s.%s.counter", http_host, id) +end + + +local function incr(key, shm_name, failure_window) + local new_value, err, forcible = ngx_shared[shm_name]:incr(key, 1, 0, failure_window) + if err then + return nil, err + end + + if forcible then + utils.log_warn("shared dict: ", shm_name, " is full, valid items forcibly overwritten") + end + return new_value, nil +end + + +local function restore(disable_duration, endpoint) + local ok, err = ngx_timer_at(disable_duration, function(premature) + if premature then + return + end + + endpoint.health_status = 1 + end) + + if not ok then + utils.log_error("failed to start timer to restore etcd endpoint to health: ", err) + end +end + + +local function report_failure(self, endpoint) + local key = get_counter_key(endpoint.http_host, now(), self.failure_window) + local failure_count, err = incr(key, self.shm_name, self.failure_window) + if err then + utils.log_error("failed to start timer to restore etcd endpoint to health: ", err) + return + end + + -- only trigger once + if failure_count == self.failure_times then + endpoint.health_status = 0 + restore(self.disable_duration, endpoint) + return + end +end + + +local function _request_uri(self, endpoint, method, uri, opts, timeout, ignore_auth) local body if opts and opts.body and tab_nkeys(opts.body) > 0 then @@ -56,6 +117,7 @@ local function _request_uri(self, method, uri, opts, timeout, ignore_auth) local http_cli, err = utils.http.new() if err then + report_failure(self, endpoint) return nil, err end @@ -73,6 +135,7 @@ local function _request_uri(self, method, uri, opts, timeout, ignore_auth) }) if err then + report_failure(self, endpoint) return nil, err end @@ -108,6 +171,17 @@ function _M.new(opts) local user = opts.user local password = opts.password local ssl_verify = opts.ssl_verify + local failure_window + local failure_times + local disable_duration + local shm_name + if opts.health_check then + failure_window = opts.health_check.failure_window + failure_times = opts.health_check.failure_times + disable_duration = opts.health_check.disable_duration + shm_name = opts.health_check.shm_name + end + if not typeof.uint(timeout) then return nil, 'opts.timeout must be unsigned integer' @@ -137,6 +211,20 @@ function _M.new(opts) return nil, 'opts.password must be string or ignore' end + if opts.health_check then + if failure_window and not typeof.uint(failure_window) then + return nil, 'opts.health_check.failure_window must be unsigned integer or ignore' + end + + if failure_times and not typeof.uint(failure_times) then + return nil, 'opts.health_check.failure_times must be unsigned integer or ignore' + end + + if disable_duration and not typeof.uint(disable_duration) then + return nil, 'opts.health_check.disable_duration must be unsigned integer or ignore' + end + end + local endpoints = {} local http_hosts if type(http_host) == 'string' then -- signle node @@ -153,29 +241,49 @@ function _M.new(opts) end tab_insert(endpoints, { - full_prefix = host .. utils.normalize(api_prefix), - http_host = host, - scheme = m[1], - host = m[2] or "127.0.0.1", - port = m[3] or "2379", - api_prefix = api_prefix, + full_prefix = host .. utils.normalize(api_prefix), + http_host = host, + scheme = m[1], + host = m[2] or "127.0.0.1", + port = m[3] or "2379", + api_prefix = api_prefix, + health_status = 1, }) end + if opts.health_check then + return setmetatable({ + last_auth_time = now(), -- save last Authentication time + jwt_token = nil, -- last Authentication token + is_auth = not not (user and password), + user = user, + password = password, + timeout = timeout, + ttl = ttl, + is_cluster = #endpoints > 1, + endpoints = endpoints, + key_prefix = key_prefix, + ssl_verify = ssl_verify, + failure_window = failure_window, + failure_times = failure_times, + disable_duration = disable_duration, + shm_name = shm_name, + }, mt) + end + return setmetatable({ - last_auth_time = now(), -- save last Authentication time - jwt_token = nil, -- last Authentication token - is_auth = not not (user and password), - user = user, - password = password, - timeout = timeout, - ttl = ttl, - is_cluster = #endpoints > 1, - endpoints = endpoints, - key_prefix = key_prefix, - ssl_verify = ssl_verify, - }, - mt) + last_auth_time = now(), -- save last Authentication time + jwt_token = nil, -- last Authentication token + is_auth = not not (user and password), + user = user, + password = password, + timeout = timeout, + ttl = ttl, + is_cluster = #endpoints > 1, + endpoints = endpoints, + key_prefix = key_prefix, + ssl_verify = ssl_verify, + }, mt) end @@ -186,13 +294,11 @@ local function choose_endpoint(self) return endpoints[1] end - self.init_count = (self.init_count or 0) + 1 - local pos = self.init_count % endpoints_len + 1 - if self.init_count >= INIT_COUNT_RESIZE then - self.init_count = 0 + for _, endpoint in ipairs(endpoints) do + if endpoint.health_status == 1 then + return endpoint + end end - - return endpoints[pos] end -- return refresh_is_ok, error @@ -210,8 +316,9 @@ function refresh_jwt_token(self) password = self.password, } } - local res, err = _request_uri(self, 'POST', - choose_endpoint(self).full_prefix .. "/auth/authenticate", + local endpoint= choose_endpoint(self) + local res, err = _request_uri(self, endpoint, 'POST', + endpoint.full_prefix .. "/auth/authenticate", opts, 5, true) -- default authenticate timeout 5 second if err then return nil, err @@ -275,9 +382,10 @@ local function set(self, key, val, attr) } } + local endpoint= choose_endpoint(self) local res - res, err = _request_uri(self, 'POST', - choose_endpoint(self).full_prefix .. "/kv/put", + res, err = _request_uri(self, endpoint, 'POST', + endpoint.full_prefix .. "/kv/put", opts, self.timeout) if err then return nil, err @@ -382,9 +490,10 @@ local function get(self, key, attr) } } + local endpoint= choose_endpoint(self) local res - res, err = _request_uri(self, "POST", - choose_endpoint(self).full_prefix .. "/kv/range", + res, err = _request_uri(self, endpoint, "POST", + endpoint.full_prefix .. "/kv/range", opts, attr and attr.timeout or self.timeout) if res and res.status == 200 then @@ -423,8 +532,9 @@ local function delete(self, key, attr) }, } - return _request_uri(self, "POST", - choose_endpoint(self).full_prefix .. "/kv/deleterange", + local endpoint= choose_endpoint(self) + return _request_uri(self, endpoint, "POST", + endpoint.full_prefix .. "/kv/deleterange", opts, self.timeout) end @@ -446,13 +556,14 @@ local function txn(self, opts_arg, compare, success, failure) }, } - return _request_uri(self, "POST", - choose_endpoint(self).full_prefix .. "/kv/txn", + local endpoint= choose_endpoint(self) + return _request_uri(self, endpoint, "POST", + endpoint.full_prefix .. "/kv/txn", opts, timeout or self.timeout) end -local function request_chunk(self, method, scheme, host, port, path, opts, timeout) +local function request_chunk(self, endpoint, method, scheme, host, port, path, opts, timeout) local body, err, _ if opts and opts.body and tab_nkeys(opts.body) > 0 then body, err = encode_json(opts.body) @@ -492,6 +603,7 @@ local function request_chunk(self, method, scheme, host, port, path, opts, timeo ok, err = http_cli:connect(host, port) if not ok then + report_failure(self, endpoint) return nil, err end @@ -543,6 +655,8 @@ local function request_chunk(self, method, scheme, host, port, path, opts, timeo body, err = decode_json(body) if not body then return nil, "failed to decode json body: " .. (err or " unkwon") + elseif body.error and body.error.http_code >= 500 then + report_failure(self, endpoint) end if body.result.events then @@ -652,7 +766,7 @@ local function watch(self, key, attr) local endpoint = choose_endpoint(self) - local callback_fun, err, http_cli = request_chunk(self, 'POST', + local callback_fun, err, http_cli = request_chunk(self, endpoint, 'POST', endpoint.scheme, endpoint.host, endpoint.port, @@ -883,8 +997,9 @@ function _M.grant(self, ttl, id) }, } - return _request_uri(self, "POST", - choose_endpoint(self).full_prefix .. "/lease/grant", opts) + local endpoint= choose_endpoint(self) + return _request_uri(self, endpoint, "POST", + endpoint .. "/lease/grant", opts) end function _M.revoke(self, id) @@ -898,8 +1013,9 @@ function _M.revoke(self, id) }, } - return _request_uri(self, "POST", - choose_endpoint(self).full_prefix .. "/kv/lease/revoke", opts) + local endpoint= choose_endpoint(self) + return _request_uri(self, endpoint, "POST", + endpoint.full_prefix .. "/kv/lease/revoke", opts) end function _M.keepalive(self, id) @@ -913,8 +1029,9 @@ function _M.keepalive(self, id) }, } - return _request_uri(self, "POST", - choose_endpoint(self).full_prefix .. "/lease/keepalive", opts) + local endpoint= choose_endpoint(self) + return _request_uri(self, endpoint, "POST", + endpoint.full_prefix .. "/lease/keepalive", opts) end function _M.timetolive(self, id, keys) @@ -930,8 +1047,9 @@ function _M.timetolive(self, id, keys) }, } - local res, err = _request_uri(self, "POST", - choose_endpoint(self).full_prefix .. "/kv/lease/timetolive", opts) + local endpoint= choose_endpoint(self) + local res, err = _request_uri(self, endpoint, "POST", + endpoint.full_prefix .. "/kv/lease/timetolive", opts) if res and res.status == 200 then if res.body.keys and tab_nkeys(res.body.keys) > 0 then @@ -945,34 +1063,39 @@ function _M.timetolive(self, id, keys) end function _M.leases(self) - return _request_uri(self, "POST", - choose_endpoint(self).full_prefix .. "/lease/leases") + local endpoint= choose_endpoint(self) + return _request_uri(self, endpoint, "POST", + endpoint.full_prefix .. "/lease/leases") end -- /version function _M.version(self) - return _request_uri(self, "GET", - choose_endpoint(self).http_host .. "/version", + local endpoint= choose_endpoint(self) + return _request_uri(self, endpoint, "GET", + endpoint.http_host .. "/version", nil, self.timeout) end -- /stats function _M.stats_leader(self) - return _request_uri(self, "GET", - choose_endpoint(self).http_host .. "/v2/stats/leader", + local endpoint= choose_endpoint(self) + return _request_uri(self, endpoint, "GET", + endpoint.http_host .. "/v2/stats/leader", nil, self.timeout) end function _M.stats_self(self) - return _request_uri(self, "GET", - choose_endpoint(self).http_host .. "/v2/stats/self", + local endpoint= choose_endpoint(self) + return _request_uri(self, endpoint, "GET", + endpoint.http_host .. "/v2/stats/self", nil, self.timeout) end function _M.stats_store(self) - return _request_uri(self, "GET", - choose_endpoint(self).http_host .. "/v2/stats/store", + local endpoint= choose_endpoint(self) + return _request_uri(self, endpoint, "GET", + endpoint.http_host .. "/v2/stats/store", nil, self.timeout) end From c2cd9e390f5733ed933fba7d6c18faa4e44f4780 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Sun, 6 Dec 2020 23:45:22 +0800 Subject: [PATCH 02/17] fix error --- lib/resty/etcd/v3.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index e3be8803..4743a67d 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -999,7 +999,7 @@ function _M.grant(self, ttl, id) local endpoint= choose_endpoint(self) return _request_uri(self, endpoint, "POST", - endpoint .. "/lease/grant", opts) + endpoint.full_prefix .. "/lease/grant", opts) end function _M.revoke(self, id) From 1d67b9e8a9eb2128ce1c5ddc26a9156b7bfa8d13 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Mon, 7 Dec 2020 01:46:58 +0800 Subject: [PATCH 03/17] add some test cases --- lib/resty/etcd/v3.lua | 4 +- t/v3/health_check.t | 234 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 237 insertions(+), 1 deletion(-) create mode 100644 t/v3/health_check.t diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 4743a67d..f35b8bbe 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -63,6 +63,7 @@ local function restore(disable_duration, endpoint) return end + utils.log_info("restore an endpoint to health: ", endpoint.http_host) endpoint.health_status = 1 end) @@ -73,10 +74,11 @@ end local function report_failure(self, endpoint) + utils.log_info("report an endpoint failure: ", endpoint.http_host) local key = get_counter_key(endpoint.http_host, now(), self.failure_window) local failure_count, err = incr(key, self.shm_name, self.failure_window) if err then - utils.log_error("failed to start timer to restore etcd endpoint to health: ", err) + utils.log_error("failed to incr etcd endpoint fail times: ", err) return end diff --git a/t/v3/health_check.t b/t/v3/health_check.t new file mode 100644 index 00000000..67306002 --- /dev/null +++ b/t/v3/health_check.t @@ -0,0 +1,234 @@ +use Test::Nginx::Socket::Lua; + +log_level('info'); +no_long_string(); +repeat_each(1); +workers(2); + +my $etcd_version = `etcd --version`; +if ($etcd_version =~ /^etcd Version: 2/ || $etcd_version =~ /^etcd Version: 3.1./ || $etcd_version =~ /^etcd Version: 3.2./) { + plan(skip_all => "etcd is too old, skip v3 protocol"); +} else { + my $enable_tls = $ENV{ETCD_ENABLE_TLS}; + if ($enable_tls eq "TRUE") { + plan(skip_all => "skip test cases with auth when TLS is enabled"); + } else { + plan 'no_plan'; + } +} + +our $HttpConfig = <<'_EOC_'; + lua_socket_log_errors off; + lua_package_path 'lib/?.lua;/usr/local/share/lua/5.3/?.lua;/usr/share/lua/5.1/?.lua;;'; + lua_shared_dict etcd_cluster_health_check 8m; +_EOC_ + +run_tests(); + +__DATA__ + +=== TEST 1: disable health check by default +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:12379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + }) + + assert(etcd.shm_name == nil) + assert(etcd.failure_times == nil) + assert(etcd.failure_window == nil) + assert(etcd.disable_duration == nil) + } + } +--- request +GET /t +--- no_error_log +[error] + + + +=== TEST 2: failed enable health check with wrong shm_name +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:12379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + health_check = { + shm_name = "wrong_shm_name", + }, + }) + + ngx.say(err) + } + } +--- request +GET /t +--- no_error_log +[error] +--- response_body +failed to get ngx.shared dict: wrong_shm_name + + + +=== TEST 3: valid default config values +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:12379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + health_check = { + shm_name = "etcd_cluster_health_check", + }, + }) + + assert(etcd.failure_times == 1) + assert(etcd.failure_window == 1) + assert(etcd.disable_duration == 100) + } + } +--- request +GET /t +--- no_error_log +[error] + + + +=== TEST 4: report unhealthy endpoint +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + health_check = { + shm_name = "etcd_cluster_health_check", + }, + }) + + local res, err = etcd:set("/test", { a='abc'}) + } + } +--- request +GET /t +--- error_log eval +qr/report an endpoint failure: http:\/\/127.0.0.1:42379/ + + + +=== TEST 5: restore endpoint to health +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + health_check = { + shm_name = "etcd_cluster_health_check", + disable_duration = 0 + }, + }) + + local res, err = etcd:set("/test", { a='abc'}) + ngx.sleep(0.1) + } + } +--- request +GET /t +--- error_log eval +qr/restore an endpoint to health: http:\/\/127.0.0.1:42379/ + + + +=== TEST 6: one endpoint only trigger mark unhealthy and restore once +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local etcd1, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + health_check = { + shm_name = "etcd_cluster_health_check", + disable_duration = 1 + }, + }) + + etcd1:set("/test", { a='abc'}) + etcd1:set("/test", { a='abc'}) + + local etcd2, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + health_check = { + shm_name = "etcd_cluster_health_check", + disable_duration = 1 + }, + }) + + etcd2:set("/test", { a='abc'}) + etcd2:set("/test", { a='abc'}) + + assert(tostring(etcd1) ~= tostring(etcd2)) + local pending_count = ngx.timer.pending_count() + assert(pending_count == 1) + + ngx.sleep(1.5) + ngx.say("all down") + } + } +--- request +GET /t +--- timeout: 5 +--- response_body +all down From 55621675c7d59d248f1d530fa5d67366d70bd9cc Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Tue, 8 Dec 2020 00:44:15 +0800 Subject: [PATCH 04/17] 1. optimization code 2. add test cases --- lib/resty/etcd/v3.lua | 17 +-------- t/v3/health_check.t | 89 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 87 insertions(+), 19 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index f35b8bbe..ad928968 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -19,9 +19,6 @@ local decode_json = cjson.decode local encode_json = cjson.encode local encode_base64 = ngx.encode_base64 local decode_base64 = ngx.decode_base64 -local tostring = tostring -local string_format = string.format -local math_floor = math.floor local ngx_shared = ngx.shared local ngx_timer_at = ngx.timer.at @@ -33,17 +30,6 @@ local mt = { __index = _M } local refresh_jwt_token -local function get_id(time, window) - return tostring(math_floor(time / window)) -end - - -local function get_counter_key(http_host, time, window) - local id = get_id(time, window) - return string_format("%s.%s.counter", http_host, id) -end - - local function incr(key, shm_name, failure_window) local new_value, err, forcible = ngx_shared[shm_name]:incr(key, 1, 0, failure_window) if err then @@ -75,8 +61,7 @@ end local function report_failure(self, endpoint) utils.log_info("report an endpoint failure: ", endpoint.http_host) - local key = get_counter_key(endpoint.http_host, now(), self.failure_window) - local failure_count, err = incr(key, self.shm_name, self.failure_window) + local failure_count, err = incr(endpoint.http_host, self.shm_name, self.failure_window) if err then utils.log_error("failed to incr etcd endpoint fail times: ", err) return diff --git a/t/v3/health_check.t b/t/v3/health_check.t index 67306002..bae31a14 100644 --- a/t/v3/health_check.t +++ b/t/v3/health_check.t @@ -118,7 +118,90 @@ GET /t -=== TEST 4: report unhealthy endpoint +=== TEST 4: verify `failure_window` works +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + health_check = { + shm_name = "etcd_cluster_health_check", + failure_window = 3, + failure_times = 5, + }, + }) + + etcd:set("/test", { a='abc'}) + local counter = ngx.shared["etcd_cluster_health_check"]:get("http://127.0.0.1:42379") + assert(counter == 1) + ngx.sleep(1) + + etcd:set("/test", { a='abc'}) + counter = ngx.shared["etcd_cluster_health_check"]:get("http://127.0.0.1:42379") + assert(counter == 2) + ngx.sleep(2) + + etcd:set("/test", { a='abc'}) + counter = ngx.shared["etcd_cluster_health_check"]:get("http://127.0.0.1:42379") + assert(counter == 1) + + ngx.say("all down") + } + } +--- request +GET /t +--- timeout: 10 +--- response_body +all down + + + +=== TEST 5: verify `failure_times` works +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + health_check = { + shm_name = "etcd_cluster_health_check", + failure_window = 3, + failure_times = 2, + disable_duration = 0, + }, + }) + + etcd:set("/test", { a='abc'}) + etcd:set("/test", { a='abc'}) + local pending_count = ngx.timer.pending_count() + assert(pending_count == 1) + ngx.say("all down") + } + } +--- request +GET /t +--- timeout: 10 +--- response_body +all down + + + +=== TEST 6: report unhealthy endpoint --- http_config eval: $::HttpConfig --- config location /t { @@ -147,7 +230,7 @@ qr/report an endpoint failure: http:\/\/127.0.0.1:42379/ -=== TEST 5: restore endpoint to health +=== TEST 7: restore endpoint to health --- http_config eval: $::HttpConfig --- config location /t { @@ -178,7 +261,7 @@ qr/restore an endpoint to health: http:\/\/127.0.0.1:42379/ -=== TEST 6: one endpoint only trigger mark unhealthy and restore once +=== TEST 8: one endpoint only trigger mark unhealthy and restore once --- http_config eval: $::HttpConfig --- config location /t { From 8a63a571b2a3341eb87c75ea3c78c5fd7b30cdbf Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Wed, 9 Dec 2020 01:47:42 +0800 Subject: [PATCH 05/17] 1. resolve conflicts first 2. add test cases 3. add doc 4. solve code review --- cluster_health_check.md | 86 +++++++++++++++++++++++++++++++++++++++++ lib/resty/etcd/v3.lua | 85 +++++++++++++++++++++++++++++++++++----- t/v3/health_check.t | 49 +++++++++++++++++++---- 3 files changed, 203 insertions(+), 17 deletions(-) create mode 100644 cluster_health_check.md diff --git a/cluster_health_check.md b/cluster_health_check.md new file mode 100644 index 00000000..8efa3434 --- /dev/null +++ b/cluster_health_check.md @@ -0,0 +1,86 @@ +Etcd Cluster Health Check +======== + +Synopsis +======== + +```nginx +http { + # required declares a shared memory zone to store endpoints's health status + lua_shared_dict healthcheck_shm 1m; + server { + location = /healthcheck { + content_by_lua_block { + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:12379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + # minimal configuration to enable etcd cluster health check + cluster_healthcheck ={ + shm_name = 'healthcheck_shm', + } + }) + } + } + } +} +``` + +Description +======== + +Implement a passive health check mechanism, when the connection/read/write fails occurs, recorded as a endpoint' failure. + +In a certain period of time, if there are n consecutive failures, the endpoint is marked unhealthy, the unhealthy endpoint will not be choosed to connect for a certain period of time in the future. + +Health check mechanism would switch endpoint only when the previously choosed endpoint is marked unhealthy. + +Config +======== + +The default configuration is as follows: + +```lua +health_check = { + shm_name = "healthcheck_shm", + failure_window = 1, + failure_times = 1, + disable_duration = 100 +} +``` + +when use `require "resty.etcd" .new` to create a connection, you can override the default configuration like + +```lua + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:12379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + health_check = { + shm_name = "etcd_cluster_health_check", + failure_window = 3, + failure_times = 2, + disable_duration = 10, + }, + }) +``` + +configurations that are not overridden will use the default configuration. + +- `shm_name` : the shared memory zone for storing the health status of endpoints. +- `failure_window` : the duration of endpoint occurs n consecutive failures(in seconds). +- `failure_times` : the times of failures that occurred before the endpoint was marked as unhealthy. +- `disable_duration` : the duration of the unhealthy endpoint will not be choosed to connect(in seconds). + +### tips +- enable the cluster health check by config the `health_check` diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 29d6331a..fb1c9f3a 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -21,7 +21,6 @@ local encode_json = cjson.encode local encode_base64 = ngx.encode_base64 local decode_base64 = ngx.decode_base64 local semaphore = require("ngx.semaphore") -local INIT_COUNT_RESIZE = 2e8 local ngx_shared = ngx.shared local ngx_timer_at = ngx.timer.at @@ -32,7 +31,53 @@ local mt = { __index = _M } -- define local refresh function variable local refresh_jwt_token -local function _request_uri(self, method, uri, opts, timeout, ignore_auth) +local function incr(key, shm_name, failure_window) + local new_value, err, forcible = ngx_shared[shm_name]:incr(key, 1, 0, failure_window) + if err then + return nil, err + end + + if forcible then + utils.log_warn("shared dict: ", shm_name, " is full, valid items forcibly overwritten") + end + return new_value, nil +end + + +local function restore(disable_duration, endpoint) + local ok, err = ngx_timer_at(disable_duration, function(premature) + if premature then + return + end + + utils.log_info("restore an endpoint to health: ", endpoint.http_host) + endpoint.health_status = 1 + end) + + if not ok then + utils.log_error("failed to start timer to restore etcd endpoint to health: ", err) + end +end + + +local function report_failure(self, endpoint) + utils.log_info("report an endpoint failure: ", endpoint.http_host) + local failure_count, err = incr(endpoint.http_host, self.shm_name, self.failure_window) + if err then + utils.log_error("failed to incr etcd endpoint fail times: ", err) + return + end + + -- only trigger once + if failure_count == self.failure_times then + endpoint.health_status = 0 + restore(self.disable_duration, endpoint) + return + end +end + + +local function _request_uri(self, endpoint, method, uri, opts, timeout, ignore_auth) utils.log_info("v3 request uri: ", uri, ", timeout: ", timeout) local body @@ -61,7 +106,9 @@ local function _request_uri(self, method, uri, opts, timeout, ignore_auth) local http_cli, err = utils.http.new() if err then - report_failure(self, endpoint) + if self.failure_times then + report_failure(self, endpoint) + end return nil, err end @@ -79,7 +126,9 @@ local function _request_uri(self, method, uri, opts, timeout, ignore_auth) }) if err then - report_failure(self, endpoint) + if self.failure_times then + report_failure(self, endpoint) + end return nil, err end @@ -126,7 +175,6 @@ function _M.new(opts) shm_name = opts.health_check.shm_name end - if not typeof.uint(timeout) then return nil, 'opts.timeout must be unsigned integer' end @@ -248,11 +296,24 @@ local function choose_endpoint(self) return endpoints[1] end - for _, endpoint in ipairs(endpoints) do - if endpoint.health_status == 1 then - return endpoint + -- choose endpoint by health check + if self.failure_times then + for _, endpoint in ipairs(endpoints) do + if endpoint.health_status == 1 then + return endpoint + end end + utils.log_error("has no health etcd endpoint") + return nil end + + self.init_count = (self.init_count or 0) + 1 + local pos = self.init_count % endpoints_len + 1 + if self.init_count >= INIT_COUNT_RESIZE then + self.init_count = 0 + end + + return endpoints[pos] end @@ -592,7 +653,9 @@ local function request_chunk(self, endpoint, method, scheme, host, port, path, o ok, err = http_cli:connect(host, port) if not ok then - report_failure(self, endpoint) + if self.failure_times then + report_failure(self, endpoint) + end return nil, err end @@ -645,7 +708,9 @@ local function request_chunk(self, endpoint, method, scheme, host, port, path, o if not body then return nil, "failed to decode json body: " .. (err or " unkwon") elseif body.error and body.error.http_code >= 500 then - report_failure(self, endpoint) + if self.failure_times then + report_failure(self, endpoint) + end end if body.result.events then diff --git a/t/v3/health_check.t b/t/v3/health_check.t index bae31a14..0361dafb 100644 --- a/t/v3/health_check.t +++ b/t/v3/health_check.t @@ -47,16 +47,51 @@ __DATA__ assert(etcd.failure_times == nil) assert(etcd.failure_window == nil) assert(etcd.disable_duration == nil) + + local res, err = etcd:set("/health_check", "disabled") + res, err = etcd:get("/health_check") + ngx.say(res.body.kvs[1].value) } } --- request GET /t --- no_error_log [error] +--- response_body +disabled + + + +=== TEST 2: compatible etcd failure without health check +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + }) + + local res, err = etcd:set("/test", { a='abc'}) + ngx.say(err) + } + } +--- request +GET /t +--- no_error_log +[error] +--- response_body +connection refused -=== TEST 2: failed enable health check with wrong shm_name +=== TEST 3: failed enable health check with wrong shm_name --- http_config eval: $::HttpConfig --- config location /t { @@ -87,7 +122,7 @@ failed to get ngx.shared dict: wrong_shm_name -=== TEST 3: valid default config values +=== TEST 4: valid default config values --- http_config eval: $::HttpConfig --- config location /t { @@ -118,7 +153,7 @@ GET /t -=== TEST 4: verify `failure_window` works +=== TEST 5: verify `failure_window` works --- http_config eval: $::HttpConfig --- config location /t { @@ -164,7 +199,7 @@ all down -=== TEST 5: verify `failure_times` works +=== TEST 6: verify `failure_times` works --- http_config eval: $::HttpConfig --- config location /t { @@ -201,7 +236,7 @@ all down -=== TEST 6: report unhealthy endpoint +=== TEST 7: report unhealthy endpoint --- http_config eval: $::HttpConfig --- config location /t { @@ -230,7 +265,7 @@ qr/report an endpoint failure: http:\/\/127.0.0.1:42379/ -=== TEST 7: restore endpoint to health +=== TEST 8: restore endpoint to health --- http_config eval: $::HttpConfig --- config location /t { @@ -261,7 +296,7 @@ qr/restore an endpoint to health: http:\/\/127.0.0.1:42379/ -=== TEST 8: one endpoint only trigger mark unhealthy and restore once +=== TEST 9: one endpoint only trigger mark unhealthy and restore once --- http_config eval: $::HttpConfig --- config location /t { From a20959026ec8f8491117d743d108fd62d578d8e3 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Wed, 9 Dec 2020 09:37:04 +0800 Subject: [PATCH 06/17] solve CI error --- lib/resty/etcd/v3.lua | 4 ++-- t/v3/health_check.t | 43 +++++++------------------------------------ 2 files changed, 9 insertions(+), 38 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index fb1c9f3a..a499837a 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -23,6 +23,7 @@ local decode_base64 = ngx.decode_base64 local semaphore = require("ngx.semaphore") local ngx_shared = ngx.shared local ngx_timer_at = ngx.timer.at +local INIT_COUNT_RESIZE = 2e8 local _M = {} @@ -284,8 +285,7 @@ function _M.new(opts) endpoints = endpoints, key_prefix = key_prefix, ssl_verify = ssl_verify, - }, - mt) + }, mt) end diff --git a/t/v3/health_check.t b/t/v3/health_check.t index 0361dafb..e544cda4 100644 --- a/t/v3/health_check.t +++ b/t/v3/health_check.t @@ -62,36 +62,7 @@ disabled -=== TEST 2: compatible etcd failure without health check ---- http_config eval: $::HttpConfig ---- config - location /t { - content_by_lua_block { - local etcd, err = require "resty.etcd" .new({ - protocol = "v3", - http_host = { - "http://127.0.0.1:42379", - "http://127.0.0.1:22379", - "http://127.0.0.1:32379", - }, - user = 'root', - password = 'abc123', - }) - - local res, err = etcd:set("/test", { a='abc'}) - ngx.say(err) - } - } ---- request -GET /t ---- no_error_log -[error] ---- response_body -connection refused - - - -=== TEST 3: failed enable health check with wrong shm_name +=== TEST 2: failed enable health check with wrong shm_name --- http_config eval: $::HttpConfig --- config location /t { @@ -122,7 +93,7 @@ failed to get ngx.shared dict: wrong_shm_name -=== TEST 4: valid default config values +=== TEST 3: valid default config values --- http_config eval: $::HttpConfig --- config location /t { @@ -153,7 +124,7 @@ GET /t -=== TEST 5: verify `failure_window` works +=== TEST 4: verify `failure_window` works --- http_config eval: $::HttpConfig --- config location /t { @@ -199,7 +170,7 @@ all down -=== TEST 6: verify `failure_times` works +=== TEST 5: verify `failure_times` works --- http_config eval: $::HttpConfig --- config location /t { @@ -236,7 +207,7 @@ all down -=== TEST 7: report unhealthy endpoint +=== TEST 6: report unhealthy endpoint --- http_config eval: $::HttpConfig --- config location /t { @@ -265,7 +236,7 @@ qr/report an endpoint failure: http:\/\/127.0.0.1:42379/ -=== TEST 8: restore endpoint to health +=== TEST 7: restore endpoint to health --- http_config eval: $::HttpConfig --- config location /t { @@ -296,7 +267,7 @@ qr/restore an endpoint to health: http:\/\/127.0.0.1:42379/ -=== TEST 9: one endpoint only trigger mark unhealthy and restore once +=== TEST 8: one endpoint only trigger mark unhealthy and restore once --- http_config eval: $::HttpConfig --- config location /t { From bc141bb6737a4e0f5ab92f7df3efa135588d7c8a Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Wed, 9 Dec 2020 10:00:48 +0800 Subject: [PATCH 07/17] update doc --- cluster_health_check.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cluster_health_check.md b/cluster_health_check.md index 8efa3434..ae17c3a6 100644 --- a/cluster_health_check.md +++ b/cluster_health_check.md @@ -36,9 +36,9 @@ Description Implement a passive health check mechanism, when the connection/read/write fails occurs, recorded as a endpoint' failure. -In a certain period of time, if there are n consecutive failures, the endpoint is marked unhealthy, the unhealthy endpoint will not be choosed to connect for a certain period of time in the future. +In a `failure_window`, if there are `failure_times` consecutive failures, the endpoint is marked as unhealthy, the unhealthy endpoint will not be choosed to connect for a `disable_duration` time in the future. -Health check mechanism would switch endpoint only when the previously choosed endpoint is marked unhealthy. +Health check mechanism would switch endpoint only when the previously choosed endpoint is marked as unhealthy. Config ======== @@ -77,7 +77,7 @@ when use `require "resty.etcd" .new` to create a connection, you can override th configurations that are not overridden will use the default configuration. -- `shm_name` : the shared memory zone for storing the health status of endpoints. +- `shm_name` : the declarative `lua_shared_dict` is used to store the health status of endpoints. - `failure_window` : the duration of endpoint occurs n consecutive failures(in seconds). - `failure_times` : the times of failures that occurred before the endpoint was marked as unhealthy. - `disable_duration` : the duration of the unhealthy endpoint will not be choosed to connect(in seconds). From 1837faba9eebce76b2559373a850bb7b9ce02b5c Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Wed, 9 Dec 2020 11:26:23 +0800 Subject: [PATCH 08/17] solve code review --- cluster_health_check.md | 9 +++++---- lib/resty/etcd/v3.lua | 35 +++++++++-------------------------- 2 files changed, 14 insertions(+), 30 deletions(-) diff --git a/cluster_health_check.md b/cluster_health_check.md index ae17c3a6..a2096963 100644 --- a/cluster_health_check.md +++ b/cluster_health_check.md @@ -8,6 +8,7 @@ Synopsis http { # required declares a shared memory zone to store endpoints's health status lua_shared_dict healthcheck_shm 1m; + server { location = /healthcheck { content_by_lua_block { @@ -67,10 +68,10 @@ when use `require "resty.etcd" .new` to create a connection, you can override th user = 'root', password = 'abc123', health_check = { - shm_name = "etcd_cluster_health_check", - failure_window = 3, - failure_times = 2, - disable_duration = 10, + shm_name = "etcd_cluster_health_check", + failure_window = 3, + failure_times = 2, + disable_duration = 10, }, }) ``` diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index a499837a..19bcb840 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -234,12 +234,12 @@ function _M.new(opts) end tab_insert(endpoints, { - full_prefix = host .. utils.normalize(api_prefix), - http_host = host, - scheme = m[1], - host = m[2] or "127.0.0.1", - port = m[3] or "2379", - api_prefix = api_prefix, + full_prefix = host .. utils.normalize(api_prefix), + http_host = host, + scheme = m[1], + host = m[2] or "127.0.0.1", + port = m[3] or "2379", + api_prefix = api_prefix, health_status = 1, }) end @@ -249,8 +249,7 @@ function _M.new(opts) return nil, err end - if opts.health_check then - return setmetatable({ + return setmetatable({ last_auth_time = now(), -- save last Authentication time last_refresh_jwt_err = nil, sema = sema, @@ -268,24 +267,8 @@ function _M.new(opts) failure_times = failure_times, disable_duration = disable_duration, shm_name = shm_name, - }, mt) - end - - return setmetatable({ - last_auth_time = now(), -- save last Authentication time - last_refresh_jwt_err = nil, - sema = sema, - jwt_token = nil, -- last Authentication token - is_auth = not not (user and password), - user = user, - password = password, - timeout = timeout, - ttl = ttl, - is_cluster = #endpoints > 1, - endpoints = endpoints, - key_prefix = key_prefix, - ssl_verify = ssl_verify, - }, mt) + }, + mt) end From ef6ce28ac7b44b5a461790b3ff5cef7ed9fc9620 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Wed, 9 Dec 2020 11:33:51 +0800 Subject: [PATCH 09/17] keep etcd.lua code style --- lib/resty/etcd.lua | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/resty/etcd.lua b/lib/resty/etcd.lua index 63721b2f..17dfac6f 100644 --- a/lib/resty/etcd.lua +++ b/lib/resty/etcd.lua @@ -1,9 +1,9 @@ -local etcdv2 = require("resty.etcd.v2") -local etcdv3 = require("resty.etcd.v3") -local utils = require("resty.etcd.utils") -local typeof = require("typeof") -local require = require -local pcall = pcall +local etcdv2 = require("resty.etcd.v2") +local etcdv3 = require("resty.etcd.v3") +local utils = require("resty.etcd.utils") +local typeof = require("typeof") +local require = require +local pcall = pcall local ngx_shared = ngx.shared local prefix_v3 = { ["3.5."] = "/v3", From a1ed237f07e6dc63a13ec44be41f4adeee1d428a Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Wed, 9 Dec 2020 12:43:32 +0800 Subject: [PATCH 10/17] optimized code style --- lib/resty/etcd/v3.lua | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 19bcb840..4ca96b80 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -341,7 +341,7 @@ function refresh_jwt_token(self, timeout) password = self.password, } } - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) local res, err = _request_uri(self, endpoint, 'POST', endpoint.full_prefix .. "/auth/authenticate", opts, timeout, true) @@ -415,7 +415,7 @@ local function set(self, key, val, attr) } } - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) local res res, err = _request_uri(self, endpoint, 'POST', endpoint.full_prefix .. "/kv/put", @@ -523,7 +523,7 @@ local function get(self, key, attr) } } - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) local res res, err = _request_uri(self, endpoint, "POST", endpoint.full_prefix .. "/kv/range", @@ -565,7 +565,7 @@ local function delete(self, key, attr) }, } - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", endpoint.full_prefix .. "/kv/deleterange", opts, self.timeout) @@ -589,7 +589,7 @@ local function txn(self, opts_arg, compare, success, failure) }, } - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", endpoint.full_prefix .. "/kv/txn", opts, timeout or self.timeout) @@ -1034,7 +1034,7 @@ function _M.grant(self, ttl, id) }, } - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", endpoint.full_prefix .. "/lease/grant", opts) end @@ -1050,7 +1050,7 @@ function _M.revoke(self, id) }, } - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", endpoint.full_prefix .. "/kv/lease/revoke", opts) end @@ -1066,7 +1066,7 @@ function _M.keepalive(self, id) }, } - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", endpoint.full_prefix .. "/lease/keepalive", opts) end @@ -1084,7 +1084,7 @@ function _M.timetolive(self, id, keys) }, } - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) local res, err = _request_uri(self, endpoint, "POST", endpoint.full_prefix .. "/kv/lease/timetolive", opts) @@ -1100,7 +1100,7 @@ function _M.timetolive(self, id, keys) end function _M.leases(self) - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", endpoint.full_prefix .. "/lease/leases") end @@ -1108,7 +1108,7 @@ end -- /version function _M.version(self) - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "GET", endpoint.http_host .. "/version", nil, self.timeout) @@ -1116,21 +1116,21 @@ end -- /stats function _M.stats_leader(self) - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "GET", endpoint.http_host .. "/v2/stats/leader", nil, self.timeout) end function _M.stats_self(self) - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "GET", endpoint.http_host .. "/v2/stats/self", nil, self.timeout) end function _M.stats_store(self) - local endpoint= choose_endpoint(self) + local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "GET", endpoint.http_host .. "/v2/stats/store", nil, self.timeout) From 8c5f8e24861ed068ae9f1c99c7b0ef790eee3742 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Wed, 9 Dec 2020 22:35:32 +0800 Subject: [PATCH 11/17] save --- cluster_health_check.md | 31 ++++++++++++++++--------------- lib/resty/etcd.lua | 4 ++-- lib/resty/etcd/utils.lua | 2 +- lib/resty/etcd/v3.lua | 38 +++++++++++++++++++------------------- t/v3/health_check.t | 20 ++++++++++---------- 5 files changed, 48 insertions(+), 47 deletions(-) diff --git a/cluster_health_check.md b/cluster_health_check.md index a2096963..2bf94071 100644 --- a/cluster_health_check.md +++ b/cluster_health_check.md @@ -21,9 +21,13 @@ http { }, user = 'root', password = 'abc123', - # minimal configuration to enable etcd cluster health check - cluster_healthcheck ={ + + # the health check feature is optional, and can be enabled with the following configuration. + health_check = { shm_name = 'healthcheck_shm', + fail_timeout = 1, + max_fails = 1, + disable_duration = 100 } }) } @@ -35,9 +39,9 @@ http { Description ======== -Implement a passive health check mechanism, when the connection/read/write fails occurs, recorded as a endpoint' failure. +Implement a passive health check mechanism, when the connection/read/write fails occurs, recorded as an endpoint' failure. -In a `failure_window`, if there are `failure_times` consecutive failures, the endpoint is marked as unhealthy, the unhealthy endpoint will not be choosed to connect for a `disable_duration` time in the future. +In a `fail_timeout`, if there are `max_fails` consecutive failures, the endpoint is marked as unhealthy, the unhealthy endpoint will not be choosed to connect for a `disable_duration` time in the future. Health check mechanism would switch endpoint only when the previously choosed endpoint is marked as unhealthy. @@ -49,8 +53,8 @@ The default configuration is as follows: ```lua health_check = { shm_name = "healthcheck_shm", - failure_window = 1, - failure_times = 1, + fail_timeout = 1, + max_fails = 1, disable_duration = 100 } ``` @@ -69,8 +73,8 @@ when use `require "resty.etcd" .new` to create a connection, you can override th password = 'abc123', health_check = { shm_name = "etcd_cluster_health_check", - failure_window = 3, - failure_times = 2, + fail_timeout = 3, + max_fails = 2, disable_duration = 10, }, }) @@ -78,10 +82,7 @@ when use `require "resty.etcd" .new` to create a connection, you can override th configurations that are not overridden will use the default configuration. -- `shm_name` : the declarative `lua_shared_dict` is used to store the health status of endpoints. -- `failure_window` : the duration of endpoint occurs n consecutive failures(in seconds). -- `failure_times` : the times of failures that occurred before the endpoint was marked as unhealthy. -- `disable_duration` : the duration of the unhealthy endpoint will not be choosed to connect(in seconds). - -### tips -- enable the cluster health check by config the `health_check` +- `shm_name`: the declarative `lua_shared_dict` is used to store the health status of endpoints. +- `fail_timeout`: set the time during which a number of failed attempts must happen for the endpoint to be marked unavailable(in seconds). +- `max_fails`: set the number of failed attempts that must occur during the `fail_timeout` period for the endpoint to be marked unavailable +- `disable_duration`: the time for which the unhealthy endpoint won't be choosed to connect(in seconds). diff --git a/lib/resty/etcd.lua b/lib/resty/etcd.lua index 17dfac6f..5946af44 100644 --- a/lib/resty/etcd.lua +++ b/lib/resty/etcd.lua @@ -54,8 +54,8 @@ function _M.new(opts) return nil, "failed to get ngx.shared dict: " .. opts.health_check.shm_name end - opts.health_check.failure_window = opts.health_check.failure_window or 1 - opts.health_check.failure_times = opts.health_check.failure_times or 1 + opts.health_check.fail_timeout = opts.health_check.fail_timeout or 1 + opts.health_check.max_fails = opts.health_check.max_fails or 1 opts.health_check.disable_duration = opts.health_check.disable_duration or 100 end diff --git a/lib/resty/etcd/utils.lua b/lib/resty/etcd/utils.lua index a6acc1d9..3272d998 100644 --- a/lib/resty/etcd/utils.lua +++ b/lib/resty/etcd/utils.lua @@ -84,7 +84,7 @@ end local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_INFO = ngx.INFO -local ngx_WARN= ngx.WARN +local ngx_WARN = ngx.WARN local function log_error(...) return ngx_log(ngx_ERR, ...) end diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 4ca96b80..1f688d6f 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -32,8 +32,8 @@ local mt = { __index = _M } -- define local refresh function variable local refresh_jwt_token -local function incr(key, shm_name, failure_window) - local new_value, err, forcible = ngx_shared[shm_name]:incr(key, 1, 0, failure_window) +local function count(key, shm_name, fail_timeout) + local new_value, err, forcible = ngx_shared[shm_name]:incr(key, 1, 0, fail_timeout) if err then return nil, err end @@ -63,14 +63,14 @@ end local function report_failure(self, endpoint) utils.log_info("report an endpoint failure: ", endpoint.http_host) - local failure_count, err = incr(endpoint.http_host, self.shm_name, self.failure_window) + local failure_count, err = count(endpoint.http_host, self.shm_name, self.fail_timeout) if err then utils.log_error("failed to incr etcd endpoint fail times: ", err) return end -- only trigger once - if failure_count == self.failure_times then + if failure_count == self.max_fails then endpoint.health_status = 0 restore(self.disable_duration, endpoint) return @@ -107,7 +107,7 @@ local function _request_uri(self, endpoint, method, uri, opts, timeout, ignore_a local http_cli, err = utils.http.new() if err then - if self.failure_times then + if self.max_fails then report_failure(self, endpoint) end return nil, err @@ -127,7 +127,7 @@ local function _request_uri(self, endpoint, method, uri, opts, timeout, ignore_a }) if err then - if self.failure_times then + if self.max_fails then report_failure(self, endpoint) end return nil, err @@ -165,13 +165,13 @@ function _M.new(opts) local user = opts.user local password = opts.password local ssl_verify = opts.ssl_verify - local failure_window - local failure_times + local fail_timeout + local max_fails local disable_duration local shm_name if opts.health_check then - failure_window = opts.health_check.failure_window - failure_times = opts.health_check.failure_times + fail_timeout = opts.health_check.fail_timeout + max_fails = opts.health_check.max_fails disable_duration = opts.health_check.disable_duration shm_name = opts.health_check.shm_name end @@ -205,12 +205,12 @@ function _M.new(opts) end if opts.health_check then - if failure_window and not typeof.uint(failure_window) then - return nil, 'opts.health_check.failure_window must be unsigned integer or ignore' + if fail_timeout and not typeof.uint(fail_timeout) then + return nil, 'opts.health_check.fail_timeout must be unsigned integer or ignore' end - if failure_times and not typeof.uint(failure_times) then - return nil, 'opts.health_check.failure_times must be unsigned integer or ignore' + if max_fails and not typeof.uint(max_fails) then + return nil, 'opts.health_check.max_fails must be unsigned integer or ignore' end if disable_duration and not typeof.uint(disable_duration) then @@ -263,8 +263,8 @@ function _M.new(opts) endpoints = endpoints, key_prefix = key_prefix, ssl_verify = ssl_verify, - failure_window = failure_window, - failure_times = failure_times, + fail_timeout = fail_timeout, + max_fails = max_fails, disable_duration = disable_duration, shm_name = shm_name, }, @@ -280,7 +280,7 @@ local function choose_endpoint(self) end -- choose endpoint by health check - if self.failure_times then + if self.max_fails then for _, endpoint in ipairs(endpoints) do if endpoint.health_status == 1 then return endpoint @@ -636,7 +636,7 @@ local function request_chunk(self, endpoint, method, scheme, host, port, path, o ok, err = http_cli:connect(host, port) if not ok then - if self.failure_times then + if self.max_fails then report_failure(self, endpoint) end return nil, err @@ -691,7 +691,7 @@ local function request_chunk(self, endpoint, method, scheme, host, port, path, o if not body then return nil, "failed to decode json body: " .. (err or " unkwon") elseif body.error and body.error.http_code >= 500 then - if self.failure_times then + if self.max_fails then report_failure(self, endpoint) end end diff --git a/t/v3/health_check.t b/t/v3/health_check.t index e544cda4..885bdd3b 100644 --- a/t/v3/health_check.t +++ b/t/v3/health_check.t @@ -44,8 +44,8 @@ __DATA__ }) assert(etcd.shm_name == nil) - assert(etcd.failure_times == nil) - assert(etcd.failure_window == nil) + assert(etcd.max_fails == nil) + assert(etcd.fail_timeout == nil) assert(etcd.disable_duration == nil) local res, err = etcd:set("/health_check", "disabled") @@ -112,8 +112,8 @@ failed to get ngx.shared dict: wrong_shm_name }, }) - assert(etcd.failure_times == 1) - assert(etcd.failure_window == 1) + assert(etcd.max_fails == 1) + assert(etcd.fail_timeout == 1) assert(etcd.disable_duration == 100) } } @@ -124,7 +124,7 @@ GET /t -=== TEST 4: verify `failure_window` works +=== TEST 4: verify `fail_timeout` works --- http_config eval: $::HttpConfig --- config location /t { @@ -140,8 +140,8 @@ GET /t password = 'abc123', health_check = { shm_name = "etcd_cluster_health_check", - failure_window = 3, - failure_times = 5, + fail_timeout = 3, + max_fails = 5, }, }) @@ -170,7 +170,7 @@ all down -=== TEST 5: verify `failure_times` works +=== TEST 5: verify `max_fails` works --- http_config eval: $::HttpConfig --- config location /t { @@ -186,8 +186,8 @@ all down password = 'abc123', health_check = { shm_name = "etcd_cluster_health_check", - failure_window = 3, - failure_times = 2, + fail_timeout = 3, + max_fails = 2, disable_duration = 0, }, }) From 7c0fdc01a7aa383171eb3b4daa6d1bff67a66144 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Wed, 9 Dec 2020 23:28:29 +0800 Subject: [PATCH 12/17] solve code style --- lib/resty/etcd/v3.lua | 49 +++++++++++++++++++------------------------ 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 1f688d6f..1a80ea9f 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -165,16 +165,11 @@ function _M.new(opts) local user = opts.user local password = opts.password local ssl_verify = opts.ssl_verify - local fail_timeout - local max_fails - local disable_duration - local shm_name - if opts.health_check then - fail_timeout = opts.health_check.fail_timeout - max_fails = opts.health_check.max_fails - disable_duration = opts.health_check.disable_duration - shm_name = opts.health_check.shm_name - end + local health_check = opts.health_check or {} + local fail_timeout = health_check.fail_timeout + local max_fails = health_check.max_fails + local disable_duration = health_check.disable_duration + local shm_name = health_check.shm_name if not typeof.uint(timeout) then return nil, 'opts.timeout must be unsigned integer' @@ -343,7 +338,7 @@ function refresh_jwt_token(self, timeout) } local endpoint = choose_endpoint(self) local res, err = _request_uri(self, endpoint, 'POST', - endpoint.full_prefix .. "/auth/authenticate", + endpoint.full_prefix .. "/auth/authenticate", opts, timeout, true) self.requesting_token = false @@ -418,8 +413,8 @@ local function set(self, key, val, attr) local endpoint = choose_endpoint(self) local res res, err = _request_uri(self, endpoint, 'POST', - endpoint.full_prefix .. "/kv/put", - opts, self.timeout) + endpoint.full_prefix .. "/kv/put", + opts, self.timeout) if err then return nil, err end @@ -526,8 +521,8 @@ local function get(self, key, attr) local endpoint = choose_endpoint(self) local res res, err = _request_uri(self, endpoint, "POST", - endpoint.full_prefix .. "/kv/range", - opts, attr and attr.timeout or self.timeout) + endpoint.full_prefix .. "/kv/range", + opts, attr and attr.timeout or self.timeout) if res and res.status == 200 then if res.body.kvs and tab_nkeys(res.body.kvs) > 0 then @@ -567,8 +562,8 @@ local function delete(self, key, attr) local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", - endpoint.full_prefix .. "/kv/deleterange", - opts, self.timeout) + endpoint.full_prefix .. "/kv/deleterange", + opts, self.timeout) end local function txn(self, opts_arg, compare, success, failure) @@ -591,7 +586,7 @@ local function txn(self, opts_arg, compare, success, failure) local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", - endpoint.full_prefix .. "/kv/txn", + endpoint.full_prefix .. "/kv/txn", opts, timeout or self.timeout) end @@ -1036,7 +1031,7 @@ function _M.grant(self, ttl, id) local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", - endpoint.full_prefix .. "/lease/grant", opts) + endpoint.full_prefix .. "/lease/grant", opts) end function _M.revoke(self, id) @@ -1052,7 +1047,7 @@ function _M.revoke(self, id) local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", - endpoint.full_prefix .. "/kv/lease/revoke", opts) + endpoint.full_prefix .. "/kv/lease/revoke", opts) end function _M.keepalive(self, id) @@ -1068,7 +1063,7 @@ function _M.keepalive(self, id) local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", - endpoint.full_prefix .. "/lease/keepalive", opts) + endpoint.full_prefix .. "/lease/keepalive", opts) end function _M.timetolive(self, id, keys) @@ -1086,7 +1081,7 @@ function _M.timetolive(self, id, keys) local endpoint = choose_endpoint(self) local res, err = _request_uri(self, endpoint, "POST", - endpoint.full_prefix .. "/kv/lease/timetolive", opts) + endpoint.full_prefix .. "/kv/lease/timetolive", opts) if res and res.status == 200 then if res.body.keys and tab_nkeys(res.body.keys) > 0 then @@ -1102,7 +1097,7 @@ end function _M.leases(self) local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "POST", - endpoint.full_prefix .. "/lease/leases") + endpoint.full_prefix .. "/lease/leases") end @@ -1110,7 +1105,7 @@ end function _M.version(self) local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "GET", - endpoint.http_host .. "/version", + endpoint.http_host .. "/version", nil, self.timeout) end @@ -1118,21 +1113,21 @@ end function _M.stats_leader(self) local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "GET", - endpoint.http_host .. "/v2/stats/leader", + endpoint.http_host .. "/v2/stats/leader", nil, self.timeout) end function _M.stats_self(self) local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "GET", - endpoint.http_host .. "/v2/stats/self", + endpoint.http_host .. "/v2/stats/self", nil, self.timeout) end function _M.stats_store(self) local endpoint = choose_endpoint(self) return _request_uri(self, endpoint, "GET", - endpoint.http_host .. "/v2/stats/store", + endpoint.http_host .. "/v2/stats/store", nil, self.timeout) end From 818057fd010d1d6d7d08d021f74b22afd2d164b0 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 10 Dec 2020 00:00:00 +0800 Subject: [PATCH 13/17] save --- lib/resty/etcd/v3.lua | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 1a80ea9f..1fbb5423 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -32,7 +32,7 @@ local mt = { __index = _M } -- define local refresh function variable local refresh_jwt_token -local function count(key, shm_name, fail_timeout) +local function fault_count(key, shm_name, fail_timeout) local new_value, err, forcible = ngx_shared[shm_name]:incr(key, 1, 0, fail_timeout) if err then return nil, err @@ -61,16 +61,16 @@ local function restore(disable_duration, endpoint) end -local function report_failure(self, endpoint) +local function report_fault(self, endpoint) utils.log_info("report an endpoint failure: ", endpoint.http_host) - local failure_count, err = count(endpoint.http_host, self.shm_name, self.fail_timeout) + local fails, err = fault_count(self, endpoint.http_host, self.shm_name, self.fail_timeout) if err then utils.log_error("failed to incr etcd endpoint fail times: ", err) return end -- only trigger once - if failure_count == self.max_fails then + if fails == self.max_fails then endpoint.health_status = 0 restore(self.disable_duration, endpoint) return @@ -108,7 +108,7 @@ local function _request_uri(self, endpoint, method, uri, opts, timeout, ignore_a local http_cli, err = utils.http.new() if err then if self.max_fails then - report_failure(self, endpoint) + report_fault(self, endpoint) end return nil, err end @@ -128,7 +128,7 @@ local function _request_uri(self, endpoint, method, uri, opts, timeout, ignore_a if err then if self.max_fails then - report_failure(self, endpoint) + report_fault(self, endpoint) end return nil, err end @@ -274,7 +274,7 @@ local function choose_endpoint(self) return endpoints[1] end - -- choose endpoint by health check + -- if enable health check if self.max_fails then for _, endpoint in ipairs(endpoints) do if endpoint.health_status == 1 then @@ -632,7 +632,7 @@ local function request_chunk(self, endpoint, method, scheme, host, port, path, o ok, err = http_cli:connect(host, port) if not ok then if self.max_fails then - report_failure(self, endpoint) + report_fault(self, endpoint) end return nil, err end @@ -687,7 +687,7 @@ local function request_chunk(self, endpoint, method, scheme, host, port, path, o return nil, "failed to decode json body: " .. (err or " unkwon") elseif body.error and body.error.http_code >= 500 then if self.max_fails then - report_failure(self, endpoint) + report_fault(self, endpoint) end end From 7b2abb039ef5651cd93eea23d4a70713b595620c Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 10 Dec 2020 15:00:15 +0800 Subject: [PATCH 14/17] revert --- lib/resty/etcd/v3.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 1fbb5423..3c587918 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -63,7 +63,7 @@ end local function report_fault(self, endpoint) utils.log_info("report an endpoint failure: ", endpoint.http_host) - local fails, err = fault_count(self, endpoint.http_host, self.shm_name, self.fail_timeout) + local fails, err = fault_count(endpoint.http_host, self.shm_name, self.fail_timeout) if err then utils.log_error("failed to incr etcd endpoint fail times: ", err) return From 7790f9614af6309c2a7ac09646655c1d65abb2c0 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 10 Dec 2020 15:13:44 +0800 Subject: [PATCH 15/17] add test case --- t/v3/health_check.t | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/t/v3/health_check.t b/t/v3/health_check.t index 885bdd3b..198825c8 100644 --- a/t/v3/health_check.t +++ b/t/v3/health_check.t @@ -321,3 +321,36 @@ GET /t --- timeout: 5 --- response_body all down + + + +=== TEST 9: no healthy endpoints when enable health check +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:12379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + user = 'root', + password = 'abc123', + health_check = { + shm_name = "etcd_cluster_health_check", + }, + }) + + for _, endpoint in ipairs(etcd.endpoints) do + endpoint.health_status = 0 + end + local res, err = etcd:set("/test", { a='abc'}) + } + } +--- request +GET /t +--- error_code: 500 +--- error_log eval +qr/has no health etcd endpoint/ From 7a65f694fc2ab36c20dd676d6185ef35cd14a657 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Sun, 13 Dec 2020 17:45:45 +0800 Subject: [PATCH 16/17] fails count shared in worker, store in lua_shared_dict unhealthy endpoint trigger by different etcd client configurations --- lib/resty/etcd/v3.lua | 6 +++++- t/v3/health_check.t | 49 ++++++++++++++++++++++++------------------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 3c587918..c5db9366 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -23,6 +23,7 @@ local decode_base64 = ngx.decode_base64 local semaphore = require("ngx.semaphore") local ngx_shared = ngx.shared local ngx_timer_at = ngx.timer.at +local worker_id = ngx.worker.id local INIT_COUNT_RESIZE = 2e8 local _M = {} @@ -31,6 +32,7 @@ local mt = { __index = _M } -- define local refresh function variable local refresh_jwt_token +local fails local function fault_count(key, shm_name, fail_timeout) local new_value, err, forcible = ngx_shared[shm_name]:incr(key, 1, 0, fail_timeout) @@ -63,7 +65,9 @@ end local function report_fault(self, endpoint) utils.log_info("report an endpoint failure: ", endpoint.http_host) - local fails, err = fault_count(endpoint.http_host, self.shm_name, self.fail_timeout) + local key = worker_id() .. "-" .. endpoint.http_host + local err + fails, err = fault_count(key, self.shm_name, self.fail_timeout) if err then utils.log_error("failed to incr etcd endpoint fail times: ", err) return diff --git a/t/v3/health_check.t b/t/v3/health_check.t index 198825c8..51abd9a4 100644 --- a/t/v3/health_check.t +++ b/t/v3/health_check.t @@ -145,18 +145,19 @@ GET /t }, }) - etcd:set("/test", { a='abc'}) - local counter = ngx.shared["etcd_cluster_health_check"]:get("http://127.0.0.1:42379") + local key = ngx.worker.id() .. "-" .. "http://127.0.0.1:42379" + etcd:set("/fail_timeout", "works") + local counter = ngx.shared["etcd_cluster_health_check"]:get(key) assert(counter == 1) ngx.sleep(1) - etcd:set("/test", { a='abc'}) - counter = ngx.shared["etcd_cluster_health_check"]:get("http://127.0.0.1:42379") + etcd:set("/fail_timeout", "works") + counter = ngx.shared["etcd_cluster_health_check"]:get(key) assert(counter == 2) ngx.sleep(2) - etcd:set("/test", { a='abc'}) - counter = ngx.shared["etcd_cluster_health_check"]:get("http://127.0.0.1:42379") + etcd:set("/fail_timeout", "works") + counter = ngx.shared["etcd_cluster_health_check"]:get(key) assert(counter == 1) ngx.say("all down") @@ -192,8 +193,8 @@ all down }, }) - etcd:set("/test", { a='abc'}) - etcd:set("/test", { a='abc'}) + etcd:set("/max_fails", "works") + etcd:set("/max_fails", "works") local pending_count = ngx.timer.pending_count() assert(pending_count == 1) ngx.say("all down") @@ -226,7 +227,7 @@ all down }, }) - local res, err = etcd:set("/test", { a='abc'}) + local res, err = etcd:set("/report", "unhealthy") } } --- request @@ -256,7 +257,7 @@ qr/report an endpoint failure: http:\/\/127.0.0.1:42379/ }, }) - local res, err = etcd:set("/test", { a='abc'}) + local res, err = etcd:set("/restore", "unhealthy") ngx.sleep(0.1) } } @@ -267,7 +268,7 @@ qr/restore an endpoint to health: http:\/\/127.0.0.1:42379/ -=== TEST 8: one endpoint only trigger mark unhealthy and restore once +=== TEST 8: endpoint fails shared by Lua VM, trigger by different etcd client configurations --- http_config eval: $::HttpConfig --- config location /t { @@ -283,12 +284,14 @@ qr/restore an endpoint to health: http:\/\/127.0.0.1:42379/ password = 'abc123', health_check = { shm_name = "etcd_cluster_health_check", - disable_duration = 1 + max_fails = 3, + fail_timeout = 3, + disable_duration = 10, }, }) - - etcd1:set("/test", { a='abc'}) - etcd1:set("/test", { a='abc'}) + etcd1:set("/shared_in_worker", "etcd1") + etcd1:set("/shared_in_worker", "etcd1") + etcd1:set("/shared_in_worker", "etcd1") local etcd2, err = require "resty.etcd" .new({ protocol = "v3", @@ -301,18 +304,22 @@ qr/restore an endpoint to health: http:\/\/127.0.0.1:42379/ password = 'abc123', health_check = { shm_name = "etcd_cluster_health_check", - disable_duration = 1 + max_fails = 5, + fail_timeout = 3, + disable_duration = 10, }, }) + etcd2:set("/shared_in_worker", "etcd2") + etcd2:set("/shared_in_worker", "etcd2") - etcd2:set("/test", { a='abc'}) - etcd2:set("/test", { a='abc'}) + local key = ngx.worker.id() .. "-" .. "http://127.0.0.1:42379" + local fails, err = ngx.shared["etcd_cluster_health_check"]:get(key) + assert(fails == 5) assert(tostring(etcd1) ~= tostring(etcd2)) local pending_count = ngx.timer.pending_count() - assert(pending_count == 1) + assert(pending_count == 2) - ngx.sleep(1.5) ngx.say("all down") } } @@ -346,7 +353,7 @@ all down for _, endpoint in ipairs(etcd.endpoints) do endpoint.health_status = 0 end - local res, err = etcd:set("/test", { a='abc'}) + local res, err = etcd:set("/no_healthy_endpoint", "yes") } } --- request From a44c2640b01d2255a6ed581d0219fd051ac0db28 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Sun, 13 Dec 2020 22:49:17 +0800 Subject: [PATCH 17/17] remove `disable_duration`, followed by nginx passive health checks design --- cluster_health_check.md | 10 +++------- lib/resty/etcd.lua | 3 +-- lib/resty/etcd/v3.lua | 14 ++++---------- t/v3/health_check.t | 14 ++++---------- 4 files changed, 12 insertions(+), 29 deletions(-) diff --git a/cluster_health_check.md b/cluster_health_check.md index 2bf94071..31b57f05 100644 --- a/cluster_health_check.md +++ b/cluster_health_check.md @@ -27,7 +27,6 @@ http { shm_name = 'healthcheck_shm', fail_timeout = 1, max_fails = 1, - disable_duration = 100 } }) } @@ -41,7 +40,7 @@ Description Implement a passive health check mechanism, when the connection/read/write fails occurs, recorded as an endpoint' failure. -In a `fail_timeout`, if there are `max_fails` consecutive failures, the endpoint is marked as unhealthy, the unhealthy endpoint will not be choosed to connect for a `disable_duration` time in the future. +In a `fail_timeout`, if there are `max_fails` consecutive failures, the endpoint is marked as unhealthy, the unhealthy endpoint will not be choosed to connect for a `fail_timeout` time in the future. Health check mechanism would switch endpoint only when the previously choosed endpoint is marked as unhealthy. @@ -55,7 +54,6 @@ health_check = { shm_name = "healthcheck_shm", fail_timeout = 1, max_fails = 1, - disable_duration = 100 } ``` @@ -75,7 +73,6 @@ when use `require "resty.etcd" .new` to create a connection, you can override th shm_name = "etcd_cluster_health_check", fail_timeout = 3, max_fails = 2, - disable_duration = 10, }, }) ``` @@ -83,6 +80,5 @@ when use `require "resty.etcd" .new` to create a connection, you can override th configurations that are not overridden will use the default configuration. - `shm_name`: the declarative `lua_shared_dict` is used to store the health status of endpoints. -- `fail_timeout`: set the time during which a number of failed attempts must happen for the endpoint to be marked unavailable(in seconds). -- `max_fails`: set the number of failed attempts that must occur during the `fail_timeout` period for the endpoint to be marked unavailable -- `disable_duration`: the time for which the unhealthy endpoint won't be choosed to connect(in seconds). +- `fail_timeout`: sets the time during which a number of failed attempts must happen for the endpoint to be marked unavailable, and also the time for which the endpoint is marked unavailable(default is 10 seconds). +- `max_fails`: sets the number of failed attempts that must occur during the `fail_timeout` period for the endpoint to be marked unavailable (default is 1 attempt). diff --git a/lib/resty/etcd.lua b/lib/resty/etcd.lua index 5946af44..eec33cd7 100644 --- a/lib/resty/etcd.lua +++ b/lib/resty/etcd.lua @@ -54,9 +54,8 @@ function _M.new(opts) return nil, "failed to get ngx.shared dict: " .. opts.health_check.shm_name end - opts.health_check.fail_timeout = opts.health_check.fail_timeout or 1 + opts.health_check.fail_timeout = opts.health_check.fail_timeout or 10 opts.health_check.max_fails = opts.health_check.max_fails or 1 - opts.health_check.disable_duration = opts.health_check.disable_duration or 100 end opts.timeout = opts.timeout or 5 -- 5 sec diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index c5db9366..3498db45 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -47,8 +47,8 @@ local function fault_count(key, shm_name, fail_timeout) end -local function restore(disable_duration, endpoint) - local ok, err = ngx_timer_at(disable_duration, function(premature) +local function restore(fail_timeout, endpoint) + local ok, err = ngx_timer_at(fail_timeout, function(premature) if premature then return end @@ -76,7 +76,7 @@ local function report_fault(self, endpoint) -- only trigger once if fails == self.max_fails then endpoint.health_status = 0 - restore(self.disable_duration, endpoint) + restore(self.fail_timeout, endpoint) return end end @@ -172,7 +172,6 @@ function _M.new(opts) local health_check = opts.health_check or {} local fail_timeout = health_check.fail_timeout local max_fails = health_check.max_fails - local disable_duration = health_check.disable_duration local shm_name = health_check.shm_name if not typeof.uint(timeout) then @@ -211,10 +210,6 @@ function _M.new(opts) if max_fails and not typeof.uint(max_fails) then return nil, 'opts.health_check.max_fails must be unsigned integer or ignore' end - - if disable_duration and not typeof.uint(disable_duration) then - return nil, 'opts.health_check.disable_duration must be unsigned integer or ignore' - end end local endpoints = {} @@ -264,8 +259,7 @@ function _M.new(opts) ssl_verify = ssl_verify, fail_timeout = fail_timeout, max_fails = max_fails, - disable_duration = disable_duration, - shm_name = shm_name, + shm_name = shm_name, }, mt) end diff --git a/t/v3/health_check.t b/t/v3/health_check.t index 51abd9a4..3213d993 100644 --- a/t/v3/health_check.t +++ b/t/v3/health_check.t @@ -46,7 +46,6 @@ __DATA__ assert(etcd.shm_name == nil) assert(etcd.max_fails == nil) assert(etcd.fail_timeout == nil) - assert(etcd.disable_duration == nil) local res, err = etcd:set("/health_check", "disabled") res, err = etcd:get("/health_check") @@ -113,8 +112,7 @@ failed to get ngx.shared dict: wrong_shm_name }) assert(etcd.max_fails == 1) - assert(etcd.fail_timeout == 1) - assert(etcd.disable_duration == 100) + assert(etcd.fail_timeout == 10) } } --- request @@ -187,9 +185,8 @@ all down password = 'abc123', health_check = { shm_name = "etcd_cluster_health_check", - fail_timeout = 3, + fail_timeout = 1, max_fails = 2, - disable_duration = 0, }, }) @@ -253,12 +250,12 @@ qr/report an endpoint failure: http:\/\/127.0.0.1:42379/ password = 'abc123', health_check = { shm_name = "etcd_cluster_health_check", - disable_duration = 0 + fail_timeout = 1, }, }) local res, err = etcd:set("/restore", "unhealthy") - ngx.sleep(0.1) + ngx.sleep(1.1) } } --- request @@ -286,7 +283,6 @@ qr/restore an endpoint to health: http:\/\/127.0.0.1:42379/ shm_name = "etcd_cluster_health_check", max_fails = 3, fail_timeout = 3, - disable_duration = 10, }, }) etcd1:set("/shared_in_worker", "etcd1") @@ -306,7 +302,6 @@ qr/restore an endpoint to health: http:\/\/127.0.0.1:42379/ shm_name = "etcd_cluster_health_check", max_fails = 5, fail_timeout = 3, - disable_duration = 10, }, }) etcd2:set("/shared_in_worker", "etcd2") @@ -325,7 +320,6 @@ qr/restore an endpoint to health: http:\/\/127.0.0.1:42379/ } --- request GET /t ---- timeout: 5 --- response_body all down