From 73d2046ac5f98ace424575157373e8f1cb9c2400 Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Thu, 28 Apr 2022 01:42:48 -0700 Subject: [PATCH] Revert "Revert "feat(dao) use `cache_key` for target uniqueness detection" (#8705)" This reverts commit 579537b494af136184aca3952a799291585b8b47. --- CHANGELOG.md | 5 + kong/api/routes/upstreams.lua | 23 ---- kong/db/dao/targets.lua | 9 -- kong/db/migrations/core/016_280_to_300.lua | 120 ++++++++++++++++++ kong/db/migrations/core/init.lua | 1 + kong/db/schema/entities/targets.lua | 1 + .../04-admin_api/08-targets_routes_spec.lua | 11 +- 7 files changed, 130 insertions(+), 40 deletions(-) create mode 100644 kong/db/migrations/core/016_280_to_300.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index 57ac0ec0f100..d5fcb4033122 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -134,6 +134,11 @@ ### Additions +#### Core + +- Added `cache_key` on target entity for uniqueness detection. + [#8179](https://github.com/Kong/kong/pull/8179) + #### Plugins - **Zipkin**: add support for including HTTP path in span name diff --git a/kong/api/routes/upstreams.lua b/kong/api/routes/upstreams.lua index 3ecf418d3e97..c463e541132e 100644 --- a/kong/api/routes/upstreams.lua +++ b/kong/api/routes/upstreams.lua @@ -112,21 +112,6 @@ local function target_endpoint(self, db, callback) end -local function update_existent_target(self, db) - local upstream = endpoints.select_entity(self, db, db.upstreams.schema) - local filter = { target = unescape_uri(self.params.target) } - local opts = endpoints.extract_options(self.args.uri, db.targets.schema, "select") - local target = db.targets:select_by_upstream_filter(upstream, filter, opts) - - if target then - self.params.targets = db.targets.schema:extract_pk_values(target) - return endpoints.update_entity(self, db, db.targets.schema) - end - - return nil -end - - return { ["/upstreams/:upstreams/health"] = { GET = function(self, db) @@ -181,14 +166,6 @@ return { "upstream", "page_for_upstream"), PUT = function(self, db) - local entity, _, err_t = update_existent_target(self, db) - if err_t then - return endpoints.handle_error(err_t) - end - if entity then - return kong.response.exit(200, entity, { ["Deprecation"] = "true" }) - end - local create = endpoints.post_collection_endpoint(kong.db.targets.schema, kong.db.upstreams.schema, "upstream") return create(self, db) diff --git a/kong/db/dao/targets.lua b/kong/db/dao/targets.lua index 76169745234f..ef0027a05e9f 100644 --- a/kong/db/dao/targets.lua +++ b/kong/db/dao/targets.lua @@ -47,15 +47,6 @@ function _TARGETS:insert(entity, options) entity.target = formatted_target end - local workspace = workspaces.get_workspace_id() - local opts = { nulls = true, workspace = workspace } - for existent in self:each_for_upstream(entity.upstream, nil, opts) do - if existent.target == entity.target then - local err_t = self.errors:unique_violation({ target = existent.target }) - return nil, tostring(err_t), err_t - end - end - return self.super.insert(self, entity, options) end diff --git a/kong/db/migrations/core/016_280_to_300.lua b/kong/db/migrations/core/016_280_to_300.lua new file mode 100644 index 000000000000..f8a710227aac --- /dev/null +++ b/kong/db/migrations/core/016_280_to_300.lua @@ -0,0 +1,120 @@ +-- remove repeated targets, the older ones are not useful anymore. targets with +-- weight 0 will be kept, as we cannot tell which were deleted and which were +-- explicitly set as 0. +local function c_remove_unused_targets(coordinator) + local cassandra = require "cassandra" + local upstream_targets = {} + for rows, err in coordinator:iterate("SELECT id, upstream_id, target, created_at FROM targets") do + if err then + return nil, err + end + + for _, row in ipairs(rows) do + local key = string.format("%s:%s", row.upstream_id, row.target) + + if not upstream_targets[key] then + upstream_targets[key] = { + id = row.id, + created_at = row.created_at, + } + else + local to_remove + if row.created_at > upstream_targets[key].created_at then + to_remove = upstream_targets[key].id + upstream_targets[key] = { + id = row.id, + created_at = row.created_at, + } + else + to_remove = row.id + end + local _, err = coordinator:execute("DELETE FROM targets WHERE id = ?", { + cassandra.uuid(to_remove) + }) + + if err then + return nil, err + end + end + end + end + + return true +end + + +-- update cache_key for targets +local function c_update_target_cache_key(coordinator) + local cassandra = require "cassandra" + for rows, err in coordinator:iterate("SELECT id, upstream_id, target, ws_id FROM targets") do + if err then + return nil, err + end + + for _, row in ipairs(rows) do + local cache_key = string.format("targets:%s:%s::::%s", row.upstream_id, row.target, row.ws_id) + + local _, err = coordinator:execute("UPDATE targets SET cache_key = ? WHERE id = ? IF EXISTS", { + cache_key, cassandra.uuid(row.id) + }) + + if err then + return nil, err + end + end + end + + return true +end + + +return { + postgres = { + up = [[ + DO $$ + BEGIN + ALTER TABLE IF EXISTS ONLY "targets" ADD COLUMN "cache_key" TEXT UNIQUE; + EXCEPTION WHEN duplicate_column THEN + -- Do nothing, accept existing state + END; + $$; + ]], + teardown = function(connector) + local _, err = connector:query([[ + DELETE FROM targets t1 + USING targets t2 + WHERE t1.created_at < t2.created_at + AND t1.upstream_id = t2.upstream_id + AND t1.target = t2.target; + UPDATE targets SET cache_key = CONCAT('targets:', upstream_id, ':', target, '::::', ws_id); + ]]) + + if err then + return nil, err + end + + return true + end + }, + + cassandra = { + up = [[ + ALTER TABLE targets ADD cache_key text; + CREATE INDEX IF NOT EXISTS targets_cache_key_idx ON targets(cache_key); + ]], + teardown = function(connector) + local coordinator = assert(connector:get_stored_connection()) + local _, err = c_remove_unused_targets(coordinator) + if err then + return nil, err + end + + _, err = c_update_target_cache_key(coordinator) + if err then + return nil, err + end + + return true + end + }, +} diff --git a/kong/db/migrations/core/init.lua b/kong/db/migrations/core/init.lua index 49b8ad5ccd94..8ecd62ba8ad1 100644 --- a/kong/db/migrations/core/init.lua +++ b/kong/db/migrations/core/init.lua @@ -13,4 +13,5 @@ return { "013_220_to_230", "014_230_to_270", "015_270_to_280", + "016_280_to_300" } diff --git a/kong/db/schema/entities/targets.lua b/kong/db/schema/entities/targets.lua index 89346233e0a4..64b39e85fbca 100644 --- a/kong/db/schema/entities/targets.lua +++ b/kong/db/schema/entities/targets.lua @@ -20,6 +20,7 @@ return { name = "targets", dao = "kong.db.dao.targets", primary_key = { "id" }, + cache_key = { "upstream", "target" }, endpoint_key = "target", workspaceable = true, fields = { diff --git a/spec/02-integration/04-admin_api/08-targets_routes_spec.lua b/spec/02-integration/04-admin_api/08-targets_routes_spec.lua index bed290208d67..d7a737690c3b 100644 --- a/spec/02-integration/04-admin_api/08-targets_routes_spec.lua +++ b/spec/02-integration/04-admin_api/08-targets_routes_spec.lua @@ -142,7 +142,7 @@ describe("Admin API #" .. strategy, function() end end) - it_content_types("updates and does not create duplicated targets (#deprecated)", function(content_type) + it_content_types("refuses to create duplicated targets", function(content_type) return function() local upstream = bp.upstreams:insert { slots = 10 } local res = assert(client:send { @@ -159,10 +159,9 @@ describe("Admin API #" .. strategy, function() assert.equal("single-target.test:8080", json.target) assert.is_number(json.created_at) assert.is_string(json.id) - local id = json.id assert.are.equal(1, json.weight) - local res = assert(client:send { + local res2 = assert(client:send { method = "PUT", path = "/upstreams/" .. upstream.name .. "/targets/", body = { @@ -171,11 +170,7 @@ describe("Admin API #" .. strategy, function() }, headers = {["Content-Type"] = content_type} }) - local body = assert.response(res).has.status(200) - local json = cjson.decode(body) - assert.are.equal(100, json.weight) - assert.are.equal(id, json.id) - assert.equal("true", res.headers["Deprecation"]) + assert.response(res2).has.status(409) end end)