diff --git a/CHANGELOG.md b/CHANGELOG.md index be8a05d302d0..68038762991d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,9 @@ - Allow configuring Labels for data planes to provide metadata information. Labels are only compatible with hybrid mode deployments with Kong Konnect (SaaS) [#10471](https://github.com/Kong/kong/pull/10471) +- Add Postgres triggers on the core entites and entities in bundled plugins to delete the + expired rows in an efficient and timely manner. + [#10389](https://github.com/Kong/kong/pull/10389) #### Admin API @@ -162,6 +165,11 @@ [#10405](https://github.com/Kong/kong/pull/10405) - Postgres TTL cleanup timer now runs a batch delete loop on each ttl enabled table with a number of 50.000 rows per batch. [#10407](https://github.com/Kong/kong/pull/10407) +- Postgres TTL cleanup timer now runs every 5 minutes instead of every 60 seconds. + [#10389](https://github.com/Kong/kong/pull/10389) +- Postgres TTL cleanup timer now deletes expired rows based on database server-side timestamp to avoid potential + problems caused by the difference of clock time between Kong and database server. + [#10389](https://github.com/Kong/kong/pull/10389) #### PDK diff --git a/kong-3.2.1-0.rockspec b/kong-3.2.1-0.rockspec index 021acd428fe2..bcdebe6f7c95 100644 --- a/kong-3.2.1-0.rockspec +++ b/kong-3.2.1-0.rockspec @@ -293,6 +293,7 @@ build = { ["kong.plugins.key-auth.migrations.000_base_key_auth"] = "kong/plugins/key-auth/migrations/000_base_key_auth.lua", ["kong.plugins.key-auth.migrations.002_130_to_140"] = "kong/plugins/key-auth/migrations/002_130_to_140.lua", ["kong.plugins.key-auth.migrations.003_200_to_210"] = "kong/plugins/key-auth/migrations/003_200_to_210.lua", + ["kong.plugins.key-auth.migrations.004_320_to_330"] = "kong/plugins/key-auth/migrations/004_320_to_330.lua", ["kong.plugins.key-auth.handler"] = "kong/plugins/key-auth/handler.lua", ["kong.plugins.key-auth.schema"] = "kong/plugins/key-auth/schema.lua", ["kong.plugins.key-auth.daos"] = "kong/plugins/key-auth/daos.lua", @@ -302,6 +303,7 @@ build = { ["kong.plugins.oauth2.migrations.003_130_to_140"] = "kong/plugins/oauth2/migrations/003_130_to_140.lua", ["kong.plugins.oauth2.migrations.004_200_to_210"] = "kong/plugins/oauth2/migrations/004_200_to_210.lua", ["kong.plugins.oauth2.migrations.005_210_to_211"] = "kong/plugins/oauth2/migrations/005_210_to_211.lua", + ["kong.plugins.oauth2.migrations.006_320_to_330"] = "kong/plugins/oauth2/migrations/006_320_to_330.lua", ["kong.plugins.oauth2.handler"] = "kong/plugins/oauth2/handler.lua", ["kong.plugins.oauth2.secret"] = "kong/plugins/oauth2/secret.lua", ["kong.plugins.oauth2.access"] = "kong/plugins/oauth2/access.lua", @@ -327,6 +329,7 @@ build = { ["kong.plugins.rate-limiting.migrations.000_base_rate_limiting"] = "kong/plugins/rate-limiting/migrations/000_base_rate_limiting.lua", ["kong.plugins.rate-limiting.migrations.003_10_to_112"] = "kong/plugins/rate-limiting/migrations/003_10_to_112.lua", ["kong.plugins.rate-limiting.migrations.004_200_to_210"] = "kong/plugins/rate-limiting/migrations/004_200_to_210.lua", + ["kong.plugins.rate-limiting.migrations.005_320_to_330"] = "kong/plugins/rate-limiting/migrations/005_320_to_330.lua", ["kong.plugins.rate-limiting.expiration"] = "kong/plugins/rate-limiting/expiration.lua", ["kong.plugins.rate-limiting.handler"] = "kong/plugins/rate-limiting/handler.lua", ["kong.plugins.rate-limiting.schema"] = "kong/plugins/rate-limiting/schema.lua", @@ -443,6 +446,7 @@ build = { ["kong.plugins.acme.handler"] = "kong/plugins/acme/handler.lua", ["kong.plugins.acme.migrations.000_base_acme"] = "kong/plugins/acme/migrations/000_base_acme.lua", ["kong.plugins.acme.migrations.001_280_to_300"] = "kong/plugins/acme/migrations/001_280_to_300.lua", + ["kong.plugins.acme.migrations.002_320_to_330"] = "kong/plugins/acme/migrations/002_320_to_330.lua", ["kong.plugins.acme.migrations"] = "kong/plugins/acme/migrations/init.lua", ["kong.plugins.acme.schema"] = "kong/plugins/acme/schema.lua", ["kong.plugins.acme.storage.kong"] = "kong/plugins/acme/storage/kong.lua", @@ -464,6 +468,7 @@ build = { ["kong.plugins.session.storage.kong"] = "kong/plugins/session/storage/kong.lua", ["kong.plugins.session.migrations.000_base_session"] = "kong/plugins/session/migrations/000_base_session.lua", ["kong.plugins.session.migrations.001_add_ttl_index"] = "kong/plugins/session/migrations/001_add_ttl_index.lua", + ["kong.plugins.session.migrations.002_320_to_330"] = "kong/plugins/session/migrations/002_320_to_330.lua", ["kong.plugins.session.migrations"] = "kong/plugins/session/migrations/init.lua", ["kong.plugins.proxy-cache.handler"] = "kong/plugins/proxy-cache/handler.lua", diff --git a/kong/db/migrations/core/019_320_to_330.lua b/kong/db/migrations/core/019_320_to_330.lua index f512b146ac2a..bb9b2e70ec95 100644 --- a/kong/db/migrations/core/019_320_to_330.lua +++ b/kong/db/migrations/core/019_320_to_330.lua @@ -16,7 +16,41 @@ return { -- Do nothing, accept existing state END; $$; - ]] + + CREATE OR REPLACE FUNCTION batch_delete_expired_rows() RETURNS trigger + LANGUAGE plpgsql + AS $$ + BEGIN + EXECUTE FORMAT('WITH rows AS (SELECT ctid FROM %s WHERE %s < CURRENT_TIMESTAMP AT TIME ZONE ''UTC'' ORDER BY %s LIMIT 2 FOR UPDATE SKIP LOCKED) DELETE FROM %s WHERE ctid IN (TABLE rows)', TG_TABLE_NAME, TG_ARGV[0], TG_ARGV[0], TG_TABLE_NAME); + RETURN NULL; + END; + $$; + + DROP TRIGGER IF EXISTS "cluster_events_ttl_trigger" ON "cluster_events"; + + DO $$ + BEGIN + CREATE TRIGGER "cluster_events_ttl_trigger" + AFTER INSERT ON "cluster_events" + FOR EACH STATEMENT + EXECUTE PROCEDURE batch_delete_expired_rows("expire_at"); + EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN + -- Do nothing, accept existing state + END$$; + + + DROP TRIGGER IF EXISTS "clustering_data_planes_ttl_trigger" ON "clustering_data_planes"; + + DO $$ + BEGIN + CREATE TRIGGER "clustering_data_planes_ttl_trigger" + AFTER INSERT ON "clustering_data_planes" + FOR EACH STATEMENT + EXECUTE PROCEDURE batch_delete_expired_rows("ttl"); + EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN + -- Do nothing, accept existing state + END$$; + ]], }, cassandra = { diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index 2f6c22319791..5a9aa70e8819 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -332,11 +332,11 @@ function _mt:init_worker(strategies) WITH rows AS ( SELECT ctid FROM %s - WHERE %s < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + WHERE %s < TO_TIMESTAMP(%s) AT TIME ZONE 'UTC' ORDER BY %s LIMIT 50000 FOR UPDATE SKIP LOCKED) DELETE FROM %s - WHERE ctid IN (TABLE rows);]], table_name_escaped, column_name, column_name, table_name_escaped):gsub("CURRENT_TIMESTAMP", "TO_TIMESTAMP(%%s)") + WHERE ctid IN (TABLE rows);]], table_name_escaped, column_name, "%s", column_name, table_name_escaped) end return timer_every(self.config.ttl_cleanup_interval, function(premature) @@ -344,12 +344,24 @@ ORDER BY %s LIMIT 50000 FOR UPDATE SKIP LOCKED) return end + -- Fetch the end timestamp from database to avoid problems caused by the difference + -- between nodes and database time. + local cleanup_end_timestamp + local ok, err = self:query("SELECT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP AT TIME ZONE 'UTC') AS NOW;") + if not ok then + log(WARN, "unable to fetch current timestamp from PostgreSQL database (", + err, ")") + return + end + + cleanup_end_timestamp = ok[1]["now"] + for i, statement in ipairs(cleanup_statements) do - local cleanup_start_time = self:escape_literal(tonumber(fmt("%.3f", now_updated()))) + local _tracing_cleanup_start_time = now() while true do -- batch delete looping - -- avoid using CURRENT_TIMESTAMP in the real query to prevent infinite loop - local ok, err = self:query(fmt(statement, cleanup_start_time)) + -- using the server-side timestamp in the whole loop to prevent infinite loop + local ok, err = self:query(fmt(statement, cleanup_end_timestamp)) if not ok then if err then log(WARN, "unable to clean expired rows from table '", @@ -368,8 +380,8 @@ ORDER BY %s LIMIT 50000 FOR UPDATE SKIP LOCKED) end end - local cleanup_end_time = now_updated() - local time_elapsed = tonumber(fmt("%.3f", cleanup_end_time - cleanup_start_time)) + local _tracing_cleanup_end_time = now() + local time_elapsed = tonumber(fmt("%.3f", _tracing_cleanup_end_time - _tracing_cleanup_start_time)) log(DEBUG, "cleaning up expired rows from table '", table_names[i], "' took ", time_elapsed, " seconds") end @@ -947,7 +959,7 @@ function _M.new(kong_config) --- not used directly by pgmoon, but used internally in connector to set the keepalive timeout keepalive_timeout = kong_config.pg_keepalive_timeout, --- non user-faced parameters - ttl_cleanup_interval = kong_config._debug_pg_ttl_cleanup_interval or 60, + ttl_cleanup_interval = kong_config._debug_pg_ttl_cleanup_interval or 300, } local refs = kong_config["$refs"] diff --git a/kong/plugins/acme/migrations/002_320_to_330.lua b/kong/plugins/acme/migrations/002_320_to_330.lua new file mode 100644 index 000000000000..634778af4e7a --- /dev/null +++ b/kong/plugins/acme/migrations/002_320_to_330.lua @@ -0,0 +1,20 @@ +return { + postgres = { + up = [[ + DROP TRIGGER IF EXISTS "acme_storage_ttl_trigger" ON "acme_storage"; + + DO $$ + BEGIN + CREATE TRIGGER "acme_storage_ttl_trigger" + AFTER INSERT ON "acme_storage" + FOR EACH STATEMENT + EXECUTE PROCEDURE batch_delete_expired_rows("ttl"); + EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN + -- Do nothing, accept existing state + END$$; + ]], + }, + cassandra = { + up = "", + } +} diff --git a/kong/plugins/acme/migrations/init.lua b/kong/plugins/acme/migrations/init.lua index 1745666a43c9..bb8bb45beb45 100644 --- a/kong/plugins/acme/migrations/init.lua +++ b/kong/plugins/acme/migrations/init.lua @@ -1,4 +1,5 @@ return { "000_base_acme", "001_280_to_300", + "002_320_to_330", } diff --git a/kong/plugins/key-auth/migrations/004_320_to_330.lua b/kong/plugins/key-auth/migrations/004_320_to_330.lua new file mode 100644 index 000000000000..55aee17b5ef1 --- /dev/null +++ b/kong/plugins/key-auth/migrations/004_320_to_330.lua @@ -0,0 +1,20 @@ +return { + postgres = { + up = [[ + DROP TRIGGER IF EXISTS "keyauth_credentials_ttl_trigger" ON "keyauth_credentials"; + + DO $$ + BEGIN + CREATE TRIGGER "keyauth_credentials_ttl_trigger" + AFTER INSERT ON "keyauth_credentials" + FOR EACH STATEMENT + EXECUTE PROCEDURE batch_delete_expired_rows("ttl"); + EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN + -- Do nothing, accept existing state + END$$; + ]], + }, + cassandra = { + up = [[]], + } +} diff --git a/kong/plugins/key-auth/migrations/init.lua b/kong/plugins/key-auth/migrations/init.lua index e2904a85ae75..11bc63b53db3 100644 --- a/kong/plugins/key-auth/migrations/init.lua +++ b/kong/plugins/key-auth/migrations/init.lua @@ -2,4 +2,5 @@ return { "000_base_key_auth", "002_130_to_140", "003_200_to_210", + "004_320_to_330", } diff --git a/kong/plugins/oauth2/migrations/006_320_to_330.lua b/kong/plugins/oauth2/migrations/006_320_to_330.lua new file mode 100644 index 000000000000..78404872e796 --- /dev/null +++ b/kong/plugins/oauth2/migrations/006_320_to_330.lua @@ -0,0 +1,33 @@ +return { + postgres = { + up = [[ + DROP TRIGGER IF EXISTS "oauth2_authorization_codes_ttl_trigger" ON "oauth2_authorization_codes"; + + DO $$ + BEGIN + CREATE TRIGGER "oauth2_authorization_codes_ttl_trigger" + AFTER INSERT ON "oauth2_authorization_codes" + FOR EACH STATEMENT + EXECUTE PROCEDURE batch_delete_expired_rows("ttl"); + EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN + -- Do nothing, accept existing state + END$$; + + + DROP TRIGGER IF EXISTS "oauth2_tokens_ttl_trigger" ON "oauth2_tokens"; + + DO $$ + BEGIN + CREATE TRIGGER "oauth2_tokens_ttl_trigger" + AFTER INSERT ON "oauth2_tokens" + FOR EACH STATEMENT + EXECUTE PROCEDURE batch_delete_expired_rows("ttl"); + EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN + -- Do nothing, accept existing state + END$$; + ]], + }, + cassandra = { + up = [[]], + } +} diff --git a/kong/plugins/oauth2/migrations/init.lua b/kong/plugins/oauth2/migrations/init.lua index 287a19e9c5eb..fe6bd04bf0a4 100644 --- a/kong/plugins/oauth2/migrations/init.lua +++ b/kong/plugins/oauth2/migrations/init.lua @@ -3,4 +3,5 @@ return { "003_130_to_140", "004_200_to_210", "005_210_to_211", + "006_320_to_330", } diff --git a/kong/plugins/rate-limiting/migrations/005_320_to_330.lua b/kong/plugins/rate-limiting/migrations/005_320_to_330.lua new file mode 100644 index 000000000000..4a175e709562 --- /dev/null +++ b/kong/plugins/rate-limiting/migrations/005_320_to_330.lua @@ -0,0 +1,22 @@ +return { + postgres = { + up = [[ + DROP TRIGGER IF EXISTS "ratelimiting_metrics_ttl_trigger" ON "ratelimiting_metrics"; + + DO $$ + BEGIN + CREATE TRIGGER "ratelimiting_metrics_ttl_trigger" + AFTER INSERT ON "ratelimiting_metrics" + FOR EACH STATEMENT + EXECUTE PROCEDURE batch_delete_expired_rows("ttl"); + EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN + -- Do nothing, accept existing state + END$$; + ]], + }, + + cassandra = { + up = [[ + ]], + }, +} diff --git a/kong/plugins/rate-limiting/migrations/init.lua b/kong/plugins/rate-limiting/migrations/init.lua index f0465556a5b2..74c3d402d1e8 100644 --- a/kong/plugins/rate-limiting/migrations/init.lua +++ b/kong/plugins/rate-limiting/migrations/init.lua @@ -2,4 +2,5 @@ return { "000_base_rate_limiting", "003_10_to_112", "004_200_to_210", + "005_320_to_330", } diff --git a/kong/plugins/session/migrations/002_320_to_330.lua b/kong/plugins/session/migrations/002_320_to_330.lua new file mode 100644 index 000000000000..cd8398f0838e --- /dev/null +++ b/kong/plugins/session/migrations/002_320_to_330.lua @@ -0,0 +1,21 @@ +return { + postgres = { + up = [[ + DROP TRIGGER IF EXISTS "sessions_ttl_trigger" ON "sessions"; + + DO $$ + BEGIN + CREATE TRIGGER "sessions_ttl_trigger" + AFTER INSERT ON "sessions" + FOR EACH STATEMENT + EXECUTE PROCEDURE batch_delete_expired_rows("ttl"); + EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN + -- Do nothing, accept existing state + END$$; + ]], + }, + + cassandra = { + up = [[]], + }, +} diff --git a/kong/plugins/session/migrations/init.lua b/kong/plugins/session/migrations/init.lua index a2d95bc5e46a..71910b2e7551 100644 --- a/kong/plugins/session/migrations/init.lua +++ b/kong/plugins/session/migrations/init.lua @@ -1,4 +1,5 @@ return { "000_base_session", "001_add_ttl_index", + "002_320_to_330", } diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index 215914662d2f..e8f198495ea2 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -115,7 +115,7 @@ pg_semaphore_timeout = 60000 pg_keepalive_timeout = NONE pg_pool_size = NONE pg_backlog = NONE -_debug_pg_ttl_cleanup_interval = 60 +_debug_pg_ttl_cleanup_interval = 300 pg_ro_host = NONE pg_ro_port = NONE diff --git a/spec/05-migration/db/migrations/core/019_320_to_330_spec.lua b/spec/05-migration/db/migrations/core/019_320_to_330_spec.lua index cf6e9e6f7ab6..718ee7835198 100644 --- a/spec/05-migration/db/migrations/core/019_320_to_330_spec.lua +++ b/spec/05-migration/db/migrations/core/019_320_to_330_spec.lua @@ -12,4 +12,11 @@ describe("database migration", function() assert.table_has_column("workspaces", "updated_at", "timestamp with time zone", "timestamp") assert.table_has_column("clustering_data_planes", "updated_at", "timestamp with time zone", "timestamp") end) + + if uh.database_type() == "postgres" then + uh.all_phases("has created the expected triggers", function () + assert.database_has_trigger("cluster_events_ttl_trigger") + assert.database_has_trigger("clustering_data_planes_ttl_trigger") + end) + end end) diff --git a/spec/05-migration/plugins/acme/migrations/002_320_to_330_spec.lua b/spec/05-migration/plugins/acme/migrations/002_320_to_330_spec.lua new file mode 100644 index 000000000000..5a891246cf86 --- /dev/null +++ b/spec/05-migration/plugins/acme/migrations/002_320_to_330_spec.lua @@ -0,0 +1,9 @@ +local uh = require "spec/upgrade_helpers" + +describe("database migration", function () + if uh.database_type() == "postgres" then + uh.all_phases("has created the expected triggers", function () + assert.database_has_trigger("acme_storage_ttl_trigger") + end) + end +end) diff --git a/spec/05-migration/plugins/key-auth/migrations/004_320_to_330_spec.lua b/spec/05-migration/plugins/key-auth/migrations/004_320_to_330_spec.lua new file mode 100644 index 000000000000..70a5c92bc0d8 --- /dev/null +++ b/spec/05-migration/plugins/key-auth/migrations/004_320_to_330_spec.lua @@ -0,0 +1,9 @@ +local uh = require "spec/upgrade_helpers" + +describe("database migration", function () + if uh.database_type() == "postgres" then + uh.all_phases("has created the expected triggers", function () + assert.database_has_trigger("keyauth_credentials_ttl_trigger") + end) + end +end) diff --git a/spec/05-migration/plugins/oauth2/migrations/006_320_to_330_spec.lua b/spec/05-migration/plugins/oauth2/migrations/006_320_to_330_spec.lua new file mode 100644 index 000000000000..55c0d3451ed1 --- /dev/null +++ b/spec/05-migration/plugins/oauth2/migrations/006_320_to_330_spec.lua @@ -0,0 +1,10 @@ +local uh = require "spec/upgrade_helpers" + +describe("database migration", function () + if uh.database_type() == "postgres" then + uh.all_phases("has created the expected triggers", function () + assert.database_has_trigger("oauth2_authorization_codes_ttl_trigger") + assert.database_has_trigger("oauth2_tokens_ttl_trigger") + end) + end +end) diff --git a/spec/05-migration/plugins/rate-limiting/migrations/005_320_to_330_spec.lua b/spec/05-migration/plugins/rate-limiting/migrations/005_320_to_330_spec.lua new file mode 100644 index 000000000000..fc3bb0a21043 --- /dev/null +++ b/spec/05-migration/plugins/rate-limiting/migrations/005_320_to_330_spec.lua @@ -0,0 +1,9 @@ +local uh = require "spec/upgrade_helpers" + +describe("database migration", function () + if uh.database_type() == "postgres" then + uh.all_phases("has created the expected triggers", function () + assert.database_has_trigger("ratelimiting_metrics_ttl_trigger") + end) + end +end) diff --git a/spec/05-migration/plugins/session/migrations/002_320_to_330_spec.lua b/spec/05-migration/plugins/session/migrations/002_320_to_330_spec.lua new file mode 100644 index 000000000000..9c233db6c510 --- /dev/null +++ b/spec/05-migration/plugins/session/migrations/002_320_to_330_spec.lua @@ -0,0 +1,9 @@ +local uh = require "spec/upgrade_helpers" + +describe("database migration", function () + if uh.database_type() == "postgres" then + uh.all_phases("has created the expected triggers", function () + assert.database_has_trigger("sessions_ttl_trigger") + end) + end +end) diff --git a/spec/upgrade_helpers.lua b/spec/upgrade_helpers.lua index 09edf20956cf..0a697d4a1f55 100644 --- a/spec/upgrade_helpers.lua +++ b/spec/upgrade_helpers.lua @@ -19,6 +19,35 @@ local function get_database() return db end +local function database_has_trigger(state, arguments) + local trigger_name = arguments[1] + local db = get_database() + local res, err + if database_type() == 'cassandra' then + res, err = db.connector:query(string.format( + "select *" + .. " from system_schema.triggers" + .. " where trigger_name = '%s'", + trigger_name)) + elseif database_type() == 'postgres' then + res, err = db.connector:query(string.format( + "select true" + .. " from pg_trigger" + .. " where tgname = '%s'", + trigger_name)) + else + return false + end + if err then + return false + end + return not(not(res[1])) +end + +say:set("assertion.database_has_trigger.positive", "Expected database to have trigger %s") +say:set("assertion.database_has_trigger.negative", "Expected database not to have trigger %s") +assert:register("assertion", "database_has_trigger", database_has_trigger, "assertion.database_has_trigger.positive", "assertion.database_has_trigger.negative") + local function table_has_column(state, arguments) local table = arguments[1] local column_name = arguments[2]