Skip to content

Commit

Permalink
Merge pull request #601 from thalesmg/20241025-fix-stop-consumer-crash
Browse files Browse the repository at this point in the history
fix: don't crash client if stopping an unknown consumer, stop `brod_consumer` processes when terminating `brod_group_subscriber_v2`
  • Loading branch information
thalesmg authored Oct 25, 2024
2 parents 0a49142 + 55b9ed5 commit 3e44cd4
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

- 4.3.1
- Fixed `brod_client:stop_consumer` so that it doesn't crash the client process if an unknown consumer is given as argument.
- Previously, `brod_group_subscriber_v2` could leave `brod_consumer` processes lingering even after its shutdown. Now, those processes are terminated.

- 4.3.0
- Split brod-cli out to a separate project [kafka4beam/brod-cli](https://github.com/kafka4beam/brod-cli)

Expand Down
4 changes: 2 additions & 2 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,8 @@ handle_call({stop_producer, Topic}, _From, State) ->
ok = brod_producers_sup:stop_producer(State#state.producers_sup, Topic),
{reply, ok, State};
handle_call({stop_consumer, Topic}, _From, State) ->
ok = brod_consumers_sup:stop_consumer(State#state.consumers_sup, Topic),
{reply, ok, State};
Reply = brod_consumers_sup:stop_consumer(State#state.consumers_sup, Topic),
{reply, Reply, State};
handle_call({get_leader_connection, Topic, Partition}, _From, State) ->
{Result, NewState} = do_get_leader_connection(State, Topic, Partition),
{reply, Result, NewState};
Expand Down
17 changes: 15 additions & 2 deletions src/brod_group_subscriber_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,15 @@ handle_info(_Info, State) ->
%%--------------------------------------------------------------------
-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()) -> any().
terminate(_Reason, #state{workers = Workers,
terminate(_Reason, #state{config = Config,
workers = Workers,
coordinator = Coordinator,
group_id = GroupId
}) ->
ok = terminate_all_workers(Workers),
ok = flush_offset_commits(GroupId, Coordinator).
ok = flush_offset_commits(GroupId, Coordinator),
ok = stop_consumers(Config),
ok.

%%%===================================================================
%%% Internal functions
Expand Down Expand Up @@ -529,6 +532,16 @@ do_ack(Topic, Partition, Offset, #state{ workers = Workers
{error, unknown_topic_or_partition}
end.

stop_consumers(Config) ->
#{ client := Client
, topics := Topics
} = Config,
lists:foreach(
fun(Topic) ->
_ = brod_client:stop_consumer(Client, Topic)
end,
Topics).

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
Expand Down
22 changes: 19 additions & 3 deletions test/brod_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@
, t_sasl_callback/1
, t_magic_version/1
, t_get_partitions_count_safe/1
, t_double_stop_consumer/1
]).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include("brod_int.hrl").

-define(HOST, "localhost").
-define(HOSTS, [{?HOST, 9092}]).
-define(HOSTS_SSL, [{?HOST, 9093}]).
-define(HOSTS_SASL_SSL, [{?HOST, 9094}]).
-define(HOSTS, [{?HOST, 9192}]).
-define(HOSTS_SSL, [{?HOST, 9193}]).
-define(HOSTS_SASL_SSL, [{?HOST, 9194}]).
-define(TOPIC, <<"brod-client-SUITE-topic">>).

-define(WAIT(PATTERN, RESULT, TIMEOUT),
Expand Down Expand Up @@ -379,6 +380,21 @@ t_magic_version(Config) when is_list(Config) ->
?assert(is_integer(Ts))
end.

t_double_stop_consumer({init, Config}) -> Config;
t_double_stop_consumer({'end', Config}) ->
brod:stop_client(?FUNCTION_NAME),
Config;
t_double_stop_consumer(Config) when is_list(Config) ->
Client = ?FUNCTION_NAME,
ClientConfig = [{get_metadata_timeout_seconds, 10}],
ok = start_client(?HOSTS, Client, ClientConfig),
ok = brod:start_consumer(Client, ?TOPIC, []),
?assertMatch({ok, _}, brod_client:get_consumer(Client, ?TOPIC, 0)),
?assertMatch(ok, brod_client:stop_consumer(Client, ?TOPIC)),
?assertMatch({error, {consumer_not_found, _}}, brod_client:get_consumer(Client, ?TOPIC, 0)),
?assertMatch({error, not_found}, brod_client:stop_consumer(Client, ?TOPIC)),
ok.

%%%_* Help functions ===========================================================

%% mocked callback
Expand Down
27 changes: 27 additions & 0 deletions test/brod_group_subscriber_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
, t_assign_partitions_handles_updating_state/1
, t_get_workers/1
, v2_coordinator_crash/1
, v2_consumer_cleanup/1
, v2_subscriber_shutdown/1
, v2_subscriber_assignments_revoked/1
]).
Expand Down Expand Up @@ -97,6 +98,7 @@ groups() ->
, t_assign_partitions_handles_updating_state
, t_get_workers
, v2_coordinator_crash
, v2_consumer_cleanup
, v2_subscriber_shutdown
, v2_subscriber_assignments_revoked
]}
Expand Down Expand Up @@ -391,6 +393,31 @@ v2_coordinator_crash(Config) when is_list(Config) ->
ok
end).

%% Checks that we don't leave `brod_consumer' processes lingering after we stop a group
%% subscriber v2.
v2_consumer_cleanup(Config) when is_list(Config) ->
InitArgs = #{},
Topic = ?topic,
Partition = 0,
Client = ?CLIENT_ID,
?check_trace(
#{timetrap => 5_000},
begin
{ok, SubscriberPid} = start_subscriber(?group_id, Config, [Topic], InitArgs),
%% Send a message to the topic and wait until it's received to make sure
%% the subscriber is stable:
produce({Topic, Partition}, <<0>>),
{ok, _} = ?wait_message(Topic, Partition, <<0>>, _),
?assertMatch({ok, _}, brod_client:get_consumer(Client, Topic, Partition)),
ok = stop_subscriber(Config, SubscriberPid),
?assertMatch({error, {consumer_not_found, _}},
brod_client:get_consumer(Client, Topic, Partition)),
ok
end,
[]
),
ok.

v2_subscriber_shutdown(Config) when is_list(Config) ->
%% Test graceful shutdown of the group subscriber:
InitArgs = #{async_ack => true},
Expand Down

0 comments on commit 3e44cd4

Please sign in to comment.