Skip to content

Commit

Permalink
Refactor the shutdown
Browse files Browse the repository at this point in the history
This PR introduces the changes needed to follow the new shutdown
semantic, it also remove the gelf as a runtime dependencies and move it
as a development dependency since its only used in the tests.

Fixes #17
  • Loading branch information
ph committed Sep 21, 2015
1 parent c768132 commit 134965d
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 24 deletions.
35 changes: 13 additions & 22 deletions lib/logstash/inputs/gelf.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# encoding: utf-8
require "date"
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/json"
require "logstash/timestamp"
require "stud/interval"
require "date"
require "socket"

# This input will read GELF messages as events over the network,
Expand Down Expand Up @@ -42,12 +43,12 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base
#
config :strip_leading_underscore, :validate => :boolean, :default => true

RECONNECT_BACKOFF_SLEEP = 5

public
def initialize(params)
super
BasicSocket.do_not_reverse_lookup = true
@shutdown_requested = false
@udp = nil
end # def initialize

public
Expand All @@ -60,42 +61,31 @@ def run(output_queue)
begin
# udp server
udp_listener(output_queue)
rescue LogStash::ShutdownSignal
@shutdown_requested = true
rescue => e
unless @shutdown_requested
unless stop?
@logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace)
sleep(5)
retry
Stud.stoppable_sleep(RECONNECT_BACKOFF_SLEEP) { stop? }
retry unless stop?
end
end # begin
end # def run

public
def teardown
@shutdown_requested = true
if @udp
@udp.close_read rescue nil
@udp.close_write rescue nil
@udp = nil
end
finished
def stop
@udp.close
rescue IOError # the plugin is currently shutting down, so its safe to ignore theses errors
end

private
def udp_listener(output_queue)
@logger.info("Starting gelf listener", :address => "#{@host}:#{@port}")

if @udp
@udp.close_read rescue nil
@udp.close_write rescue nil
end

@udp = UDPSocket.new(Socket::AF_INET)
@udp.bind(@host, @port)

while !@shutdown_requested
while !stop?
line, client = @udp.recvfrom(8192)

begin
data = Gelfd::Parser.parse(line)
rescue => ex
Expand All @@ -113,6 +103,7 @@ def udp_listener(output_queue)
event.timestamp = LogStash::Timestamp.at(event["timestamp"])
event.remove("timestamp")
end

remap_gelf(event) if @remap
strip_leading_underscore(event) if @strip_leading_underscore
decorate(event)
Expand Down
4 changes: 3 additions & 1 deletion logstash-input-gelf.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ Gem::Specification.new do |s|
s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0'

s.add_runtime_dependency "gelfd", ["0.2.0"] #(Apache 2.0 license)
s.add_runtime_dependency "gelf", ["1.3.2"] #(MIT license)
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency "stud", "~> 0.0.22"

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency "gelf", ["1.3.2"] #(MIT license)
s.add_development_dependency "flores"
end

18 changes: 17 additions & 1 deletion spec/inputs/gelf_spec.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/gelf"
require_relative "../support/helpers"
require "gelf"
require "flores/random"

describe "inputs/gelf" do
describe LogStash::Inputs::Gelf do
context "when interrupting the plugin" do
let(:port) { Flores::Random.integer(1024..65535) }
let(:host) { "127.0.0.1" }
let(:chunksize) { 1420 }
let(:producer) { InfiniteGelfProducer.new(host, port, chunksize) }
let(:config) { { "host" => host, "port" => port } }

before { producer.run }
after { producer.stop }


it_behaves_like "an interruptible input plugin"
end

it "reads chunked gelf messages " do
port = 12209
Expand Down
18 changes: 18 additions & 0 deletions spec/support/helpers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# encoding: utf-8
class InfiniteGelfProducer
def initialize(host, port, chunksize)
@client = GELF::Notifier.new(host, port, chunksize)
end

def run
@producer = Thread.new do
while true
@client.notify!("short_message" => "hello world")
end
end
end

def stop
@producer.kill
end
end

0 comments on commit 134965d

Please sign in to comment.