Skip to content

Commit

Permalink
Merge pull request #413 from TheRealReal/add-ssl-options-consumers
Browse files Browse the repository at this point in the history
Add `ssl_options` to consumers
  • Loading branch information
joshuawscott authored Oct 16, 2020
2 parents d0c3a6d + 4c379b5 commit a1bd360
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 9 deletions.
2 changes: 1 addition & 1 deletion lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
]
)

worker_opts = Keyword.take(opts, [:uris])
worker_opts = Keyword.take(opts, [:uris, :use_ssl, :ssl_options])

{:ok, worker_name} =
KafkaEx.create_worker(
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ defmodule KafkaEx.GenConsumer do
{:ok, consumer_state} =
consumer_module.init(topic, partition, extra_consumer_args)

worker_opts = Keyword.take(opts, [:uris])
worker_opts = Keyword.take(opts, [:uris, :use_ssl, :ssl_options])

{:ok, worker_name} =
KafkaEx.create_worker(
Expand Down
3 changes: 2 additions & 1 deletion test/integration/consumer_group_implementation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do

setup do
ports_before = num_open_ports()
{:ok, _} = TestPartitioner.start_link()
{:ok, test_partitioner_pid} = TestPartitioner.start_link()

{:ok, consumer_group_pid1} =
ConsumerGroup.start_link(
Expand Down Expand Up @@ -163,6 +163,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do
on_exit(fn ->
sync_stop(consumer_group_pid1)
sync_stop(consumer_group_pid2)
sync_stop(test_partitioner_pid)
end)

{
Expand Down
63 changes: 63 additions & 0 deletions test/integration/consumer_group_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,69 @@ defmodule KafkaEx.ConsumerGroup.Test do
assert consumer_group == :no_consumer_group
end

describe "custom ssl options" do
setup do
# reset application env after each test
env_before = Application.get_all_env(:kafka_ex)

ssl_option_filenames = ["ca-cert", "cert.pem", "key.pem"]
{:ok, cwd} = File.cwd()

original_filenames =
ssl_option_filenames
|> Enum.map(fn filename -> Path.join([cwd, "ssl", filename]) end)

target_filenames =
ssl_option_filenames
|> Enum.map(fn filename ->
Path.rootname(filename) <> "-custom" <> Path.extname(filename)
end)
|> Enum.map(fn filename -> Path.join([cwd, "ssl", filename]) end)

List.zip([original_filenames, target_filenames])
|> Enum.map(fn {original, target} -> File.copy(original, target) end)

on_exit(fn ->
# this is basically Application.put_all_env
for {k, v} <- env_before do
Application.put_env(:kafka_ex, k, v)
end

target_filenames
|> Enum.map(fn filename -> File.rm(filename) end)

:ok
end)

:ok
end

test "create_worker allows us to pass in use_ssl and ssl_options options" do
Application.put_env(:kafka_ex, :use_ssl, true)
ssl_options = Application.get_env(:kafka_ex, :ssl_options)
assert ssl_options == Config.ssl_options()

## These reference symbolic links to the original files in order to validate
## that custom SSL filepaths can specified
custom_ssl_options = [
cacertfile: File.cwd!() <> "/ssl/ca-cert-custom",
certfile: File.cwd!() <> "/ssl/cert-custom.pem",
keyfile: File.cwd!() <> "/ssl/key-custom.pem"
]

{:ok, pid} =
KafkaEx.create_worker(:real,
use_ssl: true,
ssl_options: custom_ssl_options
)

consumer_group = :sys.get_state(pid)
assert consumer_group.ssl_options == custom_ssl_options
refute consumer_group.ssl_options == ssl_options
assert consumer_group.use_ssl == true
end
end

test "create_worker allows us to provide a consumer group" do
{:ok, pid} =
KafkaEx.create_worker(:bah, consumer_group: "my_consumer_group")
Expand Down
14 changes: 8 additions & 6 deletions test/integration/server0_p_10_and_later_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do
Enum.member?(existing_topics(), name)
end)

assert @num_partitions ==
KafkaEx.Protocol.Metadata.Response.partitions_for_topic(
KafkaEx.metadata(),
name
)
|> Enum.count()
wait_for(fn ->
@num_partitions ==
KafkaEx.Protocol.Metadata.Response.partitions_for_topic(
KafkaEx.metadata(),
name
)
|> Enum.count()
end)
end

@tag :delete_topic
Expand Down

0 comments on commit a1bd360

Please sign in to comment.