Skip to content

Commit

Permalink
throw exceptions on HTTPTooManyRequests and HTTPServerError so Fluent…
Browse files Browse the repository at this point in the history
…D will retry (#1845)

* throw exceptions on HTTPTooManyRequests and HTTPServerError to trigger retry

* fixed variable naming

* Update fluentd/fluent-plugin-grafana-loki/spec/gems/fluent/plugin/loki_output_spec.rb

spelling fix

Co-Authored-By: Cyril Tovena <cyril.tovena@gmail.com>

* removed newline

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>

(cherry picked from commit 7413f11)
Signed-off-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
2 people authored and slim-bean committed Apr 1, 2020
1 parent 3c479c8 commit 40e47bd
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 22 deletions.
8 changes: 7 additions & 1 deletion fluentd/fluent-plugin-grafana-loki/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Metrics/BlockLength:
Max: 60
Metrics/MethodLength:
Max: 50
Metrics/LineLength:
Layout/LineLength:
Max: 120
Metrics/ClassLength:
Max: 150
Expand All @@ -25,3 +25,9 @@ RSpec/ExampleLength:
Max: 100
RSpec/MultipleExpectations:
Max: 10
Style/HashEachMethods:
Enabled: true
Style/HashTransformKeys:
Enabled: true
Style/HashTransformValues:
Enabled: true
2 changes: 1 addition & 1 deletion fluentd/fluent-plugin-grafana-loki/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ source 'https://rubygems.org'

gemspec

gem 'fluentd', '1.9.0'
gem 'fluentd', '1.9.3'
gem 'rubocop-rspec'
gem 'simplecov', require: false, group: :test
gem 'test-unit'
46 changes: 26 additions & 20 deletions fluentd/fluent-plugin-grafana-loki/lib/fluent/plugin/out_loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'fluent/env'
require 'fluent/plugin/output'
require 'net/http'
require 'yajl'
Expand All @@ -26,6 +27,8 @@ module Plugin
class LokiOutput < Fluent::Plugin::Output # rubocop:disable Metrics/ClassLength
Fluent::Plugin.register_output('loki', self)

class LogPostError < StandardError; end

helpers :compat_parameters, :record_accessor

attr_accessor :record_accessors
Expand Down Expand Up @@ -131,29 +134,16 @@ def write(chunk)
body = { 'streams' => payload }

# add ingest path to loki url
res = loki_http_request(body)

req = Net::HTTP::Post.new(
@uri.request_uri
)
req.add_field('Content-Type', 'application/json')
req.add_field('X-Scope-OrgID', @tenant) if @tenant
req.body = Yajl.dump(body)
req.basic_auth(@username, @password) if @username

opts = ssl_opts(@uri)
return if res.is_a?(Net::HTTPSuccess)

log.debug "sending #{req.body.length} bytes to loki"
res = Net::HTTP.start(@uri.host, @uri.port, **opts) { |http| http.request(req) }
unless res&.is_a?(Net::HTTPSuccess)
res_summary = if res
"#{res.code} #{res.message} #{res.body}"
else
'res=nil'
end
log.warn "failed to #{req.method} #{@uri} (#{res_summary})"
log.warn Yajl.dump(body)
res_summary = "#{res.code} #{res.message} #{res.body}"
log.warn "failed to write post to #{@uri} (#{res_summary})"
log.debug Yajl.dump(body)

end
# Only retry 429 and 500s
raise(LogPostError, res_summary) if res.is_a?(Net::HTTPTooManyRequests) || res.is_a?(Net::HTTPServerError)
end

def ssl_opts(uri)
Expand Down Expand Up @@ -186,6 +176,22 @@ def generic_to_loki(chunk)

private

def loki_http_request(body)
req = Net::HTTP::Post.new(
@uri.request_uri
)
req.add_field('Content-Type', 'application/json')
req.add_field('X-Scope-OrgID', @tenant) if @tenant
req.body = Yajl.dump(body)
req.basic_auth(@username, @password) if @username

opts = ssl_opts(@uri)

log.debug "sending #{req.body.length} bytes to loki"

Net::HTTP.start(@uri.host, @uri.port, **opts) { |http| http.request(req) }
end

def numeric?(val)
!Float(val).nil?
rescue StandardError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,37 @@
expect(res[0]['stream']).to eq('stream' => 'stdout')
6.times { |i| expect(res[0]['values'][i][1]).to eq i.to_s }
end

it 'raises an LogPostError when http request is not successful' do
config = <<-CONF
url https://logs-us-west1.grafana.net
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
lines = [[Time.at(1_546_270_458), { 'message' => 'foobar', 'stream' => 'stdout' }]]

# 200
success = Net::HTTPSuccess.new(1.0, 200, 'OK')
allow(driver.instance).to receive(:loki_http_request) { success }
allow(success).to receive(:body).and_return('fake body')
expect { driver.instance.write(lines) }.not_to raise_error

# 205
success = Net::HTTPSuccess.new(1.0, 205, 'OK')
allow(driver.instance).to receive(:loki_http_request) { success }
allow(success).to receive(:body).and_return('fake body')
expect { driver.instance.write(lines) }.not_to raise_error

# 429
too_many_requests = Net::HTTPTooManyRequests.new(1.0, 429, 'OK')
allow(driver.instance).to receive(:loki_http_request) { too_many_requests }
allow(too_many_requests).to receive(:body).and_return('fake body')
expect { driver.instance.write(lines) }.to raise_error(described_class::LogPostError)

# 505
server_error = Net::HTTPServerError.new(1.0, 505, 'OK')
allow(driver.instance).to receive(:loki_http_request) { server_error }
allow(server_error).to receive(:body).and_return('fake body')
expect { driver.instance.write(lines) }.to raise_error(described_class::LogPostError)
end
end

0 comments on commit 40e47bd

Please sign in to comment.