Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

brod_topic_subscriber throws away #kafka_fetch_error{} #441

Open
dszoboszlay opened this issue Mar 4, 2021 · 4 comments
Open

brod_topic_subscriber throws away #kafka_fetch_error{} #441

dszoboszlay opened this issue Mar 4, 2021 · 4 comments

Comments

@dszoboszlay
Copy link
Contributor

I encountered an issue where a group subscriber stopped making any progress. These are the logs from the incident, before it went silent:

2021-03-02T10:10:11.139033+00:00 notice:
Group member (mysubscribergroup,coor=<0.27477.6987>,cb=<0.8782.6987>,generation=141):
assignments received:
  mytopic:
    partition=0 begin_offset=undefined
    partition=1 begin_offset=undefined
    partition=2 begin_offset=undefined
    partition=3 begin_offset=undefined
    partition=4 begin_offset=undefined
    partition=5 begin_offset=undefined

2021-03-02T10:10:11.139676+00:00 notice:
Starting group_subscriber_worker: #{commit_fun =>
                                        #Fun<brod_group_subscriber_v2.3.129769404>,
                                    group_id => <<"mysubscribergroup">>,
                                    partition => 1,
                                    topic =>
                                        <<"mytopic">>}
Offset: undefined
Pid: 'pid.group_subscriber_worker.1'

2021-03-02T10:10:11.139740+00:00 notice:
    supervisor: {<0.27084.6987>,brod_consumers_sup}
    started: [{pid,'pid.brod_consumers_sup.mytopic'},
              {id,<<"mytopic">>},
              {mfargs,
                  {supervisor3,start_link,
                      [brod_consumers_sup,
                       {brod_consumers_sup2,'pid.brod_client.kflow_default_client',
                           <<"mytopic">>,
                           [{begin_offset,earliest}]}]}},
              {restart_type,{permanent,10}},
              {shutdown,infinity},
              {child_type,supervisor}]

2021-03-02T10:10:11.140214+00:00 notice:
    supervisor: {'pid.brod_consumers_sup.mytopic',brod_consumers_sup}
    started: [{pid,'pid.brod_consumer.1'},
              {id,1},
              {mfargs,{brod_consumer,start_link,
                                     ['pid.brod_client.kflow_default_client',
                                      <<"mytopic">>,1,
                                      [{begin_offset,earliest}]]}},
              {restart_type,{transient,2}},
              {shutdown,5000},
              {child_type,worker}]

2021-03-02T10:12:07.121191+00:00 notice:
brod_consumer 'pid.brod_consumer.1' consumer is suspended, waiting for subscriber 'pid.group_subscriber_worker.1' to resubscribe with new begin_offset

I replaced most of the pids with 'pid.module.id' atoms to make the logs more readable.

My understanding is that the last message is coming from here:

brod/src/brod_consumer.erl

Lines 609 to 617 in 1c9144b

handle_reset_offset(#state{ subscriber = Subscriber
, offset_reset_policy = reset_by_subscriber
} = State, Error) ->
ok = cast_to_subscriber(Subscriber, Error),
%% Suspend, no more fetch request until the subscriber re-subscribes
?BROD_LOG_INFO("~p ~p consumer is suspended, "
"waiting for subscriber ~p to resubscribe with "
"new begin_offset", [?MODULE, self(), Subscriber]),
{noreply, State#state{is_suspended = true}};

The brod_consumer sent a {self(), #kafka_fetch_error{}} message to the brod_topic_subscriber, but the latter only expects #kafka_message_set{}-s here:

handle_info({_ConsumerPid, #kafka_message_set{} = MsgSet}, State0) ->
State = handle_consumer_delivery(MsgSet, State0),
{noreply, State};

So the error is thrown away and thus the whole subscriber stops progressing. This looks like a problem to me, but to be honest, I don't know how to handle these errors. Maybe just log the error and crash?

By the way, I don't understand how the consumer got that error either, because it looks like it should have started from the earliest offset - which then turns out not to exist? But this happened after some big Kafka hiccup, when the previous 40 minutes was spent receiving coordinator load in progress responses from Kafka and nothing else, so I'm not particularly surprised.

@dszoboszlay dszoboszlay changed the title brod_topic_subscriber throws away #kafka_fetch_error{} from brod_topic_subscriber throws away #kafka_fetch_error{} Mar 4, 2021
@SerikDM
Copy link
Contributor

SerikDM commented Sep 8, 2022

I've faced same problem. Starting brod_group_subscriber_v2, populating topic(with 3 partitions), every few runs one of partitions freezes without any output. After some debugging found out reason is mentioned above kafka_fetch_error with offset_out_of_range reason. brod_subscriber sends this error message, but brod_topic_subscriber ignores it.
Adding handler to brod_topic_subscriber with error("msg") helps to restart subscribers. I don't think it's a proper solution, but at least subscribers can progress consuming messages.
imo ideal solution would be if consumer would use get_committed_offset callback and resubscribe.

@zmstone
Copy link
Contributor

zmstone commented Sep 9, 2022

Have to admit it's not nice neither brod_group_subscriber nor brod_topic_subscriber handles the said fetch error (not even a log at a proper severity-level).
This was a 'miss-out', If I remember correctly, we were using per-partition group subscriber implemented outside of brod, and the handling of such error is to raise a critical level alert or log to get OPs' attention, because there should be absolutely NO gap in that data stream.

Maybe the least we can do is to handle the error message in handle_info and crash in brod_group_subscriber and brod_topic_subscriber.

===================

If you want the offset to be reset in case of offset_out_of_rage fetch error, you can provide the offset_reset_policy config.
The default offset_reset_policy was set to reset_by_subscriber to be on a very defensive side.

  • Setting to earliest would imply a huge amount of data reply.
  • Setting to latest would imply a gap in the processed stream.
    IMO, the safest approach is to wait for a manual reset --- the reason behind the default value.

@SerikDM
Copy link
Contributor

SerikDM commented Sep 9, 2022

Agree auto reset to earliest or latest is not a good option. Logging and crashing sounds good, that should bring enough attention to figure out what's the real issue of data gap.
upd: can confirm in my case the issue is a data gap. consumer is starting with some non zero offset. populating topic before starting consumer works fine. while populating topic 1 second after starting consumer causes consumer to freeze without progressing further even after topic was populated up to expected offset.

@zmstone
Copy link
Contributor

zmstone commented Sep 10, 2022

Hi @SerikDM
No surprise that it did not resume even after the out-of-range offset is in-rage after a while because the subscriber process is expecting a re-subscribe from the subscriber process.

I'll try to send a PR to add a shutdown.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants