Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(balancer): add latency balancer algorithm #9787

Merged
merged 26 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
efc323c
feat: support ewma
oowl Aug 9, 2022
0a714f5
fix(balancer): ewma log update score bug fix
oowl Nov 20, 2022
448c535
tests(balancer) add ewma spec
oowl Nov 21, 2022
3a7fe48
tests(balancer): add ewma balancer spec
oowl Nov 22, 2022
e11de80
fix(balancer): add rockspec
oowl Nov 22, 2022
13560d5
fix(balancer): fix db entities overwrite function
oowl Nov 22, 2022
fefd7f7
tests(balancer): make ewma balancer spec happy
oowl Nov 22, 2022
b1c3d8e
tests(balancer): fix ewma unit test spec
oowl Nov 22, 2022
76b78fc
tests(balancer): fix ewma integration test
oowl Nov 22, 2022
8870cd8
tests(balancer): add ewma integration test case
oowl Nov 23, 2022
7c151d3
fix(balancer): cleanup ewma code
oowl Nov 23, 2022
f8865de
tests(balancer): make ewma integration test happy
oowl Nov 23, 2022
4c222da
fix(balancer): fix ewma code style
oowl Jan 14, 2023
f74e5fd
fix(balancer): fix ewma code style
oowl Jan 14, 2023
cecfc81
fix(balancer): fix ewma code style
oowl Jan 14, 2023
0d904ee
Update spec/01-unit/09-balancer/06-ewma_spec.lua
oowl Jan 14, 2023
c930b5c
Update spec/01-unit/09-balancer/06-ewma_spec.lua
oowl Jan 14, 2023
c448f93
fix: fix test
oowl Jan 15, 2023
f2de8ef
fix test
oowl Jan 15, 2023
3b08dff
fix code
oowl Jan 17, 2023
46cac10
fix test
oowl Jan 17, 2023
444e5bb
fix test
oowl Jan 17, 2023
15a6621
fix test
oowl Jan 18, 2023
3779bec
feat(balancer): rename ewma to latency
oowl Jan 19, 2023
7a67d7f
feat(balancer): rename ewma to latency
oowl Jan 19, 2023
802e333
feat(balancer): fix log message
oowl Jan 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kong-3.2.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ build = {
["kong.runloop.balancer.consistent_hashing"] = "kong/runloop/balancer/consistent_hashing.lua",
["kong.runloop.balancer.healthcheckers"] = "kong/runloop/balancer/healthcheckers.lua",
["kong.runloop.balancer.least_connections"] = "kong/runloop/balancer/least_connections.lua",
["kong.runloop.balancer.latency"] = "kong/runloop/balancer/latency.lua",
["kong.runloop.balancer.round_robin"] = "kong/runloop/balancer/round_robin.lua",
["kong.runloop.balancer.targets"] = "kong/runloop/balancer/targets.lua",
["kong.runloop.balancer.upstreams"] = "kong/runloop/balancer/upstreams.lua",
Expand Down
7 changes: 6 additions & 1 deletion kong/db/schema/entities/upstreams.lua
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ local r = {
{ name = { type = "string", required = true, unique = true, custom_validator = validate_name }, },
{ algorithm = { type = "string",
default = "round-robin",
one_of = { "consistent-hashing", "least-connections", "round-robin" },
one_of = { "consistent-hashing", "least-connections", "round-robin", "latency" },
}, },
{ hash_on = hash_on },
{ hash_fallback = hash_on },
Expand Down Expand Up @@ -307,6 +307,11 @@ local r = {
algorithm = value,
hash_on = null,
}
elseif value == "latency" then
return {
algorithm = value,
hash_on = null,
}
else
return {
algorithm = value,
Expand Down
13 changes: 13 additions & 0 deletions kong/runloop/balancer/balancers.lua
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ local function create_balancer_exclusive(upstream)
["consistent-hashing"] = require("kong.runloop.balancer.consistent_hashing"),
["least-connections"] = require("kong.runloop.balancer.least_connections"),
["round-robin"] = require("kong.runloop.balancer.round_robin"),
["latency"] = require("kong.runloop.balancer.latency"),
}
end

Expand Down Expand Up @@ -571,4 +572,16 @@ function balancer_mt:getPeer(...)
return self.algorithm:getPeer(...)
end

function balancer_mt:afterBalance(...)
if not self.healthy then
return nil, "Balancer is unhealthy"
end

if not self.algorithm or not self.algorithm.afterBalance then
return
end

return self.algorithm:afterBalance(...)
end

return balancers_M
8 changes: 7 additions & 1 deletion kong/runloop/balancer/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -467,11 +467,17 @@ local function set_host_header(balancer_data, upstream_scheme, upstream_host, is
return true
end


local function after_balance(balancer_data, ctx)
if balancer_data and balancer_data.balancer_handle then
local balancer = balancer_data.balancer
balancer:afterBalance(ctx, balancer_data.balancer_handle)
end
end

return {
init = init,
execute = execute,
after_balance = after_balance,
on_target_event = targets.on_target_event,
on_upstream_event = upstreams.on_upstream_event,
get_upstream_by_name = upstreams.get_upstream_by_name,
Expand Down
252 changes: 252 additions & 0 deletions kong/runloop/balancer/latency.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
--------------------------------------------------------------------------
-- ewma balancer algorithm
--
-- Original Authors: Shiv Nagarajan & Scott Francis
-- Accessed: March 12, 2018
-- Inspiration drawn from:
-- https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
-- /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala


local balancers = require "kong.runloop.balancer.balancers"

local pairs = pairs
local ipairs = ipairs
local math = math
local math_exp = math.exp
local ngx_now = ngx.now
local ngx_log = ngx.log
local ngx_WARN = ngx.WARN
local ngx_DEBUG = ngx.DEBUG

local table_nkeys = table.nkeys
local table_clear = table.clear
local table_insert = table.insert

local DECAY_TIME = 10 -- this value is in seconds
local PICK_SET_SIZE = 2

local new_addresses = {}

local ewma = {}
ewma.__index = ewma

local function decay_ewma(ewma, last_touched_at, rtt, now)
local td = now - last_touched_at
td = (td > 0) and td or 0
local weight = math_exp(-td / DECAY_TIME)

ewma = ewma * weight + rtt * (1.0 - weight)
return ewma
end


-- slow_start_ewma is something we use to avoid sending too many requests
-- to the newly introduced endpoints. We currently use average ewma values
-- of existing endpoints.
local function calculate_slow_start_ewma(self)
local total_ewma = 0
local address_count = 0

for _, target in ipairs(self.balancer.targets) do
for _, address in ipairs(target.addresses) do
if address.available then
local ewma = self.ewma[address] or 0
address_count = address_count + 1
total_ewma = total_ewma + ewma
end
end
end

if address_count == 0 then
ngx_log(ngx_DEBUG, "no ewma value exists for the endpoints")
return nil
end

self.address_count = address_count
return total_ewma / address_count
end


function ewma:afterHostUpdate()
table_clear(new_addresses)

for _, target in ipairs(self.balancer.targets) do
for _, address in ipairs(target.addresses) do
if address.available then
new_addresses[address] = true
end
end
end

local ewma = self.ewma
local ewma_last_touched_at = self.ewma_last_touched_at
for address, _ in pairs(ewma) do
if not new_addresses[address] then
ewma[address] = nil
ewma_last_touched_at[address] = nil
end
end

local slow_start_ewma = calculate_slow_start_ewma(self)
if slow_start_ewma == nil then
return
end

local now = ngx_now()
for address, _ in pairs(new_addresses) do
if not ewma[address] then
ewma[address] = slow_start_ewma
ewma_last_touched_at[address] = now
end
end
end


local function get_or_update_ewma(self, address, rtt, update)
local ewma = self.ewma[address] or 0
local now = ngx_now()
local last_touched_at = self.ewma_last_touched_at[address] or 0
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
if update then
self.ewma_last_touched_at[address] = now
self.ewma[address] = ewma
end

return ewma
end


function ewma:afterBalance(ctx, handle)
local ngx_var = ngx.var
local response_time = tonumber(ngx_var.upstream_response_time) or 0
local connect_time = tonumber(ngx_var.upstream_connect_time) or 0
local rtt = connect_time + response_time
local upstream = ngx_var.upstream_addr
local address = handle.address
if upstream then
ngx_log(ngx_DEBUG, "ewma after balancer rtt: ", rtt)
return get_or_update_ewma(self, address, rtt, true)
end

return nil, "no upstream addr found"
end


local function pick_and_score(self, address, k)
local lowest_score_index = 1
local lowest_score = get_or_update_ewma(self, address[lowest_score_index], 0, false) / address[lowest_score_index].weight
for i = 2, k do
local new_score = get_or_update_ewma(self, address[i], 0, false) / address[i].weight
if new_score < lowest_score then
lowest_score_index = i
lowest_score = new_score
end
end
return address[lowest_score_index], lowest_score
end


function ewma:getPeer(cache_only, handle, value_to_hash)
if handle then
-- existing handle, so it's a retry
handle.retryCount = handle.retryCount + 1

-- keep track of failed addresses
handle.failedAddresses = handle.failedAddresses or setmetatable({}, {__mode = "k"})
handle.failedAddresses[handle.address] = true
else
handle = {
failedAddresses = setmetatable({}, {__mode = "k"}),
retryCount = 0,
}
end

if not self.balancer.healthy then
return nil, balancers.errors.ERR_BALANCER_UNHEALTHY
end

-- select first address
local address
for addr, ewma in pairs(self.ewma) do
if ewma ~= nil then
address = addr
break
end
end

if address == nil then
-- No peers are available
return nil, balancers.errors.ERR_NO_PEERS_AVAILABLE, nil
end

local address_count = self.address_count
local ip, port, host
while true do
-- retry end
if address_count > 1 then
local k = (address_count < PICK_SET_SIZE) and address_count or PICK_SET_SIZE
local filtered_address = {}

for addr, ewma in pairs(self.ewma) do
if not handle.failedAddresses[addr] then
table_insert(filtered_address, addr)
end
end

local filtered_address_num = table_nkeys(filtered_address)
if filtered_address_num == 0 then
ngx_log(ngx_WARN, "all endpoints have been retried")
return nil, balancers.errors.ERR_NO_PEERS_AVAILABLE
end

local score
if filtered_address_num > 1 then
k = filtered_address_num > k and filtered_address_num or k
address, score = pick_and_score(self, filtered_address, k)
else
address = filtered_address[1]
score = get_or_update_ewma(self, filtered_address[1], 0, false)
end
ngx_log(ngx_DEBUG, "get ewma score: ", score)
end
-- check the address returned, and get an IP

ip, port, host = balancers.getAddressPeer(address, cache_only)
if ip then
-- success, exit
handle.address = address
break
end

handle.failedAddresses[address] = true
if port ~= balancers.errors.ERR_DNS_UPDATED then
-- an unknown error
break
end
end

return ip, port, host, handle
end


function ewma.new(opts)
assert(type(opts) == "table", "Expected an options table, but got: "..type(opts))
local balancer = opts.balancer

local self = setmetatable({
ewma = {},
ewma_last_touched_at = {},
balancer = balancer,
address_count = 0,
}, ewma)

self:afterHostUpdate()

ngx_log(ngx_DEBUG, "latency balancer created")

return self
end


return ewma
1 change: 1 addition & 0 deletions kong/runloop/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,7 @@ return {
balancer_data.balancer_handle:release()
end
end
balancer.after_balance(balancer_data, ctx)
end
}
}
Loading