Skip to content

Commit

Permalink
feat(dao) implement cascade deletion
Browse files Browse the repository at this point in the history
This implements a "delete hook" system to trigger cascade deletion of
foreign rows in Cassandra. To do so, all foreign columns (representing foreign keys) must now be queryable (which means be indexed by Cassandra).

A "delete hook" is added to a parent DAO if any other DAO has foreign
rows to that DAO. Ex: APIs are parents to plugins_credentials, hence the
`apis` DAO will have a delete hook on the deletion of a row, to also
delete any related `plugins_configurations`.

- Move cascade delete tests to another file, with all use cases of
  current cascade deletion.
- Add a migration for oauth2 plugin to index the `consumer_id` field.
- Remove obsolete overridden delete methods of apis and consumers DAOs.

This should be a solution to #438 and is an improved implementation of #107
  • Loading branch information
thibaultcha committed Aug 25, 2015
1 parent 2e50107 commit 554a19a
Show file tree
Hide file tree
Showing 12 changed files with 386 additions and 189 deletions.
27 changes: 0 additions & 27 deletions kong/dao/cassandra/apis.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,4 @@ function Apis:find_all()
return apis
end

-- @override
function Apis:delete(where_t)
local ok, err = Apis.super.delete(self, where_t)
if not ok then
return false, err
end

-- delete all related plugins configurations
local plugins_dao = self._factory.plugins_configurations
local select_q, columns = query_builder.select(plugins_dao._table, {api_id = where_t.id}, plugins_dao._column_family_details)

for rows, err in plugins_dao:execute(select_q, columns, {api_id = where_t.id}, {auto_paging = true}) do
if err then
return nil, err
end

for _, row in ipairs(rows) do
local ok_del_plugin, err = plugins_dao:delete({id = row.id})
if not ok_del_plugin then
return nil, err
end
end
end

return ok
end

return {apis = Apis}
63 changes: 58 additions & 5 deletions kong/dao/cassandra/base_dao.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ function BaseDao:new(properties)

self._properties = properties
self._statements_cache = {}
self._cascade_delete_hooks = {}
end

-- Marshall an entity. Does nothing by default,
Expand Down Expand Up @@ -258,9 +259,7 @@ function BaseDao:execute(query, columns, args_to_bind, options)
end

-- Execute statement
local results, err = self:_execute(query, args, options)

return results, err
return self:_execute(query, args, options)
end

-- Check all fields marked with a `unique` in the schema do not already exist.
Expand Down Expand Up @@ -551,10 +550,50 @@ function BaseDao:find(page_size, paging_state)
return self:find_by_keys(nil, page_size, paging_state)
end

-- Add a delete hook on a parent DAO of a foreign row.
-- The delete hook will basically "cascade delete" all foreign rows of a parent row.
-- @see cassandra/factory.lua ':load_daos()'
-- @param foreign_dao_name Name (string) of the parent DAO
-- @param foreign_column Name (string) of the foreign column
-- @param parent_column Name (string) of the parent column identifying the parent row
function BaseDao:add_delete_hook(foreign_dao_name, foreign_column, parent_column)

-- The actual delete hook
-- @param deleted_primary_key The value of the deleted row's primary key
-- @return boolean True if success, false otherwise
-- @return table A DAOError in case of error
local delete_hook = function(deleted_primary_key)
local foreign_dao = self._factory[foreign_dao_name]
local select_args = {
[foreign_column] = deleted_primary_key[parent_column]
}

-- Iterate over all rows with the foreign key and delete them.
-- Rows need to be deleted by PRIMARY KEY, and we only have the value of the foreign key, hence we need
-- to retrieve all rows with the foreign key, and then delete them, identifier by their own primary key.
local select_q, columns = query_builder.select(foreign_dao._table, select_args, foreign_dao._column_family_details )
for rows, err in foreign_dao:execute(select_q, columns, select_args, {auto_paging = true}) do
if err then
return false, err
end
for _, row in ipairs(rows) do
local ok_del_foreign_row, err = foreign_dao:delete(row)
if not ok_del_foreign_row then
return false, err
end
end
end

return true
end

table.insert(self._cascade_delete_hooks, delete_hook)
end

-- Delete the row at a given PRIMARY KEY.
-- @param `where_t` A table containing the PRIMARY KEY (columns/values) of the row to delete
-- @return `success` True if deleted, false if otherwise or not found
-- @return `error` Error if any during the query execution
-- @return `error` Error if any during the query execution or the cascade delete hook
function BaseDao:delete(where_t)
assert(self._primary_key ~= nil and type(self._primary_key) == "table" , "Entity does not have a primary_key")
assert(where_t ~= nil and type(where_t) == "table", "where_t must be a table")
Expand All @@ -569,7 +608,21 @@ function BaseDao:delete(where_t)

