Skip to content

Commit

Permalink
feat: optimize fetch latency
Browse files Browse the repository at this point in the history
Introduced `share_leader_conn` consumer config option.
Set to `true' to consume less TCP connections towards Kafka,
but may lead to higher fetch latency. This is because Kafka can
ony accumulate messages for the oldest fetch request, later
requests behind it may get blocked until `max_wait_time' expires
for the oldest one
  • Loading branch information
zmstone committed Sep 21, 2024
1 parent 87b1296 commit 6ee6c85
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 38 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

- 4.2.0
- Optimize consumer fetch latency.
Introduced the `share_leader_conn` consumer configuration option (default: `false`).
This setting allows users to opt for the previous behavior if preferred (set to `true`).

- 4.1.1
- Upgrade `kafka_protocol` from version 4.1.5 to 4.1.9.

Expand Down
1 change: 1 addition & 0 deletions src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@
| {offset_reset_policy, brod_consumer:offset_reset_policy()}
| {size_stat_window, non_neg_integer()}
| {isolation_level, brod_consumer:isolation_level()}
| {share_leader_conn, boolean()}
].
%% Consumer configuration.
%%
Expand Down
11 changes: 10 additions & 1 deletion src/brod_client.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%%
%%% Copyright (c) 2015-2021 Klarna Bank AB (publ)
%%% Copyright (c) 2022-2024 kafka4beam contributors
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
, get_group_coordinator/2
, get_transactional_coordinator/2
, get_leader_connection/3
, get_bootstrap/1
, get_metadata/2
, get_metadata_safe/2
, get_partitions_count/2
Expand Down Expand Up @@ -227,6 +228,10 @@ stop_consumer(Client, TopicName) ->
get_leader_connection(Client, Topic, Partition) ->
safe_gen_call(Client, {get_leader_connection, Topic, Partition}, infinity).

-spec get_bootstrap(client()) -> {ok, brod:bootstrap()} | {error, any()}.
get_bootstrap(Client) ->
safe_gen_call(Client, get_bootstrap, infinity).

%% @doc Get connection to a kafka broker.
%%
%% Return already established connection towards the broker,
Expand Down Expand Up @@ -388,6 +393,10 @@ handle_call({stop_consumer, Topic}, _From, State) ->
handle_call({get_leader_connection, Topic, Partition}, _From, State) ->
{Result, NewState} = do_get_leader_connection(State, Topic, Partition),
{reply, Result, NewState};
handle_call(get_bootstrap, _From, State) ->
#state{bootstrap_endpoints = Endpoints} = State,
ConnConfig = conn_config(State),
{reply, {ok, {Endpoints, ConnConfig}}, State};
handle_call({get_connection, Host, Port}, _From, State) ->
{Result, NewState} = maybe_connect(State, {Host, Port}),
{reply, Result, NewState};
Expand Down
104 changes: 76 additions & 28 deletions src/brod_consumer.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
%%% Copyright (c) 2014-2021 Klarna Bank AB (publ)
%%% Copyright (c) 2022-2024 kafka4beam contributors
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,7 +94,10 @@
-type pending_acks() :: #pending_acks{}.
-type isolation_level() :: kpro:isolation_level().

