From 66bf0d4c3714931b73740f3dd61e08b2bc3a0d6f Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Thu, 19 Mar 2020 19:00:11 +0100 Subject: [PATCH 01/10] Docs: make sure NOTE: gets properly rendered --- docs/input-kafka.asciidoc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index 3a2fdf79..d6fde577 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -71,8 +71,7 @@ inserted into your original event, you'll have to use the `mutate` filter to man This plugin supports these configuration options plus the <> described later. -NOTE: Some of these options map to a Kafka option. See the -https://kafka.apache.org/documentation for more details. +NOTE: Some of these options map to a Kafka option. See the https://kafka.apache.org/documentation for more details. [cols="<,<,<",options="header",] |======================================================================= @@ -587,8 +586,9 @@ Java Class used to deserialize the record's value A rack identifier for the Kafka consumer. Used to select the physically closest rack for the consumer to read from. -The setting corresponds with the broker config 'broker.rack'. -Only available for Kafka 2.4.0 and higher; see https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica[KIP-392]. +The setting corresponds with Kafka's `broker.rack` configuration. + +NOTE: Only available for Kafka 2.4.0 and higher; see https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica[KIP-392]. [id="plugins-{type}s-{plugin}-common-options"] include::{include_path}/{type}.asciidoc[] From 1e57b67424ca6b46ba269e95f1bb01c52eb3f981 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Sun, 22 Mar 2020 19:45:48 +0100 Subject: [PATCH 02/10] Feat: changed partition_assignment_strategy values to accept simple (underscore) names instead of class names --- docs/input-kafka.asciidoc | 9 +- lib/logstash/inputs/kafka.rb | 26 +++- spec/integration/inputs/kafka_spec.rb | 197 +++++++++++--------------- 3 files changed, 113 insertions(+), 119 deletions(-) diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index d6fde577..5c510d18 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -374,10 +374,11 @@ we haven't seen any partition leadership changes to proactively discover any new * Value type is <> * There is no default value for this setting. -The class name of the partition assignment strategy that the client uses to -distribute partition ownership amongst consumer instances. Maps to -the Kafka `partition.assignment.strategy` setting, which defaults to -`org.apache.kafka.clients.consumer.RangeAssignor`. +The name of the partition assignment strategy that the client uses to distribute +partition ownership amongst consumer instances, supported options are `range`, +`round_robin`, `sticky` and `cooperative_sticky`. These map to Kafka's corresponding +https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html[`ConsumerPartitionAssignor`] +implementations. [id="plugins-{type}s-{plugin}-poll_timeout_ms"] ===== `poll_timeout_ms` diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 14f4fd46..42200b85 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -128,8 +128,10 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # The period of time in milliseconds after which we force a refresh of metadata even if # we haven't seen any partition leadership changes to proactively discover any new brokers or partitions config :metadata_max_age_ms, :validate => :string - # The class name of the partition assignment strategy that the client will use to distribute - # partition ownership amongst consumer instances + # The name of the partition assignment strategy that the client uses to distribute + # partition ownership amongst consumer instances, supported options are `range`, + # `round_robin`, `sticky` and `cooperative_sticky` + # (for backwards compatibility setting the class name directly is supported). config :partition_assignment_strategy, :validate => :string # The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. config :receive_buffer_bytes, :validate => :string @@ -310,7 +312,7 @@ def create_consumer(client_id) props.put(kafka::MAX_POLL_RECORDS_CONFIG, max_poll_records) unless max_poll_records.nil? props.put(kafka::MAX_POLL_INTERVAL_MS_CONFIG, max_poll_interval_ms) unless max_poll_interval_ms.nil? props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil? - props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy) unless partition_assignment_strategy.nil? + props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy_class) unless partition_assignment_strategy.nil? props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes) unless receive_buffer_bytes.nil? props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil? props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil? @@ -340,6 +342,24 @@ def create_consumer(client_id) end end + def partition_assignment_strategy_class(partition_assignment_strategy = self.partition_assignment_strategy) + case partition_assignment_strategy + when 'range' + 'org.apache.kafka.clients.consumer.RangeAssignor' + when 'round_robin' + 'org.apache.kafka.clients.consumer.RoundRobinAssignor' + when 'sticky' + 'org.apache.kafka.clients.consumer.StickyAssignor' + when 'cooperative_sticky' + 'org.apache.kafka.clients.consumer.CooperativeStickyAssignor' + else + unless partition_assignment_strategy.index('.') + raise LogStash::ConfigurationError, "unsupported partition_assignment_strategy: #{partition_assignment_strategy.inspect}" + end + partition_assignment_strategy # assume a fully qualified class-name + end + end + def set_trustore_keystore_config(props) props.put("ssl.truststore.type", ssl_truststore_type) unless ssl_truststore_type.nil? props.put("ssl.truststore.location", ssl_truststore_location) unless ssl_truststore_location.nil? diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 5b668467..819da47f 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -1,7 +1,6 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/kafka" -require "digest" require "rspec/wait" # Please run kafka_test_setup.sh prior to executing this integration test. @@ -12,159 +11,133 @@ let(:group_id_3) {rand(36**8).to_s(36)} let(:group_id_4) {rand(36**8).to_s(36)} let(:group_id_5) {rand(36**8).to_s(36)} - let(:plain_config) { { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) } - let(:snappy_config) { { 'topics' => ['logstash_integration_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:lz4_config) { { 'topics' => ['logstash_integration_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:pattern_config) { { 'topics_pattern' => 'logstash_integration_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} } - let(:decorate_config) { { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3, 'auto_offset_reset' => 'earliest', 'decorate_events' => true} } - let(:manual_commit_config) { { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_5, 'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false'} } + let(:group_id_6) {rand(36**8).to_s(36)} + let(:plain_config) do + { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, + 'auto_offset_reset' => 'earliest' } + end + let(:multi_consumer_config) do + plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) + end + let(:snappy_config) do + { 'topics' => ['logstash_integration_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, + 'auto_offset_reset' => 'earliest' } + end + let(:lz4_config) do + { 'topics' => ['logstash_integration_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, + 'auto_offset_reset' => 'earliest' } + end + let(:pattern_config) do + { 'topics_pattern' => 'logstash_integration_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain', + 'auto_offset_reset' => 'earliest' } + end + let(:decorate_config) do + { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3, + 'auto_offset_reset' => 'earliest', 'decorate_events' => true } + end + let(:manual_commit_config) do + { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_5, + 'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false' } + end let(:timeout_seconds) { 30 } let(:num_events) { 103 } describe "#kafka-topics" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end it "should consume all messages from plain 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(plain_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end + queue = consume_messages(plain_config, timeout: timeout_seconds, event_count: num_events) + expect(queue.length).to eq(num_events) end it "should consume all messages from snappy 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(snappy_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end + queue = consume_messages(snappy_config, timeout: timeout_seconds, event_count: num_events) + expect(queue.length).to eq(num_events) end it "should consume all messages from lz4 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(lz4_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end + queue = consume_messages(lz4_config, timeout: timeout_seconds, event_count: num_events) + expect(queue.length).to eq(num_events) end it "should consumer all messages with multiple consumers" do - kafka_input = LogStash::Inputs::Kafka.new(multi_consumer_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) + consume_messages(multi_consumer_config, timeout: timeout_seconds, event_count: num_events) do |queue, kafka_input| expect(queue.length).to eq(num_events) kafka_input.kafka_consumers.each_with_index do |consumer, i| expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}") end - ensure - t.kill - t.join(30_000) end end end - describe "#kafka-topics-pattern" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end - + context "#kafka-topics-pattern" do it "should consume all messages from all 3 topics" do - kafka_input = LogStash::Inputs::Kafka.new(pattern_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(3*num_events) - expect(queue.length).to eq(3*num_events) - ensure - t.kill - t.join(30_000) - end + total_events = num_events * 3 + queue = consume_messages(pattern_config, timeout: timeout_seconds, event_count: total_events) + expect(queue.length).to eq(total_events) end end - describe "#kafka-decorate" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end - + context "#kafka-decorate" do it "should show the right topic and group name in decorated kafka section" do start = LogStash::Timestamp.now.time.to_i - kafka_input = LogStash::Inputs::Kafka.new(decorate_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) + consume_messages(decorate_config, timeout: timeout_seconds, event_count: num_events) do |queue, _| expect(queue.length).to eq(num_events) event = queue.shift expect(event.get("[@metadata][kafka][topic]")).to eq("logstash_integration_topic_plain") expect(event.get("[@metadata][kafka][consumer_group]")).to eq(group_id_3) expect(event.get("[@metadata][kafka][timestamp]")).to be >= start - ensure - t.kill - t.join(30_000) end end end - describe "#kafka-offset-commit" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) + context "#kafka-offset-commit" do + it "should manually commit offsets" do + queue = consume_messages(manual_commit_config, timeout: timeout_seconds, event_count: num_events) + expect(queue.length).to eq(num_events) + end + end + + context 'setting partition_assignment_strategy' do + let(:test_topic) { 'logstash_integration_partitioner_topic' } + let(:consumer_config) do + plain_config.merge( + "topics" => [test_topic], + 'group_id' => group_id_6, + "client_id" => "partition_assignment_strategy-spec", + "consumer_threads" => 2, + "partition_assignment_strategy" => partition_assignment_strategy + ) + end + let(:partition_assignment_strategy) { nil } + + # NOTE: just verify setting works, as its a bit cumbersome to do in a unit spec + [ 'range', 'round_robin', 'sticky', 'org.apache.kafka.clients.consumer.CooperativeStickyAssignor' ].each do |partition_assignment_strategy| + describe partition_assignment_strategy do + let(:partition_assignment_strategy) { partition_assignment_strategy } + it 'consumes data' do + consume_messages(consumer_config, timeout: nil, event_count: 0) end end end + end - it "should manually commit offsets" do - kafka_input = LogStash::Inputs::Kafka.new(manual_commit_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end + private + + def consume_messages(config, queue: Queue.new, timeout:, event_count:) + kafka_input = LogStash::Inputs::Kafka.new(config) + t = Thread.new { kafka_input.run(queue) } + begin + t.run + wait(timeout).for { queue.length }.to eq(event_count) unless timeout.nil? + block_given? ? yield(queue, kafka_input) : queue + ensure + t.kill + t.join(30_000) end end + + def thread_it(kafka_input, queue) + Thread.new { kafka_input.run(queue) } + end + end From fac21fe90da2b603b5ab76d0bab1e7a0aae75f9c Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Mon, 23 Mar 2020 08:30:47 +0100 Subject: [PATCH 03/10] Refactor: clearer intention with timeout: false --- spec/integration/inputs/kafka_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 819da47f..9f871ca6 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -115,7 +115,7 @@ describe partition_assignment_strategy do let(:partition_assignment_strategy) { partition_assignment_strategy } it 'consumes data' do - consume_messages(consumer_config, timeout: nil, event_count: 0) + consume_messages(consumer_config, timeout: false, event_count: 0) end end end @@ -128,7 +128,7 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:) t = Thread.new { kafka_input.run(queue) } begin t.run - wait(timeout).for { queue.length }.to eq(event_count) unless timeout.nil? + wait(timeout).for { queue.length }.to eq(event_count) unless timeout.eql?(false) block_given? ? yield(queue, kafka_input) : queue ensure t.kill From e914f27ea0d1ee00e351f1c909dcc03bb5ec42e6 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Mon, 23 Mar 2020 08:31:13 +0100 Subject: [PATCH 04/10] Chore: fill-in changelog with links to PRs --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e97ca31..d6bf1344 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,9 @@ ## 10.1.0 - - updated kafka client (and its dependencies) to version 2.4.1 + - updated kafka client (and its dependencies) to version 2.4.1 ([#16](https://github.com/logstash-plugins/logstash-integration-kafka/pull/16)) - added the input `client_rack` parameter to enable support for follower fetching - added the output `partitioner` parameter for tuning partitioning strategy - Refactor: normalized error logging a bit - make sure exception type is logged + - Refactor: partition_assignment_strategy to accept enumerated values ([#25](https://github.com/logstash-plugins/logstash-integration-kafka/pull/25)) ## 10.0.1 - Fix links in changelog pointing to stand-alone plugin changelogs. From a762a5f187209b87dd01560935b647a7def3818e Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Tue, 24 Mar 2020 08:05:55 +0100 Subject: [PATCH 05/10] Update CHANGELOG.md Co-Authored-By: Ry Biesemeyer --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9af3203c..b51da9e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ - added the output `partitioner` parameter for tuning partitioning strategy - Refactor: normalized error logging a bit - make sure exception type is logged - Fix: properly handle empty ssl_endpoint_identification_algorithm [#8](https://github.com/logstash-plugins/logstash-integration-kafka/pull/8) - - Refactor: partition_assignment_strategy to accept enumerated values ([#25](https://github.com/logstash-plugins/logstash-integration-kafka/pull/25)) + - Refactor : made `partition_assignment_strategy` option easier to configure by accepting simple values from an enumerated set instead of requiring lengthy class paths ([#25](https://github.com/logstash-plugins/logstash-integration-kafka/pull/25)) ## 10.0.1 - Fix links in changelog pointing to stand-alone plugin changelogs. From 8cb2328bccf08a40a4e0b825f698f179c145deeb Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Tue, 24 Mar 2020 08:12:04 +0100 Subject: [PATCH 06/10] Review: delete no longer used helper method --- spec/integration/inputs/kafka_spec.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 9f871ca6..d7ce7363 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -136,8 +136,4 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:) end end - def thread_it(kafka_input, queue) - Thread.new { kafka_input.run(queue) } - end - end From a7ad95aa7fcc29888130d8d4cf222af9e9304bd7 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Tue, 24 Mar 2020 09:58:14 +0100 Subject: [PATCH 07/10] Docs: list options as suggested in review --- docs/input-kafka.asciidoc | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index 5c510d18..7fefef1d 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -375,9 +375,14 @@ we haven't seen any partition leadership changes to proactively discover any new * There is no default value for this setting. The name of the partition assignment strategy that the client uses to distribute -partition ownership amongst consumer instances, supported options are `range`, -`round_robin`, `sticky` and `cooperative_sticky`. These map to Kafka's corresponding -https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html[`ConsumerPartitionAssignor`] +partition ownership amongst consumer instances, supported options are: + +* `range` +* `round_robin` +* `sticky` +* `cooperative_sticky` + +These map to Kafka's corresponding https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html[`ConsumerPartitionAssignor`] implementations. [id="plugins-{type}s-{plugin}-poll_timeout_ms"] From d42b1b3f5ffa10d4bf5e508a346ea3772b0461ed Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Tue, 24 Mar 2020 09:58:41 +0100 Subject: [PATCH 08/10] Review: do not mention class names in docs --- docs/output-kafka.asciidoc | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index 8f5dc53b..d903429e 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -264,9 +264,6 @@ Available options for choosing a partitioning strategy are as follows: * `round_robin` distributes writes to all partitions equally, regardless of `message_key` * `uniform_sticky` sticks to a partition for the duration of a batch than randomly picks a new one -The configuration also allows setting a partitioner class name -e.g. `org.apache.kafka.clients.producer.UniformStickyPartitioner`. - [id="plugins-{type}s-{plugin}-receive_buffer_bytes"] ===== `receive_buffer_bytes` From 8bda832f09abd142d578ee2b9a25e77f60319bd0 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Wed, 25 Mar 2020 08:42:39 +0100 Subject: [PATCH 09/10] Refactor: avoid optional method param --- lib/logstash/inputs/kafka.rb | 2 +- lib/logstash/outputs/kafka.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 42200b85..37107964 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -342,7 +342,7 @@ def create_consumer(client_id) end end - def partition_assignment_strategy_class(partition_assignment_strategy = self.partition_assignment_strategy) + def partition_assignment_strategy_class case partition_assignment_strategy when 'range' 'org.apache.kafka.clients.consumer.RangeAssignor' diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 43968f81..c341e939 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -356,7 +356,7 @@ def create_producer end end - def partitioner_class_name(partitioner = self.partitioner) + def partitioner_class_name case partitioner when 'round_robin' 'org.apache.kafka.clients.producer.RoundRobinPartitioner' From 8b5ad31a76866451d20b5afd8915206014569367 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Wed, 25 Mar 2020 08:45:04 +0100 Subject: [PATCH 10/10] Refactor: align method naming in input/output --- lib/logstash/outputs/kafka.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index c341e939..fc417552 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -324,8 +324,8 @@ def create_producer props.put(kafka::MAX_REQUEST_SIZE_CONFIG, max_request_size.to_s) props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil? unless partitioner.nil? - props.put(kafka::PARTITIONER_CLASS_CONFIG, partitioner_class = partitioner_class_name) - logger.debug('producer configured using partitioner', :partitioner_class => partitioner_class) + props.put(kafka::PARTITIONER_CLASS_CONFIG, partitioner = partitioner_class) + logger.debug('producer configured using partitioner', :partitioner_class => partitioner) end props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes.to_s) unless receive_buffer_bytes.nil? props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil? @@ -356,7 +356,7 @@ def create_producer end end - def partitioner_class_name + def partitioner_class case partitioner when 'round_robin' 'org.apache.kafka.clients.producer.RoundRobinPartitioner'