Skip to content

Commit

Permalink
fix: plugin hot reload should work on node
Browse files Browse the repository at this point in the history
  • Loading branch information
spacewander committed Oct 20, 2020
1 parent 26e55a3 commit 26ce6de
Show file tree
Hide file tree
Showing 17 changed files with 655 additions and 81 deletions.
51 changes: 50 additions & 1 deletion apisix/admin/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ local route = require("resty.radixtree")
local plugin = require("apisix.plugin")
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_time = ngx.time
local ngx_timer_at = ngx.timer.at
local ngx_worker_id = ngx.worker.id
local tonumber = tonumber
local str_lower = string.lower
local reload_event = "/apisix/admin/plugins/reload"
local ipairs = ipairs
local error = error
local events
local MAX_REQ_BODY = 1024 * 1024 * 1.5 -- 1.5 MiB

Expand Down Expand Up @@ -245,7 +249,7 @@ local function post_reload_plugins()
core.response.exit(401)
end

local success, err = events.post(reload_event, get_method(), ngx.time())
local success, err = events.post(reload_event, get_method(), ngx_time())
if not success then
core.response.exit(500, err)
end
Expand All @@ -254,9 +258,40 @@ local function post_reload_plugins()
end


local function sync_local_conf_to_etcd()
core.log.warn("sync local conf to etcd")

local local_conf = core.config.local_conf()

local plugins = {}
for _, name in ipairs(local_conf.plugins) do
core.table.insert(plugins, {
name = name,
})
end

for _, name in ipairs(local_conf.stream_plugins) do
core.table.insert(plugins, {
name = name,
stream = true,
})
end

-- need to store all plugins name into one key so that it can be updated atomically
local res, err = core.etcd.set("/plugins", plugins)
if not res then
core.log.error("failed to set plugins: ", err)
end
end


local function reload_plugins(data, event, source, pid)
core.log.info("start to hot reload plugins")
plugin.load()

if ngx_worker_id() == 0 then
sync_local_conf_to_etcd()
end
end


Expand Down Expand Up @@ -294,6 +329,20 @@ function _M.init_worker()
events = require("resty.worker.events")

events.register(reload_plugins, reload_event, "PUT")

if ngx_worker_id() == 0 then
local ok, err = ngx_timer_at(0, function(premature)
if premature then
return
end

sync_local_conf_to_etcd()
end)

if not ok then
error("failed to sync local configure to etcd: " .. err)
end
end
end


