Skip to content

Commit

Permalink
Merge pull request #589 from indrekj/fold-fix
Browse files Browse the repository at this point in the history
Fix folding over transactions
  • Loading branch information
zmstone authored Aug 12, 2024
2 parents 289e66a + a2860aa commit 17162a3
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 18 deletions.
74 changes: 56 additions & 18 deletions src/brod_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
, epoch_ms/0
, fetch/4
, fetch/5
, fetch_one_batch/4
, fold/8
, fetch_committed_offsets/3
, fetch_committed_offsets/4
Expand Down Expand Up @@ -66,6 +67,8 @@
-type req_fun() :: fun((offset(), kpro:count()) -> kpro:req()).
-type fetch_fun() :: fun((offset()) -> {ok, {offset(), [brod:message()]}} |
{error, any()}).
-type fetch_fun2() :: fun((offset()) -> {ok, {offset(), offset(), [brod:message()]}} |
{error, any()}).
-type connection() :: kpro:connection().
-type conn_config() :: brod:conn_config().
-type topic() :: brod:topic().
Expand Down Expand Up @@ -331,7 +334,7 @@ fold(Client, Topic, Partition, Offset, Opts,
?BROD_FOLD_RET(Acc, Offset, {error, Reason})
end;
fold(Conn, Topic, Partition, Offset, Opts, Acc, Fun, Limits) ->
Fetch = make_fetch_fun(Conn, Topic, Partition, Opts),
Fetch = make_fetch_fun2(Conn, Topic, Partition, Opts),
Infinity = 1 bsl 64,
EndOffset = maps:get(reach_offset, Limits, Infinity),
CountLimit = maps:get(message_count, Limits, Infinity),
Expand All @@ -346,13 +349,21 @@ fold(Conn, Topic, Partition, Offset, Opts, Acc, Fun, Limits) ->
-spec make_fetch_fun(pid(), topic(), partition(), brod:fetch_opts()) ->
fetch_fun().
make_fetch_fun(Conn, Topic, Partition, FetchOpts) ->
make_fetch_fun(Conn, Topic, Partition, FetchOpts, fun fetch/4).

-spec make_fetch_fun2(pid(), topic(), partition(), brod:fetch_opts()) ->
fetch_fun2().
make_fetch_fun2(Conn, Topic, Partition, FetchOpts) ->
make_fetch_fun(Conn, Topic, Partition, FetchOpts, fun fetch_one_batch/4).

make_fetch_fun(Conn, Topic, Partition, FetchOpts, FetchFun) ->
WaitTime = maps:get(max_wait_time, FetchOpts, 1000),
MinBytes = maps:get(min_bytes, FetchOpts, 1),
MaxBytes = maps:get(max_bytes, FetchOpts, 1 bsl 20),
IsolationLevel = maps:get(isolation_level, FetchOpts, ?kpro_read_committed),
ReqFun = make_req_fun(Conn, Topic, Partition, WaitTime,
MinBytes, IsolationLevel),
fun(Offset) -> ?MODULE:fetch(Conn, ReqFun, Offset, MaxBytes) end.
fun(Offset) -> FetchFun(Conn, ReqFun, Offset, MaxBytes) end.

-spec make_part_fun(brod:partitioner()) -> brod:partition_fun().
make_part_fun(random) ->
Expand Down Expand Up @@ -445,12 +456,37 @@ do_fetch_committed_offsets(Conn, GroupId, Topics) when is_pid(Conn) ->
-spec fetch(connection(), req_fun(), offset(), kpro:count()) ->
{ok, {offset(), [brod:message()]}} | {error, any()}.
fetch(Conn, ReqFun, Offset, MaxBytes) ->
case do_fetch(Conn, ReqFun, Offset, MaxBytes) of
{ok, {StableOffset, _NextOffset, Msgs}} ->
{ok, {StableOffset, Msgs}}; %% for backward compatibility
Other ->
Other
end.

%% @doc Fetch a message-set. If the given MaxBytes is not enough to fetch a
%% single message, expand it to fetch exactly one message
%% The fetch/4 may return an empty batch even if there can be more messages in
%% the topic. This function returns a non-empty batch unless the stable offset
%% is reached.
-spec fetch_one_batch(connection(), req_fun(), offset(), kpro:count()) ->
{ok, {offset(), offset(), [brod:message()]}} | {error, any()}.
fetch_one_batch(Conn, ReqFun, Offset, MaxBytes) ->
case do_fetch(Conn, ReqFun, Offset, MaxBytes) of
{ok, {StableOffset, NextOffset, []}} when NextOffset < StableOffset ->
fetch_one_batch(Conn, ReqFun, NextOffset, MaxBytes);
Other ->
Other
end.

-spec do_fetch(connection(), req_fun(), offset(), kpro:count()) ->
{ok, {offset(), offset(), [brod:message()]}} | {error, any()}.
do_fetch(Conn, ReqFun, Offset, MaxBytes) ->
Request = ReqFun(Offset, MaxBytes),
case request_sync(Conn, Request, infinity) of
{ok, #{error_code := ErrorCode}} when ?IS_ERROR(ErrorCode) ->
{error, ErrorCode};
{ok, #{batches := ?incomplete_batch(Size)}} ->
fetch(Conn, ReqFun, Offset, Size);
do_fetch(Conn, ReqFun, Offset, Size);
{ok, #{header := Header, batches := Batches}} ->
StableOffset = get_stable_offset(Header),
{NewBeginOffset0, Msgs} = flatten_batches(Offset, Header, Batches),
Expand All @@ -472,9 +508,9 @@ fetch(Conn, ReqFun, Offset, MaxBytes) ->
%% we can only bump begin_offset with +1 and try again.
NewBeginOffset0 + 1
end,
fetch(Conn, ReqFun, NewBeginOffset, MaxBytes);
do_fetch(Conn, ReqFun, NewBeginOffset, MaxBytes);
false ->
{ok, {StableOffset, Msgs}}
{ok, {StableOffset, NewBeginOffset0, Msgs}}
end;
{error, Reason} ->
{error, Reason}
Expand Down Expand Up @@ -636,32 +672,34 @@ do_fold(Spawn, {Pid, Mref}, Offset, Acc, Fun, End, Count) ->

handle_fetch_rsp(_Spawn, {error, Reason}, Offset, Acc, _Fun, _, _) ->
?BROD_FOLD_RET(Acc, Offset, {fetch_failure, Reason});
handle_fetch_rsp(_Spawn, {ok, {StableOffset, []}}, Offset, Acc, _Fun,
handle_fetch_rsp(_Spawn, {ok, {StableOffset, _NextFetchOffset, []}}, Offset, Acc, _Fun,
_End, _Count) when Offset >= StableOffset ->
?BROD_FOLD_RET(Acc, Offset, reached_end_of_partition);
handle_fetch_rsp(Spawn, {ok, {_StableOffset, Msgs}}, Offset, Acc, Fun,
handle_fetch_rsp(Spawn, {ok, {_StableOffset, NextFetchOffset, Msgs}}, Offset, Acc, Fun,
End, Count) ->
#kafka_message{offset = LastOffset} = lists:last(Msgs),
%% start fetching the next batch if not stopping at current
Fetcher = case LastOffset < End andalso length(Msgs) < Count of
true -> Spawn(LastOffset + 1);
Fetcher = case NextFetchOffset =< End andalso length(Msgs) < Count of
true -> Spawn(NextFetchOffset);
false -> undefined
end,
do_acc(Spawn, Fetcher, Offset, Acc, Fun, Msgs, End, Count).
do_acc(Spawn, Fetcher, NextFetchOffset, Offset, Acc, Fun, Msgs, End, Count).

do_acc(_Spawn, Fetcher, Offset, Acc, _Fun, _, _End, 0) ->
do_acc(_Spawn, Fetcher, _NextFetchOffset, Offset, Acc, _Fun, _, _End, 0) ->
undefined = Fetcher, %% assert
?BROD_FOLD_RET(Acc, Offset, reached_message_count_limit);
do_acc(_Spawn, Fetcher, Offset, Acc, _Fun, _, End, _Count) when Offset > End ->
do_acc(_Spawn, Fetcher, _NextFetchOffset, Offset, Acc, _Fun, _, End, _Count) when Offset > End ->
undefined = Fetcher, %% assert
?BROD_FOLD_RET(Acc, Offset, reached_target_offset);
do_acc(Spawn, Fetcher, Offset, Acc, Fun, [], End, Count) ->
do_fold(Spawn, Fetcher, Offset, Acc, Fun, End, Count);
do_acc(Spawn, Fetcher, Offset, Acc, Fun, [Msg | Rest], End, Count) ->
do_acc(_Spawn, Fetcher, NextFetchOffset, _Offset, Acc, _Fun, [], End, _Count)
when NextFetchOffset > End ->
undefined = Fetcher, %% assert
?BROD_FOLD_RET(Acc, NextFetchOffset, reached_target_offset);
do_acc(Spawn, Fetcher, NextFetchOffset, _Offset, Acc, Fun, [], End, Count) ->
do_fold(Spawn, Fetcher, NextFetchOffset, Acc, Fun, End, Count);
do_acc(Spawn, Fetcher, NextFetchOffset, Offset, Acc, Fun, [Msg | Rest], End, Count) ->
try Fun(Msg, Acc) of
{ok, NewAcc} ->
NextOffset = Msg#kafka_message.offset + 1,
do_acc(Spawn, Fetcher, NextOffset, NewAcc, Fun, Rest, End, Count - 1);
do_acc(Spawn, Fetcher, NextFetchOffset, NextOffset, NewAcc, Fun, Rest, End, Count - 1);
{error, Reason} ->
ok = kill_fetcher(Fetcher),
?BROD_FOLD_RET(Acc, Offset, Reason)
Expand Down
21 changes: 21 additions & 0 deletions test/brod_consumer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
, t_fetch_aborted_from_the_middle/1
, t_direct_fetch/1
, t_fold/1
, t_fold_transactions/1
, t_direct_fetch_with_small_max_bytes/1
, t_direct_fetch_expand_max_bytes/1
, t_resolve_offset/1
Expand Down Expand Up @@ -422,6 +423,26 @@ t_fold(Config) when is_list(Config) ->
0, ErrorFoldF, #{})),
ok.

t_fold_transactions(kafka_version_match) ->
has_txn();
t_fold_transactions(Config) when is_list(Config) ->
Client = ?config(client),
Topic = ?TOPIC,
Partition = 0,
Batch = [#{value => <<"one">>}, #{value => <<"two">>}],
{ok, Tx} = brod:transaction(Client, <<"some_transaction">>, []),
{ok, Offset} = brod:txn_produce(Tx, ?TOPIC, Partition, Batch),
ok = brod:commit(Tx),
FoldF =
fun F(#kafka_message{value = V}, Acc) -> {ok, F(V, Acc)};
F(V, Acc) -> [V | Acc]
end,
FetchOpts = #{max_bytes => 1},
?assertMatch({Result, O, reached_end_of_partition}
when O =:= Offset + length(Batch) + 1 andalso length(Result) =:= 2,
brod:fold(Client, Topic, Partition, Offset, FetchOpts, [], FoldF, #{})),
ok.

%% This test case does not work with Kafka 0.9, not sure aobut 0.10 and 0.11
%% since all 0.x versions are old enough, we only try to verify this against
%% 1.x or newer
Expand Down

0 comments on commit 17162a3

Please sign in to comment.