Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misleading handling of begin offset for Consumer groups #567

Open
gdwoolbert3 opened this issue Sep 26, 2023 · 0 comments
Open

Misleading handling of begin offset for Consumer groups #567

gdwoolbert3 opened this issue Sep 26, 2023 · 0 comments

Comments

@gdwoolbert3
Copy link

Hello brod maintainers! I noticed a slight inconsistency/oversight in how the begin offset is resolved for consumer groups.

The docs and type spec would lead me to believe that the begin_offset in the consumer config should be treated as a :brod.offset_time(). The atom values (:earliest and :latest) seem to be handled correctly. However, if you try to use a :brod.msg_ts() type value, it is handled as a resolved offset, not a timestamp.

Obviously, not using :earliest or :latest is a niche use case but, in that case, handling it as a timestamp seems to be the desired behavior.

This is fairly easy to recreate, using the following child_spec. Everything behaves as expected in this case but, if you attempt to use a timestamp for the begin_offset field of the consumer config, you'll notice it's treated as an offset and NOT a timestamp:

def child_spec(_arg) do
  config = %{
    client: :kafka_client,
    group_id: "consumer_group",
    topics: ["test_topic_1"],
    cb_module: __MODULE__,
    consumer_config: [begin_offset: :earliest],
    init_data: [foo: "bar"],
    message_type: :message_set,
    group_config: [
      offset_commit_policy: :consumer_managed,
      offset_commit_interval_seconds: 5,
      rejoin_delay_seconds: 60,
      reconnect_cool_down_seconds: 60
    ]
  }

  %{
    id: __MODULE__,
    start: {:brod_group_subscriber_v2, :start_link, [config]},
    type: :worker,
    restart: :temporary,
    shutdown: 5000
  }
end

Not a super big deal but it would be nice to have this functionality (or at least update the spec to reduce future confusion)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant