Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: partition_assignment_strategy to accept enumerated values #25

Merged
merged 11 commits into from
Mar 26, 2020
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
## 10.1.0
- updated kafka client (and its dependencies) to version 2.4.1
- updated kafka client (and its dependencies) to version 2.4.1 ([#16](https://github.com/logstash-plugins/logstash-integration-kafka/pull/16))
- added the input `client_rack` parameter to enable support for follower fetching
- added the output `partitioner` parameter for tuning partitioning strategy
- Refactor: normalized error logging a bit - make sure exception type is logged
- Fix: properly handle empty ssl_endpoint_identification_algorithm [#8](https://github.com/logstash-plugins/logstash-integration-kafka/pull/8)
- Refactor : made `partition_assignment_strategy` option easier to configure by accepting simple values from an enumerated set instead of requiring lengthy class paths ([#25](https://github.com/logstash-plugins/logstash-integration-kafka/pull/25))

## 10.0.1
- Fix links in changelog pointing to stand-alone plugin changelogs.
Expand Down
22 changes: 14 additions & 8 deletions docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ inserted into your original event, you'll have to use the `mutate` filter to man

This plugin supports these configuration options plus the <<plugins-{type}s-{plugin}-common-options>> described later.

NOTE: Some of these options map to a Kafka option. See the
https://kafka.apache.org/documentation for more details.
NOTE: Some of these options map to a Kafka option. See the https://kafka.apache.org/documentation for more details.

[cols="<,<,<",options="header",]
|=======================================================================
Expand Down Expand Up @@ -375,10 +374,16 @@ we haven't seen any partition leadership changes to proactively discover any new
* Value type is <<string,string>>
* There is no default value for this setting.

The class name of the partition assignment strategy that the client uses to
distribute partition ownership amongst consumer instances. Maps to
the Kafka `partition.assignment.strategy` setting, which defaults to
`org.apache.kafka.clients.consumer.RangeAssignor`.
The name of the partition assignment strategy that the client uses to distribute
kares marked this conversation as resolved.
Show resolved Hide resolved
partition ownership amongst consumer instances, supported options are:

* `range`
* `round_robin`
* `sticky`
* `cooperative_sticky`

These map to Kafka's corresponding https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html[`ConsumerPartitionAssignor`]
implementations.

[id="plugins-{type}s-{plugin}-poll_timeout_ms"]
===== `poll_timeout_ms`
Expand Down Expand Up @@ -587,8 +592,9 @@ Java Class used to deserialize the record's value

A rack identifier for the Kafka consumer.
Used to select the physically closest rack for the consumer to read from.
The setting corresponds with the broker config 'broker.rack'.
Only available for Kafka 2.4.0 and higher; see https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica[KIP-392].
The setting corresponds with Kafka's `broker.rack` configuration.

NOTE: Only available for Kafka 2.4.0 and higher; see https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica[KIP-392].

[id="plugins-{type}s-{plugin}-common-options"]
include::{include_path}/{type}.asciidoc[]
Expand Down
3 changes: 0 additions & 3 deletions docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,6 @@ Available options for choosing a partitioning strategy are as follows:
* `round_robin` distributes writes to all partitions equally, regardless of `message_key`
* `uniform_sticky` sticks to a partition for the duration of a batch than randomly picks a new one

The configuration also allows setting a partitioner class name
e.g. `org.apache.kafka.clients.producer.UniformStickyPartitioner`.

[id="plugins-{type}s-{plugin}-receive_buffer_bytes"]
===== `receive_buffer_bytes`

Expand Down
26 changes: 23 additions & 3 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
# The period of time in milliseconds after which we force a refresh of metadata even if
# we haven't seen any partition leadership changes to proactively discover any new brokers or partitions
config :metadata_max_age_ms, :validate => :string
# The class name of the partition assignment strategy that the client will use to distribute
# partition ownership amongst consumer instances
# The name of the partition assignment strategy that the client uses to distribute
# partition ownership amongst consumer instances, supported options are `range`,
# `round_robin`, `sticky` and `cooperative_sticky`
# (for backwards compatibility setting the class name directly is supported).
config :partition_assignment_strategy, :validate => :string
# The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
config :receive_buffer_bytes, :validate => :string
Expand Down Expand Up @@ -310,7 +312,7 @@ def create_consumer(client_id)
props.put(kafka::MAX_POLL_RECORDS_CONFIG, max_poll_records) unless max_poll_records.nil?
props.put(kafka::MAX_POLL_INTERVAL_MS_CONFIG, max_poll_interval_ms) unless max_poll_interval_ms.nil?
props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil?
props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy) unless partition_assignment_strategy.nil?
props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy_class) unless partition_assignment_strategy.nil?
props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes) unless receive_buffer_bytes.nil?
props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil?
props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil?
Expand Down Expand Up @@ -340,6 +342,24 @@ def create_consumer(client_id)
end
end

def partition_assignment_strategy_class
case partition_assignment_strategy
when 'range'
'org.apache.kafka.clients.consumer.RangeAssignor'
when 'round_robin'
'org.apache.kafka.clients.consumer.RoundRobinAssignor'
when 'sticky'
'org.apache.kafka.clients.consumer.StickyAssignor'
when 'cooperative_sticky'
'org.apache.kafka.clients.consumer.CooperativeStickyAssignor'
else
unless partition_assignment_strategy.index('.')
raise LogStash::ConfigurationError, "unsupported partition_assignment_strategy: #{partition_assignment_strategy.inspect}"
end
partition_assignment_strategy # assume a fully qualified class-name
end
end

def set_trustore_keystore_config(props)
props.put("ssl.truststore.type", ssl_truststore_type) unless ssl_truststore_type.nil?
props.put("ssl.truststore.location", ssl_truststore_location) unless ssl_truststore_location.nil?
Expand Down
6 changes: 3 additions & 3 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ def create_producer
props.put(kafka::MAX_REQUEST_SIZE_CONFIG, max_request_size.to_s)
props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil?
unless partitioner.nil?
props.put(kafka::PARTITIONER_CLASS_CONFIG, partitioner_class = partitioner_class_name)
logger.debug('producer configured using partitioner', :partitioner_class => partitioner_class)
props.put(kafka::PARTITIONER_CLASS_CONFIG, partitioner = partitioner_class)
logger.debug('producer configured using partitioner', :partitioner_class => partitioner)
end
props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes.to_s) unless receive_buffer_bytes.nil?
props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil?
Expand Down Expand Up @@ -356,7 +356,7 @@ def create_producer
end
end

def partitioner_class_name(partitioner = self.partitioner)
def partitioner_class
case partitioner
when 'round_robin'
'org.apache.kafka.clients.producer.RoundRobinPartitioner'
Expand Down
193 changes: 81 additions & 112 deletions spec/integration/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/kafka"
require "digest"
require "rspec/wait"

# Please run kafka_test_setup.sh prior to executing this integration test.
Expand All @@ -12,159 +11,129 @@
let(:group_id_3) {rand(36**8).to_s(36)}
let(:group_id_4) {rand(36**8).to_s(36)}
let(:group_id_5) {rand(36**8).to_s(36)}
let(:plain_config) { { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} }
let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) }
let(:snappy_config) { { 'topics' => ['logstash_integration_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} }
let(:lz4_config) { { 'topics' => ['logstash_integration_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} }
let(:pattern_config) { { 'topics_pattern' => 'logstash_integration_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} }
let(:decorate_config) { { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3, 'auto_offset_reset' => 'earliest', 'decorate_events' => true} }
let(:manual_commit_config) { { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_5, 'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false'} }
let(:group_id_6) {rand(36**8).to_s(36)}
let(:plain_config) do
{ 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1,
'auto_offset_reset' => 'earliest' }
end
let(:multi_consumer_config) do
plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3})
end
let(:snappy_config) do
{ 'topics' => ['logstash_integration_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1,
'auto_offset_reset' => 'earliest' }
end
let(:lz4_config) do
{ 'topics' => ['logstash_integration_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1,
'auto_offset_reset' => 'earliest' }
end
let(:pattern_config) do
{ 'topics_pattern' => 'logstash_integration_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain',
'auto_offset_reset' => 'earliest' }
end
let(:decorate_config) do
{ 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3,
'auto_offset_reset' => 'earliest', 'decorate_events' => true }
end
let(:manual_commit_config) do
{ 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_5,
'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false' }
end
let(:timeout_seconds) { 30 }
let(:num_events) { 103 }

describe "#kafka-topics" do
def thread_it(kafka_input, queue)
Thread.new do
begin
kafka_input.run(queue)
end
end
end

it "should consume all messages from plain 3-partition topic" do
kafka_input = LogStash::Inputs::Kafka.new(plain_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
expect(queue.length).to eq(num_events)
ensure
t.kill
t.join(30_000)
end
queue = consume_messages(plain_config, timeout: timeout_seconds, event_count: num_events)
expect(queue.length).to eq(num_events)
end

it "should consume all messages from snappy 3-partition topic" do
kafka_input = LogStash::Inputs::Kafka.new(snappy_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
expect(queue.length).to eq(num_events)
ensure
t.kill
t.join(30_000)
end
queue = consume_messages(snappy_config, timeout: timeout_seconds, event_count: num_events)
expect(queue.length).to eq(num_events)
end

it "should consume all messages from lz4 3-partition topic" do
kafka_input = LogStash::Inputs::Kafka.new(lz4_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
expect(queue.length).to eq(num_events)
ensure
t.kill
t.join(30_000)
end
queue = consume_messages(lz4_config, timeout: timeout_seconds, event_count: num_events)
expect(queue.length).to eq(num_events)
end

it "should consumer all messages with multiple consumers" do
kafka_input = LogStash::Inputs::Kafka.new(multi_consumer_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
consume_messages(multi_consumer_config, timeout: timeout_seconds, event_count: num_events) do |queue, kafka_input|
expect(queue.length).to eq(num_events)
kafka_input.kafka_consumers.each_with_index do |consumer, i|
expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}")
end
ensure
t.kill
t.join(30_000)
end
end
end

describe "#kafka-topics-pattern" do
def thread_it(kafka_input, queue)
Thread.new do
begin
kafka_input.run(queue)
end
end
end

context "#kafka-topics-pattern" do
it "should consume all messages from all 3 topics" do
kafka_input = LogStash::Inputs::Kafka.new(pattern_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(3*num_events)
expect(queue.length).to eq(3*num_events)
ensure
t.kill
t.join(30_000)
end
total_events = num_events * 3
queue = consume_messages(pattern_config, timeout: timeout_seconds, event_count: total_events)
expect(queue.length).to eq(total_events)
end
end

describe "#kafka-decorate" do
def thread_it(kafka_input, queue)
Thread.new do
begin
kafka_input.run(queue)
end
end
end

context "#kafka-decorate" do
it "should show the right topic and group name in decorated kafka section" do
start = LogStash::Timestamp.now.time.to_i
kafka_input = LogStash::Inputs::Kafka.new(decorate_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
consume_messages(decorate_config, timeout: timeout_seconds, event_count: num_events) do |queue, _|
expect(queue.length).to eq(num_events)
event = queue.shift
expect(event.get("[@metadata][kafka][topic]")).to eq("logstash_integration_topic_plain")
expect(event.get("[@metadata][kafka][consumer_group]")).to eq(group_id_3)
expect(event.get("[@metadata][kafka][timestamp]")).to be >= start
ensure
t.kill
t.join(30_000)
end
end
end

describe "#kafka-offset-commit" do
def thread_it(kafka_input, queue)
Thread.new do
begin
kafka_input.run(queue)
context "#kafka-offset-commit" do
it "should manually commit offsets" do
queue = consume_messages(manual_commit_config, timeout: timeout_seconds, event_count: num_events)
expect(queue.length).to eq(num_events)
end
end

context 'setting partition_assignment_strategy' do
let(:test_topic) { 'logstash_integration_partitioner_topic' }
let(:consumer_config) do
plain_config.merge(
"topics" => [test_topic],
'group_id' => group_id_6,
"client_id" => "partition_assignment_strategy-spec",
"consumer_threads" => 2,
"partition_assignment_strategy" => partition_assignment_strategy
)
end
let(:partition_assignment_strategy) { nil }

# NOTE: just verify setting works, as its a bit cumbersome to do in a unit spec
[ 'range', 'round_robin', 'sticky', 'org.apache.kafka.clients.consumer.CooperativeStickyAssignor' ].each do |partition_assignment_strategy|
describe partition_assignment_strategy do
let(:partition_assignment_strategy) { partition_assignment_strategy }
it 'consumes data' do
consume_messages(consumer_config, timeout: false, event_count: 0)
end
end
end
end

it "should manually commit offsets" do
kafka_input = LogStash::Inputs::Kafka.new(manual_commit_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
expect(queue.length).to eq(num_events)
ensure
t.kill
t.join(30_000)
end
private

def consume_messages(config, queue: Queue.new, timeout:, event_count:)
kafka_input = LogStash::Inputs::Kafka.new(config)
t = Thread.new { kafka_input.run(queue) }
begin
t.run
wait(timeout).for { queue.length }.to eq(event_count) unless timeout.eql?(false)
block_given? ? yield(queue, kafka_input) : queue
ensure
t.kill
t.join(30_000)
end
end

end