Skip to content

Commit

Permalink
feat(balancer) add path and query as hash sources
Browse files Browse the repository at this point in the history
  • Loading branch information
flrgh committed May 11, 2022
1 parent b440737 commit 8c5a917
Show file tree
Hide file tree
Showing 11 changed files with 327 additions and 9 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@

### Additions

#### Core

- Added `path` and `query_arg` options to upstream `hash_on` for load balancing.
[8701](https://github.com/Kong/kong/pull/8701)

#### Plugins

- **Zipkin**: add support for including HTTP path in span name
Expand Down
1 change: 1 addition & 0 deletions kong-2.8.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ build = {
["kong.db.migrations.core.013_220_to_230"] = "kong/db/migrations/core/013_220_to_230.lua",
["kong.db.migrations.core.014_230_to_270"] = "kong/db/migrations/core/014_230_to_270.lua",
["kong.db.migrations.core.015_270_to_280"] = "kong/db/migrations/core/015_270_to_280.lua",
["kong.db.migrations.core.016_280_to_300"] = "kong/db/migrations/core/016_280_to_300.lua",
["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua",
["kong.db.migrations.operations.210_to_211"] = "kong/db/migrations/operations/210_to_211.lua",
["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua",
Expand Down
33 changes: 33 additions & 0 deletions kong/db/migrations/core/016_280_to_300.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
return {
postgres = {
up = [[
-- add new hash_on_query_arg field to upstreams
DO $$
BEGIN
ALTER TABLE IF EXISTS ONLY "upstreams" ADD "hash_on_query_arg" TEXT;
EXCEPTION WHEN DUPLICATE_COLUMN THEN
-- Do nothing, accept existing state
END;
$$;
-- add new hash_fallback_query_arg field to upstreams
DO $$
BEGIN
ALTER TABLE IF EXISTS ONLY "upstreams" ADD "hash_fallback_query_arg" TEXT;
EXCEPTION WHEN DUPLICATE_COLUMN THEN
-- Do nothing, accept existing state
END;
$$;
]],
},

cassandra = {
up = [[
-- add new hash_on_query_arg field to upstreams
ALTER TABLE upstreams ADD hash_on_query_arg text;
-- add new hash_fallback_query_arg field to upstreams
ALTER TABLE upstreams ADD hash_fallback_query_arg text;
]],
},
}
1 change: 1 addition & 0 deletions kong/db/migrations/core/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ return {
"013_220_to_230",
"014_230_to_270",
"015_270_to_280",
"016_280_to_300",
}
41 changes: 37 additions & 4 deletions kong/db/schema/entities/upstreams.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ end
local hash_on = Schema.define {
type = "string",
default = "none",
one_of = { "none", "consumer", "ip", "header", "cookie" }
one_of = { "none", "consumer", "ip", "header", "cookie", "path", "query_arg" }
}


Expand Down Expand Up @@ -79,6 +79,10 @@ local health_threshold = Schema.define {
between = { 0, 100 },
}

local query_arg = Schema.define {
type = "string",
len_min = 1,
}

local NO_DEFAULT = {}

Expand Down Expand Up @@ -186,6 +190,8 @@ local r = {
{ hash_fallback_header = typedefs.header_name, },
{ hash_on_cookie = { type = "string", custom_validator = utils.validate_cookie_name }, },
{ hash_on_cookie_path = typedefs.path{ default = "/", }, },
{ hash_on_query_arg = query_arg },
{ hash_fallback_query_arg = query_arg },
{ slots = { type = "integer", default = 10000, between = { 10, 2^16 }, }, },
{ healthchecks = { type = "record",
default = healthchecks_defaults,
Expand Down Expand Up @@ -228,18 +234,45 @@ local r = {
then_field = "hash_fallback", then_match = { one_of = { "none" }, },
}, },

-- hash_fallback must not equal hash_on (headers are allowed)
-- hash_fallback must not equal hash_on (headers and query args are allowed)
{ conditional = {
if_field = "hash_on", if_match = { match = "^consumer$" },
then_field = "hash_fallback", then_match = { one_of = { "none", "ip", "header", "cookie" }, },
then_field = "hash_fallback", then_match = { one_of = { "none", "ip",
"header", "cookie",
"path", "query_arg",
}, },
}, },
{ conditional = {
if_field = "hash_on", if_match = { match = "^ip$" },
then_field = "hash_fallback", then_match = { one_of = { "none", "consumer", "header", "cookie" }, },
then_field = "hash_fallback", then_match = { one_of = { "none", "consumer",
"header", "cookie",
"path", "query_arg",
}, },
}, },
{ conditional = {
if_field = "hash_on", if_match = { match = "^path$" },
then_field = "hash_fallback", then_match = { one_of = { "none", "consumer",
"header", "cookie",
"query_arg", "ip",
}, },
}, },


-- different headers
{ distinct = { "hash_on_header", "hash_fallback_header" }, },

-- hash_on_query_arg must be present when hashing on query_arg
{ conditional = {
if_field = "hash_on", if_match = { match = "^query_arg$" },
then_field = "hash_on_query_arg", then_match = { required = true },
}, },
{ conditional = {
if_field = "hash_fallback", if_match = { match = "^query_arg$" },
then_field = "hash_fallback_query_arg", then_match = { required = true },
}, },

-- query arg and fallback must be different
{ distinct = { "hash_on_query_arg" , "hash_fallback_query_arg" }, },
},

-- This is a hack to preserve backwards compatibility with regard to the
Expand Down
49 changes: 49 additions & 0 deletions kong/runloop/balancer/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,41 @@ if ngx.config.subsystem ~= "stream" then
end


local get_query_arg
do
local get_uri_args = ngx.req.get_uri_args
local limit = 100
local cache

-- OpenResty added the ability to reuse the result table in 0.10.21
--
-- The table is cleared by ngx.req.get_uri_args, so there is no need for the
-- caller to clear or reset it manually
--
-- @see https://github.com/openresty/lua-resty-core/pull/288
if ngx.config.ngx_lua_version >= 10021 then
cache = require("table.new")(0, limit)
end

function get_query_arg(name)
local query, err = get_uri_args(limit, cache)

if err == "truncated" then
log(WARN, "could not fetch all query string args for request, ",
"hash value may be empty/incomplete")
elseif err then
log(ERR, "failed fetching query string args: ", err)
end

local value = query[name]
if type(value) == "table" then
value = table_concat(value, ",")
end

return value
end
end

-- Calculates hash-value.
-- Will only be called once per request, on first try.
-- @param upstream the upstream entity
Expand All @@ -57,6 +92,7 @@ local function get_value_to_hash(upstream, ctx)

local identifier
local header_field_name = "hash_on_header"
local query_arg_field_name = "hash_on_query_arg"

for _ = 1,2 do

Expand Down Expand Up @@ -97,6 +133,18 @@ local function get_value_to_hash(upstream, ctx)
}
end

elseif hash_on == "path" then
-- for the sake of simplicity, we're using the NGINX-normalized version of
-- the path here instead of running ngx.var.request_uri through our
-- internal normalization mechanism
identifier = var.uri

elseif hash_on == "query_arg" then
local arg_name = upstream[query_arg_field_name]
identifier = get_query_arg(arg_name)

else
log(ERR, "unknown hash_on value: ", hash_on)
end

if identifier then
Expand All @@ -106,6 +154,7 @@ local function get_value_to_hash(upstream, ctx)
-- we missed the first, so now try the fallback
hash_on = upstream.hash_fallback
header_field_name = "hash_fallback_header"
query_arg_field_name = "hash_fallback_query_arg"
if hash_on == "none" then
return nil
end
Expand Down
48 changes: 48 additions & 0 deletions spec/01-unit/01-db/01-schema/09-upstreams_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,54 @@ describe("load upstreams", function()
ok, errs = validate({ hash_on = "ip", hash_fallback = "ip" })
assert.falsy(ok)
assert.truthy(errs.hash_fallback)
ok, errs = validate({ hash_on = "path", hash_fallback = "path" })
assert.falsy(ok)
assert.truthy(errs.hash_fallback)
end)

it("hash_on = 'query_arg' makes hash_on_query_arg required", function()
local ok, errs = validate({ hash_on = "query_arg" })
assert.falsy(ok)
assert.truthy(errs.hash_on_query_arg)
end)

it("hash_fallback = 'query_arg' makes hash_fallback_query_arg required", function()
local ok, errs = validate({ hash_on = "ip", hash_fallback = "query_arg" })
assert.falsy(ok)
assert.truthy(errs.hash_fallback_query_arg)
end)

it("hash_on and hash_fallback must be different query args", function()
local ok, errs = validate({ hash_on = "query_arg", hash_on_query_arg = "same",
hash_fallback = "query_arg", hash_fallback_query_arg = "same" })
assert.falsy(ok)
assert.not_nil(errs["@entity"])
assert.same(
{
"values of these fields must be distinct: 'hash_on_query_arg', 'hash_fallback_query_arg'"
},
errs["@entity"]
)
end)

it("hash_on_query_arg and hash_fallback_query_arg must not be empty strings", function()
local ok, errs = validate({
name = "test",
hash_on = "query_arg",
hash_on_query_arg = "",
})
assert.is_nil(ok)
assert.not_nil(errs.hash_on_query_arg)

ok, errs = validate({
name = "test",
hash_on = "query_arg",
hash_on_query_arg = "ok",
hash_fallback = "query_arg",
hash_fallback_query_arg = "",
})
assert.is_nil(ok)
assert.not_nil(errs.hash_fallback_query_arg)
end)

it("produces defaults", function()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1770,6 +1770,8 @@ describe("declarative config: flatten", function()
hash_on_cookie = null,
hash_on_cookie_path = "/",
hash_on_header = null,
hash_on_query_arg = null,
hash_fallback_query_arg = null,
healthchecks = {
active = {
concurrency = 10,
Expand Down Expand Up @@ -1822,6 +1824,8 @@ describe("declarative config: flatten", function()
hash_on_cookie = null,
hash_on_cookie_path = "/",
hash_on_header = null,
hash_on_query_arg = null,
hash_fallback_query_arg = null,
healthchecks = {
active = {
concurrency = 10,
Expand Down
6 changes: 3 additions & 3 deletions spec/02-integration/04-admin_api/07-upstreams_routes_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ describe("Admin API: #" .. strategy, function()
body = assert.res_status(400, res)
local json = cjson.decode(body)
assert.equals("schema violation", json.name)
assert.same({ hash_on = "expected one of: none, consumer, ip, header, cookie" }, json.fields)
assert.same({ hash_on = "expected one of: none, consumer, ip, header, cookie, path, query_arg" }, json.fields)

-- Invalid hash_fallback entries
res = assert(client:send {
Expand All @@ -260,7 +260,7 @@ describe("Admin API: #" .. strategy, function()
assert.equals("schema violation", json.name)
assert.same({
["@entity"] = { [[failed conditional validation given value of field 'hash_on']] },
hash_fallback = "expected one of: none, ip, header, cookie",
hash_fallback = "expected one of: none, ip, header, cookie, path, query_arg",
}, json.fields)

-- same hash entries
Expand All @@ -278,7 +278,7 @@ describe("Admin API: #" .. strategy, function()
local json = cjson.decode(body)
assert.same({
["@entity"] = { [[failed conditional validation given value of field 'hash_on']] },
hash_fallback = "expected one of: none, ip, header, cookie",
hash_fallback = "expected one of: none, ip, header, cookie, path, query_arg",
}, json.fields)

-- Invalid header
Expand Down
Loading

0 comments on commit 8c5a917

Please sign in to comment.