diff --git a/.requirements b/.requirements index 342e886fef5..82e14ec44ff 100644 --- a/.requirements +++ b/.requirements @@ -7,7 +7,7 @@ PCRE=8.45 LUA_KONG_NGINX_MODULE=4d19e8d19c6dbc07eba5cf6f5ebacad95266f928 # 0.6.0 LUA_RESTY_LMDB=951926f20b674a0622236a0e331b359df1c02d9b # 1.3.0 -LUA_RESTY_EVENTS=2f6fa23eb3d0b76a3b35fd915711200e90bc6732 # 0.1.6 +LUA_RESTY_EVENTS=8448a92cec36ac04ea522e78f6496ba03c9b1fd8 # 0.2.0 LUA_RESTY_WEBSOCKET=60eafc3d7153bceb16e6327074e0afc3d94b1316 # 0.4.0 ATC_ROUTER=72cc8fddeac024c54c9c1fa5a25c28a72d79080e # 1.1.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 8dacd6bf3a0..ab1474f0fe9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -151,9 +151,10 @@ [#11099](https://github.com/Kong/kong/pull/11099) - Bumped kong-lapis from 1.8.3.1 to 1.14.0.2 [#10841](https://github.com/Kong/kong/pull/10841) -- Bumped lua-resty-events from 0.1.4 to 0.1.6 +- Bumped lua-resty-events from 0.1.4 to 0.2.0 [#10883](https://github.com/Kong/kong/pull/10883) [#11083](https://github.com/Kong/kong/pull/11083) + [#11214](https://github.com/Kong/kong/pull/11214) - Bumped lua-resty-session from 4.0.3 to 4.0.4 [#11011](https://github.com/Kong/kong/pull/11011) - Bumped OpenSSL from 1.1.1t to 3.1.1 diff --git a/kong.conf.default b/kong.conf.default index b960bf2f188..88c7fe0c4c4 100644 --- a/kong.conf.default +++ b/kong.conf.default @@ -269,6 +269,7 @@ # Similarly to `error_template_html`, the template # is required to contain one single `%s` placeholder for # the error message. + #------------------------------------------------------------------------------ # HYBRID MODE #------------------------------------------------------------------------------ diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 8b53bedf9b4..a9be1525c15 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -358,6 +358,8 @@ local CONF_PARSERS = { }, }, + worker_events_max_payload = { typ = "number" }, + upstream_keepalive_pool_size = { typ = "number" }, upstream_keepalive_max_requests = { typ = "number" }, upstream_keepalive_idle_timeout = { typ = "number" }, diff --git a/kong/global.lua b/kong/global.lua index 4b2764a38a5..0dad6430551 100644 --- a/kong/global.lua +++ b/kong/global.lua @@ -178,21 +178,31 @@ function _GLOBAL.init_worker_events() -- `kong.configuration.prefix` is already normalized to an absolute path, -- but `ngx.config.prefix()` is not - local prefix = configuration - and configuration.prefix - or require("pl.path").abspath(ngx.config.prefix()) + local prefix = configuration and + configuration.prefix or + require("pl.path").abspath(ngx.config.prefix()) - local sock = ngx.config.subsystem == "stream" - and "stream_worker_events.sock" - or "worker_events.sock" + local sock = ngx.config.subsystem == "stream" and + "stream_worker_events.sock" or + "worker_events.sock" local listening = "unix:" .. prefix .. "/" .. sock + local max_payload_len = configuration and + configuration.worker_events_max_payload + + if max_payload_len and max_payload_len > 65535 then -- default is 64KB + ngx.log(ngx.WARN, + "Increasing 'worker_events_max_payload' value has potential " .. + "negative impact on Kong's response latency and memory usage") + end + opts = { unique_timeout = 5, -- life time of unique event data in lrucache broker_id = 0, -- broker server runs in nginx worker #0 listening = listening, -- unix socket for broker listening max_queue_len = 1024 * 50, -- max queue len for events buffering + max_payload_len = max_payload_len, -- max payload size in bytes } worker_events = require "resty.events.compat" diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index a3cbdbed5a9..de837bc4ac8 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -45,6 +45,7 @@ cluster_dp_labels = NONE lmdb_environment_path = dbless.lmdb lmdb_map_size = 2048m mem_cache_size = 128m +worker_events_max_payload = 65535 ssl_cert = NONE ssl_cert_key = NONE client_ssl = off diff --git a/spec/02-integration/07-sdk/06-worker_events_spec.lua b/spec/02-integration/07-sdk/06-worker_events_spec.lua new file mode 100644 index 00000000000..e7e56e1535b --- /dev/null +++ b/spec/02-integration/07-sdk/06-worker_events_spec.lua @@ -0,0 +1,123 @@ +local helpers = require "spec.helpers" + +local worker_events_mock = [[ + server { + server_name example.com; + listen %d; + + location = /payload { + content_by_lua_block { + local SOURCE = "foo" + local EVENT = ngx.var.http_payload_type + + local worker_events = kong.worker_events + local payload_received + + local function wait_until(validator, timeout) + local deadline = ngx.now() + (timeout or 5) + local res + repeat + worker_events.poll() + res = validator() + until res or ngx.now() >= deadline + return res + end + + -- subscribe + local ok, err = worker_events.register(function(data) + payload_received = data + end, SOURCE, EVENT) + + -- when payload is a string + local PAYLOAD = string.rep("X", %d) + + -- when payload is a table + if EVENT == "table" then + PAYLOAD = { + foo = "bar", + data = PAYLOAD, + } + end + + local ok, err = worker_events.post(SOURCE, EVENT, PAYLOAD) + if not ok then + ngx.status = ngx.HTTP_INTERNAL_SERVER_ERROR + ngx.say("post failed, err: " .. err) + return + end + + assert(wait_until(function() + if EVENT == "string" then + return PAYLOAD == payload_received + else + return require("pl.tablex").deepcompare(PAYLOAD, payload_received) + end + end, 1)) + + ngx.status = ngx.HTTP_OK + ngx.say("ok") + } + } + } +]] + + +local max_payloads = { 60 * 1024, 140 * 1024, } + + +for _, max_payload in ipairs(max_payloads) do + local business_port = 34567 + local payload_size = 70 * 1024 + + local fixtures = { + http_mock = { + worker_events = string.format(worker_events_mock, + business_port, payload_size) + }, + } + + local size_allowed = max_payload > payload_size + local less_or_greater = size_allowed and ">" or "<" + + describe("worker_events [when max_payload " .. less_or_greater .. " payload_size]", function() + local strategy = "off" + local test_cases = {"string", "table", } + + lazy_setup(function() + assert(helpers.start_kong({ + database = strategy, + nginx_conf = "spec/fixtures/custom_nginx.template", + worker_events_max_payload = max_payload, + }, nil, nil, fixtures)) + end) + + lazy_teardown(function () + assert(helpers.stop_kong()) + end) + + for _, payload_type in ipairs(test_cases) do + it("max_payload = " .. max_payload .. ", type = " .. payload_type, function() + + local res = helpers.proxy_client(nil, business_port):get( + "/payload", { + headers = { + host = "example.com", + payload_type = payload_type, + } + }) + + local status_code = 200 + local msg = "ok" + + if not size_allowed then + status_code = 500 + msg = "post failed, err: " .. + "failed to publish event: payload exceeds the limitation (".. max_payload .. ")" + end + + local body = assert.res_status(status_code, res) + assert.equal(body, msg) + end) + end + end) +end