Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add batch process metrics #3070

Merged
merged 18 commits into from
Dec 24, 2020
2 changes: 2 additions & 0 deletions apisix/plugins/http-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ function _M.log(conf, ctx)
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

local err
Expand Down
6 changes: 4 additions & 2 deletions apisix/plugins/prometheus/exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ local DEFAULT_BUCKETS = { 1, 2, 5, 7, 10, 15, 20, 25, 30, 40, 50, 60, 70,

local metrics = {}

local inner_tab_arr = {}

local inner_tab_arr = {}
local function gen_arr(...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this if we can't share the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

clear_tab(inner_tab_arr)

for i = 1, select('#', ...) do
inner_tab_arr[i] = select(i, ...)
end
Expand Down Expand Up @@ -329,5 +328,8 @@ function _M.metric_data()
return prometheus:metric_data()
end

function _M.get_prometheus()
return prometheus
end

return _M
4 changes: 3 additions & 1 deletion apisix/plugins/sls-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ function _M.log(conf, ctx)
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

log_buffer, err = batch_processor:new(handle_log, process_conf)
Expand Down
2 changes: 2 additions & 0 deletions apisix/plugins/syslog.lua
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ function _M.log(conf, ctx)
max_retry_count = conf.max_retry_times,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

local err
Expand Down
4 changes: 3 additions & 1 deletion apisix/plugins/tcp-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ local function remove_stale_objects(premature)
end


function _M.log(conf)
function _M.log(conf, ctx)
local entry = log_util.get_full_log(ngx, conf)

if not stale_timer_running then
Expand Down Expand Up @@ -156,6 +156,8 @@ function _M.log(conf)
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

local err
Expand Down
4 changes: 3 additions & 1 deletion apisix/plugins/udp-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ local function remove_stale_objects(premature)
end


function _M.log(conf)
function _M.log(conf, ctx)
local entry = log_util.get_full_log(ngx, conf)

if not stale_timer_running then
Expand Down Expand Up @@ -140,6 +140,8 @@ function _M.log(conf)
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

local err
Expand Down
1 change: 1 addition & 0 deletions apisix/plugins/zipkin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ local function create_tracer(conf,ctx)
conf.sample_ratio = 1
end

conf.route_id = ctx.route_id
local reporter = new_reporter(conf)
reporter:init_processor()
local tracer = new_tracer(reporter, new_random_sampler(conf))
Expand Down
7 changes: 5 additions & 2 deletions apisix/plugins/zipkin/reporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ function _M.new(conf)
service_name = service_name,
server_addr = server_addr,
server_port = server_port,
pending_spans_n = 0
pending_spans_n = 0,
route_id = conf.route_id
}, mt)
end

Expand Down Expand Up @@ -144,7 +145,9 @@ function _M.init_processor(self)
batch_max_size = 1000,
max_retry_count = 0,
buffer_duration = 60,
inactive_timeout = 5
inactive_timeout = 5,
route_id = self.route_id,
server_addr = self.server_addr,
}

local flush = function (entries, batch_max_size)
Expand Down
23 changes: 22 additions & 1 deletion apisix/utils/batch-processor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ local batch_processor_mt = {
}
local execute_func
local create_buffer_timer
local batch_metrics
local prometheus = require("apisix.plugins.prometheus.exporter")


local schema = {
Expand Down Expand Up @@ -131,7 +133,9 @@ function batch_processor:new(func, config)
entry_buffer = { entries = {}, retry_count = 0},
is_timer_running = false,
first_entry_t = 0,
last_entry_t = 0
last_entry_t = 0,
route_id = config.route_id,
server_addr = config.server_addr,
}

return setmetatable(processor, batch_processor_mt)
Expand All @@ -146,8 +150,20 @@ function batch_processor:push(entry)
return
end

if not batch_metrics and prometheus.get_prometheus() and self.name
and self.route_id and self.server_addr then
batch_metrics = prometheus.get_prometheus():gauge("batch_process_entries",
"batch process remaining entries",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

{"name", "route_id", "server_addr"})
end

local entries = self.entry_buffer.entries
table.insert(entries, entry)
-- add batch metric for every route
if batch_metrics then
local label = {self.name, self.route_id, self.server_addr}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look like we can store the label in self?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

