Skip to content

Commit

Permalink
Merge pull request #598 from zmstone/0921-consumer-exclusive-own-conn…
Browse files Browse the repository at this point in the history
…ection

feat: add boolean flag 'share_leader_conn' in consumer config
  • Loading branch information
zmstone committed Sep 21, 2024
2 parents 87b1296 + 6ee6c85 commit 494a0cc
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 38 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

- 4.2.0
- Optimize consumer fetch latency.
Introduced the `share_leader_conn` consumer configuration option (default: `false`).
This setting allows users to opt for the previous behavior if preferred (set to `true`).

- 4.1.1
- Upgrade `kafka_protocol` from version 4.1.5 to 4.1.9.

Expand Down
1 change: 1 addition & 0 deletions src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@
| {offset_reset_policy, brod_consumer:offset_reset_policy()}
| {size_stat_window, non_neg_integer()}
| {isolation_level, brod_consumer:isolation_level()}
| {share_leader_conn, boolean()}
].
%% Consumer configuration.
%%
Expand Down
11 changes: 10 additions & 1 deletion src/brod_client.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%%
%%% Copyright (c) 2015-2021 Klarna Bank AB (publ)
%%% Copyright (c) 2022-2024 kafka4beam contributors
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
, get_group_coordinator/2
, get_transactional_coordinator/2
, get_leader_connection/3
, get_bootstrap/1
, get_metadata/2
, get_metadata_safe/2
, get_partitions_count/2
Expand Down Expand Up @@ -227,6 +228,10 @@ stop_consumer(Client, TopicName) ->
get_leader_connection(Client, Topic, Partition) ->
safe_gen_call(Client, {get_leader_connection, Topic, Partition}, infinity).

-spec get_bootstrap(client()) -> {ok, brod:bootstrap()} | {error, any()}.
get_bootstrap(Client) ->
safe_gen_call(Client, get_bootstrap, infinity).

%% @doc Get connection to a kafka broker.
%%
%% Return already established connection towards the broker,
Expand Down Expand Up @@ -388,6 +393,10 @@ handle_call({stop_consumer, Topic}, _From, State) ->
handle_call({get_leader_connection, Topic, Partition}, _From, State) ->
{Result, NewState} = do_get_leader_connection(State, Topic, Partition),
{reply, Result, NewState};
handle_call(get_bootstrap, _From, State) ->
#state{bootstrap_endpoints = Endpoints} = State,
ConnConfig = conn_config(State),
{reply, {ok, {Endpoints, ConnConfig}}, State};
handle_call({get_connection, Host, Port}, _From, State) ->
{Result, NewState} = maybe_connect(State, {Host, Port}),
{reply, Result, NewState};
Expand Down
104 changes: 76 additions & 28 deletions src/brod_consumer.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
%%% Copyright (c) 2014-2021 Klarna Bank AB (publ)
%%% Copyright (c) 2022-2024 kafka4beam contributors
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,7 +94,10 @@
-type pending_acks() :: #pending_acks{}.
-type isolation_level() :: kpro:isolation_level().

