Skip to content

Commit

Permalink
chore(fluentd): resolve rubocop failures (#6359)
Browse files Browse the repository at this point in the history
Signed-off-by: Trevor Wood <Trevor.G.Wood@gmail.com>
  • Loading branch information
taharah authored Jun 10, 2022
1 parent 3f4a663 commit 23cc938
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 34 deletions.
1 change: 1 addition & 0 deletions clients/cmd/fluentd/.rubocop.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require: rubocop-rspec

AllCops:
TargetRubyVersion: 2.7
NewCops: disable
Exclude:
- 'bin/**'
Expand Down
2 changes: 1 addition & 1 deletion clients/cmd/fluentd/docker/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

source 'https://rubygems.org'

gem 'fluent-plugin-multi-format-parser', '~>1.0.0'
gem 'fluentd', '1.9.0'
gem 'fluent-plugin-multi-format-parser', '~>1.0.0'
4 changes: 3 additions & 1 deletion clients/cmd/fluentd/fluent-plugin-grafana-loki.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ Gem::Specification.new do |spec|
spec.homepage = 'https://github.com/grafana/loki/'
spec.license = 'Apache-2.0'

spec.required_ruby_version = '~> 2.7'

# test_files, files = `git ls-files -z`.split("\x0").partition do |f|
# f.match(%r{^(test|spec|features)/})
# end
spec.files = Dir.glob('{bin,lib}/**/*') + %w[LICENSE README.md]
spec.files = Dir['{bin,lib}/**/*'] + %w[LICENSE README.md]
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
spec.require_paths = ['lib']

Expand Down
49 changes: 24 additions & 25 deletions clients/cmd/fluentd/lib/fluent/plugin/out_loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ class LogPostError < StandardError; end
config_set_default :chunk_keys, []
end

def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity
# rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def configure(conf)
compat_parameters_convert(conf, :buffer)
super
@uri = URI.parse(@url + '/loki/api/v1/push')
@uri = URI.parse("#{@url}/loki/api/v1/push")
unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS)
raise Fluent::ConfigError, 'URL parameter must have HTTP/HTTPS scheme'
end
Expand All @@ -108,25 +109,24 @@ def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity
validate_client_cert_key
end

raise "bearer_token_file #{@bearer_token_file} not found" if !@bearer_token_file.nil? && !File.exist?(@bearer_token_file)
if !@bearer_token_file.nil? && !File.exist?(@bearer_token_file)
raise "bearer_token_file #{@bearer_token_file} not found"
end

@auth_token_bearer = nil
if !@bearer_token_file.nil?
if !File.exist?(@bearer_token_file)
raise "bearer_token_file #{@bearer_token_file} not found"
end
unless @bearer_token_file.nil?
raise "bearer_token_file #{@bearer_token_file} not found" unless File.exist?(@bearer_token_file)

# Read the file once, assume long-lived authentication token.
@auth_token_bearer = File.read(@bearer_token_file)
if @auth_token_bearer.empty?
raise "bearer_token_file #{@bearer_token_file} is empty"
end
raise "bearer_token_file #{@bearer_token_file} is empty" if @auth_token_bearer.empty?

log.info "will use Bearer token from bearer_token_file #{@bearer_token_file} in Authorization header"
end


raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert)
end
# rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

def client_cert_configured?
!@key.nil? && !@cert.nil?
Expand Down Expand Up @@ -204,8 +204,7 @@ def http_request_opts(uri)
def generic_to_loki(chunk)
# log.debug("GenericToLoki: converting #{chunk}")
streams = chunk_to_loki(chunk)
payload = payload_builder(streams)
payload
payload_builder(streams)
end

private
Expand All @@ -215,7 +214,7 @@ def loki_http_request(body, tenant)
@uri.request_uri
)
req.add_field('Content-Type', 'application/json')
req.add_field('Authorization', "Bearer #{@auth_token_bearer}") if !@auth_token_bearer.nil?
req.add_field('Authorization', "Bearer #{@auth_token_bearer}") unless @auth_token_bearer.nil?
req.add_field('X-Scope-OrgID', tenant) if tenant
req.body = Yajl.dump(body)
req.basic_auth(@username, @password) if @username
Expand All @@ -241,7 +240,7 @@ def format_labels(data_labels)
data_labels = {} if data_labels.nil?
data_labels = data_labels.merge(@extra_labels)
# sanitize label values
data_labels.each { |k, v| formatted_labels[k] = v.gsub('"', '\\"') if v && v&.is_a?(String) }
data_labels.each { |k, v| formatted_labels[k] = v.gsub('"', '\\"') if v.is_a?(String) }
formatted_labels
end