local t_primary_key = extract_primary_key(where_t, self._primary_key, self._clustering_key)
local delete_q, where_columns = query_builder.delete(self._table, t_primary_key)
return self:execute(delete_q, where_columns, where_t)
local results, err = self:execute(delete_q, where_columns, where_t)
if err then
return false, err
end

-- Delete successful, trigger cascade delete hooks if any.
local foreign_err
for _, hook in ipairs(self._cascade_delete_hooks) do
foreign_err = select(2, hook(t_primary_key))
if foreign_err then
return false, foreign_err
end
end

return results
end

-- Truncate the table of this DAO
Expand Down
30 changes: 1 addition & 29 deletions kong/dao/cassandra/consumers.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
local BaseDao = require "kong.dao.cassandra.base_dao"
local query_builder = require "kong.dao.cassandra.query_builder"
local consumers_schema = require "kong.dao.schemas.consumers"

local Consumers = BaseDao:extend()
Expand All @@ -11,31 +10,4 @@ function Consumers:new(properties)
Consumers.super.new(self, properties)
end

-- @override
function Consumers:delete(where_t)
local ok, err = Consumers.super.delete(self, where_t)
if not ok then
return false, err
end

local plugins_dao = self._factory.plugins_configurations
local select_q, columns = query_builder.select(plugins_dao._table, {consumer_id = where_t.id}, plugins_dao._column_family_details)

-- delete all related plugins configurations
for rows, err in plugins_dao:execute(select_q, columns, {consumer_id = where_t.id}, {auto_paging = true}) do
if err then
return nil, err
end

for _, row in ipairs(rows) do
local ok_del_plugin, err = plugins_dao:delete({id = row.id})
if not ok_del_plugin then
return nil, err
end
end
end

return ok
end

return { consumers = Consumers }
return {consumers = Consumers}
51 changes: 40 additions & 11 deletions kong/dao/cassandra/factory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,53 @@ function CassandraFactory:new(properties, plugins)

-- Load plugins DAOs
if plugins then
for _, v in ipairs(plugins) do
local loaded, plugin_daos_mod = utils.load_module_if_exists("kong.plugins."..v..".daos")
if loaded then
if ngx then
ngx.log(ngx.DEBUG, "Loading DAO for plugin: "..v)
end
self:load_daos(plugin_daos_mod)
elseif ngx then
ngx.log(ngx.DEBUG, "No DAO loaded for plugin: "..v)
self:load_plugins(plugins)
end
end

-- Load an array of plugins (array of plugins names). If any of those plugins have DAOs,
-- they will be loaded into the factory.
-- @param plugins Array of plugins names
function CassandraFactory:load_plugins(plugins)
for _, v in ipairs(plugins) do
local loaded, plugin_daos_mod = utils.load_module_if_exists("kong.plugins."..v..".daos")
if loaded then
if ngx then
ngx.log(ngx.DEBUG, "Loading DAO for plugin: "..v)
end
self:load_daos(plugin_daos_mod)
elseif ngx then
ngx.log(ngx.DEBUG, "No DAO loaded for plugin: "..v)
end
end
end

-- Load a plugin's DAOs (plugins can have more than one DAO) in the factory and create cascade delete hooks.
-- Cascade delete hooks are triggered when a parent of a foreign row is deleted.
-- @param plugin_daos A table with key/values representing daos names and instances.
function CassandraFactory:load_daos(plugin_daos)
local dao
for name, plugin_dao in pairs(plugin_daos) do
self.daos[name] = plugin_dao(self._properties)
self.daos[name]._factory = self
dao = plugin_dao(self._properties)
dao._factory = self
self.daos[name] = dao
if dao._schema then
-- Check for any foreign relations to trigger cascade deletes
for field_name, field in pairs(dao._schema.fields) do
if field.foreign ~= nil then
-- Foreign key columns need to be queryable, hence they need to have an index
assert(field.queryable, "Foreign property "..field_name.." of shema "..name.." must be queryable (have an index)")

local parent_dao_name, parent_column = unpack(stringy.split(field.foreign, ":"))
assert(parent_dao_name ~= nil, "Foreign property "..field_name.." of schema "..name.." must contain 'parent_dao:parent_column")
assert(parent_column ~= nil, "Foreign property "..field_name.." of schema "..name.." must contain 'parent_dao:parent_column")

