Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(otel): use BatchQueue to avoid worker-level table data race #9504

Merged
merged 4 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kong/pdk/tracing.lua
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ local function new_tracer(name, options)
-- @function kong.tracing.process_span
-- @phases log
-- @tparam function processor a function that accecpt a span as the parameter
function self.process_span(processor)
function self.process_span(processor, ...)
check_phase(PHASES.log)

if type(processor) ~= "function" then
Expand All @@ -452,7 +452,7 @@ local function new_tracer(name, options)

for _, span in ipairs(ctx.KONG_SPANS) do
if span.tracer.name == self.name then
processor(span)
processor(span, ...)
end
end
end
Expand Down
92 changes: 29 additions & 63 deletions kong/plugins/opentelemetry/handler.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
local new_tab = require "table.new"
local BatchQueue = require "kong.tools.batch_queue"
local http = require "resty.http"
local clone = require "table.clone"
local otlp = require "kong.plugins.opentelemetry.otlp"
local propagation = require "kong.tracing.propagation"
local tablepool = require "tablepool"

local ngx = ngx
local kong = kong
Expand All @@ -14,17 +13,12 @@ local ngx_now = ngx.now
local ngx_update_time = ngx.update_time
local ngx_req = ngx.req
local ngx_get_headers = ngx_req.get_headers
local timer_at = ngx.timer.at
local clear = table.clear
local propagation_parse = propagation.parse
local propagation_set = propagation.set
local tablepool_release = tablepool.release
local tablepool_fetch = tablepool.fetch
local null = ngx.null
local encode_traces = otlp.encode_traces
local transform_span = otlp.transform_span

local POOL_BATCH_SPANS = "KONG_OTLP_BATCH_SPANS"
local _log_prefix = "[otel] "

local OpenTelemetryHandler = {
Expand All @@ -37,9 +31,8 @@ local default_headers = {
}

-- worker-level spans queue
local spans_queue = new_tab(5000, 0)
local queues = {} -- one queue per unique plugin config
local headers_cache = setmetatable({}, { __mode = "k" })
local last_run_cache = setmetatable({}, { __mode = "k" })

local function get_cached_headers(conf_headers)
-- cache http headers
Expand Down Expand Up @@ -80,56 +73,21 @@ local function http_export_request(conf, pb_data, headers)
end
end

local function http_export(premature, conf)
if premature then
return
end

local spans_n = #spans_queue
if spans_n == 0 then
return
end

local function http_export(conf, spans)
local start = ngx_now()
local headers = conf.headers and get_cached_headers(conf.headers) or default_headers
local payload = encode_traces(spans, conf.resource_attributes)

-- batch send spans
local spans_buffer = tablepool_fetch(POOL_BATCH_SPANS, conf.batch_span_count, 0)

for i = 1, spans_n do
local len = (spans_buffer.n or 0) + 1
spans_buffer[len] = spans_queue[i]
spans_buffer.n = len

if len >= conf.batch_span_count then
local pb_data = encode_traces(spans_buffer, conf.resource_attributes)
clear(spans_buffer)

http_export_request(conf, pb_data, headers)
end
end

-- remain spans
if spans_queue.n and spans_queue.n > 0 then
local pb_data = encode_traces(spans_buffer, conf.resource_attributes)
http_export_request(conf, pb_data, headers)
end

-- clear the queue
clear(spans_queue)

tablepool_release(POOL_BATCH_SPANS, spans_buffer)
http_export_request(conf, payload, headers)

ngx_update_time()
local duration = ngx_now() - start
ngx_log(ngx_DEBUG, _log_prefix, "opentelemetry exporter sent " .. spans_n ..
ngx_log(ngx_DEBUG, _log_prefix, "exporter sent " .. #spans ..
" traces to " .. conf.endpoint .. " in " .. duration .. " seconds")
end

local function process_span(span)
if span.should_sample == false
or kong.ctx.plugin.should_sample == false
then
local function process_span(span, queue)
if span.should_sample == false or kong.ctx.plugin.should_sample == false then
-- ignore
return
end
Expand All @@ -142,11 +100,7 @@ local function process_span(span)

local pb_span = transform_span(span)

local len = spans_queue.n or 0
len = len + 1

spans_queue[len] = pb_span
spans_queue.n = len
queue:add(pb_span)
end

function OpenTelemetryHandler:rewrite()
Expand Down Expand Up @@ -184,16 +138,28 @@ end
function OpenTelemetryHandler:log(conf)
ngx_log(ngx_DEBUG, _log_prefix, "total spans in current request: ", ngx.ctx.KONG_SPANS and #ngx.ctx.KONG_SPANS)

-- transform spans
kong.tracing.process_span(process_span)
local queue_id = conf.__key__
local q = queues[queue_id]
if not q then
local process = function(entries)
return http_export(conf, entries)
end

local opts = {
batch_max_size = conf.batch_span_count,
process_delay = conf.batch_flush_delay,
}

local cache_key = conf.__key__
local last = last_run_cache[cache_key] or 0
local now = ngx_now()
if now - last >= conf.batch_flush_delay then
last_run_cache[cache_key] = now
timer_at(0, http_export, conf)
local err
q, err = BatchQueue.new(process, opts)
if not q then
kong.log.err("could not create queue: ", err)
return
end
queues[queue_id] = q
end

kong.tracing.process_span(process_span, q)
end

return OpenTelemetryHandler
Loading