Skip to content

Commit

Permalink
Fix for #21
Browse files Browse the repository at this point in the history
  • Loading branch information
Silviu Caragea committed May 27, 2020
1 parent ab72c61 commit e220c75
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/erlkaf_consumer_group.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,20 @@ handle_info({revoke_partitions, Partitions}, #state{
?LOG_INFO("revoke partitions: ~p", [Partitions]),
Pids = get_pids(ActiveTopicsMap, Partitions),
ok = stop_consumers(Pids),
?LOG_INFO("all existing consumers stopped for partitions: ~p", [Partitions]),
ok = erlkaf_nif:consumer_partition_revoke_completed(ClientRef),
{noreply, State#state{active_topics_map = #{}}};

handle_info({'EXIT', FromPid, Reason} , State) when Reason =/= normal ->
?LOG_WARNING("consumer ~p died with reason: ~p. restart consumer group ...", [FromPid, Reason]),
{stop, {error, Reason}, State};
handle_info({'EXIT', FromPid, Reason}, #state{active_topics_map = ActiveTopics} = State) when Reason =/= normal ->

case maps:size(ActiveTopics) of
0 ->
?LOG_WARNING("consumer ~p died with reason: ~p. no active topic (ignore message) ...", [FromPid, Reason]),
{noreply, State};
_ ->
?LOG_WARNING("consumer ~p died with reason: ~p. restart consumer group ...", [FromPid, Reason]),
{stop, {error, Reason}, State}
end;

handle_info(_Info, State) ->
{noreply, State}.
Expand Down

0 comments on commit e220c75

Please sign in to comment.