Skip to content

Commit

Permalink
Merge pull request #62 from thalesmg/fix-client-down-producers-restart
Browse files Browse the repository at this point in the history
fix: restart `pulsar_producers` if it encounters problems reaching `pulsar_client`
  • Loading branch information
thalesmg authored Oct 8, 2024
2 parents 03f85b9 + c3b07e6 commit 4b75f6b
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 3 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# 0.8.5

## Bug fixes

- Fixed an issue where `pulsar_producers` would stop and not restart if it encountered
problems when trying to reach `pulsar_client`.

# 0.8.4

## Bug fixes

- Fixed an issue where a producer process might've end up stuck with a closed connection
while it believed to be in the `connected` state.

# 0.8.3

## Bug fixes
Expand Down
6 changes: 3 additions & 3 deletions src/pulsar_producers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ handle_info(timeout, State = #state{client_id = ClientId, topic = Topic}) ->
{noreply, NewState#state{partitions = length(PartitionTopics)}};
{error, Reason} ->
log_error("get topic metatdata failed: ~p", [Reason]),
{stop, {shutdown, Reason}, State}
{stop, {failed_to_get_metadata, Reason}, State}
end;
{error, Reason} ->
{stop, {shutdown, Reason}, State}
{stop, {client_not_found, Reason}, State}
end;
handle_info({'EXIT', Pid, Error}, State = #state{workers = Workers, producers = Producers}) ->
log_error("Received EXIT from ~p, error: ~p", [Pid, Error]),
Expand All @@ -195,7 +195,7 @@ handle_info({restart_producer, Partition, PartitionTopic}, State = #state{client
{ok, Pid} ->
{noreply, start_producer(Pid, Partition, PartitionTopic, State)};
{error, Reason} ->
{stop, {shutdown, Reason}, State}
{stop, {client_not_found, Reason}, State}
end;
handle_info({producer_state_change, ProducerPid, ProducerState},
State = #state{producers = Producers, workers = WorkersTable})
Expand Down
76 changes: 76 additions & 0 deletions test/pulsar_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ resilience_multi_failure_tests() ->
resilience_single_failure_tests() ->
[ t_pulsar_drop_expired_batch_resend_inflight
, t_overflow
, t_client_down_producers_restart
].

groups() ->
Expand Down Expand Up @@ -1081,6 +1082,81 @@ t_per_request_callbacks(Config) ->

ok.

%% Checks that we restart producers if the client is down while producers attempt to use
%% it.
t_client_down_producers_restart(Config) ->
PulsarHost = ?config(pulsar_host, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
{ok, _} = pulsar_test_utils:reset_proxy(ProxyHost, ProxyPort),
{ok, _} = application:ensure_all_started(pulsar),
{ok, _ClientPid} = pulsar:ensure_supervised_client(?TEST_SUIT_CLIENT, [PulsarHost], #{}),
Topic = "persistent://public/default/" ++ atom_to_list(?FUNCTION_NAME),
ProducersName = ?FUNCTION_NAME,
ProducerOpts = #{
name => ProducersName,
batch_size => ?BATCH_SIZE,
strategy => random,
retention_period => infinity
},
{ok, Producers} = pulsar:ensure_supervised_producers(
?TEST_SUIT_CLIENT,
Topic,
ProducerOpts),
Batch = [#{key => <<"k">>, value => <<"v">>}],
%% pre-condition: everything is fine initially
?assertMatch({_, P} when is_pid(P), pulsar_producers:pick_producer(Producers, Batch)),
%% Now, pulsar becomes unresponsive while `pulsar_producers' is trying to get topic
%% metadata from it. It'll timeout and make producers shutdown.
pulsar_test_utils:enable_failure(down, ProxyHost, ProxyPort),
ProducersPid0 = whereis(ProducersName),
?assert(is_pid(ProducersPid0)),
MRef0 = monitor(process, ProducersPid0),
ProducersPid0 ! timeout,
CallTimeout = 30_000,
receive
{'DOWN', MRef0, process, ProducersPid0, Reason0} ->
ct:pal("shutdown reason: ~p", [Reason0]),
ok
after
CallTimeout + 3_000 ->
ct:fail("producers didn't shut down")
end,
%% ... then, they should eventually restart.
pulsar_test_utils:heal_failure(down, ProxyHost, ProxyPort),
ct:sleep(500),
ProducersPid1 = whereis(ProducersName),
?assert(is_pid(ProducersPid1)),
?assertNotEqual(ProducersPid0, ProducersPid1),
?assert(is_process_alive(ProducersPid1)),
?assertMatch({_, P} when is_pid(P), pulsar_producers:pick_producer(Producers, Batch)),

%% Alternatively: if the client happens to be restarting while `pulsar_producers' to
%% reach the client, it also shuts down.
pulsar_test_utils:with_mock(pulsar_client_sup, find_client,
fun(_ClientId) -> {error, undefined} end,
fun() ->
MRef1 = monitor(process, ProducersPid1),
ProducersPid1 ! timeout,
receive
{'DOWN', MRef1, process, ProducersPid1, Reason1} ->
ct:pal("shutdown reason: ~p", [Reason1]),
ok
after
CallTimeout + 3_000 ->
ct:fail("producers didn't shut down")
end
end
),
ct:sleep(500),
ProducersPid2 = whereis(ProducersName),
?assert(is_pid(ProducersPid2)),
?assertNotEqual(ProducersPid1, ProducersPid2),
?assert(is_process_alive(ProducersPid2)),
?assertMatch({_, P} when is_pid(P), pulsar_producers:pick_producer(Producers, Batch)),

ok.

t_producers_all_connected(Config) ->
PulsarHost = ?config(pulsar_host, Config),
{ok, _} = application:ensure_all_started(pulsar),
Expand Down

0 comments on commit 4b75f6b

Please sign in to comment.