Skip to content

Commit

Permalink
Add consumer group integration tests (#33)
Browse files Browse the repository at this point in the history
* Add consumer group integration tests

* Fix credo
  • Loading branch information
Argonus committed Feb 24, 2024
1 parent 4eabcf1 commit 1204a83
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/kayrock/broker_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Kayrock.BrokerConnection do

def send(conn, data), do: Connection.call(conn, {:send, data})

def recv(conn, timeout \\ 3000) do
def recv(conn, timeout \\ 5000) do
Connection.call(conn, {:recv, timeout})
end

Expand Down
135 changes: 135 additions & 0 deletions test/integration/consumer_group_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
defmodule Kayrock.Integration.ConsumerGroupTest do
use Kayrock.IntegrationCase
use ExUnit.Case, async: true

import Kayrock.TestSupport
import Kayrock.RequestFactory

container(:kafka, KafkaContainer.new(), shared: true)

describe "Consumer Group API" do
for api_version <- [0, 1, 2] do
test "v#{api_version} - allows to manage consumer groups", %{kafka: kafka} do
api_version = unquote(api_version)
group_name = unique_string()

{:ok, client_pid} = build_client(kafka)
topic_name = create_topic(client_pid, api_version)

# [WHEN] No consumer groups exist
# [THEN] List consumer groups returns empty list
list_request = list_consumer_groups_request(api_version)
{:ok, list_response} = Kayrock.client_call(client_pid, list_request, :controller)

matching_groups = Enum.filter(list_response.groups, &(&1.group_id == group_name))
assert list_response.error_code == 0
assert matching_groups == []

# [WHEN] We try to find a coordinator for a consumer group
coordinator_request = find_coordinator_request(group_name, api_version)

{:ok, coordinator_response} =
with_retry(fn ->
Kayrock.client_call(client_pid, coordinator_request, 1)
end)

# [THEN] We get a valid coordinator node
assert coordinator_response.error_code == 0
assert coordinator_response.coordinator.node_id > 0
node_id = coordinator_response.coordinator.node_id

# [WHEN] We join a group
member_data = %{group_id: group_name, topics: [topic_name]}
join_request = join_group_request(member_data, api_version)

{:ok, join_response} =
with_retry(fn ->
Kayrock.client_call(client_pid, join_request, node_id)
end)

assert join_response.error_code == 0
assert join_response.members != []
member_ids = Enum.map(join_response.members, & &1.member_id)
assert Enum.member?(member_ids, join_response.member_id)

# [THEN] We can list the consumer group
list_request = list_consumer_groups_request(api_version)
{:ok, list_response} = Kayrock.client_call(client_pid, list_request, node_id)

matching_groups = Enum.filter(list_response.groups, &(&1.group_id == group_name))
assert list_response.error_code == 0
assert matching_groups == [%{group_id: group_name, protocol_type: "consumer"}]

# [WHEN] We sync the group
assignments = [
%{
member_id: join_response.member_id,
topic: topic_name,
partitions: [0, 1, 2]
}
]

sync_request =
sync_group_request(group_name, join_response.member_id, assignments, api_version)

{:ok, sync_response} = Kayrock.client_call(client_pid, sync_request, node_id)
assert sync_response.error_code == 0

# [THEN] We can describe the consumer group
describe_request = describe_groups_request([group_name], api_version)
{:ok, describe_response} = Kayrock.client_call(client_pid, describe_request, node_id)

[group_info] = describe_response.groups
assert group_info.error_code == 0
assert group_info.group_id == group_name
assert group_info.protocol_type == "consumer"
[member] = group_info.members
assert member.member_id == join_response.member_id

# [WHEN] We leave the group
leave_group_request =
leave_group_request(group_name, join_response.member_id, api_version)

{:ok, leave_group_response} =
Kayrock.client_call(client_pid, leave_group_request, node_id)

assert leave_group_response.error_code == 0

# [THEN] We can don't find member in the group
describe_request = describe_groups_request([group_name], api_version)
{:ok, describe_response} = Kayrock.client_call(client_pid, describe_request, node_id)

[group_info] = describe_response.groups
assert group_info.error_code == 0
assert group_info.group_id == group_name
assert group_info.protocol_type == "consumer"
assert group_info.members == []

# [WHEN] We delete consumer group
delete_request = delete_groups_request([group_name], api_version)
{:ok, delete_response} = Kayrock.client_call(client_pid, delete_request, node_id)

assert delete_response.group_error_codes == [%{group_id: group_name, error_code: 0}]

# [THEN] We can't find the group
list_request = list_consumer_groups_request(api_version)
{:ok, list_response} = Kayrock.client_call(client_pid, list_request, node_id)

matching_groups = Enum.filter(list_response.groups, &(&1.group_id == group_name))
assert matching_groups == []
end
end
end

defp build_client(kafka) do
uris = [{"localhost", Container.mapped_port(kafka, 9092)}]

Check warning on line 125 in test/integration/consumer_group_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.14, 24.3)

Container.mapped_port/2 is undefined (module Container is not available or is yet to be defined)

