Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add batch process metrics #3070

Merged
merged 18 commits into from
Dec 24, 2020
2 changes: 2 additions & 0 deletions apisix/plugins/http-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ function _M.log(conf, ctx)
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

local err
Expand Down
6 changes: 4 additions & 2 deletions apisix/plugins/prometheus/exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ local DEFAULT_BUCKETS = { 1, 2, 5, 7, 10, 15, 20, 25, 30, 40, 50, 60, 70,

local metrics = {}

local inner_tab_arr = {}

local inner_tab_arr = {}
local function gen_arr(...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this if we can't share the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

clear_tab(inner_tab_arr)

for i = 1, select('#', ...) do
inner_tab_arr[i] = select(i, ...)
end
Expand Down Expand Up @@ -328,5 +327,8 @@ function _M.metric_data()
return prometheus:metric_data()
end

function _M.get_prometheus()
return prometheus
end

return _M
4 changes: 3 additions & 1 deletion apisix/plugins/sls-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ function _M.log(conf, ctx)
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

log_buffer, err = batch_processor:new(handle_log, process_conf)
Expand Down
2 changes: 2 additions & 0 deletions apisix/plugins/syslog.lua
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ function _M.log(conf, ctx)
max_retry_count = conf.max_retry_times,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

local err
Expand Down
4 changes: 3 additions & 1 deletion apisix/plugins/tcp-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ local function remove_stale_objects(premature)
end


function _M.log(conf)
function _M.log(conf, ctx)
local entry = log_util.get_full_log(ngx, conf)

if not stale_timer_running then
Expand Down Expand Up @@ -156,6 +156,8 @@ function _M.log(conf)
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

local err
Expand Down
4 changes: 3 additions & 1 deletion apisix/plugins/udp-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ local function remove_stale_objects(premature)
end


function _M.log(conf)
function _M.log(conf, ctx)
local entry = log_util.get_full_log(ngx, conf)

if not stale_timer_running then
Expand Down Expand Up @@ -140,6 +140,8 @@ function _M.log(conf)
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

local err
Expand Down
1 change: 1 addition & 0 deletions apisix/plugins/zipkin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ local function create_tracer(conf,ctx)
conf.sample_ratio = 1
end

conf.route_id = ctx.route_id
local reporter = new_reporter(conf)
reporter:init_processor()
local tracer = new_tracer(reporter, new_random_sampler(conf))
Expand Down
7 changes: 5 additions & 2 deletions apisix/plugins/zipkin/reporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ function _M.new(conf)
service_name = service_name,
server_addr = server_addr,
server_port = server_port,
pending_spans_n = 0
pending_spans_n = 0,
route_id = conf.route_id
}, mt)
end

Expand Down Expand Up @@ -144,7 +145,9 @@ function _M.init_processor(self)
batch_max_size = 1000,
max_retry_count = 0,
buffer_duration = 60,
inactive_timeout = 5
inactive_timeout = 5,
route_id = self.route_id,
server_addr = self.server_addr,
}

local flush = function (entries, batch_max_size)
Expand Down
23 changes: 22 additions & 1 deletion apisix/utils/batch-processor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ local batch_processor_mt = {
}
local execute_func
local create_buffer_timer
local batch_metrics
local prometheus = require("apisix.plugins.prometheus.exporter")


local schema = {
Expand Down Expand Up @@ -131,7 +133,9 @@ function batch_processor:new(func, config)
entry_buffer = { entries = {}, retry_count = 0},
is_timer_running = false,
first_entry_t = 0,
last_entry_t = 0
last_entry_t = 0,
route_id = config.route_id,
server_addr = config.server_addr,
}

return setmetatable(processor, batch_processor_mt)
Expand All @@ -146,8 +150,20 @@ function batch_processor:push(entry)
return
end

if not batch_metrics and prometheus.get_prometheus() and self.name
and self.route_id and self.server_addr then
batch_metrics = prometheus.get_prometheus():gauge("batch_process_entries",
"batch process remaining entries",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

{"name", "route_id", "server_addr"})
end

