Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(clustering) do granular hashes like #8519 does for old protocol. #8671

Merged
merged 4 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions kong/clustering/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ function _M:calculate_config_hash(config_table)
}
end

local function fill_empty_hashes(hashes)
for _, field_name in ipairs{
"config",
"routes",
"services",
"plugins",
"upstreams",
"targets",
} do
hashes[field_name] = hashes[field_name] or DECLARATIVE_EMPTY_CONFIG_HASH
end
end

function _M:request_version_negotiation()
local response_data, err = version_negotiation.request_version_handshake(self.conf, self.cert, self.cert_key)
Expand All @@ -212,6 +224,10 @@ function _M:update_config(config_table, config_hash, update_cache, hashes)
config_hash, hashes = self:calculate_config_hash(config_table)
end

if hashes then
fill_empty_hashes(hashes)
end

local current_hash = declarative.get_current_hash()
if current_hash == config_hash then
ngx_log(ngx_DEBUG, _log_prefix, "same config received from control plane, ",
Expand All @@ -234,8 +250,8 @@ function _M:update_config(config_table, config_hash, update_cache, hashes)
-- NOTE: no worker mutex needed as this code can only be
-- executed by worker 0

local res, err =
declarative.load_into_cache_with_events(entities, meta, new_hash, hashes)
local res
res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes)
if not res then
return nil, err
end
Expand Down
5 changes: 3 additions & 2 deletions kong/clustering/wrpc_control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ function _M:export_deflated_reconfigure_payload()
end
end

local config_hash = self:calculate_config_hash(config_table)
local config_hash, hashes = self:calculate_config_hash(config_table)
config_version = config_version + 1

-- store serialized plugins map for troubleshooting purposes
Expand All @@ -162,7 +162,8 @@ function _M:export_deflated_reconfigure_payload()
self.config_call_rpc, self.config_call_args = assert(service:encode_args("ConfigService.SyncConfig", {
config = config_table,
version = config_version,
hash = config_hash,
config_hash = config_hash,
hashes = hashes,
}))

return config_table, nil
Expand Down
7 changes: 5 additions & 2 deletions kong/clustering/wrpc_data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ local function get_config_service()
data.config.format_version = nil

peer.config_obj.next_config = data.config
peer.config_obj.next_hash = data.hash
peer.config_obj.next_hash = data.config_hash
peer.config_obj.next_hashes = data.hashes
peer.config_obj.next_config_version = tonumber(data.version)
if peer.config_semaphore:count() <= 0 then
-- the following line always executes immediately after the `if` check
Expand Down Expand Up @@ -196,11 +197,12 @@ function _M:communicate(premature)
end
local config_table = self.next_config
local config_hash = self.next_hash
local hashes = self.next_hashes
if config_table and self.next_config_version > last_config_version then
ngx_log(ngx_INFO, _log_prefix, "received config #", self.next_config_version, log_suffix)

local pok, res
pok, res, err = xpcall(self.update_config, debug.traceback, self, config_table, config_hash, true)
pok, res, err = xpcall(self.update_config, debug.traceback, self, config_table, config_hash, true, hashes)
if pok then
last_config_version = self.next_config_version
if not res then
Expand All @@ -214,6 +216,7 @@ function _M:communicate(premature)
if self.next_config == config_table then
self.next_config = nil
self.next_hash = nil
self.next_hashes = nil
end
end

Expand Down
4 changes: 2 additions & 2 deletions kong/db/declarative/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ do
local DECLARATIVE_LOCK_KEY = "declarative:lock"

-- make sure no matter which path it exits, we released the lock.
function declarative.load_into_cache_with_events(entities, meta, hash)
function declarative.load_into_cache_with_events(entities, meta, hash, hashes)
local kong_shm = ngx.shared.kong

local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL)
Expand All @@ -1007,7 +1007,7 @@ do
return nil, err
end

ok, err = load_into_cache_with_events_no_lock(entities, meta, hash)
ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes)
kong_shm:delete(DECLARATIVE_LOCK_KEY)

return ok, err
Expand Down
12 changes: 11 additions & 1 deletion kong/include/kong/services/config/v1/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ message PluginVersion {
string version = 2;
}

message GranularHashes {
string config = 1;
string routes = 2;
string services = 3;
string plugins = 4;
string upstreams = 5;
string targets = 6;
}

message SyncConfigRequest {
// Config represents a configuration of Kong Gateway.
// This is same as the declarative configuration of Kong.
Expand All @@ -101,7 +110,8 @@ message SyncConfigRequest {
uint64 version = 2;

// raw binary hash of the config data.
string hash = 3;
string config_hash = 3;
GranularHashes hashes = 4;
}

message SyncConfigResponse {
Expand Down
66 changes: 66 additions & 0 deletions spec/01-unit/19-hybrid/02-clustering_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -209,5 +209,71 @@ describe("kong.clustering", function()
end
end)

describe("granular hashes", function()
local DECLARATIVE_EMPTY_CONFIG_HASH = require("kong.constants").DECLARATIVE_EMPTY_CONFIG_HASH

it("filled with empty hash values for missing config fields", function()
local value = {}

for _ = 1, 10 do
local hash, hashes = clustering.calculate_config_hash(clustering, value)
assert.is_string(hash)
assert.equal("aaf38faf0b5851d711027bb4d812d50d", hash)
assert.is_table(hashes)
assert.same({
config = "aaf38faf0b5851d711027bb4d812d50d",
routes = DECLARATIVE_EMPTY_CONFIG_HASH,
services = DECLARATIVE_EMPTY_CONFIG_HASH,
plugins = DECLARATIVE_EMPTY_CONFIG_HASH,
upstreams = DECLARATIVE_EMPTY_CONFIG_HASH,
targets = DECLARATIVE_EMPTY_CONFIG_HASH,
}, hashes)
end
end)

it("has sensible values for existing fields", function()
local value = {
routes = {},
services = {},
plugins = {},
}

for _ = 1, 10 do
local hash, hashes = clustering.calculate_config_hash(clustering, value)
assert.is_string(hash)
assert.equal("768533baebe6e0d46de8d5f8a0c05bf0", hash)
assert.is_table(hashes)
assert.same({
config = "768533baebe6e0d46de8d5f8a0c05bf0",
routes = "99914b932bd37a50b983c5e7c90ae93b",
services = "99914b932bd37a50b983c5e7c90ae93b",
plugins = "99914b932bd37a50b983c5e7c90ae93b",
upstreams = DECLARATIVE_EMPTY_CONFIG_HASH,
targets = DECLARATIVE_EMPTY_CONFIG_HASH,
}, hashes)
end

value = {
upstreams = {},
targets = {},
}

for _ = 1, 10 do
local hash, hashes = clustering.calculate_config_hash(clustering, value)
assert.is_string(hash)
assert.equal("6c5fb69169a0fabb24dcfa3a5d7a14b0", hash)
assert.is_table(hashes)
assert.same({
config = "6c5fb69169a0fabb24dcfa3a5d7a14b0",
routes = DECLARATIVE_EMPTY_CONFIG_HASH,
services = DECLARATIVE_EMPTY_CONFIG_HASH,
plugins = DECLARATIVE_EMPTY_CONFIG_HASH,
upstreams = "99914b932bd37a50b983c5e7c90ae93b",
targets = "99914b932bd37a50b983c5e7c90ae93b",
}, hashes)
end
end)
end)

end)
end)