-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support weighting #7
base: master
Are you sure you want to change the base?
Changes from all commits
090bc35
bd1a9cc
c7335ab
9df7619
64872fd
39f333a
a07501a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,13 @@ | |
-- | ||
-- | ||
-- | ||
local next = next | ||
|
||
-- Dependencies | ||
local http = require "resty.http" | ||
local balancer = require "ngx.balancer" | ||
local json = require "cjson" | ||
local resty_roundrobin = require "resty.roundrobin" | ||
|
||
local WATCH_RETRY_TIMER = 0.5 | ||
|
||
|
@@ -18,7 +20,7 @@ end | |
local _M = new_tab(0, 5) -- Change the second number. | ||
|
||
_M.VERSION = "0.04" | ||
_M._cache = {} | ||
_M._cache = {} -- To save the "service" object | ||
|
||
local function _sanitize_uri(consul_uri) | ||
-- TODO: Ensure that uri has <proto>://<host>[:<port>] scheme | ||
|
@@ -64,28 +66,35 @@ local function _parse_service(response) | |
if passing then | ||
local s = v["Service"] | ||
local na = v["Node"]["Address"] | ||
table.insert(service.upstreams, { | ||
address = s["Address"] ~= "" and s["Address"] or na, | ||
port = s["Port"], | ||
}) | ||
local address = s["Address"] ~= "" and s["Address"] or na | ||
local port = s["Port"] | ||
-- If the weight parameter fails to be obtained or is passed incorrectly, set it to 1 | ||
local weight = s["Weights"]["Passing"] or 1 | ||
if type(weight) == "number" and weight > 0 then | ||
service.upstreams[address .. ":" .. port] = weight | ||
ngx.log(ngx.INFO, "consul.balancer: add " .. address .. ":" .. port .. " to upstreams, weight " .. weight) | ||
else | ||
ngx.log(ngx.ALERT, "consul.balancer: upstream " .. address .. ":" .. port .. " weight set error:" .. weight) | ||
end | ||
end | ||
end | ||
if service.upstreams ==nil or next(service.upstreams) ==nil then | ||
ngx.log(ngx.ERR, "[FATAL] consul.balancer: upstream list is nil") | ||
return nil, "upstream list is nil" | ||
end | ||
local rr_up = resty_roundrobin:new(service.upstreams) | ||
service.rr_up = rr_up | ||
return service | ||
end | ||
|
||
local function _persist(service_name, service) | ||
if _M.shared_cache then | ||
_M.shared_cache:set(service_name, json.encode(service)) | ||
return | ||
end | ||
-- Shared cache requires encode & decode to store data. The "find" method of rr_up objects will be lost. | ||
-- Therefore, the rr_up objects are stored directly using table. | ||
_M._cache[service_name] = service | ||
end | ||
|
||
local function _aquire(service_name) | ||
if _M.shared_cache then | ||
local service_json = _M.shared_cache:get(service_name) | ||
return service_json and json.decode(service_json) or nil | ||
end | ||
-- When using table to store an rr_up object, simply return it | ||
return _M._cache[service_name] | ||
end | ||
|
||
|
@@ -162,7 +171,7 @@ local function _watch(premature, service_descriptor) | |
hc:set_timeout(360000) -- consul api has a default of 5 minutes for tcp long poll | ||
local service_index = 0 | ||
ngx.log(ngx.NOTICE, "consul.balancer: started watching for changes in ", service_descriptor.name) | ||
while true do | ||
while not ngx.worker.exiting() do | ||
local uri = _build_service_uri(service_descriptor, service_index) | ||
local service, err = _refresh(hc, uri) | ||
if service == nil then | ||
|
@@ -179,10 +188,7 @@ local function _watch(premature, service_descriptor) | |
end | ||
|
||
function _M.watch(consul_uri, service_list) | ||
-- start watching on first worker only, skip for others (if shared storage provided) | ||
if _M.shared_cache and ngx.worker.id() > 0 then | ||
return | ||
end | ||
-- Each worker process is independent and independently watches consul upstream changes | ||
-- TODO: Reconsider scope for this variable. | ||
_M._consul_uri = _sanitize_uri(consul_uri) | ||
for k,v in pairs(service_list) do | ||
|
@@ -196,35 +202,17 @@ function _M.round_robin(service_name) | |
ngx.log(ngx.ERR, "consul.balancer: no entry found for service: ", service_name) | ||
return ngx.exit(500) | ||
end | ||
if service.upstreams == nil or #service.upstreams == 0 then | ||
ngx.log(ngx.ERR, "consul.balancer: no peers for service: ", service_name) | ||
local rr_up = service.rr_up | ||
if rr_up == nil then | ||
ngx.log(ngx.ERR, "consul.balancer: no roundrobin object for service: ", service_name) | ||
return ngx.exit(500) | ||
end | ||
if service.state == nil or service.state > #service.upstreams then | ||
service.state = 1 | ||
end | ||
-- TODO: https://github.com/openresty/lua-resty-core/blob/master/lib/ngx/balancer.md#get_last_failure | ||
-- set max tries only at first attempt | ||
if not balancer.get_last_failure() then | ||
balancer.set_more_tries(#service.upstreams - 1) | ||
end | ||
-- Picking next upstream | ||
local upstream = service.upstreams[service.state] | ||
service.state = service.state + 1 | ||
_persist(service_name, service) | ||
local ok, err = balancer.set_current_peer(upstream["address"], upstream["port"]) | ||
local server = rr_up:find() | ||
local ok, err = balancer.set_current_peer(server) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could not find firm proof that this function behaves correctly if passed a string containing ':' separated host/port pair. I checked here: https://github.com/openresty/lua-resty-core/blob/master/lib/ngx/balancer.md#set_current_peer Please verify this by testing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The After testing, it is feasible, it is possible that the |
||
if not ok then | ||
ngx.log(ngx.ERR, "consul.balancer: failed to set the current peer: ", err) | ||
return ngx.exit(500) | ||
end | ||
end | ||
|
||
function _M.set_shared_dict_name(dict_name) | ||
_M.shared_cache = ngx.shared[dict_name] | ||
if not _M.shared_cache then | ||
ngx.log(ngx.ERR, "consul.balancer: unable to access shared dict ", dict_name) | ||
return ngx.exit(ngx.ERROR) | ||
end | ||
end | ||
|
||
return _M |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lookup may unwind the stack if response structure does not contain 'Weight' key.
I could not determine which consul version added this field into the API response.
If this key aways exist, then the error handling part of the following code is unnecessary.
If this key exists only after some Consul version, then all clients using earlier version will stop functioning permanently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to solve the consul issues #1088 and 4198, the
Weights
field was added in cosnul 1.2.3 version,See consul-1.2.3 AgentWeights and #4468.Also, When I tested with Consul v1.4.0, the response will always have the
Weights
field by default, whether or not I passed in theWeights
field when I signed up for the service.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then just as a precaution, slap in Readme that this new code only works with Consul >1.2.3 :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll add dependencies later.