Check warning on line 125 in test/integration/consumer_group_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.12, 22.3)

Container.mapped_port/2 is undefined (module Container is not available or is yet to be defined)
Kayrock.Client.start_link(uris)
end

defp create_topic(client_pid, api_version) do
topic_name = unique_string()
create_request = create_topic_request(topic_name, api_version)
{:ok, _} = Kayrock.client_call(client_pid, create_request, :controller)
topic_name
end
end
122 changes: 122 additions & 0 deletions test/support/request_factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,126 @@ defmodule Kayrock.RequestFactory do

%{Map.merge(request, request_date) | replica_id: -1}
end

@doc """
Creates a request to join a consumer group
Uses min of api_version and max supported version
"""
def list_consumer_groups_request(api_version) do
api_version = min(Kayrock.ListGroups.max_vsn(), api_version)
Kayrock.ListGroups.get_request_struct(api_version)
end

@doc """
Create a request to find coordinator for a consumer group
Uses min of api_version and max supported version
"""
def find_coordinator_request(group_id, api_version) do
api_version = min(Kayrock.FindCoordinator.max_vsn(), api_version)
request = Kayrock.FindCoordinator.get_request_struct(api_version)
coordinator_key(request, api_version, group_id)
end

defp coordinator_key(request, 0, group_id), do: %{request | group_id: group_id}

defp coordinator_key(request, _, group_id) do
%{request | coordinator_key: group_id, coordinator_type: 0}
end

@doc """
Creates a request to join a consumer group
Uses min of api_version and max supported version
"""
def join_group_request(member_data, api_version) do
api_version = min(Kayrock.JoinGroup.max_vsn(), api_version)
request = Kayrock.JoinGroup.get_request_struct(api_version)
topics = Map.fetch!(member_data, :topics)

%{
request
| group_id: Map.fetch!(member_data, :group_id),
session_timeout: Map.get(member_data, :session_timeout, 10_000),
member_id: Map.get(member_data, :member_id, ""),
protocol_type: "consumer",
group_protocols: [
%{
protocol_metadata: %Kayrock.GroupProtocolMetadata{topics: topics},
protocol_name: Map.get(member_data, :protocol_name, "assign")
}
]
}
|> add_rebalance_timeout(api_version, member_data)
end

defp add_rebalance_timeout(request, 0, _), do: request

defp add_rebalance_timeout(request, _, member_data) do
%{
request
| rebalance_timeout: Map.get(member_data, :rebalance_timeout, 30_000)
}
end

@doc """
Creates a request to sync a consumer group
Uses min of api_version and max supported version
"""
def sync_group_request(group_id, member_id, assignments, api_version) do
api_version = min(Kayrock.SyncGroup.max_vsn(), api_version)
request = Kayrock.SyncGroup.get_request_struct(api_version)

%{
request
| group_id: group_id,
member_id: member_id,
generation_id: 1,
group_assignment: build_assignments(assignments)
}
end

defp build_assignments(assignments) do
Enum.map(assignments, fn assignment ->
%{
member_id: Map.fetch!(assignment, :member_id),
member_assignment: %Kayrock.MemberAssignment{
partition_assignments: [
%Kayrock.MemberAssignment.PartitionAssignment{
topic: Map.fetch!(assignment, :topic),
partitions: Map.fetch!(assignment, :partitions)
}
]
}
}
end)
end

@doc """
Creates a request to describe a consumer groups
Uses min of api_version and max supported version
"""
def describe_groups_request(group_ids, api_version) do
api_version = min(Kayrock.DescribeGroups.max_vsn(), api_version)
request = Kayrock.DescribeGroups.get_request_struct(api_version)
%{request | group_ids: group_ids}
end

@doc """
Creates a request to leave a consumer group
Uses min of api_version and max supported version
"""
def leave_group_request(group_id, member_id, api_version) do
api_version = min(Kayrock.LeaveGroup.max_vsn(), api_version)
request = Kayrock.LeaveGroup.get_request_struct(api_version)
%{request | group_id: group_id, member_id: member_id}
end

@doc """
Creates a request to delete a group
Uses min of api_version and max supported version
"""
def delete_groups_request(group_ids, api_version) do
api_version = min(Kayrock.DeleteGroups.max_vsn(), api_version)
request = Kayrock.DeleteGroups.get_request_struct(api_version)
%{request | groups: group_ids}
end
end
18 changes: 18 additions & 0 deletions test/support/test_support.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,22 @@ defmodule Kayrock.TestSupport do
defp pad_list(l, n, pad_with) do
l ++ List.duplicate(pad_with, n - length(l))
end

@doc """
Calls the given function up to 3 times, sleeping 1 second between each call.
"""
def with_retry(fun), do: do_with_retry(3, fun, nil)

defp do_with_retry(0, _fun, result), do: result

defp do_with_retry(n, fun, _result) do
case fun.() do
{:ok, response = %{error_code: 0}} ->
{:ok, response}

result ->
:timer.sleep(1000)
do_with_retry(n - 1, fun, result)
end
end
end

0 comments on commit 1204a83

Please sign in to comment.