-record(state, { bootstrap :: pid() | brod:bootstrap()
-define(GET_FROM_CLIENT, get).
-define(IGNORE, ignore).
-record(state, { client_pid :: ?IGNORE | pid()
, bootstrap :: ?IGNORE | ?GET_FROM_CLIENT | brod:bootstrap()
, connection :: ?undef | pid()
, topic :: binary()
, partition :: integer()
Expand Down Expand Up @@ -136,6 +140,7 @@
-define(INIT_CONNECTION, init_connection).
-define(DEFAULT_AVG_WINDOW, 5).
-define(DEFAULT_ISOLATION_LEVEL, ?kpro_read_committed).
-define(DEFAULT_SHARE_LEADER_CONN, false).

%%%_* APIs =====================================================================
%% @equiv start_link(ClientPid, Topic, Partition, Config, [])
Expand Down Expand Up @@ -220,6 +225,16 @@ start_link(Bootstrap, Topic, Partition, Config) ->
%% and `read_committed' to get only the records from committed
%% transactions</li>
%%
%% <li>`share_leader_conn': (optional, default = `false')
%%
%% Whether or not share the partition leader connection with
%% other producers or consumers.
%% Set to `true' to consume less TCP connections towards Kafka,
%% but may lead to higher fetch latency. This is because Kafka can
%% ony accumulate messages for the oldest fetch request, later
%% requests behind it may get blocked until `max_wait_time' expires
%% for the oldest one</li>
%%
%% </ul>
%% @end
-spec start_link(pid() | brod:bootstrap(),
Expand Down Expand Up @@ -286,7 +301,7 @@ get_connection(Pid) ->

%%%_* gen_server callbacks =====================================================

init({Bootstrap, Topic, Partition, Config}) ->
init({Bootstrap0, Topic, Partition, Config}) ->
erlang:process_flag(trap_exit, true),
Cfg = fun(Name, Default) ->
proplists:get_value(Name, Config, Default)
Expand All @@ -300,15 +315,33 @@ init({Bootstrap, Topic, Partition, Config}) ->
BeginOffset = Cfg(begin_offset, ?DEFAULT_BEGIN_OFFSET),
OffsetResetPolicy = Cfg(offset_reset_policy, ?DEFAULT_OFFSET_RESET_POLICY),
IsolationLevel = Cfg(isolation_level, ?DEFAULT_ISOLATION_LEVEL),

%% If bootstrap is a client pid, register self to the client
case is_shared_conn(Bootstrap) of
IsShareConn = Cfg(share_leader_conn, ?DEFAULT_SHARE_LEADER_CONN),

%% resolve connection bootstrap args
{ClientPid, Bootstrap} =
case is_pid(Bootstrap0) of
true when IsShareConn ->
%% share leader connection with other producers/consumers
%% the connection is to be managed by brod_client
{Bootstrap0, ?IGNORE};
true ->
%% not sharing leader connection with other producers/consumers
%% the bootstrap args will be resolved later when it's
%% time to establish a connection to partition leader
{Bootstrap0, ?GET_FROM_CLIENT};
false ->
%% this consumer process is not started from `brod' APIs
%% maybe managed by other supervisors.
{?IGNORE, Bootstrap0}
end,
case is_pid(ClientPid) of
true ->
ok = brod_client:register_consumer(Bootstrap, Topic, Partition);
false ->
ok
end,
{ok, #state{ bootstrap = Bootstrap
{ok, #state{ client_pid = ClientPid
, bootstrap = Bootstrap
, topic = Topic
, partition = Partition
, begin_offset = BeginOffset
Expand Down Expand Up @@ -418,20 +451,26 @@ handle_cast(Cast, State) ->
{noreply, State}.

%% @private
terminate(Reason, #state{ bootstrap = Bootstrap
terminate(Reason, #state{ client_pid = ClientPid
, topic = Topic
, partition = Partition
, connection = Connection
, connection_mref = Mref
}) ->
IsShared = is_shared_conn(Bootstrap),
IsNormal = brod_utils:is_normal_reason(Reason),
%% deregister consumer if it's shared connection and normal shutdown
IsShared andalso IsNormal andalso
brod_client:deregister_consumer(Bootstrap, Topic, Partition),
%% close connection if it's working standalone
case not IsShared andalso is_pid(Connection) of
true -> kpro:close_connection(Connection);
false -> ok
case is_pid(ClientPid) andalso IsNormal of
true ->
brod_client:deregister_consumer(ClientPid, Topic, Partition);
false ->
ok
end,
%% close connection if it's owned by this consumer
case Mref =:= ?undef andalso is_pid(Connection) andalso is_process_alive(Connection) of
true ->
kpro:close_connection(Connection);
false ->
ok
end,
%% write a log if it's not a normal reason
IsNormal orelse ?BROD_LOG_ERROR("Consumer ~s-~w terminate reason: ~p",
Expand Down Expand Up @@ -858,17 +897,19 @@ safe_gen_call(Server, Call, Timeout) ->
-spec maybe_init_connection(state()) ->
{ok, state()} | {{error, any()}, state()}.
maybe_init_connection(
#state{ bootstrap = Bootstrap
#state{ client_pid = ClientPid
, bootstrap = Bootstrap
, topic = Topic
, partition = Partition
, connection = ?undef
} = State0) ->
%% Lookup, or maybe (re-)establish a connection to partition leader
case connect_leader(Bootstrap, Topic, Partition) of
{MonitorOrLink, Result} = connect_leader(ClientPid, Bootstrap, Topic, Partition),
case Result of
{ok, Connection} ->
Mref = case is_shared_conn(Bootstrap) of
true -> erlang:monitor(process, Connection);
false -> ?undef %% linked
Mref = case MonitorOrLink of
monitor -> erlang:monitor(process, Connection);
linked -> ?undef
end,
%% Switching to a new connection
%% the response for last_req_ref will be lost forever
Expand All @@ -883,13 +924,23 @@ maybe_init_connection(
maybe_init_connection(State) ->
{ok, State}.

connect_leader(ClientPid, Topic, Partition) when is_pid(ClientPid) ->
brod_client:get_leader_connection(ClientPid, Topic, Partition);
connect_leader(Endpoints, Topic, Partition) when is_list(Endpoints) ->
connect_leader({Endpoints, []}, Topic, Partition);
connect_leader({Endpoints, ConnCfg}, Topic, Partition) ->
connect_leader(ClientPid, ?IGNORE, Topic, Partition) when is_pid(ClientPid) ->
{monitor, brod_client:get_leader_connection(ClientPid, Topic, Partition)};
connect_leader(ClientPid, ?GET_FROM_CLIENT, Topic, Partition) when is_pid(ClientPid) ->
case brod_client:get_bootstrap(ClientPid) of
{ok, Bootstrap} ->
link_connect_leader(Bootstrap, Topic, Partition);
{error, Reason} ->
{linked, {error, Reason}}
end;
connect_leader(?IGNORE, Bootstrap, Topic, Partition) ->
link_connect_leader(Bootstrap, Topic, Partition).

link_connect_leader(Endpoints, Topic, Partition) when is_list(Endpoints) ->
link_connect_leader({Endpoints, []}, Topic, Partition);
link_connect_leader({Endpoints, ConnCfg}, Topic, Partition) ->
%% connection pid is linked to self()
kpro:connect_partition_leader(Endpoints, ConnCfg, Topic, Partition).
{linked, kpro:connect_partition_leader(Endpoints, ConnCfg, Topic, Partition)}.

%% Send a ?INIT_CONNECTION delayed loopback message to re-init.
-spec maybe_send_init_connection(state()) -> ok.
Expand All @@ -900,9 +951,6 @@ maybe_send_init_connection(#state{subscriber = Subscriber}) ->
erlang:send_after(Timeout, self(), ?INIT_CONNECTION),
ok.

%% In case 'bootstrap' is a client pid, connection is shared with other workers.
is_shared_conn(Bootstrap) -> is_pid(Bootstrap).

%%%_* Tests ====================================================================

-ifdef(TEST).
Expand Down
51 changes: 42 additions & 9 deletions test/brod_consumer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
, t_direct_fetch_expand_max_bytes/1
, t_resolve_offset/1
, t_consumer_max_bytes_too_small/1
, t_consumer_connection_restart/1
, t_consumer_connection_restart_0/1
, t_consumer_connection_restart_1/1
, t_consumer_connection_restart_2/1
, t_consumer_resubscribe/1
, t_consumer_resubscribe_earliest/1
Expand Down Expand Up @@ -536,8 +537,20 @@ t_consumer_max_bytes_too_small(Config) ->
end).

%% @doc Consumer should auto recover from connection down, subscriber should not
%% notice a thing except for a few seconds of break in data streaming
t_consumer_connection_restart(Config) ->
%% notice a thing except for a few seconds of break in data streaming.
%% Covers the case when connection is shared with other partition leaders
t_consumer_connection_restart_0(Config) ->
ConsumerConfig = [{share_leader_conn, true} | consumer_config()],
consumer_connection_restart(Config, ConsumerConfig).

%% @doc Consumer should auto recover from connection down, subscriber should not
%% notice a thing except for a few seconds of break in data streaming.
%% Covers the case when connection is NOT shared with other partition leaders
t_consumer_connection_restart_1(Config) ->
ConsumerConfig = [{share_leader_conn, false} | consumer_config()],
consumer_connection_restart(Config, ConsumerConfig).

consumer_connection_restart(Config, ConsumerConfig) ->
Client = ?config(client),
Topic = ?TOPIC,
Partition = 0,
Expand All @@ -546,7 +559,7 @@ t_consumer_connection_restart(Config) ->
, {prefetch_bytes, 0}
, {min_bytes, 1}
, {max_bytes, 12} %% ensure fetch exactly one message at a time
| consumer_config()
| ConsumerConfig
],
{ok, ConsumerPid} =
brod_consumer:start_link(whereis(Client), Topic, Partition, ConsumerCfg),
Expand Down Expand Up @@ -586,11 +599,31 @@ t_consumer_connection_restart(Config) ->
Nums2 = Receive(Nums1, 5000),
?assertError(timeout, Receive(Nums2, 100)),
?assertEqual(NumsCnt - 2, length(Nums2)),
?assertEqual({ok, NewConnPid},
brod_client:get_leader_connection(Client, Topic, Partition)),
ok = brod_consumer:stop(ConsumerPid),
?assertNot(is_process_alive(ConsumerPid)),
?assert(is_process_alive(NewConnPid)), %% managed by brod_client
case proplists:get_bool(share_leader_conn, ConsumerConfig) of
true ->
?assertEqual({ok, NewConnPid},
brod_client:get_leader_connection(Client, Topic, Partition)),
ok = brod_consumer:stop(ConsumerPid),
?assertNot(is_process_alive(ConsumerPid)),
?assert(is_process_alive(NewConnPid));
false ->
%% assert normal shutdown
Ref1 = erlang:monitor(process, NewConnPid),
Ref2 = erlang:monitor(process, ConsumerPid),
%% assert connection linked to consumer
{links, Links} = process_info(ConsumerPid, links),
?assert(lists:member(NewConnPid, Links)),
ok = brod_consumer:stop(ConsumerPid),
Wait = fun() ->
?WAIT_ONLY({'DOWN', Ref, process, _, Reason},
begin
?assertEqual(normal, Reason),
?assert(Ref =:= Ref1 orelse Ref =:= Ref2)
end)
end,
Wait(),
Wait()
end,
ok.

%% @doc same as t_consumer_connection_restart,
Expand Down

0 comments on commit 494a0cc

Please sign in to comment.