Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure execution_context is propagated to additional_codecs #152

Merged
merged 4 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.5.1
- Fix: codecs provided with `additional_codecs` now correctly run in the pipeline's context, which means that they respect the `pipeline.ecs_compatibility` setting [#152](https://github.com/logstash-plugins/logstash-input-http/pull/152)

## 3.5.0
- Feat: TLSv1.3 support [#146](https://github.com/logstash-plugins/logstash-input-http/pull/146)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.5.0
3.5.1
13 changes: 12 additions & 1 deletion lib/logstash/inputs/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class LogStash::Inputs::Http < LogStash::Inputs::Base
config :verify_mode, :validate => ['none', 'peer', 'force_peer'], :default => 'none',
:deprecated => "Set 'ssl_verify_mode' instead."

attr_reader :codecs

public
def register

Expand All @@ -140,7 +142,7 @@ def register
@codecs = Hash.new

@additional_codecs.each do |content_type, codec|
@codecs[content_type] = LogStash::Plugin.lookup("codec", codec).new
@codecs[content_type] = initialize_codec(codec)
end

require "logstash/inputs/http/message_handler"
Expand Down Expand Up @@ -333,4 +335,13 @@ def error_details(e, trace = false)
error_details
end

def initialize_codec(codec_name)
codec_klass = LogStash::Plugin.lookup("codec", codec_name)
if defined?(::LogStash::Plugins::Contextualizer)
::LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, codec_klass)
else
codec_klass.new
end
end

end # class LogStash::Inputs::Http
64 changes: 64 additions & 0 deletions spec/inputs/http_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -656,4 +656,68 @@ def setup_server_client(url = self.url)

end
end
end if false

# If we have a setting called `pipeline.ecs_compatibility`, we need to
# ensure that our additional_codecs are instantiated with the proper
# execution context in order to ensure that the pipeline setting is
# respected.
if LogStash::SETTINGS.registered?('pipeline.ecs_compatibility')

def with_setting(name, value, &block)
setting = LogStash::SETTINGS.get_setting(name)
was_set, orignial_value = setting.set?, setting.value
setting.set(value)

yield(true)

ensure
was_set ? setting.set(orignial_value) : setting.reset
end

def setting_value_supported?(name, value)
with_setting(name, value) { true }
rescue
false
end

describe LogStash::Inputs::Http do
context 'additional_codecs' do
let(:port) { rand(1025...5000) }

%w(disabled v1 v8).each do |spec|
if setting_value_supported?('pipeline.ecs_compatibility', spec)
context "with `pipeline.ecs_compatibility: #{spec}`" do
around(:each) { |example| with_setting('pipeline.ecs_compatibility', spec, &example) }

it 'propagates the ecs_compatibility pipeline setting to the additional_codecs' do
input("input { http { port => #{port} additional_codecs => { 'application/json' => 'json' 'text/plain' => 'plain' } } }") do |pipeline, queue|
http_input = pipeline.inputs.first
expect(http_input).to be_a_kind_of(described_class) # precondition

http_input.codecs.each do |key, value|
aggregate_failures("Codec for `#{key}`") do
expect(value.ecs_compatibility).to eq(spec.to_sym)
end
end
end
end
end
end
end

it 'propagates the execution context from the input to the codecs' do
input("input { http { port => #{port} } }") do |pipeline, queue|
http_input = pipeline.inputs.first
expect(http_input).to be_a_kind_of(described_class) # precondition

http_input.codecs.each do |key, value|
aggregate_failures("Codec for `#{key}`") do
expect(value.execution_context).to be http_input.execution_context
end
end
end
end
end
end
end