Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(opentelemetry): support variable resource attributes #13839

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
message: |
**Opentelemetry**: Support variable resource attributes
type: feature
scope: Plugin
18 changes: 16 additions & 2 deletions kong/plugins/opentelemetry/handler.lua
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@

local otel_traces = require "kong.plugins.opentelemetry.traces"
local otel_logs = require "kong.plugins.opentelemetry.logs"
local otel_utils = require "kong.plugins.opentelemetry.utils"
local dynamic_hook = require "kong.dynamic_hook"
local o11y_logs = require "kong.observability.logs"
local kong_meta = require "kong.meta"
samugi marked this conversation as resolved.
Show resolved Hide resolved

local _log_prefix = otel_utils._log_prefix
local ngx_log = ngx.log
local ngx_WARN = ngx.WARN


local OpenTelemetryHandler = {
Expand Down Expand Up @@ -42,14 +46,24 @@ end


function OpenTelemetryHandler:log(conf)
-- Read resource attributes variable
local options = {}
if conf.resource_attributes then
local compiled, err = otel_utils.read_resource_attributes(conf.resource_attributes)
if not compiled then
ngx_log(ngx_WARN, _log_prefix, "resource attributes template failed to compile: ", err)
end
options.compiled_resource_attributes = compiled
end

-- Traces
if conf.traces_endpoint then
otel_traces.log(conf)
otel_traces.log(conf, options)
end

-- Logs
if conf.logs_endpoint then
otel_logs.log(conf)
otel_logs.log(conf, options)
end
end

Expand Down
16 changes: 12 additions & 4 deletions kong/plugins/opentelemetry/logs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ local encode_logs = otlp.encode_logs
local prepare_logs = otlp.prepare_logs


local function http_export_logs(conf, logs_batch)
local function http_export_logs(params, logs_batch)
local conf = params.conf
local resource_attributes = params.options.compiled_resource_attributes
samugi marked this conversation as resolved.
Show resolved Hide resolved
or conf.resource_attributes
local headers = get_headers(conf.headers)
local payload = encode_logs(logs_batch, conf.resource_attributes)

local payload = encode_logs(logs_batch, resource_attributes)

local ok, err = http_export_request({
connect_timeout = conf.connect_timeout,
Expand All @@ -42,7 +46,7 @@ local function http_export_logs(conf, logs_batch)
end


local function log(conf)
local function log(conf, options)
local worker_logs = o11y_logs.get_worker_logs()
local request_logs = o11y_logs.get_request_logs()

Expand All @@ -59,6 +63,10 @@ local function log(conf)
local flags = tracing_context.get_flags()
local worker_logs_ready = prepare_logs(worker_logs)
local request_logs_ready = prepare_logs(request_logs, raw_trace_id, flags)
local params = {
conf = conf,
options = options,
}

local queue_conf = clone(Queue.get_plugin_params("opentelemetry", conf))
queue_conf.name = queue_conf.name .. ":logs"
Expand All @@ -72,7 +80,7 @@ local function log(conf)
Queue.enqueue(
queue_conf,
http_export_logs,
conf,
params,
log
)
end
Expand Down
16 changes: 12 additions & 4 deletions kong/plugins/opentelemetry/traces.lua
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,13 @@ local function header_filter(conf)
end


local function http_export_traces(conf, spans)
local function http_export_traces(params, spans)
local conf = params.conf
local resource_attributes = params.options.compiled_resource_attributes
samugi marked this conversation as resolved.
Show resolved Hide resolved
or conf.resource_attributes
local headers = get_headers(conf.headers)
local payload = encode_traces(spans, conf.resource_attributes)

local payload = encode_traces(spans, resource_attributes)

local ok, err = http_export_request({
connect_timeout = conf.connect_timeout,
Expand All @@ -143,7 +147,7 @@ local function http_export_traces(conf, spans)
end


local function log(conf)
local function log(conf, options)
ngx_log(ngx_DEBUG, _log_prefix, "total spans in current request: ", ngx.ctx.KONG_SPANS and #ngx.ctx.KONG_SPANS)

kong.tracing.process_span(function (span)
Expand All @@ -158,13 +162,17 @@ local function log(conf)
span.trace_id = trace_id
end

local params = {
conf = conf,
options = options,
}
local queue_conf = clone(Queue.get_plugin_params("opentelemetry", conf))
queue_conf.name = queue_conf.name .. ":traces"

local ok, err = Queue.enqueue(
queue_conf,
http_export_traces,
conf,
params,
encode_span(span)
)
if not ok then
Expand Down
123 changes: 121 additions & 2 deletions kong/plugins/opentelemetry/utils.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
local http = require "resty.http"
local clone = require "table.clone"

local sandbox = require "kong.tools.sandbox"
local deep_copy = require("kong.tools.table").deep_copy
local pl_template = require "pl.template"
local lua_enabled = sandbox.configuration.enabled
local sandbox_enabled = sandbox.configuration.sandbox_enabled
local get_request_headers = kong.request.get_headers
local get_uri_args = kong.request.get_query
local rawset = rawset
local str_find = string.find
local tostring = tostring
local null = ngx.null

local EMPTY = require("kong.tools.table").EMPTY

local CONTENT_TYPE_HEADER_NAME = "Content-Type"
local DEFAULT_CONTENT_TYPE_HEADER = "application/x-protobuf"
Expand All @@ -13,6 +21,7 @@ local DEFAULT_HEADERS = {

local _log_prefix = "[otel] "


local function http_export_request(conf, pb_data, headers)
local httpc = http.new()
httpc:set_timeouts(conf.connect_timeout, conf.send_timeout, conf.read_timeout)
Expand Down Expand Up @@ -48,8 +57,118 @@ local function get_headers(conf_headers)
end


local compile_opts = {
escape = "\xff", -- disable '#' as a valid template escape
}

local template_cache = setmetatable( {}, { __mode = "k" })


local function param_value(source_template, resource_attributes, template_env)
if not source_template or source_template == "" then
return nil
end

if not lua_enabled then
-- Detect expressions in the source template
local expr = str_find(source_template, "%$%(.*%)")
if expr then
return nil, "loading of untrusted Lua code disabled because " ..
"'untrusted_lua' config option is set to 'off'"
end
-- Lua is disabled, no need to render the template
return source_template
end

-- find compiled templates for this plugin-configuration array
local compiled_templates = template_cache[resource_attributes]
if not compiled_templates then
compiled_templates = {}
-- store it by `resource_attributes` which is part of the plugin `conf` table
-- it will be GC'ed at the same time as `conf` and hence invalidate the
-- compiled templates here as well as the cache-table has weak-keys
template_cache[resource_attributes] = compiled_templates
end

-- Find or compile the specific template
local compiled_template = compiled_templates[source_template]
if not compiled_template then
local res
compiled_template, res = pl_template.compile(source_template, compile_opts)
if res then
return source_template
end
compiled_templates[source_template] = compiled_template
end

return compiled_template:render(template_env)
end

local function read_resource_attributes(resource_attributes)
if not resource_attributes then
return EMPTY
end
local __meta_environment = {
__index = function(self, key)
local lazy_loaders = {
headers = function(self)
return get_request_headers() or EMPTY
end,
query_params = function(self)
return get_uri_args() or EMPTY
end,
uri_captures = function(self)
return (ngx.ctx.router_matches or EMPTY).uri_captures or EMPTY
end,
shared = function(self)
return ((kong or EMPTY).ctx or EMPTY).shared or EMPTY
end,
}
local loader = lazy_loaders[key]
if not loader then
if lua_enabled and not sandbox_enabled then
return _G[key]
end
return
end
-- set the result on the table to not load again
local value = loader()
rawset(self, key, value)
return value
end,
__newindex = function(self)
error("This environment is read-only.")
end,
}

local template_env = {}
if lua_enabled and sandbox_enabled then
-- load the sandbox environment to be used to render the template
template_env = deep_copy(sandbox.configuration.environment)
-- here we can optionally add functions to expose to the sandbox, eg:
-- tostring = tostring,
-- because headers may contain array elements such as duplicated headers
-- type is a useful function in these cases. See issue #25.
template_env.type = type
end
setmetatable(template_env, __meta_environment)
local compiled_resource_attributes = {}
for current_name, current_value in pairs(resource_attributes) do
local res, err = param_value(current_value, resource_attributes, template_env)
if not res then
return nil, err
end

compiled_resource_attributes[current_name] = res
end
return compiled_resource_attributes
end



return {
http_export_request = http_export_request,
get_headers = get_headers,
_log_prefix = _log_prefix,
read_resource_attributes = read_resource_attributes,
}
12 changes: 12 additions & 0 deletions spec/03-plugins/37-opentelemetry/02-schema_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,16 @@ describe("Plugin: OpenTelemetry (schema)", function()
}
}, err)
end)

it("accepts variable values", function()
samugi marked this conversation as resolved.
Show resolved Hide resolved
local ok, err = validate_plugin_config_schema({
endpoint = "http://example.dev",
resource_attributes = {
foo = "$(headers.host)",
},
}, schema_def)

assert.truthy(ok)
assert.is_nil(err)
end)
end)
20 changes: 13 additions & 7 deletions spec/03-plugins/37-opentelemetry/04-exporter_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,8 @@ for _, strategy in helpers.each_strategy() do
resource_attributes = {
["service.name"] = "kong_oss",
["os.version"] = "debian",
["host.name"] = "$(headers.host)",
["validstr"] = "$($@#)",
}
})
mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT })
Expand Down Expand Up @@ -608,13 +610,17 @@ for _, strategy in helpers.each_strategy() do
local res_attr = decoded.resource_spans[1].resource.attributes
sort_by_key(res_attr)
-- resource attributes
assert.same("os.version", res_attr[1].key)
assert.same({string_value = "debian", value = "string_value"}, res_attr[1].value)
assert.same("service.instance.id", res_attr[2].key)
assert.same("service.name", res_attr[3].key)
assert.same({string_value = "kong_oss", value = "string_value"}, res_attr[3].value)
assert.same("service.version", res_attr[4].key)
assert.same({string_value = kong.version, value = "string_value"}, res_attr[4].value)
assert.same("host.name", res_attr[1].key)
assert.same({string_value = "0.0.0.0:9000", value = "string_value"}, res_attr[1].value)
assert.same("os.version", res_attr[2].key)
assert.same({string_value = "debian", value = "string_value"}, res_attr[2].value)
assert.same("service.instance.id", res_attr[3].key)
assert.same("service.name", res_attr[4].key)
assert.same({string_value = "kong_oss", value = "string_value"}, res_attr[4].value)
assert.same("service.version", res_attr[5].key)
assert.same({string_value = kong.version, value = "string_value"}, res_attr[5].value)
assert.same("validstr", res_attr[6].key)
assert.same({string_value = "$($@#)", value = "string_value"}, res_attr[6].value)

local scope_spans = decoded.resource_spans[1].scope_spans
assert.is_true(#scope_spans > 0, scope_spans)
Expand Down
Loading