Expand Down
1 change: 1 addition & 0 deletions apisix/core.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ return {
config = config,
sleep = utils.sleep,
json = require("apisix.core.json"),
set = require("apisix.core.set"),
table = require("apisix.core.table"),
request = require("apisix.core.request"),
response = require("apisix.core.response"),
Expand Down
102 changes: 76 additions & 26 deletions apisix/core/config_etcd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,6 @@ local function sync_data(self)
return false, err
end

if not dir_res.nodes then
dir_res.nodes = {}
end

if self.values then
for i, val in ipairs(self.values) do
if val and val.clean_handlers then
Expand All @@ -203,19 +199,14 @@ local function sync_data(self)
self.values_hash = nil
end

self.values = new_tab(#dir_res.nodes, 0)
self.values_hash = new_tab(0, #dir_res.nodes)

local changed = false
for _, item in ipairs(dir_res.nodes) do
local key = short_key(self, item.key)
local data_valid = true
if type(item.value) ~= "table" then
data_valid = false
log.error("invalid item data of [", self.key .. "/" .. key,
"], val: ", item.value,
", it shoud be a object")
end

if self.single_item then
self.values = new_tab(1, 0)
self.values_hash = new_tab(0, 1)

local item = dir_res
local data_valid = item.value ~= nil

if data_valid and self.item_schema then
data_valid, err = check_schema(self.item_schema, item.value)
Expand All @@ -228,8 +219,8 @@ local function sync_data(self)
if data_valid then
changed = true
insert_tab(self.values, item)
self.values_hash[key] = #self.values
item.value.id = key
self.values_hash[self.key] = #self.values

item.clean_handlers = {}

if self.filter then
Expand All @@ -238,6 +229,48 @@ local function sync_data(self)
end

self:upgrade_version(item.modifiedIndex)

else
if not dir_res.nodes then
dir_res.nodes = {}
end

self.values = new_tab(#dir_res.nodes, 0)
self.values_hash = new_tab(0, #dir_res.nodes)

for _, item in ipairs(dir_res.nodes) do
local key = short_key(self, item.key)
local data_valid = true
if type(item.value) ~= "table" then
data_valid = false
log.error("invalid item data of [", self.key .. "/" .. key,
"], val: ", item.value,
", it shoud be a object")
end

if data_valid and self.item_schema then
data_valid, err = check_schema(self.item_schema, item.value)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.encode(item.value))
end
end

if data_valid then
changed = true
insert_tab(self.values, item)
self.values_hash[key] = #self.values

item.value.id = key
item.clean_handlers = {}

if self.filter then
self.filter(item)
end
end

self:upgrade_version(item.modifiedIndex)
end
end

if headers then
Expand Down Expand Up @@ -285,9 +318,16 @@ local function sync_data(self)
end

local res_copy = res
-- waitdir will return [res] even for self.single_item = true
for _, res in ipairs(res_copy) do
local key = short_key(self, res.key)
if res.value and type(res.value) ~= "table" then
local key
if self.single_item then
key = self.key
else
key = short_key(self, res.key)
end

if res.value and not self.single_item and type(res.value) ~= "table" then
self:upgrade_version(res.modifiedIndex)
return false, "invalid item data of [" .. self.key .. "/" .. key
.. "], val: " .. res.value
Expand All @@ -314,10 +354,6 @@ local function sync_data(self)
return false
end

if self.filter then
self.filter(res)
end

local pre_index = self.values_hash[key]
if pre_index then
local pre_val = self.values[pre_index]
Expand All @@ -329,7 +365,10 @@ local function sync_data(self)
end

if res.value then
res.value.id = key
if not self.single_item then
res.value.id = key
end

self.values[pre_index] = res
res.clean_handlers = {}
log.info("update data by key: ", key)
Expand All @@ -345,7 +384,10 @@ local function sync_data(self)
res.clean_handlers = {}
insert_tab(self.values, res)
self.values_hash[key] = #self.values
res.value.id = key
if not self.single_item then
res.value.id = key
end

log.info("insert data by key: ", key)
end

Expand All @@ -372,6 +414,12 @@ local function sync_data(self)
self.sync_times = 0
end

-- /plugins' filter need to known self.values when it is called
-- so the filter should be called after self.values set.
if self.filter then
self.filter(res)
end

self.conf_version = self.conf_version + 1
end

Expand Down Expand Up @@ -476,6 +524,7 @@ function _M.new(key, opts)
local item_schema = opts and opts.item_schema
local filter_fun = opts and opts.filter
local timeout = opts and opts.timeout
local single_item = opts and opts.single_item

local obj = setmetatable({
etcd_cli = nil,
Expand All @@ -493,6 +542,7 @@ function _M.new(key, opts)
last_err = nil,
last_err_time = nil,
timeout = timeout,
single_item = single_item,
filter = filter_fun,
}, mt)

Expand Down
73 changes: 53 additions & 20 deletions apisix/core/config_yaml.lua
Original file line number Diff line number Diff line change
Expand Up @@ -148,25 +148,18 @@ local function sync_data(self)
self.values = nil
end

self.values = new_tab(#items, 0)
self.values_hash = new_tab(0, #items)
if self.single_item then
-- treat items as a single item
self.values = new_tab(1, 0)
self.values_hash = new_tab(0, 1)

local err
for i, item in ipairs(items) do
local id = tostring(i)
local data_valid = true
if type(item) ~= "table" then
data_valid = false
log.error("invalid item data of [", self.key .. "/" .. id,
"], val: ", json.delay_encode(item),
", it shoud be a object")
end

local key = item.id or "arr_" .. i
local item = items
local conf_item = {value = item, modifiedIndex = apisix_yaml_ctime,
key = "/" .. self.key .. "/" .. key}
key = "/" .. self.key}

if data_valid and self.item_schema then
local data_valid = true
local err
if self.item_schema then
data_valid, err = check_schema(self.item_schema, item)
if not data_valid then
log.error("failed to check item data of [", self.key,
Expand All @@ -176,16 +169,54 @@ local function sync_data(self)

if data_valid then
insert_tab(self.values, conf_item)
local item_id = conf_item.value.id or self.key .. "#" .. id
item_id = tostring(item_id)
self.values_hash[item_id] = #self.values
conf_item.value.id = item_id
self.values_hash[self.key] = #self.values
conf_item.clean_handlers = {}

if self.filter then
self.filter(conf_item)
end
end

else
self.values = new_tab(#items, 0)
self.values_hash = new_tab(0, #items)

local err
for i, item in ipairs(items) do
local id = tostring(i)
local data_valid = true
if type(item) ~= "table" then
data_valid = false
log.error("invalid item data of [", self.key .. "/" .. id,
"], val: ", json.delay_encode(item),
", it shoud be a object")
end

local key = item.id or "arr_" .. i
local conf_item = {value = item, modifiedIndex = apisix_yaml_ctime,
key = "/" .. self.key .. "/" .. key}

if data_valid and self.item_schema then
data_valid, err = check_schema(self.item_schema, item)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.delay_encode(item))
end
end

if data_valid then
insert_tab(self.values, conf_item)
local item_id = conf_item.value.id or self.key .. "#" .. id
item_id = tostring(item_id)
self.values_hash[item_id] = #self.values
conf_item.value.id = item_id
conf_item.clean_handlers = {}

if self.filter then
self.filter(conf_item)
end
end
end
end

self.conf_version = apisix_yaml_ctime
Expand Down Expand Up @@ -263,6 +294,7 @@ function _M.new(key, opts)
local automatic = opts and opts.automatic
local item_schema = opts and opts.item_schema
local filter_fun = opts and opts.filter
local single_item = opts and opts.single_item

-- like /routes and /upstreams, remove first char `/`
if key then
Expand All @@ -281,6 +313,7 @@ function _M.new(key, opts)
last_err = nil,
last_err_time = nil,
key = key,
single_item = single_item,
filter = filter_fun,
}, mt)

Expand Down
Loading

0 comments on commit 26ce6de

Please sign in to comment.