Skip to content

Commit

Permalink
ensure execution_context is propagated to additional_codecs (#152)
Browse files Browse the repository at this point in the history
* ensure execution_context is propagated to additional_codecs

* more closely specify the desired behaviour for pipeline.ecs_compatibility

* version bump

* restructure conditional

too complex to be a guard clause.

Co-authored-by: João Duarte <jsvd@users.noreply.github.com>

Co-authored-by: João Duarte <jsvd@users.noreply.github.com>
  • Loading branch information
yaauie and jsvd authored Apr 12, 2022
1 parent d20a3e0 commit 0e37a1c
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.5.1
- Fix: codecs provided with `additional_codecs` now correctly run in the pipeline's context, which means that they respect the `pipeline.ecs_compatibility` setting [#152](https://github.com/logstash-plugins/logstash-input-http/pull/152)

## 3.5.0
- Feat: TLSv1.3 support [#146](https://github.com/logstash-plugins/logstash-input-http/pull/146)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.5.0
3.5.1
13 changes: 12 additions & 1 deletion lib/logstash/inputs/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class LogStash::Inputs::Http < LogStash::Inputs::Base
config :verify_mode, :validate => ['none', 'peer', 'force_peer'], :default => 'none',
:deprecated => "Set 'ssl_verify_mode' instead."

attr_reader :codecs

public
def register

Expand All @@ -140,7 +142,7 @@ def register
@codecs = Hash.new

@additional_codecs.each do |content_type, codec|
@codecs[content_type] = LogStash::Plugin.lookup("codec", codec).new
@codecs[content_type] = initialize_codec(codec)
end

require "logstash/inputs/http/message_handler"
Expand Down Expand Up @@ -333,4 +335,13 @@ def error_details(e, trace = false)
error_details
end

def initialize_codec(codec_name)
codec_klass = LogStash::Plugin.lookup("codec", codec_name)
if defined?(::LogStash::Plugins::Contextualizer)
::LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, codec_klass)
else
codec_klass.new
end
end

end # class LogStash::Inputs::Http
64 changes: 64 additions & 0 deletions spec/inputs/http_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -656,4 +656,68 @@ def setup_server_client(url = self.url)

end
end
end if false

# If we have a setting called `pipeline.ecs_compatibility`, we need to
# ensure that our additional_codecs are instantiated with the proper
# execution context in order to ensure that the pipeline setting is
# respected.
if LogStash::SETTINGS.registered?('pipeline.ecs_compatibility')

def with_setting(name, value, &block)
setting = LogStash::SETTINGS.get_setting(name)
was_set, orignial_value = setting.set?, setting.value
setting.set(value)

yield(true)

ensure
was_set ? setting.set(orignial_value) : setting.reset
end

def setting_value_supported?(name, value)
with_setting(name, value) { true }
rescue
false
end

describe LogStash::Inputs::Http do
context 'additional_codecs' do
let(:port) { rand(1025...5000) }

%w(disabled v1 v8).each do |spec|
if setting_value_supported?('pipeline.ecs_compatibility', spec)
context "with `pipeline.ecs_compatibility: #{spec}`" do
around(:each) { |example| with_setting('pipeline.ecs_compatibility', spec, &example) }

it 'propagates the ecs_compatibility pipeline setting to the additional_codecs' do
input("input { http { port => #{port} additional_codecs => { 'application/json' => 'json' 'text/plain' => 'plain' } } }") do |pipeline, queue|
http_input = pipeline.inputs.first
expect(http_input).to be_a_kind_of(described_class) # precondition

http_input.codecs.each do |key, value|
aggregate_failures("Codec for `#{key}`") do
expect(value.ecs_compatibility).to eq(spec.to_sym)
end
end
end
end
end
end
end

it 'propagates the execution context from the input to the codecs' do
input("input { http { port => #{port} } }") do |pipeline, queue|
http_input = pipeline.inputs.first
expect(http_input).to be_a_kind_of(described_class) # precondition

http_input.codecs.each do |key, value|
aggregate_failures("Codec for `#{key}`") do
expect(value.execution_context).to be http_input.execution_context
end
end
end
end
end
end
end

0 comments on commit 0e37a1c

Please sign in to comment.