-record(state, { bootstrap :: pid() | brod:bootstrap()
-define(GET_FROM_CLIENT, get).
-define(IGNORE, ignore).
-record(state, { client_pid :: ?IGNORE | pid()
, bootstrap :: ?IGNORE | ?GET_FROM_CLIENT | brod:bootstrap()
, connection :: ?undef | pid()
, topic :: binary()
, partition :: integer()
Expand Down Expand Up @@ -136,6 +140,7 @@
-define(INIT_CONNECTION, init_connection).
-define(DEFAULT_AVG_WINDOW, 5).
-define(DEFAULT_ISOLATION_LEVEL, ?kpro_read_committed).
-define(DEFAULT_SHARE_LEADER_CONN, false).

%%%_* APIs =====================================================================
%% @equiv start_link(ClientPid, Topic, Partition, Config, [])
Expand Down Expand Up @@ -220,6 +225,16 @@ start_link(Bootstrap, Topic, Partition, Config) ->
%% and `read_committed' to get only the records from committed
%% transactions</li>
%%
%% <li>`share_leader_conn': (optional, default = `false')
%%
%% Whether or not share the partition leader connection with
%% other producers or consumers.
%% Set to `true' to consume less TCP connections towards Kafka,
%% but may lead to higher fetch latency. This is because Kafka can
%% ony accumulate messages for the oldest fetch request, later
%% requests behind it may get blocked until `max_wait_time' expires
%% for the oldest one</li>
%%
%% </ul>
%% @end
-spec start_link(pid() | brod:bootstrap(),
Expand Down Expand Up @@ -286,7 +301,7 @@ get_connection(Pid) ->

%%%_* gen_server callbacks =====================================================

init({Bootstrap, Topic, Partition, Config}) ->
init({Bootstrap0, Topic, Partition, Config}) ->
erlang:process_flag(trap_exit, true),
Cfg = fun(Name, Default) ->
proplists:get_value(Name, Config, Default)
Expand All @@ -300,15 +315,33 @@ init({Bootstrap, Topic, Partition, Config}) ->
BeginOffset = Cfg(begin_offset, ?DEFAULT_BEGIN_OFFSET),
OffsetResetPolicy = Cfg(offset_reset_policy, ?DEFAULT_OFFSET_RESET_POLICY),
IsolationLevel = Cfg(isolation_level, ?DEFAULT_ISOLATION_LEVEL),

%% If bootstrap is a client pid, register self to the client
case is_shared_conn(Bootstrap) of
IsShareConn = Cfg(share_leader_conn, ?DEFAULT_SHARE_LEADER_CONN),

%% resolve connection bootstrap args
{ClientPid, Bootstrap} =
case is_pid(Bootstrap0) of
true when IsShareConn ->
%% share leader connection with other producers/consumers
%% the connection is to be managed by brod_client
{Bootstrap0, ?IGNORE};
true ->
%% not sharing leader connection with other producers/consumers
%% the bootstrap args will be resolved later when it's
%% time to establish a connection to partition leader
{Bootstrap0, ?GET_FROM_CLIENT};
false ->
%% this consumer process is not started from `brod' APIs
%% maybe managed by other supervisors.
{?IGNORE, Bootstrap0}
end,
case is_pid(ClientPid) of
true ->
ok = brod_client:register_consumer(Bootstrap, Topic, Partition);
false ->
ok
end,
{ok, #state{ bootstrap = Bootstrap
{ok, #state{ client_pid = ClientPid
, bootstrap = Bootstrap
, topic = Topic
, partition = Partition
, begin_offset = BeginOffset
Expand Down Expand Up @@ -418,20 +451,26 @@ handle_cast(Cast, State) ->
{noreply, State}.

%% @private
terminate(Reason, #state{ bootstrap = Bootstrap
terminate(Reason, #state{ client_pid = ClientPid
, topic = Topic
, partition = Partition
, connection = Connection
, connection_mref = Mref
}) ->
IsShared = is_shared_conn(Bootstrap),
IsNormal = brod_utils:is_normal_reason(Reason),
%% deregister consumer if it's shared connection and normal shutdown
IsShared andalso IsNormal andalso
brod_client:deregister_consumer(Bootstrap, Topic, Partition),
%% close connection if it's working standalone
case not IsShared andalso is_pid(Connection) of
true -> kpro:close_connection(Connection);
false -> ok
case is_pid(ClientPid) andalso IsNormal of
true ->
brod_client:deregister_consumer(ClientPid, Topic, Partition);
false ->
ok
end,
%% close connection if it's owned by this consumer
case Mref =:= ?undef andalso is_pid(Connection) andalso is_process_alive(Connection) of
true ->
kpro:close_connection(Connection);
false ->
ok
end,
%% write a log if it's not a normal reason
IsNormal orelse ?BROD_LOG_ERROR("Consumer ~s-~w terminate reason: ~p",
Expand Down Expand Up @@ -858,17 +897,19 @@ safe_gen_call(Server, Call, Timeout) ->
-spec maybe_init_connection(state()) ->
{ok, state()} | {{error, any()}, state()}.
maybe_init_connection(
#state{ bootstrap = Bootstrap
#state{ client_pid = ClientPid
, bootstrap = Bootstrap
, topic = Topic
, partition = Partition
, connection = ?undef
} = State0) ->
%% Lookup, or maybe (re-)establish a connection to partition leader
case connect_leader(Bootstrap, Topic, Partition) of
{MonitorOrLink, Result} = connect_leader(ClientPid, Bootstrap, Topic, Partition),
case Result of
{ok, Connection} ->
Mref = case is_shared_conn(Bootstrap) of
true -> erlang:monitor(process, Connection);
false -> ?undef %% linked
Mref = case MonitorOrLink of
monitor -> erlang:monitor(process, Connection);
linked -> ?undef
end,
%% Switching to a new connection
%% the response for last_req_ref will be lost forever
Expand All @@ -883,13 +924,23 @@ maybe_init_connection(
maybe_init_connection(State) ->
{ok, State}.

connect_leader(ClientPid, Topic, Partition) when is_pid(ClientPid) ->
brod_client:get_leader_connection(ClientPid, Topic, Partition);
connect_leader(Endpoints, Topic, Partition) when is_list(Endpoints) ->
connect_leader({Endpoints, []}, Topic, Partition);
connect_leader({Endpoints, ConnCfg}, Topic, Partition) ->
connect_leader(ClientPid, ?IGNORE, Topic, Partition) when is_pid(ClientPid) ->
{monitor, brod_client:get_leader_connection(ClientPid, Topic, Partition)};
connect_leader(ClientPid, ?GET_FROM_CLIENT, Topic, Partition) when is_pid(ClientPid) ->
case brod_client:get_bootstrap(ClientPid) of
{ok, Bootstrap} ->
link_connect_leader(Bootstrap, Topic, Partition);
{error, Reason} ->
{linked, {error, Reason}}
end;
connect_leader(?IGNORE, Bootstrap, Topic, Partition) ->
link_connect_leader(Bootstrap, Topic, Partition).

link_connect_leader(Endpoints, Topic, Partition) when is_list(Endpoints) ->
link_connect_leader({Endpoints, []}, Topic, Partition);
link_connect_leader({Endpoints, ConnCfg}, Topic, Partition) ->
%% connection pid is linked to self()
kpro:connect_partition_leader(Endpoints, ConnCfg, Topic, Partition).
{linked, kpro:connect_partition_leader(Endpoints, ConnCfg, Topic, Partition)}.

%% Send a ?INIT_CONNECTION delayed loopback message to re-init.
-spec maybe_send_init_connection(state()) -> ok.
Expand All @@ -900,9 +951,6 @@ maybe_send_init_connection(#state{subscriber = Subscriber}) ->
erlang:send_after(Timeout, self(), ?INIT_CONNECTION),
ok.

%% In case 'bootstrap' is a client pid, connection is shared with other workers.
is_shared_conn(Bootstrap) -> is_pid(Bootstrap).

%%%_* Tests ====================================================================

-ifdef(TEST).
Expand Down
51 changes: 42 additions & 9 deletions test/brod_consumer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
, t_direct_fetch_expand_max_bytes/1
, t_resolve_offset/1
, t_consumer_max_bytes_too_small/1
, t_consumer_connection_restart/1
, t_consumer_connection_restart_0/1
, t_consumer_connection_restart_1/1
, t_consumer_connection_restart_2/1
, t_consumer_resubscribe/1
, t_consumer_resubscribe_earliest/1
Expand Down Expand Up @@ -536,8 +537,20 @@ t_consumer_max_bytes_too_small(Config) ->
end).

%% @doc Consumer should auto recover from connection down, subscriber should not
%% notice a thing except for a few seconds of break in data streaming
t_consumer_connection_restart(Config) ->
%% notice a thing except for a few seconds of break in data streaming.
%% Covers the case when connection is shared with other partition leaders
t_consumer_connection_restart_0(Config) ->
ConsumerConfig = [{share_leader_conn, true} | consumer_config()],
consumer_connection_restart(Config, ConsumerConfig).

%% @doc Consumer should auto recover from connection down, subscriber should not
%% notice a thing except for a few seconds of break in data streaming.
%% Covers the case when connection is NOT shared with other partition leaders
t_consumer_connection_restart_1(Config) ->
ConsumerConfig = [{share_leader_conn, false} | consumer_config()],
consumer_connection_restart(Config, ConsumerConfig).

consumer_connection_restart(Config, ConsumerConfig) ->
Client = ?config(client),
Topic = ?TOPIC,
Partition = 0,
Expand All @@ -546,7 +559,7 @@ t_consumer_connection_restart(Config) ->
, {prefetch_bytes, 0}
, {min_bytes, 1}
, {max_bytes, 12} %% ensure fetch exactly one message at a time
| consumer_config()
| ConsumerConfig
],
{ok, ConsumerPid} =
brod_consumer:start_link(whereis(Client), Topic, Partition, ConsumerCfg),
Expand Down Expand Up @@ -586,11 +599,31 @@ t_consumer_connection_restart(Config) ->
Nums2 = Receive(Nums1, 5000),
?assertError(timeout, Receive(Nums2, 100)),
?assertEqual(NumsCnt - 2, length(Nums2)),
?assertEqual({ok, NewConnPid},
brod_client:get_leader_connection(Client, Topic, Partition)),
ok = brod_consumer:stop(ConsumerPid),
?assertNot(is_process_alive(ConsumerPid)),
?assert(is_process_alive(NewConnPid)), %% managed by brod_client
case proplists:get_bool(share_leader_conn, ConsumerConfig) of
true ->
?assertEqual({ok, NewConnPid},
brod_client:get_leader_connection(Client, Topic, Partition)),
ok = brod_consumer:stop(ConsumerPid),
?assertNot(is_process_alive(ConsumerPid)),
?assert(is_process_alive(NewConnPid));
false ->
%% assert normal shutdown
Ref1 = erlang:monitor(process, NewConnPid),
Ref2 = erlang:monitor(process, ConsumerPid),
%% assert connection linked to consumer
{links, Links} = process_info(ConsumerPid, links),
?assert(lists:member(NewConnPid, Links)),
ok = brod_consumer:stop(ConsumerPid),
Wait = fun() ->
?WAIT_ONLY({'DOWN', Ref, process, _, Reason},
begin
?assertEqual(normal, Reason),
?assert(Ref =:= Ref1 orelse Ref =:= Ref2)
end)
end,
Wait(),
Wait()
end,
ok.

%% @doc same as t_consumer_connection_restart,
Expand Down

0 comments on commit 6ee6c85

Please sign in to comment.