Skip to content

Commit

Permalink
feat: stream subsystem support tars service discovery (#8826)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronething authored Feb 17, 2023
1 parent e688881 commit 641481d
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 2 deletions.
4 changes: 4 additions & 0 deletions apisix/cli/ngx_tpl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ stream {
lua_shared_dict etcd-cluster-health-check-stream {* stream.lua_shared_dict["etcd-cluster-health-check-stream"] *};
lua_shared_dict worker-events-stream {* stream.lua_shared_dict["worker-events-stream"] *};
{% if enabled_discoveries["tars"] then %}
lua_shared_dict tars-stream {* stream.lua_shared_dict["tars-stream"] *};
{% end %}
{% if enabled_stream_plugins["limit-conn"] then %}
lua_shared_dict plugin-limit-conn-stream {* stream.lua_shared_dict["plugin-limit-conn-stream"] *};
{% end %}
Expand Down
20 changes: 18 additions & 2 deletions apisix/discovery/tars/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ local tonumber = tonumber
local local_conf = require("apisix.core.config_local").local_conf()
local core = require("apisix.core")
local mysql = require("resty.mysql")
local process = require("ngx.process")
local is_http = ngx.config.subsystem == "http"
local support_process, process = pcall(require, "ngx.process")

local endpoint_dict

Expand Down Expand Up @@ -331,9 +332,24 @@ function _M.nodes(servant)
return get_endpoint(servant)
end

local function get_endpoint_dict()
local shm = "tars"

if not is_http then
shm = shm .. "-stream"
end

return ngx.shared[shm]
end

function _M.init_worker()
endpoint_dict = ngx.shared.tars
if not support_process then
core.log.error("tars discovery not support in subsystem: ", ngx.config.subsystem,
", please check if your openresty version >= 1.19.9.1 or not")
return
end

endpoint_dict = get_endpoint_dict()
if not endpoint_dict then
error("failed to get lua_shared_dict: tars, please check your APISIX version")
end
Expand Down
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ nginx_config: # config for render the template to generate n
lrucache-lock-stream: 10m
plugin-limit-conn-stream: 10m
worker-events-stream: 10m
tars-stream: 1m

# As user can add arbitrary configurations in the snippet,
# it is user's responsibility to check the configurations
Expand Down
5 changes: 5 additions & 0 deletions t/APISIX.pm
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ _EOC_
lua_shared_dict kubernetes-stream 1m;
lua_shared_dict kubernetes-first-stream 1m;
lua_shared_dict kubernetes-second-stream 1m;
lua_shared_dict tars-stream 1m;
upstream apisix_backend {
server 127.0.0.1:1900;
Expand All @@ -405,6 +406,8 @@ _EOC_
}
_EOC_

my $stream_extra_init_by_lua_start = $block->stream_extra_init_by_lua_start // "";

my $stream_init_by_lua_block = $block->stream_init_by_lua_block // <<_EOC_;
if os.getenv("APISIX_ENABLE_LUACOV") == "1" then
require("luacov.runner")("t/apisix.luacov")
Expand All @@ -413,6 +416,8 @@ _EOC_
require "resty.core"
$stream_extra_init_by_lua_start
apisix = require("apisix")
local args = {
dns_resolver = $dns_addrs_tbl_str,
Expand Down
216 changes: 216 additions & 0 deletions t/tars/discovery/stream/tars.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX;

my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
my $version = eval { `$nginx_binary -V 2>&1` };

if ($version =~ m/\/1.19.3/) {
plan(skip_all => "require OpenResty version >= 1.19.9.1");
} else {
plan('no_plan');
}

repeat_each(1);
log_level('warn');
no_root_location();
no_shuffle();
workers(4);

add_block_preprocessor(sub {
my ($block) = @_;

my $yaml_config = <<_EOC_;
apisix:
node_listen: 1984
enable_admin: false
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
tars:
db_conf:
host: 127.0.0.1
port: 3306
database: db_tars
user: root
password: tars2022
full_fetch_interval: 3
incremental_fetch_interval: 1
_EOC_

$block->set_value("yaml_config", $yaml_config);

my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
routes: []
#END
_EOC_

$block->set_value("apisix_yaml", $apisix_yaml);

my $extra_init_by_lua_start = <<_EOC_;
-- reduce incremental_fetch_interval,full_fetch_interval
local schema = require("apisix.discovery.tars.schema")
schema.properties.incremental_fetch_interval.minimum=1
schema.properties.incremental_fetch_interval.default=1
schema.properties.full_fetch_interval.minimum = 3
schema.properties.full_fetch_interval.default = 3
_EOC_

$block->set_value("extra_init_by_lua_start", $extra_init_by_lua_start);
$block->set_value("stream_extra_init_by_lua_start", $extra_init_by_lua_start);

my $config = $block->config // <<_EOC_;
location /sql {
content_by_lua_block {
local mysql = require("resty.mysql")
local core = require("apisix.core")
local ipairs = ipairs
ngx.req.read_body()
local sql = ngx.req.get_body_data()
core.log.info("get sql ", sql)
local db_conf= {
host="127.0.0.1",
port=3306,
database="db_tars",
user="root",
password="tars2022",
}
local db_cli, err = mysql:new()
if not db_cli then
core.log.error("failed to instantiate mysql: ", err)
return
end
db_cli:set_timeout(3000)
local ok, err, errcode, sqlstate = db_cli:connect(db_conf)
if not ok then
core.log.error("failed to connect mysql: ", err, ", ", errcode, ", ", sqlstate)
return
end
local res, err, errcode, sqlstate = db_cli:query(sql)
if not res then
ngx.say("bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
return
end
ngx.say("DONE")
}
}
_EOC_

$block->set_value("config", $config);

my $stream_config = $block->stream_config // <<_EOC_;
server {
listen 8125;
content_by_lua_block {
local core = require("apisix.core")
local d = require("apisix.discovery.tars")
ngx.sleep(2)
local sock = ngx.req.socket()
local request_body = sock:receive()
core.log.info("get body ", request_body)
local response_body = "{"
local queries = core.json.decode(request_body)
for _,query in ipairs(queries) do
local nodes = d.nodes(query)
if nodes==nil or #nodes==0 then
response_body=response_body.." "..0
else
response_body=response_body.." "..#nodes
end
end
ngx.say(response_body.." }")
}
}
_EOC_

$block->set_value("extra_stream_config", $stream_config);

});

run_tests();

__DATA__
=== TEST 1: create initial server and servant
--- timeout: 3
--- request eval
[
"POST /sql
truncate table t_server_conf",
"POST /sql
truncate table t_adapter_conf",
"POST /sql
insert into t_server_conf(application, server_name, node_name, registry_timestamp,
template_name, setting_state, present_state, server_type)
values ('A', 'AServer', '172.16.1.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'),
('B', 'BServer', '172.16.2.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'),
('C', 'CServer', '172.16.3.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp')",
"POST /sql
insert into t_adapter_conf(application, server_name, node_name, adapter_name, endpoint, servant)
values ('A', 'AServer', '172.16.1.1', 'A.AServer.FirstObjAdapter',
'tcp -h 172.16.1.1 -p 10001 -e 0 -t 6000', 'A.AServer.FirstObj'),
('B', 'BServer', '172.16.2.1', 'B.BServer.FirstObjAdapter',
'tcp -p 10001 -h 172.16.2.1 -e 0 -t 6000', 'B.BServer.FirstObj'),
('C', 'CServer', '172.16.3.1', 'C.CServer.FirstObjAdapter',
'tcp -e 0 -h 172.16.3.1 -t 6000 -p 10001 ', 'C.CServer.FirstObj')",
]
--- response_body eval
[
"DONE\n",
"DONE\n",
"DONE\n",
"DONE\n",
]
=== TEST 2: get count after create servant
--- apisix_yaml
stream_routes:
-
id: 1
server_port: 1985
upstream_id: 1
upstreams:
- nodes:
"127.0.0.1:8125": 1
type: roundrobin
id: 1
#END
--- stream_request
["A.AServer.FirstObj","B.BServer.FirstObj", "C.CServer.FirstObj"]
--- stream_response eval
qr{ 1 1 1 }

0 comments on commit 641481d

Please sign in to comment.