diff --git a/CHANGELOG.md b/CHANGELOG.md index 98ef4386..b51da9e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,10 @@ ## 10.1.0 - - updated kafka client (and its dependencies) to version 2.4.1 + - updated kafka client (and its dependencies) to version 2.4.1 ([#16](https://github.com/logstash-plugins/logstash-integration-kafka/pull/16)) - added the input `client_rack` parameter to enable support for follower fetching - added the output `partitioner` parameter for tuning partitioning strategy - Refactor: normalized error logging a bit - make sure exception type is logged - Fix: properly handle empty ssl_endpoint_identification_algorithm [#8](https://github.com/logstash-plugins/logstash-integration-kafka/pull/8) + - Refactor : made `partition_assignment_strategy` option easier to configure by accepting simple values from an enumerated set instead of requiring lengthy class paths ([#25](https://github.com/logstash-plugins/logstash-integration-kafka/pull/25)) ## 10.0.1 - Fix links in changelog pointing to stand-alone plugin changelogs. diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index 3a2fdf79..7fefef1d 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -71,8 +71,7 @@ inserted into your original event, you'll have to use the `mutate` filter to man This plugin supports these configuration options plus the <> described later. -NOTE: Some of these options map to a Kafka option. See the -https://kafka.apache.org/documentation for more details. +NOTE: Some of these options map to a Kafka option. See the https://kafka.apache.org/documentation for more details. [cols="<,<,<",options="header",] |======================================================================= @@ -375,10 +374,16 @@ we haven't seen any partition leadership changes to proactively discover any new * Value type is <> * There is no default value for this setting. -The class name of the partition assignment strategy that the client uses to -distribute partition ownership amongst consumer instances. Maps to -the Kafka `partition.assignment.strategy` setting, which defaults to -`org.apache.kafka.clients.consumer.RangeAssignor`. +The name of the partition assignment strategy that the client uses to distribute +partition ownership amongst consumer instances, supported options are: + +* `range` +* `round_robin` +* `sticky` +* `cooperative_sticky` + +These map to Kafka's corresponding https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html[`ConsumerPartitionAssignor`] +implementations. [id="plugins-{type}s-{plugin}-poll_timeout_ms"] ===== `poll_timeout_ms` @@ -587,8 +592,9 @@ Java Class used to deserialize the record's value A rack identifier for the Kafka consumer. Used to select the physically closest rack for the consumer to read from. -The setting corresponds with the broker config 'broker.rack'. -Only available for Kafka 2.4.0 and higher; see https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica[KIP-392]. +The setting corresponds with Kafka's `broker.rack` configuration. + +NOTE: Only available for Kafka 2.4.0 and higher; see https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica[KIP-392]. [id="plugins-{type}s-{plugin}-common-options"] include::{include_path}/{type}.asciidoc[] diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index 8f5dc53b..d903429e 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -264,9 +264,6 @@ Available options for choosing a partitioning strategy are as follows: * `round_robin` distributes writes to all partitions equally, regardless of `message_key` * `uniform_sticky` sticks to a partition for the duration of a batch than randomly picks a new one -The configuration also allows setting a partitioner class name -e.g. `org.apache.kafka.clients.producer.UniformStickyPartitioner`. - [id="plugins-{type}s-{plugin}-receive_buffer_bytes"] ===== `receive_buffer_bytes` diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 14f4fd46..37107964 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -128,8 +128,10 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # The period of time in milliseconds after which we force a refresh of metadata even if # we haven't seen any partition leadership changes to proactively discover any new brokers or partitions config :metadata_max_age_ms, :validate => :string - # The class name of the partition assignment strategy that the client will use to distribute - # partition ownership amongst consumer instances + # The name of the partition assignment strategy that the client uses to distribute + # partition ownership amongst consumer instances, supported options are `range`, + # `round_robin`, `sticky` and `cooperative_sticky` + # (for backwards compatibility setting the class name directly is supported). config :partition_assignment_strategy, :validate => :string # The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. config :receive_buffer_bytes, :validate => :string @@ -310,7 +312,7 @@ def create_consumer(client_id) props.put(kafka::MAX_POLL_RECORDS_CONFIG, max_poll_records) unless max_poll_records.nil? props.put(kafka::MAX_POLL_INTERVAL_MS_CONFIG, max_poll_interval_ms) unless max_poll_interval_ms.nil? props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil? - props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy) unless partition_assignment_strategy.nil? + props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy_class) unless partition_assignment_strategy.nil? props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes) unless receive_buffer_bytes.nil? props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil? props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil? @@ -340,6 +342,24 @@ def create_consumer(client_id) end end + def partition_assignment_strategy_class + case partition_assignment_strategy + when 'range' + 'org.apache.kafka.clients.consumer.RangeAssignor' + when 'round_robin' + 'org.apache.kafka.clients.consumer.RoundRobinAssignor' + when 'sticky' + 'org.apache.kafka.clients.consumer.StickyAssignor' + when 'cooperative_sticky' + 'org.apache.kafka.clients.consumer.CooperativeStickyAssignor' + else + unless partition_assignment_strategy.index('.') + raise LogStash::ConfigurationError, "unsupported partition_assignment_strategy: #{partition_assignment_strategy.inspect}" + end + partition_assignment_strategy # assume a fully qualified class-name + end + end + def set_trustore_keystore_config(props) props.put("ssl.truststore.type", ssl_truststore_type) unless ssl_truststore_type.nil? props.put("ssl.truststore.location", ssl_truststore_location) unless ssl_truststore_location.nil? diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 43968f81..fc417552 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -324,8 +324,8 @@ def create_producer props.put(kafka::MAX_REQUEST_SIZE_CONFIG, max_request_size.to_s) props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil? unless partitioner.nil? - props.put(kafka::PARTITIONER_CLASS_CONFIG, partitioner_class = partitioner_class_name) - logger.debug('producer configured using partitioner', :partitioner_class => partitioner_class) + props.put(kafka::PARTITIONER_CLASS_CONFIG, partitioner = partitioner_class) + logger.debug('producer configured using partitioner', :partitioner_class => partitioner) end props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes.to_s) unless receive_buffer_bytes.nil? props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil? @@ -356,7 +356,7 @@ def create_producer end end - def partitioner_class_name(partitioner = self.partitioner) + def partitioner_class case partitioner when 'round_robin' 'org.apache.kafka.clients.producer.RoundRobinPartitioner' diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 5b668467..d7ce7363 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -1,7 +1,6 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/kafka" -require "digest" require "rspec/wait" # Please run kafka_test_setup.sh prior to executing this integration test. @@ -12,159 +11,129 @@ let(:group_id_3) {rand(36**8).to_s(36)} let(:group_id_4) {rand(36**8).to_s(36)} let(:group_id_5) {rand(36**8).to_s(36)} - let(:plain_config) { { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) } - let(:snappy_config) { { 'topics' => ['logstash_integration_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:lz4_config) { { 'topics' => ['logstash_integration_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:pattern_config) { { 'topics_pattern' => 'logstash_integration_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} } - let(:decorate_config) { { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3, 'auto_offset_reset' => 'earliest', 'decorate_events' => true} } - let(:manual_commit_config) { { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_5, 'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false'} } + let(:group_id_6) {rand(36**8).to_s(36)} + let(:plain_config) do + { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, + 'auto_offset_reset' => 'earliest' } + end + let(:multi_consumer_config) do + plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) + end + let(:snappy_config) do + { 'topics' => ['logstash_integration_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, + 'auto_offset_reset' => 'earliest' } + end + let(:lz4_config) do + { 'topics' => ['logstash_integration_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, + 'auto_offset_reset' => 'earliest' } + end + let(:pattern_config) do + { 'topics_pattern' => 'logstash_integration_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain', + 'auto_offset_reset' => 'earliest' } + end + let(:decorate_config) do + { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3, + 'auto_offset_reset' => 'earliest', 'decorate_events' => true } + end + let(:manual_commit_config) do + { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_5, + 'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false' } + end let(:timeout_seconds) { 30 } let(:num_events) { 103 } describe "#kafka-topics" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end it "should consume all messages from plain 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(plain_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end + queue = consume_messages(plain_config, timeout: timeout_seconds, event_count: num_events) + expect(queue.length).to eq(num_events) end it "should consume all messages from snappy 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(snappy_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end + queue = consume_messages(snappy_config, timeout: timeout_seconds, event_count: num_events) + expect(queue.length).to eq(num_events) end it "should consume all messages from lz4 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(lz4_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end + queue = consume_messages(lz4_config, timeout: timeout_seconds, event_count: num_events) + expect(queue.length).to eq(num_events) end it "should consumer all messages with multiple consumers" do - kafka_input = LogStash::Inputs::Kafka.new(multi_consumer_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) + consume_messages(multi_consumer_config, timeout: timeout_seconds, event_count: num_events) do |queue, kafka_input| expect(queue.length).to eq(num_events) kafka_input.kafka_consumers.each_with_index do |consumer, i| expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}") end - ensure - t.kill - t.join(30_000) end end end - describe "#kafka-topics-pattern" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end - + context "#kafka-topics-pattern" do it "should consume all messages from all 3 topics" do - kafka_input = LogStash::Inputs::Kafka.new(pattern_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(3*num_events) - expect(queue.length).to eq(3*num_events) - ensure - t.kill - t.join(30_000) - end + total_events = num_events * 3 + queue = consume_messages(pattern_config, timeout: timeout_seconds, event_count: total_events) + expect(queue.length).to eq(total_events) end end - describe "#kafka-decorate" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end - + context "#kafka-decorate" do it "should show the right topic and group name in decorated kafka section" do start = LogStash::Timestamp.now.time.to_i - kafka_input = LogStash::Inputs::Kafka.new(decorate_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) + consume_messages(decorate_config, timeout: timeout_seconds, event_count: num_events) do |queue, _| expect(queue.length).to eq(num_events) event = queue.shift expect(event.get("[@metadata][kafka][topic]")).to eq("logstash_integration_topic_plain") expect(event.get("[@metadata][kafka][consumer_group]")).to eq(group_id_3) expect(event.get("[@metadata][kafka][timestamp]")).to be >= start - ensure - t.kill - t.join(30_000) end end end - describe "#kafka-offset-commit" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) + context "#kafka-offset-commit" do + it "should manually commit offsets" do + queue = consume_messages(manual_commit_config, timeout: timeout_seconds, event_count: num_events) + expect(queue.length).to eq(num_events) + end + end + + context 'setting partition_assignment_strategy' do + let(:test_topic) { 'logstash_integration_partitioner_topic' } + let(:consumer_config) do + plain_config.merge( + "topics" => [test_topic], + 'group_id' => group_id_6, + "client_id" => "partition_assignment_strategy-spec", + "consumer_threads" => 2, + "partition_assignment_strategy" => partition_assignment_strategy + ) + end + let(:partition_assignment_strategy) { nil } + + # NOTE: just verify setting works, as its a bit cumbersome to do in a unit spec + [ 'range', 'round_robin', 'sticky', 'org.apache.kafka.clients.consumer.CooperativeStickyAssignor' ].each do |partition_assignment_strategy| + describe partition_assignment_strategy do + let(:partition_assignment_strategy) { partition_assignment_strategy } + it 'consumes data' do + consume_messages(consumer_config, timeout: false, event_count: 0) end end end + end - it "should manually commit offsets" do - kafka_input = LogStash::Inputs::Kafka.new(manual_commit_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end + private + + def consume_messages(config, queue: Queue.new, timeout:, event_count:) + kafka_input = LogStash::Inputs::Kafka.new(config) + t = Thread.new { kafka_input.run(queue) } + begin + t.run + wait(timeout).for { queue.length }.to eq(event_count) unless timeout.eql?(false) + block_given? ? yield(queue, kafka_input) : queue + ensure + t.kill + t.join(30_000) end end + end