batch_metrics:set(#entries, label)
end

if #entries == 1 then
self.first_entry_t = now()
Expand All @@ -173,11 +189,16 @@ function batch_processor:process_buffer()
"buffercount[", #self.entry_buffer.entries ,"]")
self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
self.entry_buffer = { entries = {}, retry_count = 0 }
if batch_metrics then
local label = {self.name, self.route_id, self.server_addr}
batch_metrics:set(0, label)
end
end

for _, batch in ipairs(self.batch_to_process) do
schedule_func_exec(self, 0, batch)
end

self.batch_to_process = {}
end

Expand Down
204 changes: 204 additions & 0 deletions t/plugin/prometheus.t
Original file line number Diff line number Diff line change
Expand Up @@ -1088,3 +1088,207 @@ GET /apisix/prometheus/metrics
--- error_code: 200
--- no_error_log
[error]



=== TEST 62: set batch plugins
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/9',
ngx.HTTP_PUT,
[[{
"methods": ["GET"],
"plugins": {
"prometheus": {},
"syslog": {
"host": "127.0.0.1",
"include_req_body": false,
"max_retry_times": 1,
"tls": false,
"retry_interval": 1,
"batch_max_size": 1000,
"buffer_duration": 60,
"port": 0,
"name": "sys-logger",
"flush_limit": 4096,
"sock_type": "tcp",
"timeout": 3,
"drop_limit": 1048576,
"pool_size": 5
},
"zipkin": {
"endpoint": "http://127.0.0.1:9447",
"service_name": "APISIX",
"sample_ratio": 1
},
"http-logger": {
"inactive_timeout": 5,
"include_req_body": false,
"timeout": 3,
"name": "http-logger",
"retry_delay": 1,
"buffer_duration": 60,
"uri": "http://127.0.0.1:19080/report",
"concat_method": "json",
"batch_max_size": 1000,
"max_retry_count": 0
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uris": ["/batch-process-metrics"]
}]]
)

if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 63: hit batch-process-metrics
--- request
GET /batch-process-metrics
--- error_code: 404



=== TEST 64: check sys logger metrics
--- request
GET /apisix/prometheus/metrics
--- error_code: 200
--- response_body_like eval
qr/apisix_batch_process_entries{name="sys-logger",route_id="9"/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Escape {?

Copy link
Contributor Author

@gy09535 gy09535 Dec 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plugin set error. 😄




=== TEST 65: check zipkin log metrics
--- request
GET /apisix/prometheus/metrics
--- error_code: 200
--- response_body_like eval
qr/apisix_batch_process_entries{name="zipkin_report",route_id="9"/



=== TEST 66: check http log metrics
--- request
GET /apisix/prometheus/metrics
--- error_code: 200
--- response_body_like eval
qr/apisix_batch_process_entries{name="http-logger",route_id="9"/



=== TEST 67: set batch plugins
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/10',
ngx.HTTP_PUT,
[[{
"methods": ["GET"],
"plugins": {
"prometheus": {},
"tcp-logger": {
"host": "127.0.0.1",
"include_req_body": false,
"timeout": 1000,
"name": "tcp-logger",
"retry_delay": 1,
"buffer_duration": 60,
"port": 0,
"batch_max_size": 1000,
"inactive_timeout": 5,
"tls": false,
"max_retry_count": 0
},
"udp-logger": {
"host": "127.0.0.1",
"port": 0,
"include_req_body": false,
"timeout": 3,
"batch_max_size": 1000,
"name": "udp-logger",
"inactive_timeout": 5,
"buffer_duration": 60
},
"sls-logger": {
"host": "127.0.0.1",
"batch_max_size": 1000,
"name": "sls-logger",
"inactive_timeout": 5,
"logstore": "your_logstore",
"buffer_duration": 60,
"port": 10009,
"max_retry_count": 0,
"retry_delay": 1,
"access_key_id": "your_access_id",
"access_key_secret": "your_key_secret",
"timeout": 5000,
"project": "your_project"
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uris": ["/batch-process-metrics-10"]
}]]
)

if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 68: tigger metircs batch-process-metrics
--- request
GET /batch-process-metrics-10
--- error_code: 404



=== TEST 69: check tcp log metrics
--- request
GET /apisix/prometheus/metrics
--- error_code: 200
--- response_body_like eval
qr/apisix_batch_process_entries{name="tcp-logger",route_id="10"/



=== TEST 70: check udp log metrics
--- request
GET /apisix/prometheus/metrics
--- error_code: 200
--- response_body_like eval
qr/apisix_batch_process_entries{name="udp-logger",route_id="10"/