Skip to content

Commit

Permalink
feat(balancer) add path, query, uri_capture hash sources
Browse files Browse the repository at this point in the history
  • Loading branch information
flrgh authored and aboudreault committed Jun 1, 2022
1 parent 2215828 commit 4ca9ce4
Show file tree
Hide file tree
Showing 9 changed files with 502 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@
Build-in instrumentation types and sampling rate are configuable through
`opentelemetry_tracing` and `opentelemetry_tracing_sampling_rate` options.
[#8724](https://github.com/Kong/kong/pull/8724)
- Added `path`, `uri_capture`, and `query_arg` options to upstream `hash_on`
for load balancing. [#8701](https://github.com/Kong/kong/pull/8701)

#### Plugins

Expand Down
48 changes: 48 additions & 0 deletions kong/db/migrations/core/016_280_to_300.lua
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,42 @@ return {
-- Do nothing, accept existing state
END;
$$;
-- add new hash_on_query_arg field to upstreams
DO $$
BEGIN
ALTER TABLE IF EXISTS ONLY "upstreams" ADD "hash_on_query_arg" TEXT;
EXCEPTION WHEN DUPLICATE_COLUMN THEN
-- Do nothing, accept existing state
END;
$$;
-- add new hash_fallback_query_arg field to upstreams
DO $$
BEGIN
ALTER TABLE IF EXISTS ONLY "upstreams" ADD "hash_fallback_query_arg" TEXT;
EXCEPTION WHEN DUPLICATE_COLUMN THEN
-- Do nothing, accept existing state
END;
$$;
-- add new hash_on_uri_capture field to upstreams
DO $$
BEGIN
ALTER TABLE IF EXISTS ONLY "upstreams" ADD "hash_on_uri_capture" TEXT;
EXCEPTION WHEN DUPLICATE_COLUMN THEN
-- Do nothing, accept existing state
END;
$$;
-- add new hash_fallback_uri_capture field to upstreams
DO $$
BEGIN
ALTER TABLE IF EXISTS ONLY "upstreams" ADD "hash_fallback_uri_capture" TEXT;
EXCEPTION WHEN DUPLICATE_COLUMN THEN
-- Do nothing, accept existing state
END;
$$;
]],
teardown = function(connector)
local _, err = connector:query([[
Expand Down Expand Up @@ -289,6 +325,18 @@ return {
ALTER TABLE targets ADD cache_key text;
CREATE INDEX IF NOT EXISTS targets_cache_key_idx ON targets(cache_key);
-- add new hash_on_query_arg field to upstreams
ALTER TABLE upstreams ADD hash_on_query_arg text;
-- add new hash_fallback_query_arg field to upstreams
ALTER TABLE upstreams ADD hash_fallback_query_arg text;
-- add new hash_on_uri_capture field to upstreams
ALTER TABLE upstreams ADD hash_on_uri_capture text;
-- add new hash_fallback_uri_capture field to upstreams
ALTER TABLE upstreams ADD hash_fallback_uri_capture text;
]],
teardown = function(connector)
local coordinator = assert(connector:get_stored_connection())
Expand Down
60 changes: 56 additions & 4 deletions kong/db/schema/entities/upstreams.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ end
local hash_on = Schema.define {
type = "string",
default = "none",
one_of = { "none", "consumer", "ip", "header", "cookie" }
one_of = { "none", "consumer", "ip", "header", "cookie", "path", "query_arg", "uri_capture" }
}


Expand Down Expand Up @@ -79,6 +79,10 @@ local health_threshold = Schema.define {
between = { 0, 100 },
}

local simple_param = Schema.define {
type = "string",
len_min = 1,
}

local NO_DEFAULT = {}

Expand Down Expand Up @@ -186,6 +190,10 @@ local r = {
{ hash_fallback_header = typedefs.header_name, },
{ hash_on_cookie = { type = "string", custom_validator = utils.validate_cookie_name }, },
{ hash_on_cookie_path = typedefs.path{ default = "/", }, },
{ hash_on_query_arg = simple_param },
{ hash_fallback_query_arg = simple_param },
{ hash_on_uri_capture = simple_param },
{ hash_fallback_uri_capture = simple_param },
{ slots = { type = "integer", default = 10000, between = { 10, 2^16 }, }, },
{ healthchecks = { type = "record",
default = healthchecks_defaults,
Expand Down Expand Up @@ -228,18 +236,62 @@ local r = {
then_field = "hash_fallback", then_match = { one_of = { "none" }, },
}, },

-- hash_fallback must not equal hash_on (headers are allowed)
-- hash_fallback must not equal hash_on (headers and query args are allowed)
{ conditional = {
if_field = "hash_on", if_match = { match = "^consumer$" },
then_field = "hash_fallback", then_match = { one_of = { "none", "ip", "header", "cookie" }, },
then_field = "hash_fallback", then_match = { one_of = { "none", "ip",
"header", "cookie",
"path", "query_arg",
"uri_capture",
}, },
}, },
{ conditional = {
if_field = "hash_on", if_match = { match = "^ip$" },
then_field = "hash_fallback", then_match = { one_of = { "none", "consumer", "header", "cookie" }, },
then_field = "hash_fallback", then_match = { one_of = { "none", "consumer",
"header", "cookie",
"path", "query_arg",
"uri_capture",
}, },
}, },
{ conditional = {
if_field = "hash_on", if_match = { match = "^path$" },
then_field = "hash_fallback", then_match = { one_of = { "none", "consumer",
"header", "cookie",
"query_arg", "ip",
"uri_capture",
}, },
}, },


-- different headers
{ distinct = { "hash_on_header", "hash_fallback_header" }, },

-- hash_on_query_arg must be present when hashing on query_arg
{ conditional = {
if_field = "hash_on", if_match = { match = "^query_arg$" },
then_field = "hash_on_query_arg", then_match = { required = true },
}, },
{ conditional = {
if_field = "hash_fallback", if_match = { match = "^query_arg$" },
then_field = "hash_fallback_query_arg", then_match = { required = true },
}, },

-- query arg and fallback must be different
{ distinct = { "hash_on_query_arg" , "hash_fallback_query_arg" }, },

-- hash_on_uri_capture must be present when hashing on uri_capture
{ conditional = {
if_field = "hash_on", if_match = { match = "^uri_capture$" },
then_field = "hash_on_uri_capture", then_match = { required = true },
}, },
{ conditional = {
if_field = "hash_fallback", if_match = { match = "^uri_capture$" },
then_field = "hash_fallback_uri_capture", then_match = { required = true },
}, },

-- uri capture and fallback must be different
{ distinct = { "hash_on_uri_capture" , "hash_fallback_uri_capture" }, },

},

-- This is a hack to preserve backwards compatibility with regard to the
Expand Down
73 changes: 73 additions & 0 deletions kong/runloop/balancer/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,55 @@ if ngx.config.subsystem ~= "stream" then
end


local get_query_arg
do
local sort = table.sort
local get_uri_args = ngx.req.get_uri_args
local limit = 100

-- OpenResty allows us to reuse the table that it populates with the request
-- query args. The table is cleared by `ngx.req.get_uri_args` on each use, so
-- there is no need for the caller (us) to clear or reset it manually.
--
-- @see https://github.com/openresty/lua-resty-core/pull/288
-- @see https://github.com/openresty/lua-resty-core/blob/3c3d0786d6e26282e76f39f4fe5577d316a47a09/lib/resty/core/request.lua#L196-L208
local cache = require("table.new")(0, limit)


function get_query_arg(name)
local query, err = get_uri_args(limit, cache)

if err == "truncated" then
log(WARN, "could not fetch all query string args for request, ",
"hash value may be empty/incomplete")

elseif not query then
log(ERR, "failed fetching query string args: ", err or "unknown error")
return
end

local value = query[name]

-- normalization
--
-- 1. convert booleans to string
-- 2. sort and concat multi-value args

if type(value) == "table" then
for i = 1, #value do
value[i] = tostring(value[i])
end
sort(value)
value = table_concat(value, ",")

elseif value ~= nil then
value = tostring(value)
end

return value
end
end

-- Calculates hash-value.
-- Will only be called once per request, on first try.
-- @param upstream the upstream entity
Expand All @@ -55,6 +104,8 @@ local function get_value_to_hash(upstream, ctx)

local identifier
local header_field_name = "hash_on_header"
local query_arg_field_name = "hash_on_query_arg"
local uri_capture_name = "hash_on_uri_capture"

for _ = 1,2 do

Expand Down Expand Up @@ -95,6 +146,25 @@ local function get_value_to_hash(upstream, ctx)
}
end

elseif hash_on == "path" then
-- for the sake of simplicity, we're using the NGINX-normalized version of
-- the path here instead of running ngx.var.request_uri through our
-- internal normalization mechanism
identifier = var.uri

elseif hash_on == "query_arg" then
local arg_name = upstream[query_arg_field_name]
identifier = get_query_arg(arg_name)

elseif hash_on == "uri_capture" then
local captures = (ctx.router_matches or EMPTY_T).uri_captures
if captures then
local group = upstream[uri_capture_name]
identifier = captures[group]
end

else
log(ERR, "unknown hash_on value: ", hash_on)
end

if identifier then
Expand All @@ -104,6 +174,9 @@ local function get_value_to_hash(upstream, ctx)
-- we missed the first, so now try the fallback
hash_on = upstream.hash_fallback
header_field_name = "hash_fallback_header"
query_arg_field_name = "hash_fallback_query_arg"
uri_capture_name = "hash_fallback_uri_capture"

if hash_on == "none" then
return nil
end
Expand Down
82 changes: 82 additions & 0 deletions spec/01-unit/01-db/01-schema/09-upstreams_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,90 @@ describe("load upstreams", function()
ok, errs = validate({ hash_on = "ip", hash_fallback = "ip" })
assert.falsy(ok)
assert.truthy(errs.hash_fallback)
ok, errs = validate({ hash_on = "path", hash_fallback = "path" })
assert.falsy(ok)
assert.truthy(errs.hash_fallback)
end)

it("hash_on = 'query_arg' makes hash_on_query_arg required", function()
local ok, errs = validate({ hash_on = "query_arg" })
assert.falsy(ok)
assert.truthy(errs.hash_on_query_arg)
end)

it("hash_fallback = 'query_arg' makes hash_fallback_query_arg required", function()
local ok, errs = validate({ hash_on = "ip", hash_fallback = "query_arg" })
assert.falsy(ok)
assert.truthy(errs.hash_fallback_query_arg)
end)

it("hash_on and hash_fallback must be different query args", function()
local ok, errs = validate({ hash_on = "query_arg", hash_on_query_arg = "same",
hash_fallback = "query_arg", hash_fallback_query_arg = "same" })
assert.falsy(ok)
assert.not_nil(errs["@entity"])
assert.same(
{
"values of these fields must be distinct: 'hash_on_query_arg', 'hash_fallback_query_arg'"
},
errs["@entity"]
)
end)

it("hash_on_query_arg and hash_fallback_query_arg must not be empty strings", function()
local ok, errs = validate({
name = "test",
hash_on = "query_arg",
hash_on_query_arg = "",
})
assert.is_nil(ok)
assert.not_nil(errs.hash_on_query_arg)

ok, errs = validate({
name = "test",
hash_on = "query_arg",
hash_on_query_arg = "ok",
hash_fallback = "query_arg",
hash_fallback_query_arg = "",
})
assert.is_nil(ok)
assert.not_nil(errs.hash_fallback_query_arg)
end)

it("hash_on and hash_fallback must be different uri captures", function()
local ok, errs = validate({ hash_on = "uri_capture", hash_on_uri_capture = "same",
hash_fallback = "uri_capture", hash_fallback_uri_capture = "same" })
assert.falsy(ok)
assert.not_nil(errs["@entity"])
assert.same(
{
"values of these fields must be distinct: 'hash_on_uri_capture', 'hash_fallback_uri_capture'"
},
errs["@entity"]
)
end)

it("hash_on_uri_capture and hash_fallback_uri_capture must not be empty strings", function()
local ok, errs = validate({
name = "test",
hash_on = "uri_capture",
hash_on_uri_capture = "",
})
assert.is_nil(ok)
assert.not_nil(errs.hash_on_uri_capture)

ok, errs = validate({
name = "test",
hash_on = "uri_capture",
hash_on_uri_capture = "ok",
hash_fallback = "uri_capture",
hash_fallback_uri_capture = "",
})
assert.is_nil(ok)
assert.not_nil(errs.hash_fallback_uri_capture)
end)


it("produces defaults", function()
local u = {
name = "www.example.com",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1770,6 +1770,10 @@ describe("declarative config: flatten", function()
hash_on_cookie = null,
hash_on_cookie_path = "/",
hash_on_header = null,
hash_on_query_arg = null,
hash_fallback_query_arg = null,
hash_on_uri_capture = null,
hash_fallback_uri_capture = null,
healthchecks = {
active = {
concurrency = 10,
Expand Down Expand Up @@ -1822,6 +1826,10 @@ describe("declarative config: flatten", function()
hash_on_cookie = null,
hash_on_cookie_path = "/",
hash_on_header = null,
hash_on_query_arg = null,
hash_fallback_query_arg = null,
hash_on_uri_capture = null,
hash_fallback_uri_capture = null,
healthchecks = {
active = {
concurrency = 10,
Expand Down
Loading

0 comments on commit 4ca9ce4

Please sign in to comment.