-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix folding over transactions #589
Conversation
Hi @indrekj
|
@zmstone I pushed the changes. I'm unsure if I understood everything, but if you want any other changes, please let me know. |
@@ -443,7 +460,7 @@ do_fetch_committed_offsets(Conn, GroupId, Topics) when is_pid(Conn) -> | |||
%% @doc Fetch a message-set. If the given MaxBytes is not enough to fetch a | |||
%% single message, expand it to fetch exactly one message | |||
-spec fetch(connection(), req_fun(), offset(), kpro:count()) -> | |||
{ok, {offset(), [brod:message()]}} | {error, any()}. | |||
{ok, {offset(), offset(), [brod:message()]}} | {error, any()}. | |||
fetch(Conn, ReqFun, Offset, MaxBytes) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean something like this:
fetch(Conn, ReqFun, Offset, MaxBytes, AllowEmptyBatch) ->
case do_fetch(Conn, ReqFun, Offset, MaxBytes) of
{ok, {StableOffset, NextOffset, []}} when not AllowEmptyBatch andalso NextOffset < StableOffset ->
fetch(Conn, ReqFun, NextOffset, MaxBytes, AllowEmptyBatch);
Other ->
Other
end.
fetch(Conn, ReqFun, Offset, MaxBytes) ->
case do_fetch(Conn, ReqFun, Offset, MaxBytes) of
{ok, {StableOffset, _NextOffset, Msgs}} ->
{ok, {StableOffset, Msgs}}; %% for backward compatibility
Other ->
Other
end.
so the caller will not have to deal with empty batches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, there already is brod_utils:fetch/5 that is also exported:
fetch(Hosts, Topic, Partition, Offset, Opts) when is_list(Hosts) ->
fetch({Hosts, ConnCfg}, Topic, Partition, Offset, Opts) ->
fetch(Client, Topic, Partition, Offset, Opts) when is_atom(Client) ->
fetch(Conn, Topic, Partition, Offset, Opts) ->
Under the hood, it uses make_fetch_fun, which uses fetch/4.
Is it okay to add a new fetch/5 with quite a different signature? I want to be sure, as I don't write erlang daily. Also, do you think AllowEmptyBatch should be replaced with a list of options instead of a boolean flag? I'm okay with doing it either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, sorry for missing that.
maybe fetch_one_batch
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zmstone I got distracted, but finally addressed your comments
15bcc4a
to
30874a9
Compare
The brod:fold/8 did not work correctly when the last message in the message set a transaction commit. The output looked like this: ``` %%% brod_consumer_SUITE ==> t_fold_transactions: FAILED %%% brod_consumer_SUITE ==> {function_clause, [{lists,last,[[]],[{file,"lists.erl"},{line,228}]}, {brod_utils,handle_fetch_rsp,7, [{file,"/home/indrek/gems/brod/src/brod_utils.erl"},{line,661}]}, {brod_consumer_SUITE,t_fold_transactions,1, [{file,"/home/indrek/gems/brod/test/brod_consumer_SUITE.erl"}, {line,443}]}, {test_server,ts_tc,3,[{file,"test_server.erl"},{line,1783}]}, {test_server,run_test_case_eval1,6, [{file,"test_server.erl"},{line,1292}]}, {test_server,run_test_case_eval,9, [{file,"test_server.erl"},{line,1224}]}]} ``` The issue was that `#kafka_message{offset = LastOffset} = lists:last(Msgs),` was used, when the Msgs list was empty. Now instead of trying to infer next offset from the kafka_message, we pass the NextFetchOffset from the brod_utils:fetch_one_batch/4 function. Fixes kafka4beam#588
thank you. published 4.1.0 |
The brod:fold/8 did not work correctly when the last message in the message set a transaction commit.
The output looked like this:
The issue was that
#kafka_message{offset = LastOffset} = lists:last(Msgs),
was used, when the Msgs list was empty.Now instead of trying to infer next offset from the kafka_message, we pass the NextFetchOffset from the brod_utils:fetch/4 function.
Fixes #588