Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream subsystem support tars service discovery #8826

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apisix/cli/ngx_tpl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ stream {
lua_shared_dict etcd-cluster-health-check-stream {* stream.lua_shared_dict["etcd-cluster-health-check-stream"] *};
lua_shared_dict worker-events-stream {* stream.lua_shared_dict["worker-events-stream"] *};
{% if enabled_discoveries["tars"] then %}
lua_shared_dict tars-stream {* stream.lua_shared_dict["tars-stream"] *};
{% end %}
{% if enabled_stream_plugins["limit-conn"] then %}
lua_shared_dict plugin-limit-conn-stream {* stream.lua_shared_dict["plugin-limit-conn-stream"] *};
{% end %}
Expand Down
20 changes: 18 additions & 2 deletions apisix/discovery/tars/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ local tonumber = tonumber
local local_conf = require("apisix.core.config_local").local_conf()
local core = require("apisix.core")
local mysql = require("resty.mysql")
local process = require("ngx.process")
local is_http = ngx.config.subsystem == "http"
local support_process, process = pcall(require, "ngx.process")

local endpoint_dict

Expand Down Expand Up @@ -331,9 +332,24 @@ function _M.nodes(servant)
return get_endpoint(servant)
end

local function get_endpoint_dict()
local shm = "tars"

if not is_http then
shm = shm .. "-stream"
end

return ngx.shared[shm]
end

function _M.init_worker()
endpoint_dict = ngx.shared.tars
if not support_process then
core.log.error("tars discovery not support in subsystem: ", ngx.config.subsystem,
", please check if your openresty version >= 1.19.9.1 or not")
return
end

endpoint_dict = get_endpoint_dict()
if not endpoint_dict then
error("failed to get lua_shared_dict: tars, please check your APISIX version")
end
Expand Down
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ nginx_config: # config for render the template to generate n
lrucache-lock-stream: 10m
plugin-limit-conn-stream: 10m
worker-events-stream: 10m
tars-stream: 1m

# As user can add arbitrary configurations in the snippet,
# it is user's responsibility to check the configurations
Expand Down
5 changes: 5 additions & 0 deletions t/APISIX.pm
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ _EOC_
lua_shared_dict kubernetes-stream 1m;
lua_shared_dict kubernetes-first-stream 1m;
lua_shared_dict kubernetes-second-stream 1m;
lua_shared_dict tars-stream 1m;

upstream apisix_backend {
server 127.0.0.1:1900;
Expand All @@ -405,6 +406,8 @@ _EOC_
}
_EOC_

my $stream_extra_init_by_lua_start = $block->stream_extra_init_by_lua_start // "";

my $stream_init_by_lua_block = $block->stream_init_by_lua_block // <<_EOC_;
if os.getenv("APISIX_ENABLE_LUACOV") == "1" then
require("luacov.runner")("t/apisix.luacov")
Expand All @@ -413,6 +416,8 @@ _EOC_

require "resty.core"

$stream_extra_init_by_lua_start

apisix = require("apisix")
local args = {
dns_resolver = $dns_addrs_tbl_str,
Expand Down
216 changes: 216 additions & 0 deletions t/tars/discovery/stream/tars.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX;

my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
my $version = eval { `$nginx_binary -V 2>&1` };

if ($version =~ m/\/1.19.3/) {
plan(skip_all => "require OpenResty version >= 1.19.9.1");
} else {
plan('no_plan');
}

repeat_each(1);
log_level('warn');
no_root_location();
no_shuffle();
workers(4);

add_block_preprocessor(sub {
my ($block) = @_;

my $yaml_config = <<_EOC_;
apisix:
node_listen: 1984
enable_admin: false
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
tars:
db_conf:
host: 127.0.0.1
port: 3306
database: db_tars
user: root
password: tars2022
full_fetch_interval: 3
incremental_fetch_interval: 1
_EOC_

$block->set_value("yaml_config", $yaml_config);

my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
routes: []
#END
_EOC_

$block->set_value("apisix_yaml", $apisix_yaml);

my $extra_init_by_lua_start = <<_EOC_;
-- reduce incremental_fetch_interval,full_fetch_interval
local schema = require("apisix.discovery.tars.schema")
schema.properties.incremental_fetch_interval.minimum=1
schema.properties.incremental_fetch_interval.default=1
schema.properties.full_fetch_interval.minimum = 3
schema.properties.full_fetch_interval.default = 3
_EOC_

$block->set_value("extra_init_by_lua_start", $extra_init_by_lua_start);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to set the HTTP version in the stream test?

Copy link
Contributor Author

@ronething ronething Feb 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The discovery configuration will affect both the HTTP and steam subsystem simultaneously. If we do not set extra_init_by_lua_start in HTTP, it will cause error like below.

nginx: [error] init_by_lua error: .../apisix/apisix/core/config_local.lua:71: invalid discovery tars configuration: property "full_fetch_interval" validation failed: expected 3 to be at least 90

$block->set_value("stream_extra_init_by_lua_start", $extra_init_by_lua_start);

my $config = $block->config // <<_EOC_;

location /sql {
content_by_lua_block {
local mysql = require("resty.mysql")
local core = require("apisix.core")
local ipairs = ipairs

ngx.req.read_body()
local sql = ngx.req.get_body_data()
core.log.info("get sql ", sql)

local db_conf= {
host="127.0.0.1",
port=3306,
database="db_tars",
user="root",
password="tars2022",
}

local db_cli, err = mysql:new()
if not db_cli then
core.log.error("failed to instantiate mysql: ", err)
return
end
db_cli:set_timeout(3000)

local ok, err, errcode, sqlstate = db_cli:connect(db_conf)
if not ok then
core.log.error("failed to connect mysql: ", err, ", ", errcode, ", ", sqlstate)
return
end

local res, err, errcode, sqlstate = db_cli:query(sql)
if not res then
ngx.say("bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
return
end
ngx.say("DONE")
}
}
_EOC_

$block->set_value("config", $config);

my $stream_config = $block->stream_config // <<_EOC_;
server {
listen 8125;
content_by_lua_block {
local core = require("apisix.core")
local d = require("apisix.discovery.tars")

ngx.sleep(2)

local sock = ngx.req.socket()
local request_body = sock:receive()

core.log.info("get body ", request_body)

local response_body = "{"
local queries = core.json.decode(request_body)
for _,query in ipairs(queries) do
local nodes = d.nodes(query)
if nodes==nil or #nodes==0 then
response_body=response_body.." "..0
else
response_body=response_body.." "..#nodes
end
end
ngx.say(response_body.." }")
}
}

_EOC_

$block->set_value("extra_stream_config", $stream_config);

});

run_tests();

__DATA__

=== TEST 1: create initial server and servant
--- timeout: 3
--- request eval
[
"POST /sql
truncate table t_server_conf",

"POST /sql
truncate table t_adapter_conf",

"POST /sql
insert into t_server_conf(application, server_name, node_name, registry_timestamp,
template_name, setting_state, present_state, server_type)
values ('A', 'AServer', '172.16.1.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'),
('B', 'BServer', '172.16.2.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'),
('C', 'CServer', '172.16.3.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp')",

"POST /sql
insert into t_adapter_conf(application, server_name, node_name, adapter_name, endpoint, servant)
values ('A', 'AServer', '172.16.1.1', 'A.AServer.FirstObjAdapter',
'tcp -h 172.16.1.1 -p 10001 -e 0 -t 6000', 'A.AServer.FirstObj'),
('B', 'BServer', '172.16.2.1', 'B.BServer.FirstObjAdapter',
'tcp -p 10001 -h 172.16.2.1 -e 0 -t 6000', 'B.BServer.FirstObj'),
('C', 'CServer', '172.16.3.1', 'C.CServer.FirstObjAdapter',
'tcp -e 0 -h 172.16.3.1 -t 6000 -p 10001 ', 'C.CServer.FirstObj')",

]
--- response_body eval
[
"DONE\n",
"DONE\n",
"DONE\n",
"DONE\n",
]



=== TEST 2: get count after create servant
--- apisix_yaml
stream_routes:
-
id: 1
server_port: 1985
upstream_id: 1

upstreams:
- nodes:
"127.0.0.1:8125": 1
type: roundrobin
id: 1

#END
--- stream_request
["A.AServer.FirstObj","B.BServer.FirstObj", "C.CServer.FirstObj"]
--- stream_response eval
qr{ 1 1 1 }