diff --git a/lib/logstash/inputs/gelf.rb b/lib/logstash/inputs/gelf.rb index 17b56c9..a50eb3a 100644 --- a/lib/logstash/inputs/gelf.rb +++ b/lib/logstash/inputs/gelf.rb @@ -1,9 +1,10 @@ # encoding: utf-8 -require "date" require "logstash/inputs/base" require "logstash/namespace" require "logstash/json" require "logstash/timestamp" +require "stud/interval" +require "date" require "socket" # This input will read GELF messages as events over the network, @@ -42,12 +43,12 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base # config :strip_leading_underscore, :validate => :boolean, :default => true + RECONNECT_BACKOFF_SLEEP = 5 + public def initialize(params) super BasicSocket.do_not_reverse_lookup = true - @shutdown_requested = false - @udp = nil end # def initialize public @@ -60,42 +61,31 @@ def run(output_queue) begin # udp server udp_listener(output_queue) - rescue LogStash::ShutdownSignal - @shutdown_requested = true rescue => e - unless @shutdown_requested + unless stop? @logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace) - sleep(5) - retry + Stud.stoppable_sleep(RECONNECT_BACKOFF_SLEEP) { stop? } + retry unless stop? end end # begin end # def run public - def teardown - @shutdown_requested = true - if @udp - @udp.close_read rescue nil - @udp.close_write rescue nil - @udp = nil - end - finished + def stop + @udp.close + rescue IOError # the plugin is currently shutting down, so its safe to ignore theses errors end private def udp_listener(output_queue) @logger.info("Starting gelf listener", :address => "#{@host}:#{@port}") - if @udp - @udp.close_read rescue nil - @udp.close_write rescue nil - end - @udp = UDPSocket.new(Socket::AF_INET) @udp.bind(@host, @port) - while !@shutdown_requested + while !stop? line, client = @udp.recvfrom(8192) + begin data = Gelfd::Parser.parse(line) rescue => ex @@ -113,6 +103,7 @@ def udp_listener(output_queue) event.timestamp = LogStash::Timestamp.at(event["timestamp"]) event.remove("timestamp") end + remap_gelf(event) if @remap strip_leading_underscore(event) if @strip_leading_underscore decorate(event) diff --git a/logstash-input-gelf.gemspec b/logstash-input-gelf.gemspec index 9ae7929..14e695a 100644 --- a/logstash-input-gelf.gemspec +++ b/logstash-input-gelf.gemspec @@ -23,9 +23,11 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0' s.add_runtime_dependency "gelfd", ["0.2.0"] #(Apache 2.0 license) - s.add_runtime_dependency "gelf", ["1.3.2"] #(MIT license) s.add_runtime_dependency 'logstash-codec-plain' + s.add_runtime_dependency "stud", "~> 0.0.22" s.add_development_dependency 'logstash-devutils' + s.add_development_dependency "gelf", ["1.3.2"] #(MIT license) + s.add_development_dependency "flores" end diff --git a/spec/inputs/gelf_spec.rb b/spec/inputs/gelf_spec.rb index eae41fe..dde7563 100644 --- a/spec/inputs/gelf_spec.rb +++ b/spec/inputs/gelf_spec.rb @@ -1,8 +1,24 @@ +# encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/gelf" +require_relative "../support/helpers" require "gelf" +require "flores/random" -describe "inputs/gelf" do +describe LogStash::Inputs::Gelf do + context "when interrupting the plugin" do + let(:port) { Flores::Random.integer(1024..65535) } + let(:host) { "127.0.0.1" } + let(:chunksize) { 1420 } + let(:producer) { InfiniteGelfProducer.new(host, port, chunksize) } + let(:config) { { "host" => host, "port" => port } } + + before { producer.run } + after { producer.stop } + + + it_behaves_like "an interruptible input plugin" + end it "reads chunked gelf messages " do port = 12209 diff --git a/spec/support/helpers.rb b/spec/support/helpers.rb new file mode 100644 index 0000000..cd177d2 --- /dev/null +++ b/spec/support/helpers.rb @@ -0,0 +1,18 @@ +# encoding: utf-8 +class InfiniteGelfProducer + def initialize(host, port, chunksize) + @client = GELF::Notifier.new(host, port, chunksize) + end + + def run + @producer = Thread.new do + while true + @client.notify!("short_message" => "hello world") + end + end + end + + def stop + @producer.kill + end +end