From ad2b67ee81a9571022eec2b651342ef36f46945f Mon Sep 17 00:00:00 2001 From: YuanSheng Wang Date: Mon, 16 Sep 2019 10:58:27 +0800 Subject: [PATCH] feature: supported to added pluings to stream routes. (#513) * feature: supported to added pluings to stream routes. * feature: supported MQTT protocol. --- conf/config.yaml | 3 + lua/apisix.lua | 10 +- lua/apisix/admin/plugins.lua | 32 +++- lua/apisix/admin/stream_routes.lua | 8 + lua/apisix/plugin.lua | 139 +++++++++++++-- lua/apisix/router.lua | 4 +- lua/apisix/schema_def.lua | 3 +- lua/apisix/stream/plugins/mqtt-proxy.lua | 165 ++++++++++++++++++ .../router/{ip_remote.lua => ip_port.lua} | 36 +++- t/APISIX.pm | 15 +- t/admin/stream-routes.t | 61 +++++++ t/apisix.luacov | 28 +-- t/stream-node/sanity.t | 2 + t/stream-plugin/mqtt-proxy.t | 69 ++++++++ 14 files changed, 523 insertions(+), 52 deletions(-) create mode 100644 lua/apisix/stream/plugins/mqtt-proxy.lua rename lua/apisix/stream/router/{ip_remote.lua => ip_port.lua} (61%) create mode 100644 t/stream-plugin/mqtt-proxy.t diff --git a/conf/config.yaml b/conf/config.yaml index c8d718bc02e0..0f6184a6b576 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -50,3 +50,6 @@ plugins: # plugin list - serverless-pre-function - serverless-post-function - openid-connect + +stream_plugins: + - mqtt-proxy diff --git a/lua/apisix.lua b/lua/apisix.lua index 8585fc8fb2ec..c09d67bb0e19 100644 --- a/lua/apisix.lua +++ b/lua/apisix.lua @@ -57,7 +57,7 @@ function _M.http_init_worker() router.http_init_worker() require("apisix.http.service").init_worker() - require("apisix.plugin").init_worker() + plugin.init_worker() require("apisix.consumer").init_worker() if core.config == require("apisix.core.config_yaml") then @@ -394,6 +394,8 @@ end function _M.stream_init_worker() core.log.info("enter stream_init_worker") router.stream_init_worker() + plugin.init_worker() + load_balancer = require("apisix.balancer").run end @@ -421,6 +423,11 @@ function _M.stream_preread_phase() return ngx_exit(1) end + local plugins = core.tablepool.fetch("plugins", 32, 0) + api_ctx.plugins = plugin.stream_filter(matched_route, plugins) + -- core.log.info("valid plugins: ", core.json.delay_encode(plugins, true)) + + run_plugin("preread", plugins, api_ctx) end @@ -452,6 +459,7 @@ end function _M.stream_log_phase() core.log.info("enter stream_log_phase") -- core.ctx.release_vars(api_ctx) + run_plugin("log") end diff --git a/lua/apisix/admin/plugins.lua b/lua/apisix/admin/plugins.lua index d26609bdbe34..31d75bea0283 100644 --- a/lua/apisix/admin/plugins.lua +++ b/lua/apisix/admin/plugins.lua @@ -1,8 +1,9 @@ local core = require("apisix.core") local local_plugins = require("apisix.plugin").plugins_hash -local pairs = pairs -local pcall = pcall -local require = require +local stream_local_plugins = require("apisix.plugin").stream_plugins_hash +local pairs = pairs +local pcall = pcall +local require = require local table_remove = table.remove local _M = { @@ -44,6 +45,31 @@ function _M.check_schema(plugins_conf) end +function _M.stream_check_schema(plugins_conf) + for name, plugin_conf in pairs(plugins_conf) do + core.log.info("check stream plugin scheme, name: ", name, + ": ", core.json.delay_encode(plugin_conf, true)) + local plugin_obj = stream_local_plugins[name] + if not plugin_obj then + return false, "unknow plugin [" .. name .. "]" + end + + if plugin_obj.check_schema then + local ok = core.schema.check(disable_schema, plugin_conf) + if not ok then + local ok, err = plugin_obj.check_schema(plugin_conf) + if not ok then + return false, "failed to check the configuration of " + .. "stream plugin [" .. name .. "]: " .. err + end + end + end + end + + return true +end + + function _M.get(name) local plugin_name = "apisix.plugins." .. name diff --git a/lua/apisix/admin/stream_routes.lua b/lua/apisix/admin/stream_routes.lua index 93fb6c715074..dd0563e8dfa0 100644 --- a/lua/apisix/admin/stream_routes.lua +++ b/lua/apisix/admin/stream_routes.lua @@ -1,4 +1,5 @@ local core = require("apisix.core") +local schema_plugin = require("apisix.admin.plugins").stream_check_schema local tostring = tostring @@ -49,6 +50,13 @@ local function check_conf(id, conf, need_id) end end + if conf.plugins then + local ok, err = schema_plugin(conf.plugins) + if not ok then + return nil, {error_msg = err} + end + end + return need_id and id or true end diff --git a/lua/apisix/plugin.lua b/lua/apisix/plugin.lua index d37b6e8bd7c2..473473f88110 100644 --- a/lua/apisix/plugin.lua +++ b/lua/apisix/plugin.lua @@ -1,21 +1,29 @@ -local require = require -local core = require("apisix.core") -local pkg_loaded = package.loaded -local sort_tab = table.sort -local pcall = pcall -local ipairs = ipairs -local pairs = pairs -local type = type +local require = require +local core = require("apisix.core") +local pkg_loaded = package.loaded +local sort_tab = table.sort +local pcall = pcall +local ipairs = ipairs +local pairs = pairs +local type = type local local_plugins = core.table.new(32, 0) -local local_plugins_hash = core.table.new(0, 32) +local ngx = ngx +local local_plugins_hash = core.table.new(0, 32) +local stream_local_plugins = core.table.new(32, 0) +local stream_local_plugins_hash = core.table.new(0, 32) local local_conf local _M = { - version = 0.2, - load_times = 0, - plugins = local_plugins, - plugins_hash = local_plugins_hash, + version = 0.2, + + load_times = 0, + plugins = local_plugins, + plugins_hash = local_plugins_hash, + + stream_load_times= 0, + stream_plugins = stream_local_plugins, + stream_plugins_hash = stream_local_plugins_hash, } @@ -24,8 +32,11 @@ local function sort_plugin(l, r) end -local function load_plugin(name) +local function load_plugin(name, plugins_list, is_stream_plugin) local pkg_name = "apisix.plugins." .. name + if is_stream_plugin then + pkg_name = "apisix.stream.plugins." .. name + end pkg_loaded[pkg_name] = nil local ok, plugin = pcall(require, pkg_name) @@ -46,7 +57,7 @@ local function load_plugin(name) end plugin.name = name - core.table.insert(local_plugins, plugin) + core.table.insert(plugins_list, plugin) if plugin.init then plugin.init() @@ -74,7 +85,7 @@ local function load() for _, name in ipairs(plugin_names) do if processed[name] == nil then processed[name] = true - load_plugin(name) + load_plugin(name, local_plugins) end end @@ -95,11 +106,69 @@ local function load() _M.load_times = _M.load_times + 1 core.log.info("load plugin times: ", _M.load_times) - return local_plugins + return true +end + + +local function load_stream() + core.table.clear(stream_local_plugins) + core.table.clear(stream_local_plugins_hash) + + local plugin_names = local_conf.stream_plugins + if not plugin_names then + core.log.warn("failed to read stream plugin list form local file") + return true + end + + local processed = {} + for _, name in ipairs(plugin_names) do + if processed[name] == nil then + processed[name] = true + load_plugin(name, stream_local_plugins, true) + end + end + + -- sort by plugin's priority + if #stream_local_plugins > 1 then + sort_tab(stream_local_plugins, sort_plugin) + end + + for i, plugin in ipairs(stream_local_plugins) do + stream_local_plugins_hash[plugin.name] = plugin + if local_conf and local_conf.apisix + and local_conf.apisix.enable_debug then + core.log.warn("loaded stream plugin and sort by priority:", + " ", plugin.priority, + " name: ", plugin.name) + end + end + + _M.stream_load_times = _M.stream_load_times + 1 + core.log.info("stream plugins: ", core.json.delay_encode()) + core.log.info("load stream plugin times: ", _M.stream_load_times) + return true end -_M.load = load +function _M.load() + local_conf = core.config.local_conf(true) + + if ngx.config.subsystem == "http" then + local ok, err = load() + if not ok then + core.log.error("failed to load plugins: ", err) + end + end + + local ok, err = load_stream() + if not ok then + core.log.error("failed to load stream plugins: ", err) + end + + -- for test + return local_plugins +end + local fetch_api_routes do @@ -172,6 +241,38 @@ function _M.filter(user_route, plugins) end +function _M.stream_filter(user_route, plugins) + plugins = plugins or core.table.new(#stream_local_plugins * 2, 0) + local user_plugin_conf = user_route.value.plugins + if user_plugin_conf == nil then + if local_conf and local_conf.apisix.enable_debug then + core.response.set_header("Apisix-Plugins", "no plugin") + end + return plugins + end + + for _, plugin_obj in ipairs(stream_local_plugins) do + local name = plugin_obj.name + local plugin_conf = user_plugin_conf[name] + + if type(plugin_conf) == "table" and not plugin_conf.disable then + core.table.insert(plugins, plugin_obj) + core.table.insert(plugins, plugin_conf) + end + end + + if local_conf.apisix.enable_debug then + local t = {} + for i = 1, #plugins, 2 do + core.table.insert(t, plugins[i].name) + end + core.response.set_header("Apisix-Plugins", core.table.concat(t, ", ")) + end + + return plugins +end + + function _M.merge_service_route(service_conf, route_conf) core.log.info("service conf: ", core.json.delay_encode(service_conf)) -- core.log.info("route conf : ", core.json.delay_encode(route_conf)) @@ -216,7 +317,7 @@ end function _M.init_worker() - load() + _M.load() end diff --git a/lua/apisix/router.lua b/lua/apisix/router.lua index ceb1ce1b652c..6e588be4af98 100644 --- a/lua/apisix/router.lua +++ b/lua/apisix/router.lua @@ -38,8 +38,8 @@ end function _M.stream_init_worker() - local router_stream = require("apisix.stream.router.ip_remote") - router_stream.init_worker() + local router_stream = require("apisix.stream.router.ip_port") + router_stream.stream_init_worker() _M.router_stream = router_stream end diff --git a/lua/apisix/schema_def.lua b/lua/apisix/schema_def.lua index 70e9f8f794d7..5631edfe51c1 100644 --- a/lua/apisix/schema_def.lua +++ b/lua/apisix/schema_def.lua @@ -404,12 +404,13 @@ _M.stream_route = { }, upstream = upstream_schema, upstream_id = id_schema, + plugins = plugins_schema, }, anyOf = { {required = {"remote_addr", "upstream"}}, {required = {"remote_addr", "upstream_id"}}, + {required = {"remote_addr", "plugins"}}, }, - required = {"remote_addr"}, } diff --git a/lua/apisix/stream/plugins/mqtt-proxy.lua b/lua/apisix/stream/plugins/mqtt-proxy.lua new file mode 100644 index 000000000000..7357b6ccb937 --- /dev/null +++ b/lua/apisix/stream/plugins/mqtt-proxy.lua @@ -0,0 +1,165 @@ +local core = require("apisix.core") +local balancer = require("ngx.balancer") +local ngx_exit = ngx.exit +local str_byte = string.byte +local str_sub = string.sub + + +local schema = { + type = "object", + properties = { + protocol_name = {type = "string"}, + protocol_level = {type = "integer"}, + upstream = { + type = "object", + properties = { + ip = {type = "string"}, + port = {type = "number"}, + } + } + }, + required = {"protocol_name", "protocol_level", "upstream"}, +} + + +local plugin_name = "mqtt-proxy" + + +local _M = { + version = 0.1, + priority = 1000, + name = plugin_name, + schema = schema, +} + + +function _M.check_schema(conf) + local ok, err = core.schema.check(schema, conf) + + if not ok then + return false, err + end + + return true +end + + +local function parse_mqtt(data) + local res = {} + res.packet_type_flags_byte = str_byte(data, 1, 1) + if res.packet_type_flags_byte < 16 or res.packet_type_flags_byte > 32 then + return nil, "Received unexpected MQTT packet type+flags: " + .. res.packet_type_flags_byte + end + + local parsed_pos = 1 + res.remaining_len = 0 + local multiplier = 1 + for i = 2, 5 do + parsed_pos = i + local byte = str_byte(data, i, i) + res.remaining_len = res.remaining_len + bit.band(byte, 127) * multiplier + multiplier = multiplier * 128 + if bit.band(byte, 128) == 0 then + break + end + end + + local protocol_len = str_byte(data, parsed_pos + 1, parsed_pos + 1) * 256 + + str_byte(data, parsed_pos + 2, parsed_pos + 2) + parsed_pos = parsed_pos + 2 + res.protocol = str_sub(data, parsed_pos + 1, parsed_pos + protocol_len) + parsed_pos = parsed_pos + protocol_len + + res.protocol_ver = str_byte(data, parsed_pos + 1, parsed_pos + 1) + parsed_pos = parsed_pos + 1 + if res.protocol_ver == 4 then + parsed_pos = parsed_pos + 3 + elseif res.protocol_ver == 5 then + parsed_pos = parsed_pos + 9 + end + + local client_id_len = str_byte(data, parsed_pos + 1, parsed_pos + 1) * 256 + + str_byte(data, parsed_pos + 2, parsed_pos + 2) + parsed_pos = parsed_pos + 2 + + if parsed_pos + client_id_len > #data then + res.expect_len = parsed_pos + client_id_len + return res + end + + res.client_id = str_sub(data, parsed_pos + 1, parsed_pos + client_id_len) + parsed_pos = parsed_pos + client_id_len + + res.expect_len = parsed_pos + return res +end + + +function _M.preread(conf, ctx) + core.log.warn("plugin rewrite phase, conf: ", core.json.encode(conf)) + -- core.log.warn(" ctx: ", core.json.encode(ctx, true)) + local sock = ngx.req.socket() + local data, err = sock:peek(16) + if not data then + core.log.error("failed to read first 16 bytes: ", err) + return ngx_exit(1) + end + + local res, err = parse_mqtt(data) + if not res then + core.log.error("failed to parse the first 16 bytes: ", err) + return ngx_exit(1) + end + + if res.expect_len > #data then + data, err = sock:peek(res.expect_len) + if not data then + core.log.error("failed to read ", res.expect_len, " bytes: ", err) + return ngx_exit(1) + end + + res = parse_mqtt(data) + if res.expect_len > #data then + core.log.error("failed to parse mqtt request, expect len: ", + res.expect_len, " but got ", #data) + return ngx_exit(1) + end + end + + if res.protocol and res.protocol ~= conf.protocol_name then + core.log.error("expect protocol name: ", conf.protocol_name + ", but got ", res.protocol) + return ngx_exit(1) + end + + if res.protocol_ver and res.protocol_ver ~= conf.protocol_level then + core.log.error("expect protocol level: ", conf.protocol_level + ", but got ", res.protocol_ver) + return ngx_exit(1) + end + + core.log.info("mqtt client id: ", res.client_id) +end + + +function _M.log(conf, ctx) + core.log.info("plugin log phase, conf: ", core.json.encode(conf)) +end + + +function _M.balancer(conf, ctx) + core.log.info("plugin balancer phase, conf: ", core.json.encode(conf)) + -- ctx.balancer_name = plugin_name + local up = conf.upstream + ctx.balancer_name = plugin_name + + local ok, err = balancer.set_current_peer(up.ip, up.port) + if not ok then + core.log.error("failed to set server peer: ", err) + return ngx_exit(1) + end +end + + +return _M diff --git a/lua/apisix/stream/router/ip_remote.lua b/lua/apisix/stream/router/ip_port.lua similarity index 61% rename from lua/apisix/stream/router/ip_remote.lua rename to lua/apisix/stream/router/ip_port.lua index 14d4060b7b5a..24e8fec05c6f 100644 --- a/lua/apisix/stream/router/ip_remote.lua +++ b/lua/apisix/stream/router/ip_port.lua @@ -10,6 +10,30 @@ local user_routes local _M = {version = 0.1} +local function match_opts(route, api_ctx) + local vars = api_ctx.var + + -- todo: use resty-ipmatcher to support multiple ip address + if route.value.remote_addr and + route.value.remote_addr ~= vars.remote_addr then + return false + end + + if route.value.server_addr and + route.value.server_addr ~= vars.server_addr then + return false + end + + -- todo: use resty-ipmatcher to support multiple ip address + if route.value.server_port and + route.value.server_port ~= vars.server_port then + return false + end + + return true +end + + function _M.match(api_ctx) local routes = _M.routes() if not routes then @@ -18,14 +42,9 @@ function _M.match(api_ctx) end core.log.info("stream routes: ", core.json.delay_encode(routes)) - local remote_addr = api_ctx.var.remote_addr - -- local server_addr = api_ctx.var.server_addr - -- local server_port = api_ctx.var.server_port - - -- todo: need a better way - core.log.info("remote addr: ", remote_addr) for _, route in ipairs(routes) do - if route.value.remote_addr == remote_addr then + local hit = match_opts(route, api_ctx) + if hit then api_ctx.matched_route = route return true end @@ -45,12 +64,13 @@ function _M.routes() end -function _M.init_worker() +function _M.stream_init_worker() local err user_routes, err = core.config.new("/stream_routes", { automatic = true, item_schema = core.schema.stream_route }) + if not user_routes then error("failed to create etcd instance for fetching /stream_routes : " .. err) diff --git a/t/APISIX.pm b/t/APISIX.pm index 7876b4c306c9..4b638a031765 100644 --- a/t/APISIX.pm +++ b/t/APISIX.pm @@ -54,11 +54,12 @@ _EOC_ } init_by_lua_block { + -- if os.getenv("APISIX_ENABLE_LUACOV") == "1" then + -- require("luacov.runner")("t/apisix.luacov") + -- jit.off() + -- end + require "resty.core" - if os.getenv("APISIX_ENABLE_LUACOV") == "1" then - require("luacov.runner")("t/apisix.luacov") - jit.off() - end apisix = require("apisix") apisix.stream_init() @@ -73,8 +74,9 @@ _EOC_ listen 1995; content_by_lua_block { + local sock = ngx.req.socket() + local data = sock:receive("1") ngx.say("hello world") - ngx.exit(1) } } _EOC_ @@ -102,12 +104,13 @@ _EOC_ } my $init_by_lua_block = $block->init_by_lua_block // <<_EOC_; - require "resty.core" if os.getenv("APISIX_ENABLE_LUACOV") == "1" then require("luacov.runner")("t/apisix.luacov") jit.off() end + require "resty.core" + apisix = require("apisix") apisix.http_init() _EOC_ diff --git a/t/admin/stream-routes.t b/t/admin/stream-routes.t index b6f9f70efc47..9fea8c103943 100644 --- a/t/admin/stream-routes.t +++ b/t/admin/stream-routes.t @@ -181,3 +181,64 @@ GET /t [delete] code: 200 message: passed --- no_error_log [error] + + + +=== TEST 5: set route with plugin +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "remote_addr": "127.0.0.1", + "plugins": { + "mqtt-proxy": { + "protocol_name": "MQTT", + "protocol_level": 4, + "upstream": { + "ip": "127.0.0.1", + "port": 1980 + } + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 6: delete route(id: 1) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, message = t('/apisix/admin/stream_routes/1', + ngx.HTTP_DELETE, + nil, + [[{ + "action": "delete" + }]] + ) + ngx.say("[delete] code: ", code, " message: ", message) + } + } +--- request +GET /t +--- response_body +[delete] code: 200 message: passed +--- no_error_log +[error] diff --git a/t/apisix.luacov b/t/apisix.luacov index fa776dafd170..77e252ca9b50 100644 --- a/t/apisix.luacov +++ b/t/apisix.luacov @@ -1,15 +1,19 @@ return { - modules = { - ["lua.*"] = "lua", - ["lua/apisix/*"] = "apisix", - ["lua/apisix/admin/*"] = "admin", - ["lua/apisix/core/*"] = "core", - ["lua/apisix/http/*"] = "http", - ["lua/apisix/http/router/*"] = "http/router", - ["lua/apisix/plugins/*"] = "plugins", - ["lua/apisix/plugins/grpc-transcode/*"] = "plugins/grpc-transcode", - ["lua/apisix/plugins/prometheus/*"] = "plugins/prometheus", - ["lua/apisix/plugins/zipkin/*"] = "plugins/zipkin", + modules = { + ["lua.*"] = "lua", + ["lua/apisix/*"] = "apisix", + ["lua/apisix/admin/*"] = "admin", + ["lua/apisix/core/*"] = "core", + ["lua/apisix/http/*"] = "http", + ["lua/apisix/http/router/*"] = "http/router", + ["lua/apisix/plugins/*"] = "plugins", + ["lua/apisix/plugins/grpc-transcode/*"] = "plugins/grpc-transcode", + ["lua/apisix/plugins/prometheus/*"] = "plugins/prometheus", + ["lua/apisix/plugins/zipkin/*"] = "plugins/zipkin", + + -- can not enable both at http and stream, will fix it later. + -- ["lua/apisix/stream/*"] = "stream", + -- ["lua/apisix/stream/plugins/*"] = "stream/plugins", + -- ["lua/apisix/stream/router/*"] = "stream/router", }, - savestepsize = 10 } diff --git a/t/stream-node/sanity.t b/t/stream-node/sanity.t index 51c627b3db3a..84fdeb543c64 100644 --- a/t/stream-node/sanity.t +++ b/t/stream-node/sanity.t @@ -42,6 +42,8 @@ passed === TEST 2: hit route --- stream_enable +--- stream_request eval +mmm --- stream_response hello world --- no_error_log diff --git a/t/stream-plugin/mqtt-proxy.t b/t/stream-plugin/mqtt-proxy.t new file mode 100644 index 000000000000..10ced24fbc69 --- /dev/null +++ b/t/stream-plugin/mqtt-proxy.t @@ -0,0 +1,69 @@ +BEGIN { + $ENV{TEST_NGINX_USE_HUP} = 1; +} + +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_shuffle(); +no_root_location(); + +run_tests; + +__DATA__ + +=== TEST 1: set route(id: 1) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "remote_addr": "127.0.0.1", + "plugins": { + "mqtt-proxy": { + "protocol_name": "MQTT", + "protocol_level": 4, + "upstream": { + "ip": "127.0.0.1", + "port": 1995 + } + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 2: invalid header +--- stream_enable +--- stream_request eval +mmm +--- error_log +Received unexpected MQTT packet type+flags + + + +=== TEST 3: hit route +--- stream_enable +--- stream_request eval +"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f" +--- stream_response +hello world +--- no_error_log +[error]