Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: provide skywalking logger plugin #5478

Merged
merged 13 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 233 additions & 0 deletions apisix/plugins/skywalking-logger.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local batch_processor = require("apisix.utils.batch-processor")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local http = require("resty.http")
local url = require("net.url")
local plugin = require("apisix.plugin")

local base64 = require("ngx.base64")
local ngx_re = require("ngx.re")

local ngx = ngx
local tostring = tostring
local tonumber = tonumber
local ipairs = ipairs
local timer_at = ngx.timer.at

local plugin_name = "skywalking-logger"
local stale_timer_running = false
local buffers = {}

local schema = {
type = "object",
properties = {
endpoint_addr = core.schema.uri_def,
service_name = {type = "string", default = "APISIX"},
service_instance_name = {type = "string", default = "APISIX Instance Name"},
timeout = {type = "integer", minimum = 1, default = 3},
name = {type = "string", default = "skywalking logger"},
max_retry_count = {type = "integer", minimum = 0, default = 0},
retry_delay = {type = "integer", minimum = 0, default = 1},
buffer_duration = {type = "integer", minimum = 1, default = 60},
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
include_req_body = {type = "boolean", default = false},
},
required = {"endpoint_addr"},
}


local metadata_schema = {
type = "object",
properties = {
log_format = log_util.metadata_schema_log_format,
},
}


local _M = {
version = 0.1,
priority = 408,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
}


function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
return core.schema.check(schema, conf)
end


local function send_http_data(conf, log_message)
local err_msg
local res = true
local url_decoded = url.parse(conf.endpoint_addr)
local host = url_decoded.host
local port = url_decoded.port

core.log.info("sending a batch logs to ", conf.endpoint_addr)

local httpc = http.new()
httpc:set_timeout(conf.timeout * 1000)
local ok, err = httpc:connect(host, port)

if not ok then
return false, "failed to connect to host[" .. host .. "] port["
.. tostring(port) .. "] " .. err
end

local httpc_res, httpc_err = httpc:request({
method = "POST",
path = "/v3/logs",
body = log_message,
headers = {
["Host"] = url_decoded.host,
["Content-Type"] = "application/json",
}
})

if not httpc_res then
return false, "error while sending data to [" .. host .. "] port["
.. tostring(port) .. "] " .. httpc_err
end

-- some error occurred in the server
if httpc_res.status >= 400 then
res = false
err_msg = "server returned status code[" .. httpc_res.status .. "] host["
.. host .. "] port[" .. tostring(port) .. "] "
.. "body[" .. httpc_res:read_body() .. "]"
end

return res, err_msg
end


-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
if premature then
return
end

for key, batch in ipairs(buffers) do
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
core.log.warn("removing batch processor stale object, conf: ",
core.json.delay_encode(key))
buffers[key] = nil
end
end

stale_timer_running = false
end


function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))

local log_body
if metadata and metadata.value.log_format
and core.table.nkeys(metadata.value.log_format) > 0
then
log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
else
log_body = log_util.get_full_log(ngx, conf)
end

local trace_context
local sw_header = ngx.req.get_headers()["sw8"]
if sw_header then
-- 1-TRACEID-SEGMENTID-SPANID-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
local ids = ngx_re.split(sw_header, '-')
if #ids == 8 then
trace_context = {
traceId = base64.decode_base64url(ids[2]),
traceSegmentId = base64.decode_base64url(ids[3]),
spanId = tonumber(ids[4])
}
else
core.log.warn("failed to parse trace_context header: ", sw_header)
end
end

local entry = {
traceContext = trace_context,
body = {
json = {
json = core.json.encode(log_body, true)
}
},
service = conf.service_name,
serviceInstance = conf.service_instance_name,
endpoint = ctx.var.uri,
}

if not stale_timer_running then
-- run the timer every 30 mins if any log is present
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every 30 mins? For?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a huge latency in low traffic scenario. No one is willing to wait for 30 mins for log collecting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it sweeps the stale log data in every 30m.
When the data transport fails, the data will remain in memory. Therefore, regular cleaning is required.

Copy link
Member

@wu-sheng wu-sheng Nov 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not very general way to process logs. Usually, unless you have local cache or mmap(like SkyWalking Satellite), logs should be abandoned quickly once retry threshold reached.
The reason behind this logic is, observability presents a large data set but isn't always very useful, especially for logs. Metrics are small, and tolerable to keep latest values, but different from logs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me. @spacewander how do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err. @dmsolr's explain is incorrect. This timer doesn't clean up the stale log. It clean up the stale buffers when they are no longer holding the log.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is about clean up the stale buffers only, than, this is APISIX's call. Nothing relates to SkyWalking or observability preference.

