Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ With the introduction of the rdkafka-ruby based input plugin we hope to support

See also [rdkafka-ruby](https://github.com/appsignal/rdkafka-ruby) and [librdkafka](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for more detailed documentation about Kafka consumer options.

`topics` supports POSIX Extended Regular Expression pattern (not Ruby regex syntax) since v0.19.6. If you want to use regex pattern, use `/pattern/` like `/foo.*/`. <br>
**Note**: A caret (`^`) is automatically added to the beginning of the pattern.

Consuming topic name is used for event tag. So when the target topic name is `app_event`, the tag is `app_event`. If you want to modify tag, use `add_prefix` or `add_suffix` parameter. With `add_prefix kafka`, the tag is `kafka.app_event`.

### Output plugin
Expand Down
16 changes: 13 additions & 3 deletions lib/fluent/plugin/in_rdkafka_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,29 @@ def initialize
end

def _config_to_array(config)
config_array = config.split(',').map {|k| k.strip }
config_array = config.split(',').map {|k| _config_regex_pattern(k.strip) }
if config_array.empty?
raise Fluent::ConfigError, "kafka_group: '#{config}' is a required parameter"
end
config_array
end
private :_config_to_array

def _config_regex_pattern(topic)
if (m = /^\/(.+)\/$/.match(topic))
# librdkafka recognizes string as regex pattern if the topic name starts with '^'.
# https://github.com/confluentinc/librdkafka/blob/570c785e9e35812db8f50254bd2f7e0cf47def39/src/rdkafka.h#L4148
# https://github.com/confluentinc/librdkafka/blob/e1db7eaa517f0a6438bc846a9c49ede73b9ea211/src/rdkafka_topic.c#L2064
return "^#{m[1]}"
end
topic
end
private :_config_regex_pattern

def multi_workers_ready?
true
end

private :_config_to_array

def configure(conf)
compat_parameters_convert(conf, :parser)

Expand Down
123 changes: 123 additions & 0 deletions test/plugin/test_in_rdkafka_group.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
require 'helper'
require 'fluent/test/driver/input'
require 'securerandom'

class RdkafkaGroupInputTest < Test::Unit::TestCase

def have_rdkafka
begin
require 'fluent/plugin/in_rdkafka_group'
true
rescue LoadError
false
end
end

def setup
omit_unless(have_rdkafka, "rdkafka isn't installed")
Fluent::Test.setup
end

TOPIC_NAME = "kafka-input-#{SecureRandom.uuid}"

CONFIG = %[
topics #{TOPIC_NAME}
kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
<parse>
@type none
</parse>
]

def create_driver(conf = CONFIG)
Fluent::Test::Driver::Input.new(Fluent::Plugin::RdKafkaGroupInput).configure(conf)
end


def test_configure
d = create_driver
assert_equal [TOPIC_NAME], d.instance.topics
assert_equal 'localhost:9092', d.instance.kafka_configs['bootstrap.servers']
end

def test_multi_worker_support
d = create_driver
assert_true d.instance.multi_workers_ready?
end

class ConsumeTest < self
TOPIC_NAME = "kafka-input-#{SecureRandom.uuid}"

def setup
@kafka = Kafka.new(["localhost:9092"], client_id: 'kafka')
@producer = @kafka.producer
@kafka.create_topic(TOPIC_NAME)
end

def teardown
@kafka.delete_topic(TOPIC_NAME)
@kafka.close
end

def test_consume
conf = %[
topics #{TOPIC_NAME}
kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
<parse>
@type none
</parse>
]

d = create_driver(conf)

d.run(expect_records: 1, timeout: 10) do
sleep 0.1
@producer.produce("Hello, fluent-plugin-kafka!", topic: TOPIC_NAME)
@producer.deliver_messages
end

expected = {'message' => 'Hello, fluent-plugin-kafka!'}
assert_equal expected, d.events[0][2]
end
end

class ConsumeTopicWithRegexpTest < self
TOPIC_NAME1 = "kafka-input-1-#{SecureRandom.uuid}"
TOPIC_NAME2 = "kafka-input-2-#{SecureRandom.uuid}"
TOPIC_NAME_REGEXP = "/kafka-input-(1|2)-.*/"

def setup
@kafka = Kafka.new(["localhost:9092"], client_id: 'kafka')
@producer = @kafka.producer
@kafka.create_topic(TOPIC_NAME1)
@kafka.create_topic(TOPIC_NAME2)
end

def teardown
@kafka.delete_topic(TOPIC_NAME1)
@kafka.delete_topic(TOPIC_NAME2)
@kafka.close
end

def test_consume_with_regexp
conf = %[
topics #{TOPIC_NAME_REGEXP}
kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
<parse>
@type none
</parse>
]
d = create_driver(conf)

d.run(expect_records: 2, timeout: 10) do
sleep 0.1
@producer.produce("Hello, fluent-plugin-kafka! in topic 1", topic: TOPIC_NAME1)
@producer.produce("Hello, fluent-plugin-kafka! in topic 2", topic: TOPIC_NAME2)
@producer.deliver_messages
end
expected_message_pattern = /Hello, fluent-plugin-kafka! in topic [12]/
assert_equal 2, d.events.size
assert_match(expected_message_pattern, d.events[0][2]['message'])
assert_match(expected_message_pattern, d.events[1][2]['message'])
end
end
end