-- Add delete hook to the parent DAO
local parent_dao = self[parent_dao_name]
parent_dao:add_delete_hook(name, field_name, parent_column)
end
end
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion kong/plugins/basicauth/daos.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ local SCHEMA = {
fields = {
id = { type = "id", dao_insert_value = true },
created_at = { type = "timestamp", dao_insert_value = true },
consumer_id = { type = "id", required = true, foreign = "consumers:id" },
consumer_id = { type = "id", required = true, queryable = true, foreign = "consumers:id" },
username = { type = "string", required = true, unique = true, queryable = true },
password = { type = "string" }
}
Expand Down
2 changes: 1 addition & 1 deletion kong/plugins/keyauth/daos.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ local SCHEMA = {
fields = {
id = { type = "id", dao_insert_value = true },
created_at = { type = "timestamp", dao_insert_value = true },
consumer_id = { type = "id", required = true, foreign = "consumers:id" },
consumer_id = { type = "id", required = true, queryable = true, foreign = "consumers:id" },
key = { type = "string", required = false, unique = true, queryable = true, func = generate_if_missing }
}
}
Expand Down
8 changes: 4 additions & 4 deletions kong/plugins/oauth2/access.lua
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ local function issue_token(conf)
response_params = {[ERROR] = "access_denied", error_description = "You must use HTTPS"}
else
local grant_type = parameters[GRANT_TYPE]
if not (grant_type == GRANT_AUTHORIZATION_CODE or
grant_type == GRANT_REFRESH_TOKEN or
(conf.enable_client_credentials and grant_type == GRANT_CLIENT_CREDENTIALS) or
if not (grant_type == GRANT_AUTHORIZATION_CODE or
grant_type == GRANT_REFRESH_TOKEN or
(conf.enable_client_credentials and grant_type == GRANT_CLIENT_CREDENTIALS) or
(conf.enable_password_grant and grant_type == GRANT_PASSWORD)) then
response_params = {[ERROR] = "invalid_request", error_description = "Invalid "..GRANT_TYPE}
end
Expand Down Expand Up @@ -353,7 +353,7 @@ function _M.execute(conf)
-- Check if the API has a path and if it's being invoked with the path resolver
local path_prefix = (ngx.ctx.api.path and stringy.startswith(ngx.var.request_uri, ngx.ctx.api.path)) and ngx.ctx.api.path or ""
if stringy.endswith(path_prefix, "/") then
path_prefix = path_prefix:sub(1, path_prefix:len() - 1)
path_prefix = path_prefix:sub(1, path_prefix:len() - 1)
end

if ngx.req.get_method() == "POST" then
Expand Down
4 changes: 2 additions & 2 deletions kong/plugins/oauth2/daos.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ local OAUTH2_CREDENTIALS_SCHEMA = {
primary_key = {"id"},
fields = {
id = { type = "id", dao_insert_value = true },
consumer_id = { type = "id", required = true, foreign = "consumers:id" },
consumer_id = { type = "id", required = true, queryable = true, foreign = "consumers:id" },
name = { type = "string", required = true },
client_id = { type = "string", required = false, unique = true, queryable = true, func = generate_if_missing },
client_secret = { type = "string", required = false, unique = true, func = generate_if_missing },
Expand All @@ -45,7 +45,7 @@ local OAUTH2_TOKENS_SCHEMA = {
primary_key = {"id"},
fields = {
id = { type = "id", dao_insert_value = true },
credential_id = { type = "id", required = true, foreign = "oauth2_credentials:id" },
credential_id = { type = "id", required = true, queryable = true, foreign = "oauth2_credentials:id" },
token_type = { type = "string", required = true, enum = { BEARER }, default = BEARER },
access_token = { type = "string", required = false, unique = true, queryable = true, immutable = true, func = generate_if_missing },
refresh_token = { type = "string", required = false, unique = true, queryable = true, immutable = true, func = generate_refresh_token },
Expand Down
13 changes: 13 additions & 0 deletions kong/plugins/oauth2/migrations/cassandra.lua
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ local Migrations = {
DROP TABLE oauth2_tokens;
]]
end
},
{
name = "2015-08-24-215800_cascade_delete_index",
up = function()
return [[
CREATE INDEX IF NOT EXISTS oauth2_credential_id_idx ON oauth2_tokens(credential_id);
]]
end,
down = function()
return [[
DROP INDEX oauth2_credential_id_idx;
]]
end
}
}

Expand Down
Loading

0 comments on commit 554a19a

Please sign in to comment.