timer_at(1800, remove_stale_objects)
stale_timer_running = true
end

local log_buffer = buffers[conf]

if log_buffer then
log_buffer:push(entry)
return
end

-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
local data, err = core.json.encode(entries)
if not data then
return false, 'error occurred while encoding the data: ' .. err
end

return send_http_data(conf, data)
end

local config = {
name = conf.name,
retry_delay = conf.retry_delay,
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

local err
log_buffer, err = batch_processor:new(func, config)

if not log_buffer then
core.log.error("error when creating the batch processor: ", err)
return
end

buffers[conf] = log_buffer
log_buffer:push(entry)
end


return _M
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ plugins: # plugin list (sorted by priority)
- datadog # priority: 495
- echo # priority: 412
- http-logger # priority: 410
- skywalking-logger # priority: 408
- sls-logger # priority: 406
- tcp-logger # priority: 405
- kafka-logger # priority: 403
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
"label": "Loggers",
"items": [
"plugins/http-logger",
"plugins/skywalking-logger",
"plugins/tcp-logger",
"plugins/kafka-logger",
"plugins/udp-logger",
Expand Down
117 changes: 117 additions & 0 deletions docs/en/latest/plugins/skywalking-logger.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
---
title: skywalking-logger
---

<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->

## Summary

- [**Name**](#name)
- [**Attributes**](#attributes)
- [**How To Enable**](#how-to-enable)
- [**Test Plugin**](#test-plugin)
- [**Metadata**](#metadata)
- [**Disable Plugin**](#disable-plugin)

## Name

`skywalking-logger` is a plugin which push Access Log data to `SkyWalking OAP` server over HTTP. If there is tracing context existing, it sets up the trace-log correlation automatically, and relies on [SkyWalking Cross Process Propagation Headers Protocol](https://skywalking.apache.org/docs/main/latest/en/protocols/skywalking-cross-process-propagation-headers-protocol-v3/).

This will provide the ability to send Access Log as JSON objects to `SkyWalking OAP` server.

## Attributes

| Name | Type | Requirement | Default | Valid | Description |
| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- |
| endpoint_addr | string | required | | | The URI of the `SkyWalking OAP` server. |
| service_name | string | optional | "APISIX" | | service name for SkyWalking reporter. |
| service_instance_name | string | optional |"APISIX Instance Name" | | service instance name for SkyWalking reporter, set it to `$hostname` to get local hostname directly.|
| timeout | integer | optional | 3 | [1,...] | Time to keep the connection alive after sending a request. |
| name | string | optional | "skywalking logger" | | A unique identifier to identity the logger. |
| batch_max_size | integer | optional | 1000 | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the `SkyWalking OAP` server. |
| inactive_timeout | integer | optional | 5 | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the `SkyWalking OAP` server regardless of whether the number of logs in the buffer reaches the maximum number set. |
| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.|
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. |
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. |

## How To Enable

The following is an example of how to enable the `skywalking-logger` for a specific route. Before that, an available `SkyWalking OAP` server was required and accessible.

```shell
curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins": {
"http-logger": {
"endpoint_addr": "http://127.0.0.1:12800"
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
},
"uri": "/hello"
}'
```

## Test Plugin

> success:

```shell
$ curl -i http://127.0.0.1:9080/hello
HTTP/1.1 200 OK
...
hello, world
```

Completion of the steps, could find the Log details on `SkyWalking UI`.

## Metadata

`skywalking-logger` also supports to custom log format like [http-logger](./http-logger.md).

| Name | Type | Requirement | Default | Valid | Description |
| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- |
| log_format | object | optional | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | Log format declared as key value pair in JSON format. Only string is supported in the `value` part. If the value starts with `$`, it means to get `APISIX` variables or [Nginx variable](http://nginx.org/en/docs/varindex.html). |

Note that **the metadata configuration is applied in global scope**, which means it will take effect on all Route or Service which use `skywalking-logger` plugin.

## Disable Plugin

Remove the corresponding json configuration in the plugin configuration to disable the `skywalking-logger`.
APISIX plugins are hot-reloaded, therefore no need to restart APISIX.

```shell
$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'
```
1 change: 1 addition & 0 deletions docs/zh/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
"label": "Loggers",
"items": [
"plugins/http-logger",
"plugins/skywalking-logger",
"plugins/tcp-logger",
"plugins/kafka-logger",
"plugins/udp-logger",
Expand Down
Loading