Skip to content

Commit

Permalink
fluentd: Refactor label_keys and and add extract_kubernetes_labels co…
Browse files Browse the repository at this point in the history
…nfiguration (#1186)

* refactor label_keys to labels and add kubernetes label extract

* fix escaping
  • Loading branch information
tarokkk authored and cyriltovena committed Oct 25, 2019
1 parent 38586f5 commit 51105f6
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 40 deletions.
67 changes: 63 additions & 4 deletions fluentd/fluent-plugin-grafana-loki/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This plugin offers two line formats and uses protobuf to send compressed data to

Key features:
* extra_labels - labels to be added to every line of a logfile, useful for designating environments
* label_keys - customizable list of keys for stream labels
* label - This section allows you to specify labels from your log fields

## Installation

Expand All @@ -29,6 +29,64 @@ In your Fluentd configuration, use `@type loki`. Additional configuration is opt
</match>
```

### Using labels

Simple label from top level attribute
```
<match mytag>
@type loki
# ...
<label>
fluentd_worker
</label>
# ...
</match>
```

You can rewrite the label keys as well as the following

```
<match mytag>
@type loki
# ...
<label>
worker fluentd_worker
</label>
# ...
</match>
```

You can use record accessor syntax for nested field. https://docs.fluentd.org/plugin-helper-overview/api-plugin-helper-record_accessor#syntax

```
<match mytag>
@type loki
# ...
<label>
container $.kubernetes.container
</label>
# ...
</match>
```

### Extracting Kubernetes labels

As Kubernetes labels are a list of nested key-value pairs there is a separate option to extract them.
Note that special characters like "`. - /`" will be overwritten with `_`.
Use with the `remove_keys kubernetes` option to eliminate metadata from the log.
```
<match mytag>
@type loki
# ...
extract_kubernetes_labels true
remove_keys kubernetes
<label>
container $.kubernetes.container
</label>
# ...
</match>
```

### Multi-worker usage

Loki doesn't currently support out-of-order inserts - if you try to insert a log entry an earlier timestamp after a log entry with with identical labels but a later timestamp, the insert will fail with `HTTP status code: 500, message: rpc error: code = Unknown desc = Entry out of order`. Therefore, in order to use this plugin in a multi worker Fluentd setup, you'll need to include the worker ID in the labels.
Expand All @@ -45,7 +103,9 @@ For example, using [fluent-plugin-record-modifier](https://github.com/repeatedly
<match mytag>
@type loki
# ...
label_keys "fluentd_worker"
<label>
fluentd_worker
</label>
# ...
</match>
```
Expand Down Expand Up @@ -112,8 +172,7 @@ Loki is intended to index and group log streams using only a small set of labels

There are few configurations settings to control the output format.
- extra_labels: (default: nil) set of labels to include with every Loki stream. eg `{"env":"dev", "datacenter": "dc1"}`
- remove_keys: (default: nil) comma separated list of needless record keys to remove. All other keys will be placed into the log line
- label_keys: (default: "job,instance") comma separated list of keys to use as stream labels. All other keys will be placed into the log line
- remove_keys: (default: nil) comma separated list of needless record keys to remove. All other keys will be placed into the log line. You can use [record_accessor syntax](https://docs.fluentd.org/plugin-helper-overview/api-plugin-helper-record_accessor#syntax).
- line_format: format to use when flattening the record to a log line. Valid values are "json" or "key_value". If set to "json" the log line sent to Loki will be the fluentd record (excluding any keys extracted out as labels) dumped as json. If set to "key_value", the log line will be each item in the record concatenated together (separated by a single space) in the format `<key>=<value>`.
- drop_single_key: if set to true and after extracting label_keys a record only has a single key remaining, the log line sent to Loki will just be the value of the record key.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ $LOAD_PATH.push File.expand_path('lib', __dir__)

Gem::Specification.new do |spec|
spec.name = 'fluent-plugin-grafana-loki'
spec.version = '1.0.2'
spec.version = '1.1.0'
spec.authors = %w[woodsaj briangann]
spec.email = ['awoods@grafana.com', 'brian@grafana.com']

Expand All @@ -25,7 +25,7 @@ Gem::Specification.new do |spec|
# spec.test_files = test_files
# spec.require_paths = ['lib']

spec.add_development_dependency 'bundler', '~> 1.15'
spec.add_development_dependency 'bundler'
spec.add_development_dependency 'rake', '~> 12.0'
spec.add_development_dependency 'rspec', '~> 3.0'
spec.add_runtime_dependency 'fluentd', ['>= 0.14.10', '< 2']
Expand Down
80 changes: 50 additions & 30 deletions fluentd/fluent-plugin-grafana-loki/lib/fluent/plugin/out_loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,38 @@
module Fluent
module Plugin
# Subclass of Fluent Plugin Output
class LokiOutput < Fluent::Plugin::Output
class LokiOutput < Fluent::Plugin::Output # rubocop:disable Metrics/ClassLength
Fluent::Plugin.register_output('loki', self)

helpers :compat_parameters
helpers :compat_parameters, :record_accessor

attr_accessor :record_accessors

DEFAULT_BUFFER_TYPE = 'memory'

# url of loki server
desc 'url of loki server'
config_param :url, :string, default: 'https://logs-us-west1.grafana.net'

# BasicAuth credentials
desc 'BasicAuth credentials'
config_param :username, :string, default: nil
config_param :password, :string, default: nil, secret: true

# Loki tenant id
desc 'Loki tenant id'
config_param :tenant, :string, default: nil

# extra labels to add to all log streams
desc 'extra labels to add to all log streams'
config_param :extra_labels, :hash, default: {}

# format to use when flattening the record to a log line
desc 'format to use when flattening the record to a log line'
config_param :line_format, :enum, list: %i[json key_value], default: :key_value

# comma separated list of keys to use as stream lables. All other keys will be placed into the log line
config_param :label_keys, :string, default: 'job,instance'
desc 'extract kubernetes labels as loki labels'
config_param :extract_kubernetes_labels, :bool, default: false

# comma separated list of needless record keys to remove
config_param :remove_keys, :string, default: nil
desc 'comma separated list of needless record keys to remove'
config_param :remove_keys, :array, default: %w[], value_type: :string

# if a record only has 1 key, then just set the log line to the value and discard the key.
desc 'if a record only has 1 key, then just set the log line to the value and discard the key.'
config_param :drop_single_key, :bool, default: false

config_section :buffer do
Expand All @@ -64,9 +66,18 @@ class LokiOutput < Fluent::Plugin::Output
def configure(conf)
compat_parameters_convert(conf, :buffer)
super

@label_keys = @label_keys.split(/\s*,\s*/) if @label_keys
@remove_keys = @remove_keys.split(',').map(&:strip) if @remove_keys
@record_accessors = {}
conf.elements.select { |element| element.name == 'label' }.each do |element|
element.each_pair do |k, v|
element.key?(k) # to suppress unread configuration warning
v = k if v.empty?
@record_accessors[k] = record_accessor_create(v)
end
end
@remove_keys_accessors = []
@remove_keys.each do |key|
@remove_keys_accessors.push(record_accessor_create(key))
end
end

def multi_workers_ready?
Expand All @@ -84,7 +95,7 @@ def http_opts(uri)
def write(chunk)
# streams by label
payload = generic_to_loki(chunk)
body = { 'streams': payload }
body = { 'streams' => payload }

# add ingest path to loki url
uri = URI.parse(url + '/api/prom/push')
Expand All @@ -101,7 +112,7 @@ def write(chunk)
}
log.debug "sending #{req.body.length} bytes to loki"
res = Net::HTTP.start(uri.hostname, uri.port, **opts) { |http| http.request(req) }
unless res && res.is_a?(Net::HTTPSuccess)
unless res&.is_a?(Net::HTTPSuccess)
res_summary = if res
"#{res.code} #{res.message} #{res.body}"
else
Expand Down Expand Up @@ -136,7 +147,7 @@ def labels_to_protocol(data_labels)
data_labels = data_labels.merge(@extra_labels)

data_labels.each do |k, v|
formatted_labels.push("#{k}=\"#{v.gsub('"','\\"')}\"") if v
formatted_labels.push(%(#{k}="#{v.gsub('"', '\\"')}")) if v
end
'{' + formatted_labels.join(',') + '}'
end
Expand All @@ -145,7 +156,7 @@ def payload_builder(streams)
payload = []
streams.each do |k, v|
# create a stream for each label set.
# Additionally sort the entries by timestamp just incase we
# Additionally sort the entries by timestamp just in case we
# got them out of order.
# 'labels' => '{worker="0"}',
payload.push(
Expand All @@ -167,30 +178,39 @@ def record_to_line(record)
when :key_value
formatted_labels = []
record.each do |k, v|
formatted_labels.push("#{k}=\"#{v}\"")
formatted_labels.push(%(#{k}="#{v}"))
end
line = formatted_labels.join(' ')
end
end
line
end

#
# convert a line to loki line with labels
def line_to_loki(record)
chunk_labels = {}
line = ''
if record.is_a?(Hash)
# remove needless keys.
@remove_keys.each { |v|
record.delete(v)
} if @remove_keys
# extract white listed record keys into labels.
@label_keys.each do |k|
if record.key?(k)
chunk_labels[k] = record[k]
record.delete(k)
@record_accessors&.each do |name, accessor|
new_key = name.gsub(%r{[.\-\/]}, '_')
chunk_labels[new_key] = accessor.call(record)
accessor.delete(record)
end

if @extract_kubernetes_labels && record.key?('kubernetes')
kubernetes_labels = record['kubernetes']['labels']
kubernetes_labels.each_key do |l|
new_key = l.gsub(%r{[.\-\/]}, '_')
chunk_labels[new_key] = kubernetes_labels[l]
end
end if @label_keys
end

# remove needless keys.
@remove_keys_accessors&.each do |deleter|
deleter.delete(record)
end

line = record_to_line(record)
else
line = record.to_s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
tenant 1234
extra_labels {}
line_format key_value
label_keys "job,instance"
drop_single_key true
remove_keys a, b
<label>
job
instance instance
</label>
CONF

expect(driver.instance.url).to eq 'https://logs-us-west1.grafana.net'
Expand All @@ -32,7 +36,8 @@
expect(driver.instance.tenant).to eq '1234'
expect(driver.instance.extra_labels).to eq({})
expect(driver.instance.line_format).to eq :key_value
expect(driver.instance.label_keys).to eq %w[job instance]
expect(driver.instance.record_accessors.keys).to eq %w[job instance]
expect(driver.instance.remove_keys).to eq %w[a b]
expect(driver.instance.drop_single_key).to eq true
end

Expand Down Expand Up @@ -139,7 +144,9 @@
config = <<-CONF
url https://logs-us-west1.grafana.net
line_format json
label_keys "stream"
<label>
stream
</label>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
Expand All @@ -153,12 +160,55 @@
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0])
end

it 'extracts nested record key as label' do
config = <<-CONF
url https://logs-us-west1.grafana.net
line_format json
<label>
pod $.kubernetes.pod
</label>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{pod="podname"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0], 'kubernetes' => {})
end

it 'extracts nested record key as label and drop key after' do
config = <<-CONF
url https://logs-us-west1.grafana.net
line_format json
remove_keys kubernetes
<label>
pod $.kubernetes.pod
</label>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{pod="podname"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0])
end

it 'formats as simple string when only 1 record key' do
config = <<-CONF
url https://logs-us-west1.grafana.net
line_format json
label_keys "stream"
drop_single_key true
<label>
stream
</label>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
Expand Down

0 comments on commit 51105f6

Please sign in to comment.