Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(db): add postgres triggers to cleanup expired rows on ttl-enabled schema tables #10389

Merged
merged 4 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@
- Allow configuring Labels for data planes to provide metadata information.
Labels are only compatible with hybrid mode deployments with Kong Konnect (SaaS)
[#10471](https://github.com/Kong/kong/pull/10471)
- Add Postgres triggers on the core entites and entities in bundled plugins to delete the
expired rows in an efficient and timely manner.
[#10389](https://github.com/Kong/kong/pull/10389)

#### Admin API

Expand Down Expand Up @@ -162,6 +165,11 @@
[#10405](https://github.com/Kong/kong/pull/10405)
- Postgres TTL cleanup timer now runs a batch delete loop on each ttl enabled table with a number of 50.000 rows per batch.
[#10407](https://github.com/Kong/kong/pull/10407)
- Postgres TTL cleanup timer now runs every 5 minutes instead of every 60 seconds.
[#10389](https://github.com/Kong/kong/pull/10389)
- Postgres TTL cleanup timer now deletes expired rows based on database server-side timestamp to avoid potential
problems caused by the difference of clock time between Kong and database server.
[#10389](https://github.com/Kong/kong/pull/10389)

#### PDK

Expand Down
5 changes: 5 additions & 0 deletions kong-3.2.1-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ build = {
["kong.plugins.key-auth.migrations.000_base_key_auth"] = "kong/plugins/key-auth/migrations/000_base_key_auth.lua",
["kong.plugins.key-auth.migrations.002_130_to_140"] = "kong/plugins/key-auth/migrations/002_130_to_140.lua",
["kong.plugins.key-auth.migrations.003_200_to_210"] = "kong/plugins/key-auth/migrations/003_200_to_210.lua",
["kong.plugins.key-auth.migrations.004_320_to_330"] = "kong/plugins/key-auth/migrations/004_320_to_330.lua",
["kong.plugins.key-auth.handler"] = "kong/plugins/key-auth/handler.lua",
["kong.plugins.key-auth.schema"] = "kong/plugins/key-auth/schema.lua",
["kong.plugins.key-auth.daos"] = "kong/plugins/key-auth/daos.lua",
Expand All @@ -302,6 +303,7 @@ build = {
["kong.plugins.oauth2.migrations.003_130_to_140"] = "kong/plugins/oauth2/migrations/003_130_to_140.lua",
["kong.plugins.oauth2.migrations.004_200_to_210"] = "kong/plugins/oauth2/migrations/004_200_to_210.lua",
["kong.plugins.oauth2.migrations.005_210_to_211"] = "kong/plugins/oauth2/migrations/005_210_to_211.lua",
["kong.plugins.oauth2.migrations.006_320_to_330"] = "kong/plugins/oauth2/migrations/006_320_to_330.lua",
["kong.plugins.oauth2.handler"] = "kong/plugins/oauth2/handler.lua",
["kong.plugins.oauth2.secret"] = "kong/plugins/oauth2/secret.lua",
["kong.plugins.oauth2.access"] = "kong/plugins/oauth2/access.lua",
Expand All @@ -327,6 +329,7 @@ build = {
["kong.plugins.rate-limiting.migrations.000_base_rate_limiting"] = "kong/plugins/rate-limiting/migrations/000_base_rate_limiting.lua",
["kong.plugins.rate-limiting.migrations.003_10_to_112"] = "kong/plugins/rate-limiting/migrations/003_10_to_112.lua",
["kong.plugins.rate-limiting.migrations.004_200_to_210"] = "kong/plugins/rate-limiting/migrations/004_200_to_210.lua",
["kong.plugins.rate-limiting.migrations.005_320_to_330"] = "kong/plugins/rate-limiting/migrations/005_320_to_330.lua",
["kong.plugins.rate-limiting.expiration"] = "kong/plugins/rate-limiting/expiration.lua",
["kong.plugins.rate-limiting.handler"] = "kong/plugins/rate-limiting/handler.lua",
["kong.plugins.rate-limiting.schema"] = "kong/plugins/rate-limiting/schema.lua",
Expand Down Expand Up @@ -443,6 +446,7 @@ build = {
["kong.plugins.acme.handler"] = "kong/plugins/acme/handler.lua",
["kong.plugins.acme.migrations.000_base_acme"] = "kong/plugins/acme/migrations/000_base_acme.lua",
["kong.plugins.acme.migrations.001_280_to_300"] = "kong/plugins/acme/migrations/001_280_to_300.lua",
["kong.plugins.acme.migrations.002_320_to_330"] = "kong/plugins/acme/migrations/002_320_to_330.lua",
["kong.plugins.acme.migrations"] = "kong/plugins/acme/migrations/init.lua",
["kong.plugins.acme.schema"] = "kong/plugins/acme/schema.lua",
["kong.plugins.acme.storage.kong"] = "kong/plugins/acme/storage/kong.lua",
Expand All @@ -464,6 +468,7 @@ build = {
["kong.plugins.session.storage.kong"] = "kong/plugins/session/storage/kong.lua",
["kong.plugins.session.migrations.000_base_session"] = "kong/plugins/session/migrations/000_base_session.lua",
["kong.plugins.session.migrations.001_add_ttl_index"] = "kong/plugins/session/migrations/001_add_ttl_index.lua",
["kong.plugins.session.migrations.002_320_to_330"] = "kong/plugins/session/migrations/002_320_to_330.lua",
["kong.plugins.session.migrations"] = "kong/plugins/session/migrations/init.lua",

["kong.plugins.proxy-cache.handler"] = "kong/plugins/proxy-cache/handler.lua",
Expand Down
36 changes: 35 additions & 1 deletion kong/db/migrations/core/019_320_to_330.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,41 @@ return {
-- Do nothing, accept existing state
END;
$$;
]]

CREATE OR REPLACE FUNCTION batch_delete_expired_rows() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT('WITH rows AS (SELECT ctid FROM %s WHERE %s < CURRENT_TIMESTAMP AT TIME ZONE ''UTC'' ORDER BY %s LIMIT 2 FOR UPDATE SKIP LOCKED) DELETE FROM %s WHERE ctid IN (TABLE rows)', TG_TABLE_NAME, TG_ARGV[0], TG_ARGV[0], TG_TABLE_NAME);
RETURN NULL;
END;
$$;

DROP TRIGGER IF EXISTS "cluster_events_ttl_trigger" ON "cluster_events";

DO $$
BEGIN
CREATE TRIGGER "cluster_events_ttl_trigger"
AFTER INSERT ON "cluster_events"
FOR EACH STATEMENT
EXECUTE PROCEDURE batch_delete_expired_rows("expire_at");
EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN
-- Do nothing, accept existing state
END$$;


DROP TRIGGER IF EXISTS "clustering_data_planes_ttl_trigger" ON "clustering_data_planes";

DO $$
BEGIN
CREATE TRIGGER "clustering_data_planes_ttl_trigger"
AFTER INSERT ON "clustering_data_planes"
FOR EACH STATEMENT
EXECUTE PROCEDURE batch_delete_expired_rows("ttl");
EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN
-- Do nothing, accept existing state
END$$;
]],
},

cassandra = {
Expand Down
28 changes: 20 additions & 8 deletions kong/db/strategies/postgres/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -332,24 +332,36 @@ function _mt:init_worker(strategies)
WITH rows AS (
SELECT ctid
FROM %s
WHERE %s < CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
WHERE %s < TO_TIMESTAMP(%s) AT TIME ZONE 'UTC'
ORDER BY %s LIMIT 50000 FOR UPDATE SKIP LOCKED)
DELETE
FROM %s
WHERE ctid IN (TABLE rows);]], table_name_escaped, column_name, column_name, table_name_escaped):gsub("CURRENT_TIMESTAMP", "TO_TIMESTAMP(%%s)")
WHERE ctid IN (TABLE rows);]], table_name_escaped, column_name, "%s", column_name, table_name_escaped)
end

return timer_every(self.config.ttl_cleanup_interval, function(premature)
if premature then
return
end

-- Fetch the end timestamp from database to avoid problems caused by the difference
-- between nodes and database time.
local cleanup_end_timestamp
local ok, err = self:query("SELECT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP AT TIME ZONE 'UTC') AS NOW;")
if not ok then
log(WARN, "unable to fetch current timestamp from PostgreSQL database (",
err, ")")
return
end

cleanup_end_timestamp = ok[1]["now"]

for i, statement in ipairs(cleanup_statements) do
local cleanup_start_time = self:escape_literal(tonumber(fmt("%.3f", now_updated())))
local _tracing_cleanup_start_time = now()

while true do -- batch delete looping
-- avoid using CURRENT_TIMESTAMP in the real query to prevent infinite loop
local ok, err = self:query(fmt(statement, cleanup_start_time))
-- using the server-side timestamp in the whole loop to prevent infinite loop
local ok, err = self:query(fmt(statement, cleanup_end_timestamp))
if not ok then
if err then
log(WARN, "unable to clean expired rows from table '",
Expand All @@ -368,8 +380,8 @@ ORDER BY %s LIMIT 50000 FOR UPDATE SKIP LOCKED)
end
end

local cleanup_end_time = now_updated()
local time_elapsed = tonumber(fmt("%.3f", cleanup_end_time - cleanup_start_time))
local _tracing_cleanup_end_time = now()
local time_elapsed = tonumber(fmt("%.3f", _tracing_cleanup_end_time - _tracing_cleanup_start_time))
log(DEBUG, "cleaning up expired rows from table '", table_names[i],
"' took ", time_elapsed, " seconds")
end
Expand Down Expand Up @@ -947,7 +959,7 @@ function _M.new(kong_config)
--- not used directly by pgmoon, but used internally in connector to set the keepalive timeout
keepalive_timeout = kong_config.pg_keepalive_timeout,
--- non user-faced parameters
ttl_cleanup_interval = kong_config._debug_pg_ttl_cleanup_interval or 60,
ttl_cleanup_interval = kong_config._debug_pg_ttl_cleanup_interval or 300,
}

local refs = kong_config["$refs"]
Expand Down
20 changes: 20 additions & 0 deletions kong/plugins/acme/migrations/002_320_to_330.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
return {
postgres = {
up = [[
DROP TRIGGER IF EXISTS "acme_storage_ttl_trigger" ON "acme_storage";

DO $$
BEGIN
CREATE TRIGGER "acme_storage_ttl_trigger"
AFTER INSERT ON "acme_storage"
FOR EACH STATEMENT
EXECUTE PROCEDURE batch_delete_expired_rows("ttl");
EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN
-- Do nothing, accept existing state
END$$;
]],
},
cassandra = {
up = "",
}
}
1 change: 1 addition & 0 deletions kong/plugins/acme/migrations/init.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
return {
"000_base_acme",
"001_280_to_300",
"002_320_to_330",
}
20 changes: 20 additions & 0 deletions kong/plugins/key-auth/migrations/004_320_to_330.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
return {
postgres = {
up = [[
DROP TRIGGER IF EXISTS "keyauth_credentials_ttl_trigger" ON "keyauth_credentials";

DO $$
BEGIN
CREATE TRIGGER "keyauth_credentials_ttl_trigger"
AFTER INSERT ON "keyauth_credentials"
FOR EACH STATEMENT
EXECUTE PROCEDURE batch_delete_expired_rows("ttl");
EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN
-- Do nothing, accept existing state
END$$;
]],
},
cassandra = {
up = [[]],
}
}
1 change: 1 addition & 0 deletions kong/plugins/key-auth/migrations/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ return {
"000_base_key_auth",
"002_130_to_140",
"003_200_to_210",
"004_320_to_330",
}
33 changes: 33 additions & 0 deletions kong/plugins/oauth2/migrations/006_320_to_330.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
return {
postgres = {
up = [[
DROP TRIGGER IF EXISTS "oauth2_authorization_codes_ttl_trigger" ON "oauth2_authorization_codes";

DO $$
BEGIN
CREATE TRIGGER "oauth2_authorization_codes_ttl_trigger"
AFTER INSERT ON "oauth2_authorization_codes"
FOR EACH STATEMENT
EXECUTE PROCEDURE batch_delete_expired_rows("ttl");
EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN
-- Do nothing, accept existing state
END$$;


DROP TRIGGER IF EXISTS "oauth2_tokens_ttl_trigger" ON "oauth2_tokens";

DO $$
BEGIN
CREATE TRIGGER "oauth2_tokens_ttl_trigger"
AFTER INSERT ON "oauth2_tokens"
FOR EACH STATEMENT
EXECUTE PROCEDURE batch_delete_expired_rows("ttl");
EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN
-- Do nothing, accept existing state
END$$;
]],
},
cassandra = {
up = [[]],
}
}
1 change: 1 addition & 0 deletions kong/plugins/oauth2/migrations/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ return {
"003_130_to_140",
"004_200_to_210",
"005_210_to_211",
"006_320_to_330",
}
22 changes: 22 additions & 0 deletions kong/plugins/rate-limiting/migrations/005_320_to_330.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
return {
postgres = {
up = [[
DROP TRIGGER IF EXISTS "ratelimiting_metrics_ttl_trigger" ON "ratelimiting_metrics";

DO $$
BEGIN
CREATE TRIGGER "ratelimiting_metrics_ttl_trigger"
AFTER INSERT ON "ratelimiting_metrics"
FOR EACH STATEMENT
EXECUTE PROCEDURE batch_delete_expired_rows("ttl");
EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN
-- Do nothing, accept existing state
END$$;
]],
},

cassandra = {
up = [[
]],
},
}
1 change: 1 addition & 0 deletions kong/plugins/rate-limiting/migrations/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ return {
"000_base_rate_limiting",
"003_10_to_112",
"004_200_to_210",
"005_320_to_330",
}
21 changes: 21 additions & 0 deletions kong/plugins/session/migrations/002_320_to_330.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
return {
postgres = {
up = [[
DROP TRIGGER IF EXISTS "sessions_ttl_trigger" ON "sessions";

DO $$
BEGIN
CREATE TRIGGER "sessions_ttl_trigger"
AFTER INSERT ON "sessions"
FOR EACH STATEMENT
EXECUTE PROCEDURE batch_delete_expired_rows("ttl");
EXCEPTION WHEN UNDEFINED_COLUMN OR UNDEFINED_TABLE THEN
-- Do nothing, accept existing state
END$$;
]],
},

cassandra = {
up = [[]],
},
}
1 change: 1 addition & 0 deletions kong/plugins/session/migrations/init.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
return {
"000_base_session",
"001_add_ttl_index",
"002_320_to_330",
}
2 changes: 1 addition & 1 deletion kong/templates/kong_defaults.lua
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pg_semaphore_timeout = 60000
pg_keepalive_timeout = NONE
pg_pool_size = NONE
pg_backlog = NONE
_debug_pg_ttl_cleanup_interval = 60
_debug_pg_ttl_cleanup_interval = 300

pg_ro_host = NONE
pg_ro_port = NONE
Expand Down
7 changes: 7 additions & 0 deletions spec/05-migration/db/migrations/core/019_320_to_330_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,11 @@ describe("database migration", function()
assert.table_has_column("workspaces", "updated_at", "timestamp with time zone", "timestamp")
assert.table_has_column("clustering_data_planes", "updated_at", "timestamp with time zone", "timestamp")
end)

if uh.database_type() == "postgres" then
uh.all_phases("has created the expected triggers", function ()
assert.database_has_trigger("cluster_events_ttl_trigger")
assert.database_has_trigger("clustering_data_planes_ttl_trigger")
end)
end
end)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
local uh = require "spec/upgrade_helpers"

describe("database migration", function ()
if uh.database_type() == "postgres" then
uh.all_phases("has created the expected triggers", function ()
assert.database_has_trigger("acme_storage_ttl_trigger")
end)
end
end)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
local uh = require "spec/upgrade_helpers"

describe("database migration", function ()
if uh.database_type() == "postgres" then
uh.all_phases("has created the expected triggers", function ()
assert.database_has_trigger("keyauth_credentials_ttl_trigger")
end)
end
end)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
local uh = require "spec/upgrade_helpers"

describe("database migration", function ()
if uh.database_type() == "postgres" then
uh.all_phases("has created the expected triggers", function ()
assert.database_has_trigger("oauth2_authorization_codes_ttl_trigger")
assert.database_has_trigger("oauth2_tokens_ttl_trigger")
end)
end
end)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
local uh = require "spec/upgrade_helpers"

describe("database migration", function ()
if uh.database_type() == "postgres" then
uh.all_phases("has created the expected triggers", function ()
assert.database_has_trigger("ratelimiting_metrics_ttl_trigger")
end)
end
end)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
local uh = require "spec/upgrade_helpers"

describe("database migration", function ()
if uh.database_type() == "postgres" then
uh.all_phases("has created the expected triggers", function ()
assert.database_has_trigger("sessions_ttl_trigger")
end)
end
end)
Loading