local entries = self.entry_buffer.entries
table.insert(entries, entry)
-- add batch metric for every route
if batch_metrics then
self.label = {self.name, self.route_id, self.server_addr}
batch_metrics:set(#entries, self.label)
end

if #entries == 1 then
self.first_entry_t = now()
Expand All @@ -173,11 +189,16 @@ function batch_processor:process_buffer()
"buffercount[", #self.entry_buffer.entries ,"]")
self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
self.entry_buffer = { entries = {}, retry_count = 0 }
if batch_metrics then
self.label = {self.name, self.route_id, self.server_addr}
batch_metrics:set(0, self.label)
end
end

for _, batch in ipairs(self.batch_to_process) do
schedule_func_exec(self, 0, batch)
end

self.batch_to_process = {}
end

Expand Down
9 changes: 9 additions & 0 deletions doc/plugins/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Or you can goto [Grafana official](https://grafana.com/grafana/dashboards/11719)
* `Bandwidth`: Total Bandwidth (egress/ingress) flowing through apisix. This metric is available per service and as a sum across all services.
* `etcd reachability`: A gauge type with a value of 0 or 1, representing if etcd can be reached by a apisix or not.
* `Connections`: Various Nginx connection metrics like active, reading, writing, and number of accepted connections.
* `Batch process entries`: A gauge type, when we use plugins such as: sys logger, http logger, sls logger, tcp logger, udp logger and zipkin, the surplus entries which not sended will be statistics in the metrics.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should mention the batch processor.
Some suggestions:
surplus entries => entries
not sended => hasn't been sent
statistics => counted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.


Here is the original metric data of apisix:

Expand All @@ -138,6 +139,14 @@ apisix_bandwidth{type="egress",service="foo.com"} 2379
apisix_bandwidth{type="ingress",service="127.0.0.2"} 83
apisix_bandwidth{type="ingress",service="bar.com"} 76
apisix_bandwidth{type="ingress",service="foo.com"} 988
# HELP apisix_batch_process_entries batch process remaining entries
# TYPE apisix_batch_process_entries gauge
apisix_batch_process_entries{name="http-logger",route_id="9",server_addr="127.0.0.1"} 1
apisix_batch_process_entries{name="sls-logger",route_id="9",server_addr="127.0.0.1"} 1
apisix_batch_process_entries{name="tcp-logger",route_id="9",server_addr="127.0.0.1"} 1
apisix_batch_process_entries{name="udp-logger",route_id="9",server_addr="127.0.0.1"} 1
apisix_batch_process_entries{name="sys-logger",route_id="9",server_addr="127.0.0.1"} 1
apisix_batch_process_entries{name="zipkin_report",route_id="9",server_addr="127.0.0.1"} 1
# HELP apisix_etcd_reachable Config server etcd reachable from Apisix, 0 is unreachable
# TYPE apisix_etcd_reachable gauge
apisix_etcd_reachable 1
Expand Down
10 changes: 9 additions & 1 deletion doc/zh-cn/plugins/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ plugin_attr:
* `Bandwidth`: 流经apisix的总带宽(可分出口带宽和入口带宽). 每个服务指标或者是所有服务指标的总和都可以统计到。
* `etcd reachability`: apisix 连接 etcd 的可用性,用 0 和 1来表示。
* `Connections`: 各种的 Nginx 连接指标,如 active(正处理的活动连接数),reading(nginx 读取到客户端的 Header 信息数),writing(nginx 返回给客户端的 Header 信息数),已建立的连接数。.

* `Batch process entries`: 批处理未发送数据统计,当你使用了批处理发送插件,比如:sys logger, http logger, sls logger, tcp logger, udp logger and zipkin, 那么你将会在此指标中看到当前尚未发送的数据量。
这里是apisix的原始的指标数据集:

```
Expand All @@ -136,6 +136,14 @@ apisix_bandwidth{type="egress",service="foo.com"} 2379
apisix_bandwidth{type="ingress",service="127.0.0.2"} 83
apisix_bandwidth{type="ingress",service="bar.com"} 76
apisix_bandwidth{type="ingress",service="foo.com"} 988
# HELP apisix_batch_process_entries batch process remaining entries
# TYPE apisix_batch_process_entries gauge
apisix_batch_process_entries{name="http-logger",route_id="9",server_addr="127.0.0.1"} 1
apisix_batch_process_entries{name="sls-logger",route_id="9",server_addr="127.0.0.1"} 1
apisix_batch_process_entries{name="tcp-logger",route_id="9",server_addr="127.0.0.1"} 1
apisix_batch_process_entries{name="udp-logger",route_id="9",server_addr="127.0.0.1"} 1
apisix_batch_process_entries{name="sys-logger",route_id="9",server_addr="127.0.0.1"} 1
apisix_batch_process_entries{name="zipkin_report",route_id="9",server_addr="127.0.0.1"} 1
# HELP apisix_etcd_reachable Config server etcd reachable from Apisix, 0 is unreachable
# TYPE apisix_etcd_reachable gauge
apisix_etcd_reachable 1
Expand Down
Loading