From 9d55aa27cf0fec1cdc94e8abe36298e1e1b17d45 Mon Sep 17 00:00:00 2001 From: spacewander Date: Mon, 8 Mar 2021 16:44:52 +0800 Subject: [PATCH] feat: load etcd configuration when apisix starts Fix #3370 Signed-off-by: spacewander --- apisix/cli/etcd.lua | 14 +- apisix/constants.lua | 38 ++++++ apisix/core/config_etcd.lua | 258 +++++++++++++++++++++++++----------- apisix/core/config_yaml.lua | 1 + apisix/core/etcd.lua | 8 +- apisix/init.lua | 17 ++- apisix/patch.lua | 177 +++++++++++++++++++++++++ apisix/plugin.lua | 1 + t/APISIX.pm | 2 + t/admin/plugins-reload.t | 11 +- t/core/etcd.t | 44 ++++++ t/misc/patch.t | 57 ++++++++ 12 files changed, 531 insertions(+), 97 deletions(-) create mode 100644 apisix/constants.lua create mode 100644 apisix/patch.lua create mode 100644 t/misc/patch.t diff --git a/apisix/cli/etcd.lua b/apisix/cli/etcd.lua index bb34462271b93..621c001d0cc71 100644 --- a/apisix/cli/etcd.lua +++ b/apisix/cli/etcd.lua @@ -17,6 +17,7 @@ local base64_encode = require("base64").encode local dkjson = require("dkjson") +local constants = require("apisix.constants") local util = require("apisix.cli.util") local file = require("apisix.cli.file") local http = require("socket.http") @@ -25,6 +26,7 @@ local ltn12 = require("ltn12") local type = type local ipairs = ipairs +local pairs = pairs local print = print local tonumber = tonumber local str_format = string.format @@ -248,11 +250,15 @@ function _M.init(env, args) end - for _, dir_name in ipairs({"/routes", "/upstreams", "/services", - "/plugins", "/consumers", "/ssl", - "/global_rules", "/stream_routes", "/proto", - "/plugin_metadata", "/plugin_configs"}) do + local dirs = {} + for name in pairs(constants.HTTP_ETCD_DIRECTORY) do + dirs[name] = true + end + for name in pairs(constants.STREAM_ETCD_DIRECTORY) do + dirs[name] = true + end + for dir_name in pairs(dirs) do local key = (etcd_conf.prefix or "") .. dir_name .. "/" local put_url = host .. "/v3/kv/put" diff --git a/apisix/constants.lua b/apisix/constants.lua new file mode 100644 index 0000000000000..c668959534ab0 --- /dev/null +++ b/apisix/constants.lua @@ -0,0 +1,38 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +return { + HTTP_ETCD_DIRECTORY = { + ["/upstreams"] = true, + ["/plugins"] = true, + ["/ssl"] = true, + ["/stream_routes"] = true, + ["/plugin_metadata"] = true, + ["/routes"] = true, + ["/services"] = true, + ["/consumers"] = true, + ["/global_rules"] = true, + ["/proto"] = true, + ["/plugin_configs"] = true, + }, + STREAM_ETCD_DIRECTORY = { + ["/upstreams"] = true, + ["/plugins"] = true, + ["/ssl"] = true, + ["/stream_routes"] = true, + ["/plugin_metadata"] = true, + }, +} diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index ee8227c5b8e0c..674d06cb59c23 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -20,6 +20,7 @@ local config_local = require("apisix.core.config_local") local log = require("apisix.core.log") local json = require("apisix.core.json") local etcd_apisix = require("apisix.core.etcd") +local core_str = require("apisix.core.string") local etcd = require("resty.etcd") local new_tab = require("table.new") local clone_tab = require("table.clone") @@ -39,7 +40,12 @@ local xpcall = xpcall local debug = debug local error = error local rand = math.random +local constants = require("apisix.constants") + + +local is_http = ngx.config.subsystem == "http" local created_obj = {} +local loaded_configuration = {} local _M = { @@ -81,7 +87,7 @@ local function getkey(etcd_cli, key) end -local function readdir(etcd_cli, key) +local function readdir(etcd_cli, key, formatter) if not etcd_cli then return nil, "not inited" end @@ -96,7 +102,7 @@ local function readdir(etcd_cli, key) return nil, "failed to read etcd dir" end - res, err = etcd_apisix.get_format(res, key .. '/', true) + res, err = etcd_apisix.get_format(res, key .. '/', true, formatter) if not res then return nil, err end @@ -168,46 +174,64 @@ function _M.upgrade_version(self, new_ver) end -local function sync_data(self) - if not self.key then - return nil, "missing 'key' arguments" - end +local function load_full_data(self, dir_res, headers) + local err + local changed = false - if self.need_reload then - local res, err = readdir(self.etcd_cli, self.key) - if not res then - return false, err - end + if self.single_item then + self.values = new_tab(1, 0) + self.values_hash = new_tab(0, 1) - local dir_res, headers = res.body.node or {}, res.headers - log.debug("readdir key: ", self.key, " res: ", - json.delay_encode(dir_res)) - if not dir_res then - return false, err + local item = dir_res + local data_valid = item.value ~= nil + + if data_valid and self.item_schema then + data_valid, err = check_schema(self.item_schema, item.value) + if not data_valid then + log.error("failed to check item data of [", self.key, + "] err:", err, " ,val: ", json.encode(item.value)) + end end - if self.values then - for i, val in ipairs(self.values) do - if val and val.clean_handlers then - for _, clean_handler in ipairs(val.clean_handlers) do - clean_handler(val) - end - val.clean_handlers = nil - end + if data_valid and self.checker then + data_valid, err = self.checker(item.value) + if not data_valid then + log.error("failed to check item data of [", self.key, + "] err:", err, " ,val: ", json.delay_encode(item.value)) end + end - self.values = nil - self.values_hash = nil + if data_valid then + changed = true + insert_tab(self.values, item) + self.values_hash[self.key] = #self.values + + item.clean_handlers = {} + + if self.filter then + self.filter(item) + end end - local changed = false + self:upgrade_version(item.modifiedIndex) - if self.single_item then - self.values = new_tab(1, 0) - self.values_hash = new_tab(0, 1) + else + if not dir_res.nodes then + dir_res.nodes = {} + end - local item = dir_res - local data_valid = item.value ~= nil + self.values = new_tab(#dir_res.nodes, 0) + self.values_hash = new_tab(0, #dir_res.nodes) + + for _, item in ipairs(dir_res.nodes) do + local key = short_key(self, item.key) + local data_valid = true + if type(item.value) ~= "table" then + data_valid = false + log.error("invalid item data of [", self.key .. "/" .. key, + "], val: ", item.value, + ", it should be an object") + end if data_valid and self.item_schema then data_valid, err = check_schema(self.item_schema, item.value) @@ -228,8 +252,9 @@ local function sync_data(self) if data_valid then changed = true insert_tab(self.values, item) - self.values_hash[self.key] = #self.values + self.values_hash[key] = #self.values + item.value.id = key item.clean_handlers = {} if self.filter then @@ -238,67 +263,55 @@ local function sync_data(self) end self:upgrade_version(item.modifiedIndex) + end + end - else - if not dir_res.nodes then - dir_res.nodes = {} - end + if headers then + self:upgrade_version(headers["X-Etcd-Index"]) + end - self.values = new_tab(#dir_res.nodes, 0) - self.values_hash = new_tab(0, #dir_res.nodes) - - for _, item in ipairs(dir_res.nodes) do - local key = short_key(self, item.key) - local data_valid = true - if type(item.value) ~= "table" then - data_valid = false - log.error("invalid item data of [", self.key .. "/" .. key, - "], val: ", item.value, - ", it should be an object") - end + if changed then + self.conf_version = self.conf_version + 1 + end - if data_valid and self.item_schema then - data_valid, err = check_schema(self.item_schema, item.value) - if not data_valid then - log.error("failed to check item data of [", self.key, - "] err:", err, " ,val: ", json.encode(item.value)) - end - end + self.need_reload = false +end - if data_valid and self.checker then - data_valid, err = self.checker(item.value) - if not data_valid then - log.error("failed to check item data of [", self.key, - "] err:", err, " ,val: ", json.delay_encode(item.value)) - end - end - if data_valid then - changed = true - insert_tab(self.values, item) - self.values_hash[key] = #self.values +local function sync_data(self) + if not self.key then + return nil, "missing 'key' arguments" + end - item.value.id = key - item.clean_handlers = {} + if self.need_reload then + local res, err = readdir(self.etcd_cli, self.key) + if not res then + return false, err + end - if self.filter then - self.filter(item) + local dir_res, headers = res.body.node or {}, res.headers + log.debug("readdir key: ", self.key, " res: ", + json.delay_encode(dir_res)) + if not dir_res then + return false, err + end + + if self.values then + for i, val in ipairs(self.values) do + if val and val.clean_handlers then + for _, clean_handler in ipairs(val.clean_handlers) do + clean_handler(val) end + val.clean_handlers = nil end - - self:upgrade_version(item.modifiedIndex) end - end - if headers then - self:upgrade_version(headers["X-Etcd-Index"]) + self.values = nil + self.values_hash = nil end - if changed then - self.conf_version = self.conf_version + 1 - end + load_full_data(self, dir_res, headers) - self.need_reload = false return true end @@ -608,6 +621,16 @@ function _M.new(key, opts) return nil, "missing `key` argument" end + if loaded_configuration[key] then + local res = loaded_configuration[key] + loaded_configuration[key] = nil -- tried to load + + log.notice("use loaded configuration ", key) + + local dir_res, headers = res.body, res.headers + load_full_data(obj, dir_res, headers) + end + ngx_timer_at(0, _automatic_fetch, obj) else @@ -665,4 +688,79 @@ function _M.server_version(self) end +local function create_formatter(prefix) + return function (res) + res.body.nodes = {} + + local dirs + if is_http then + dirs = constants.HTTP_ETCD_DIRECTORY + else + dirs = constants.STREAM_ETCD_DIRECTORY + end + + local curr_dir_data + local curr_key + for _, item in ipairs(res.body.kvs) do + if curr_dir_data then + if core_str.has_prefix(item.key, curr_key) then + table.insert(curr_dir_data, etcd_apisix.kvs_to_node(item)) + goto CONTINUE + end + + curr_dir_data = nil + end + + local key = sub_str(item.key, #prefix + 1) + if dirs[key] then + -- single item + loaded_configuration[key] = { + body = etcd_apisix.kvs_to_node(item), + headers = res.headers, + } + else + local key = sub_str(item.key, #prefix + 1, #item.key - 1) + -- ensure the same key hasn't been handled as single item + if dirs[key] and not loaded_configuration[key] then + loaded_configuration[key] = { + body = { + nodes = {}, + }, + headers = res.headers, + } + curr_dir_data = loaded_configuration[key].body.nodes + curr_key = item.key + end + end + + ::CONTINUE:: + end + + return res + end +end + + +function _M.init() + local etcd_cli, err = get_etcd() + if not etcd_cli then + return nil, "failed to start a etcd instance: " .. err + end + + local local_conf, err = config_local.local_conf() + if not local_conf then + return nil, err + end + + local etcd_conf = local_conf.etcd + local prefix = etcd_conf.prefix + local res, err = readdir(etcd_cli, prefix, create_formatter(prefix)) + if not res then + return nil, err + end + + return true +end + + return _M diff --git a/apisix/core/config_yaml.lua b/apisix/core/config_yaml.lua index 04b450d863b63..fbf3106f09f50 100644 --- a/apisix/core/config_yaml.lua +++ b/apisix/core/config_yaml.lua @@ -360,6 +360,7 @@ end function _M.init() read_apisix_yaml() + return true end diff --git a/apisix/core/etcd.lua b/apisix/core/etcd.lua index 9099c89e54677..6a3072853e788 100644 --- a/apisix/core/etcd.lua +++ b/apisix/core/etcd.lua @@ -55,6 +55,7 @@ end _M.new = new +-- convert ETCD v3 entry to v2 one local function kvs_to_node(kvs) local node = {} node.key = kvs.key @@ -63,6 +64,7 @@ local function kvs_to_node(kvs) node.modifiedIndex = tonumber(kvs.mod_revision) return node end +_M.kvs_to_node = kvs_to_node local function kvs_to_nodes(res) res.body.node.dir = true @@ -84,7 +86,7 @@ end -- When `is_dir` is true, returns the value of both the dir key and its descendants. -- Otherwise, return the value of key only. -function _M.get_format(res, real_key, is_dir) +function _M.get_format(res, real_key, is_dir, formatter) if res.body.error == "etcdserver: user name is empty" then return nil, "insufficient credentials code: 401" end @@ -97,6 +99,10 @@ function _M.get_format(res, real_key, is_dir) res.body.action = "get" + if formatter then + return formatter(res) + end + if not is_dir then local key = res.body.kvs[1].key if key ~= real_key then diff --git a/apisix/init.lua b/apisix/init.lua index a8d851c6bbc6f..beec7e677e756 100644 --- a/apisix/init.lua +++ b/apisix/init.lua @@ -15,6 +15,7 @@ -- limitations under the License. -- local require = require +require("apisix.patch").patch() local core = require("apisix.core") local plugin = require("apisix.plugin") local plugin_config = require("apisix.plugin_config") @@ -80,8 +81,11 @@ function _M.http_init(args) core.log.error("failed to enable privileged_agent: ", err) end - if core.config == require("apisix.core.config_yaml") then - core.config.init() + if core.config.init then + local ok, err = core.config.init() + if not ok then + core.log.error("failed to load the configuration: ", err) + end end end @@ -111,9 +115,9 @@ function _M.http_init_worker() require("apisix.timers").init_worker() + plugin.init_worker() router.http_init_worker() require("apisix.http.service").init_worker() - plugin.init_worker() plugin_config.init_worker() require("apisix.consumer").init_worker() @@ -699,8 +703,11 @@ end function _M.stream_init() core.log.info("enter stream_init") - if core.config == require("apisix.core.config_yaml") then - core.config.init() + if core.config.init then + local ok, err = core.config.init() + if not ok then + core.log.error("failed to load the configuration: ", err) + end end end diff --git a/apisix/patch.lua b/apisix/patch.lua new file mode 100644 index 0000000000000..124e8bd55f1e2 --- /dev/null +++ b/apisix/patch.lua @@ -0,0 +1,177 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local socket = require("socket") +local ssl = require("ssl") +local get_phase = ngx.get_phase +local ngx_socket = ngx.socket +local original_tcp = ngx.socket.tcp +local concat_tab = table.concat +local new_tab = require("table.new") +local ipairs = ipairs +local select = select +local setmetatable = setmetatable +local type = type + + +local _M = {} + + +local function flatten(args) + local buf = new_tab(#args, 0) + for i, v in ipairs(args) do + local ty = type(v) + if ty == "table" then + buf[i] = flatten(v) + elseif ty == "boolean" then + buf[i] = v and "true" or "false" + elseif ty == "nil" then + buf[i] = "nil" + else + buf[i] = v + end + end + return concat_tab(buf) +end + + +local luasocket_wrapper = { + send = function(self, ...) + if select('#', ...) == 1 and type(select(1, ...)) == "string" then + -- fast path + return self.sock:send(...) + end + + -- luasocket's send only accepts a single string + return self.sock:send(flatten({...})) + end, + + getreusedtimes = function () + return 0 + end, + setkeepalive = function () + return true + end, + + settimeout = function (self, time) + if time then + time = time / 1000 + end + return self.sock:settimeout(time) + end, + settimeouts = function (self, connect_time, read_time, write_time) + connect_time = connect_time or 0 + read_time = read_time or 0 + write_time = write_time or 0 + + -- set the max one as the timeout + local time = connect_time + if time < read_time then + time = read_time + end + if time < write_time then + time = write_time + end + + if time > 0 then + time = time / 1000 + else + time = nil + end + return self.sock:settimeout(time) + end, + + sslhandshake = function (self, verify, opts) + if opts == nil then + opts = {} + end + + local params = { + mode = "client", + protocol = opts.ssl_version or "any", + key = opts.key, + certificate = opts.cert, + cafile = opts.cafile, + verify = verify and "peer" or "none", + options = { + "all", + "no_sslv2", + "no_sslv3", + "no_tlsv1" + } + } + + local sec_sock, err = ssl.wrap(self.sock, params) + if not sec_sock then + return false, err + end + + local success + success, err = sec_sock:dohandshake() + if not success then + return false, err + end + + self.sock = sec_sock + return true + end +} + + +local mt = { + __index = function(self, key) + local sock = self.sock + local fn = luasocket_wrapper[key] + if fn then + self[key] = fn + return fn + end + + local origin = sock[key] + if type(origin) ~= "function" then + return origin + end + + fn = function(_, ...) + return origin(sock, ...) + end + + self[key] = fn + return fn + end +} + +local function luasocket_tcp() + local sock = socket.tcp() + return setmetatable({sock = sock}, mt) +end + + +function _M.patch() + -- make linter happy + -- luacheck: ignore + ngx_socket.tcp = function () + local phase = get_phase() + if phase ~= "init" and phase ~= "init_worker" then + return original_tcp() + end + + return luasocket_tcp() + end +end + + +return _M diff --git a/apisix/plugin.lua b/apisix/plugin.lua index 4cbf722c1ff3c..0a270b62b31b0 100644 --- a/apisix/plugin.lua +++ b/apisix/plugin.lua @@ -486,6 +486,7 @@ do _M.load(plugins_conf) end, }) + if not plugins_conf then error("failed to create etcd instance for fetching /plugins : " .. err) end diff --git a/t/APISIX.pm b/t/APISIX.pm index feb82d0c54636..5d8ff8e4ffa2a 100644 --- a/t/APISIX.pm +++ b/t/APISIX.pm @@ -350,6 +350,7 @@ _EOC_ $block->set_value("stream_server_config", $stream_server_config); } + my $extra_init_by_lua = $block->extra_init_by_lua // ""; my $init_by_lua_block = $block->init_by_lua_block // <<_EOC_; if os.getenv("APISIX_ENABLE_LUACOV") == "1" then require("luacov.runner")("t/apisix.luacov") @@ -363,6 +364,7 @@ _EOC_ dns_resolver = $dns_addrs_tbl_str, } apisix.http_init(args) + $extra_init_by_lua _EOC_ my $http_config = $block->http_config // ''; diff --git a/t/admin/plugins-reload.t b/t/admin/plugins-reload.t index e03597039fe6b..9c60c1785674f 100644 --- a/t/admin/plugins-reload.t +++ b/t/admin/plugins-reload.t @@ -78,15 +78,11 @@ location /t { plugins_conf, err = core.config.new("/plugins", { automatic = true, single_item = true, - filter = function() - -- called twice, one for readir, another for waitdir + filter = function(item) + -- called twice, one for worker start, another for sync data from admin ngx.log(ngx.WARN, "reload plugins on node ", before_reload and "before reload" or "after reload") - local plugins = {} - for _, conf_value in config_util.iterate_values(plugins_conf.values) do - core.table.insert_tail(plugins, unpack(conf_value.value)) - end - ngx.log(ngx.WARN, require("toolkit.json").encode(plugins)) + ngx.log(ngx.WARN, require("toolkit.json").encode(item.value)) end, }) if not plugins_conf then @@ -124,6 +120,7 @@ done qr/reload plugins on node \w+ reload/ --- grep_error_log_out reload plugins on node before reload +reload plugins on node before reload reload plugins on node after reload --- error_log filter(): [{"name":"jwt-auth"},{"name":"mqtt-proxy","stream":true}] diff --git a/t/core/etcd.t b/t/core/etcd.t index 761dd4cb842ad..5a7ac4484c730 100644 --- a/t/core/etcd.t +++ b/t/core/etcd.t @@ -371,3 +371,47 @@ GET /t ab --- no_error_log [error] + + + +=== TEST 7: run etcd in init phase +--- init_by_lua_block + local apisix = require("apisix") + apisix.http_init() + local etcd = require("apisix.core.etcd") + assert(etcd.set("/a", "ab")) + + local res, err = etcd.get("/a") + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.log(ngx.WARN, res.body.node.value) + + local res, err = etcd.delete("/a") + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.log(ngx.WARN, res.status) + + local res, err = etcd.get("/a") + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.log(ngx.WARN, res.status) +--- config + location /t { + return 200; + } +--- request +GET /t +--- no_error_log +[error] +--- grep_error_log eval +qr/init_by_lua:\d+: \S+/ +--- grep_error_log_out +init_by_lua:12: ab +init_by_lua:19: 200 +init_by_lua:26: 404 diff --git a/t/misc/patch.t b/t/misc/patch.t new file mode 100644 index 0000000000000..edce04ea60243 --- /dev/null +++ b/t/misc/patch.t @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); +log_level("info"); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } + + if (!defined $block->no_error_log) { + $block->set_value("no_error_log", "[error]"); + } +}); + +run_tests; + +__DATA__ + +=== TEST 1: flatten send args +--- extra_init_by_lua +local sock = ngx.socket.tcp() +getmetatable(sock.sock).__index.send = function (_, data) + ngx.log(ngx.WARN, data) + return #data +end +sock:send({1, "a", {1, "b", true}}) +sock:send(1, "a", {1, "b", false}) +--- config + location /t { + return 200; + } +--- grep_error_log eval +qr/send\(\): \S+/ +--- grep_error_log_out +send(): 1a1btrue +send(): 1a1bfalse