Skip to content

Commit

Permalink
feature: supported to added pluings to stream routes. (#513)
Browse files Browse the repository at this point in the history
* feature: supported to added pluings to stream routes.
* feature: supported MQTT protocol.
  • Loading branch information
membphis authored Sep 16, 2019
1 parent 7990266 commit ad2b67e
Show file tree
Hide file tree
Showing 14 changed files with 523 additions and 52 deletions.
3 changes: 3 additions & 0 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ plugins: # plugin list
- serverless-pre-function
- serverless-post-function
- openid-connect

stream_plugins:
- mqtt-proxy
10 changes: 9 additions & 1 deletion lua/apisix.lua
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function _M.http_init_worker()

router.http_init_worker()
require("apisix.http.service").init_worker()
require("apisix.plugin").init_worker()
plugin.init_worker()
require("apisix.consumer").init_worker()

if core.config == require("apisix.core.config_yaml") then
Expand Down Expand Up @@ -394,6 +394,8 @@ end
function _M.stream_init_worker()
core.log.info("enter stream_init_worker")
router.stream_init_worker()
plugin.init_worker()

load_balancer = require("apisix.balancer").run
end

Expand Down Expand Up @@ -421,6 +423,11 @@ function _M.stream_preread_phase()
return ngx_exit(1)
end

local plugins = core.tablepool.fetch("plugins", 32, 0)
api_ctx.plugins = plugin.stream_filter(matched_route, plugins)
-- core.log.info("valid plugins: ", core.json.delay_encode(plugins, true))

run_plugin("preread", plugins, api_ctx)
end


Expand Down Expand Up @@ -452,6 +459,7 @@ end
function _M.stream_log_phase()
core.log.info("enter stream_log_phase")
-- core.ctx.release_vars(api_ctx)
run_plugin("log")
end


Expand Down
32 changes: 29 additions & 3 deletions lua/apisix/admin/plugins.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
local core = require("apisix.core")
local local_plugins = require("apisix.plugin").plugins_hash
local pairs = pairs
local pcall = pcall
local require = require
local stream_local_plugins = require("apisix.plugin").stream_plugins_hash
local pairs = pairs
local pcall = pcall
local require = require
local table_remove = table.remove

local _M = {
Expand Down Expand Up @@ -44,6 +45,31 @@ function _M.check_schema(plugins_conf)
end


function _M.stream_check_schema(plugins_conf)
for name, plugin_conf in pairs(plugins_conf) do
core.log.info("check stream plugin scheme, name: ", name,
": ", core.json.delay_encode(plugin_conf, true))
local plugin_obj = stream_local_plugins[name]
if not plugin_obj then
return false, "unknow plugin [" .. name .. "]"
end

if plugin_obj.check_schema then
local ok = core.schema.check(disable_schema, plugin_conf)
if not ok then
local ok, err = plugin_obj.check_schema(plugin_conf)
if not ok then
return false, "failed to check the configuration of "
.. "stream plugin [" .. name .. "]: " .. err
end
end
end
end

return true
end


function _M.get(name)
local plugin_name = "apisix.plugins." .. name

Expand Down
8 changes: 8 additions & 0 deletions lua/apisix/admin/stream_routes.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local core = require("apisix.core")
local schema_plugin = require("apisix.admin.plugins").stream_check_schema
local tostring = tostring


Expand Down Expand Up @@ -49,6 +50,13 @@ local function check_conf(id, conf, need_id)
end
end

if conf.plugins then
local ok, err = schema_plugin(conf.plugins)
if not ok then
return nil, {error_msg = err}
end
end

return need_id and id or true
end

Expand Down
139 changes: 120 additions & 19 deletions lua/apisix/plugin.lua
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
local require = require
local core = require("apisix.core")
local pkg_loaded = package.loaded
local sort_tab = table.sort
local pcall = pcall
local ipairs = ipairs
local pairs = pairs
local type = type
local require = require
local core = require("apisix.core")
local pkg_loaded = package.loaded
local sort_tab = table.sort
local pcall = pcall
local ipairs = ipairs
local pairs = pairs
local type = type
local local_plugins = core.table.new(32, 0)
local local_plugins_hash = core.table.new(0, 32)
local ngx = ngx
local local_plugins_hash = core.table.new(0, 32)
local stream_local_plugins = core.table.new(32, 0)
local stream_local_plugins_hash = core.table.new(0, 32)
local local_conf


local _M = {
version = 0.2,
load_times = 0,
plugins = local_plugins,
plugins_hash = local_plugins_hash,
version = 0.2,

load_times = 0,
plugins = local_plugins,
plugins_hash = local_plugins_hash,

stream_load_times= 0,
stream_plugins = stream_local_plugins,
stream_plugins_hash = stream_local_plugins_hash,
}


Expand All @@ -24,8 +32,11 @@ local function sort_plugin(l, r)
end


local function load_plugin(name)
local function load_plugin(name, plugins_list, is_stream_plugin)
local pkg_name = "apisix.plugins." .. name
if is_stream_plugin then
pkg_name = "apisix.stream.plugins." .. name
end
pkg_loaded[pkg_name] = nil

local ok, plugin = pcall(require, pkg_name)
Expand All @@ -46,7 +57,7 @@ local function load_plugin(name)
end

plugin.name = name
core.table.insert(local_plugins, plugin)
core.table.insert(plugins_list, plugin)

if plugin.init then
plugin.init()
Expand Down Expand Up @@ -74,7 +85,7 @@ local function load()
for _, name in ipairs(plugin_names) do
if processed[name] == nil then
processed[name] = true
load_plugin(name)
load_plugin(name, local_plugins)
end
end

Expand All @@ -95,11 +106,69 @@ local function load()

_M.load_times = _M.load_times + 1
core.log.info("load plugin times: ", _M.load_times)
return local_plugins
return true
end


local function load_stream()
core.table.clear(stream_local_plugins)
core.table.clear(stream_local_plugins_hash)

local plugin_names = local_conf.stream_plugins
if not plugin_names then
core.log.warn("failed to read stream plugin list form local file")
return true
end

local processed = {}
for _, name in ipairs(plugin_names) do
if processed[name] == nil then
processed[name] = true
load_plugin(name, stream_local_plugins, true)
end
end

-- sort by plugin's priority
if #stream_local_plugins > 1 then
sort_tab(stream_local_plugins, sort_plugin)
end

for i, plugin in ipairs(stream_local_plugins) do
stream_local_plugins_hash[plugin.name] = plugin
if local_conf and local_conf.apisix
and local_conf.apisix.enable_debug then
core.log.warn("loaded stream plugin and sort by priority:",
" ", plugin.priority,
" name: ", plugin.name)
end
end

_M.stream_load_times = _M.stream_load_times + 1
core.log.info("stream plugins: ", core.json.delay_encode())
core.log.info("load stream plugin times: ", _M.stream_load_times)
return true
end
_M.load = load


function _M.load()
local_conf = core.config.local_conf(true)

if ngx.config.subsystem == "http" then
local ok, err = load()
if not ok then
core.log.error("failed to load plugins: ", err)
end
end

local ok, err = load_stream()
if not ok then
core.log.error("failed to load stream plugins: ", err)
end

-- for test
return local_plugins
end


local fetch_api_routes
do
Expand Down Expand Up @@ -172,6 +241,38 @@ function _M.filter(user_route, plugins)
end


function _M.stream_filter(user_route, plugins)
plugins = plugins or core.table.new(#stream_local_plugins * 2, 0)
local user_plugin_conf = user_route.value.plugins
if user_plugin_conf == nil then
if local_conf and local_conf.apisix.enable_debug then
core.response.set_header("Apisix-Plugins", "no plugin")
end
return plugins
end

for _, plugin_obj in ipairs(stream_local_plugins) do
local name = plugin_obj.name
local plugin_conf = user_plugin_conf[name]

if type(plugin_conf) == "table" and not plugin_conf.disable then
core.table.insert(plugins, plugin_obj)
core.table.insert(plugins, plugin_conf)
end
end

if local_conf.apisix.enable_debug then
local t = {}
for i = 1, #plugins, 2 do
core.table.insert(t, plugins[i].name)
end
core.response.set_header("Apisix-Plugins", core.table.concat(t, ", "))
end

return plugins
end


function _M.merge_service_route(service_conf, route_conf)
core.log.info("service conf: ", core.json.delay_encode(service_conf))
-- core.log.info("route conf : ", core.json.delay_encode(route_conf))
Expand Down Expand Up @@ -216,7 +317,7 @@ end


function _M.init_worker()
load()
_M.load()
end


Expand Down
4 changes: 2 additions & 2 deletions lua/apisix/router.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ end


function _M.stream_init_worker()
local router_stream = require("apisix.stream.router.ip_remote")
router_stream.init_worker()
local router_stream = require("apisix.stream.router.ip_port")
router_stream.stream_init_worker()
_M.router_stream = router_stream
end

Expand Down
3 changes: 2 additions & 1 deletion lua/apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,13 @@ _M.stream_route = {
},
upstream = upstream_schema,
upstream_id = id_schema,
plugins = plugins_schema,
},
anyOf = {
{required = {"remote_addr", "upstream"}},
{required = {"remote_addr", "upstream_id"}},
{required = {"remote_addr", "plugins"}},
},
required = {"remote_addr"},
}


Expand Down
Loading

0 comments on commit ad2b67e

Please sign in to comment.