Expand Down Expand Up @@ -270,6 +269,7 @@ def to_nano(time)
end
end

# rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def record_to_line(record)
line = ''
if @drop_single_key && record.keys.length == 1
Expand All @@ -282,11 +282,9 @@ def record_to_line(record)
formatted_labels = []
record.each do |k, v|
# Remove non UTF-8 characters by force-encoding the string
if v.is_a?(String)
v = v.encode('utf-8', invalid: :replace, undef: :replace, replace: '?')
end
v = v.encode('utf-8', invalid: :replace, undef: :replace, replace: '?') if v.is_a?(String)
# Escape double quotes and backslashes by prefixing them with a backslash
v = v.to_s.gsub(%r{(["\\])}, '\\\\\1')
v = v.to_s.gsub(/(["\\])/, '\\\\\1')
if v.include?(' ') || v.include?('=')
formatted_labels.push(%(#{k}="#{v}"))
else
Expand All @@ -298,25 +296,25 @@ def record_to_line(record)
end
line
end
# rubocop:enable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

# convert a line to loki line with labels
# rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def line_to_loki(record)
chunk_labels = {}
line = ''
if record.is_a?(Hash)
@record_accessors&.each do |name, accessor|
new_key = name.gsub(%r{[.\-\/]}, '_')
new_key = name.gsub(%r{[.\-/]}, '_')
chunk_labels[new_key] = accessor.call(record)
accessor.delete(record)
end

if @extract_kubernetes_labels && record.key?('kubernetes')
kubernetes_labels = record['kubernetes']['labels']
if !kubernetes_labels.nil?
kubernetes_labels.each_key do |l|
new_key = l.gsub(%r{[.\-\/]}, '_')
chunk_labels[new_key] = kubernetes_labels[l]
end
kubernetes_labels&.each_key do |l|
new_key = l.gsub(%r{[.\-/]}, '_')
chunk_labels[new_key] = kubernetes_labels[l]
end
end

Expand Down Expand Up @@ -345,6 +343,7 @@ def line_to_loki(record)
labels: chunk_labels
}
end
# rubocop:enable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

# iterate through each chunk and create a loki stream entry
def chunk_to_loki(chunk)
Expand Down
16 changes: 9 additions & 7 deletions clients/cmd/fluentd/spec/gems/fluent/plugin/loki_output_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/non_utf8.log')[0]
chunk = [Time.at(1_546_270_458), {'message'=>content, 'number': 1.2345, 'stream'=>'stdout'}]
chunk = [Time.at(1_546_270_458), { 'message' => content, 'number': 1.2345, 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([chunk])
expect(payload[0]['stream'].empty?).to eq true
expect(payload[0]['values'].count).to eq 1
expect(payload[0]['values'][0][0]).to eq "1546270458000000000"
expect(payload[0]['values'][0][1]).to eq "message=\"? rest of line\" number=1.2345 stream=stdout"
expect(payload[0]['values'][0][0]).to eq '1546270458000000000'
expect(payload[0]['values'][0][1]).to eq 'message="? rest of line" number=1.2345 stream=stdout'
end

it 'handle non utf-8 characters from log lines in json format' do
Expand All @@ -134,12 +134,14 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/non_utf8.log')[0]
chunk = [Time.at(1_546_270_458), {'message'=>content, 'number': 1.2345, 'stream'=>'stdout'}]
chunk = [Time.at(1_546_270_458), { 'message' => content, 'number': 1.2345, 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([chunk])
expect(payload[0]['stream'].empty?).to eq true
expect(payload[0]['values'].count).to eq 1
expect(payload[0]['values'][0][0]).to eq "1546270458000000000"
expect(payload[0]['values'][0][1]).to eq "{\"message\":\"\xC1 rest of line\",\"number\":1.2345,\"stream\":\"stdout\"}"
expect(payload[0]['values'][0][0]).to eq '1546270458000000000'
expect(payload[0]['values'][0][1]).to eq(
"{\"message\":\"\xC1 rest of line\",\"number\":1.2345,\"stream\":\"stdout\"}"
)
end

it 'formats record hash as key_value' do
Expand All @@ -155,7 +157,7 @@
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq 'message="' + content[0] + '" stream=stdout'
expect(body[:streams][0]['values'][0][1]).to eq "message=\"#{content[0]}\" stream=stdout"
end

it 'formats record hash as json' do
Expand Down

0 comments on commit 23cc938

Please sign in to comment.