diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua index 7083d4201ceb..bc6af4bf2c5c 100644 --- a/apisix/cli/ngx_tpl.lua +++ b/apisix/cli/ngx_tpl.lua @@ -141,6 +141,10 @@ stream { lua_shared_dict etcd-cluster-health-check-stream {* stream.lua_shared_dict["etcd-cluster-health-check-stream"] *}; lua_shared_dict worker-events-stream {* stream.lua_shared_dict["worker-events-stream"] *}; + {% if enabled_discoveries["tars"] then %} + lua_shared_dict tars-stream {* stream.lua_shared_dict["tars-stream"] *}; + {% end %} + {% if enabled_stream_plugins["limit-conn"] then %} lua_shared_dict plugin-limit-conn-stream {* stream.lua_shared_dict["plugin-limit-conn-stream"] *}; {% end %} diff --git a/apisix/discovery/tars/init.lua b/apisix/discovery/tars/init.lua index f621791dd1a0..14f658dce5d2 100644 --- a/apisix/discovery/tars/init.lua +++ b/apisix/discovery/tars/init.lua @@ -22,7 +22,8 @@ local tonumber = tonumber local local_conf = require("apisix.core.config_local").local_conf() local core = require("apisix.core") local mysql = require("resty.mysql") -local process = require("ngx.process") +local is_http = ngx.config.subsystem == "http" +local support_process, process = pcall(require, "ngx.process") local endpoint_dict @@ -331,9 +332,24 @@ function _M.nodes(servant) return get_endpoint(servant) end +local function get_endpoint_dict() + local shm = "tars" + + if not is_http then + shm = shm .. "-stream" + end + + return ngx.shared[shm] +end function _M.init_worker() - endpoint_dict = ngx.shared.tars + if not support_process then + core.log.error("tars discovery not support in subsystem: ", ngx.config.subsystem, + ", please check if your openresty version >= 1.19.9.1 or not") + return + end + + endpoint_dict = get_endpoint_dict() if not endpoint_dict then error("failed to get lua_shared_dict: tars, please check your APISIX version") end diff --git a/conf/config-default.yaml b/conf/config-default.yaml index 3fcd7505bc0d..69cdf55f79bd 100755 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -165,6 +165,7 @@ nginx_config: # config for render the template to generate n lrucache-lock-stream: 10m plugin-limit-conn-stream: 10m worker-events-stream: 10m + tars-stream: 1m # As user can add arbitrary configurations in the snippet, # it is user's responsibility to check the configurations diff --git a/t/APISIX.pm b/t/APISIX.pm index 8e9e72d1174b..e0ad704277b5 100644 --- a/t/APISIX.pm +++ b/t/APISIX.pm @@ -396,6 +396,7 @@ _EOC_ lua_shared_dict kubernetes-stream 1m; lua_shared_dict kubernetes-first-stream 1m; lua_shared_dict kubernetes-second-stream 1m; + lua_shared_dict tars-stream 1m; upstream apisix_backend { server 127.0.0.1:1900; @@ -405,6 +406,8 @@ _EOC_ } _EOC_ + my $stream_extra_init_by_lua_start = $block->stream_extra_init_by_lua_start // ""; + my $stream_init_by_lua_block = $block->stream_init_by_lua_block // <<_EOC_; if os.getenv("APISIX_ENABLE_LUACOV") == "1" then require("luacov.runner")("t/apisix.luacov") @@ -413,6 +416,8 @@ _EOC_ require "resty.core" + $stream_extra_init_by_lua_start + apisix = require("apisix") local args = { dns_resolver = $dns_addrs_tbl_str, diff --git a/t/tars/discovery/stream/tars.t b/t/tars/discovery/stream/tars.t new file mode 100644 index 000000000000..b7c55a0f49f5 --- /dev/null +++ b/t/tars/discovery/stream/tars.t @@ -0,0 +1,216 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX; + +my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx'; +my $version = eval { `$nginx_binary -V 2>&1` }; + +if ($version =~ m/\/1.19.3/) { + plan(skip_all => "require OpenResty version >= 1.19.9.1"); +} else { + plan('no_plan'); +} + +repeat_each(1); +log_level('warn'); +no_root_location(); +no_shuffle(); +workers(4); + +add_block_preprocessor(sub { + my ($block) = @_; + + my $yaml_config = <<_EOC_; +apisix: + node_listen: 1984 + enable_admin: false +deployment: + role: data_plane + role_data_plane: + config_provider: yaml +discovery: + tars: + db_conf: + host: 127.0.0.1 + port: 3306 + database: db_tars + user: root + password: tars2022 + full_fetch_interval: 3 + incremental_fetch_interval: 1 +_EOC_ + + $block->set_value("yaml_config", $yaml_config); + + my $apisix_yaml = $block->apisix_yaml // <<_EOC_; +routes: [] +#END +_EOC_ + + $block->set_value("apisix_yaml", $apisix_yaml); + + my $extra_init_by_lua_start = <<_EOC_; + -- reduce incremental_fetch_interval,full_fetch_interval + local schema = require("apisix.discovery.tars.schema") + schema.properties.incremental_fetch_interval.minimum=1 + schema.properties.incremental_fetch_interval.default=1 + schema.properties.full_fetch_interval.minimum = 3 + schema.properties.full_fetch_interval.default = 3 +_EOC_ + + $block->set_value("extra_init_by_lua_start", $extra_init_by_lua_start); + $block->set_value("stream_extra_init_by_lua_start", $extra_init_by_lua_start); + + my $config = $block->config // <<_EOC_; + + location /sql { + content_by_lua_block { + local mysql = require("resty.mysql") + local core = require("apisix.core") + local ipairs = ipairs + + ngx.req.read_body() + local sql = ngx.req.get_body_data() + core.log.info("get sql ", sql) + + local db_conf= { + host="127.0.0.1", + port=3306, + database="db_tars", + user="root", + password="tars2022", + } + + local db_cli, err = mysql:new() + if not db_cli then + core.log.error("failed to instantiate mysql: ", err) + return + end + db_cli:set_timeout(3000) + + local ok, err, errcode, sqlstate = db_cli:connect(db_conf) + if not ok then + core.log.error("failed to connect mysql: ", err, ", ", errcode, ", ", sqlstate) + return + end + + local res, err, errcode, sqlstate = db_cli:query(sql) + if not res then + ngx.say("bad result: ", err, ": ", errcode, ": ", sqlstate, ".") + return + end + ngx.say("DONE") + } + } +_EOC_ + + $block->set_value("config", $config); + + my $stream_config = $block->stream_config // <<_EOC_; + server { + listen 8125; + content_by_lua_block { + local core = require("apisix.core") + local d = require("apisix.discovery.tars") + + ngx.sleep(2) + + local sock = ngx.req.socket() + local request_body = sock:receive() + + core.log.info("get body ", request_body) + + local response_body = "{" + local queries = core.json.decode(request_body) + for _,query in ipairs(queries) do + local nodes = d.nodes(query) + if nodes==nil or #nodes==0 then + response_body=response_body.." "..0 + else + response_body=response_body.." "..#nodes + end + end + ngx.say(response_body.." }") + } + } + +_EOC_ + + $block->set_value("extra_stream_config", $stream_config); + +}); + +run_tests(); + +__DATA__ + +=== TEST 1: create initial server and servant +--- timeout: 3 +--- request eval +[ +"POST /sql +truncate table t_server_conf", + +"POST /sql +truncate table t_adapter_conf", + +"POST /sql +insert into t_server_conf(application, server_name, node_name, registry_timestamp, + template_name, setting_state, present_state, server_type) +values ('A', 'AServer', '172.16.1.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'), + ('B', 'BServer', '172.16.2.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'), + ('C', 'CServer', '172.16.3.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp')", + +"POST /sql +insert into t_adapter_conf(application, server_name, node_name, adapter_name, endpoint, servant) +values ('A', 'AServer', '172.16.1.1', 'A.AServer.FirstObjAdapter', + 'tcp -h 172.16.1.1 -p 10001 -e 0 -t 6000', 'A.AServer.FirstObj'), + ('B', 'BServer', '172.16.2.1', 'B.BServer.FirstObjAdapter', + 'tcp -p 10001 -h 172.16.2.1 -e 0 -t 6000', 'B.BServer.FirstObj'), + ('C', 'CServer', '172.16.3.1', 'C.CServer.FirstObjAdapter', + 'tcp -e 0 -h 172.16.3.1 -t 6000 -p 10001 ', 'C.CServer.FirstObj')", + +] +--- response_body eval +[ + "DONE\n", + "DONE\n", + "DONE\n", + "DONE\n", +] + + + +=== TEST 2: get count after create servant +--- apisix_yaml +stream_routes: + - + id: 1 + server_port: 1985 + upstream_id: 1 + +upstreams: + - nodes: + "127.0.0.1:8125": 1 + type: roundrobin + id: 1 + +#END +--- stream_request +["A.AServer.FirstObj","B.BServer.FirstObj", "C.CServer.FirstObj"] +--- stream_response eval +qr{ 1 1 1 }