Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No reconnect after "payload connection down :shutdown, :tcp_closed}" #520

Closed
anoskov opened this issue Jul 25, 2022 · 9 comments
Closed

Comments

@anoskov
Copy link

anoskov commented Jul 25, 2022

Hello!
We have some issue with brod in kubernetes. When the service starts all consumers join to group and works fine. But after some time we got

2022-07-24 16:05:04.207 pid=<0.7117.24> [info] Group member (admin,coor=#PID<0.7117.24>,cb=#PID<0.20534.22>,generation=172):
re-joining group, reason:{:connection_down, {:shutdown, :tcp_closed}}
2022-07-24 16:05:04.207 pid=<0.7119.24> [info] client Client: payload connection down kafka-host:9092
reason:{:shutdown, :tcp_closed}

and after this brod doesn't reconnect and kafka-consumer-groups.sh says that consumer group has no active members.

We have been using brod quite a long time and usually he did reconnect after network issues. Now we use broadway_kafka built on brod.

What could be wrong?

@anoskov
Copy link
Author

anoskov commented Jul 25, 2022

I noticed that disconnect occurs 10 minutes after connection. This value is simillar to connections.max.idle.ms and looks like kafka disonnects idle connections. I know that some kafka clients supports reconnect on idle feature. Does the brod support it?

UPD.
brod_client process is alive and :brod_client.metadata/2 and :brod.fetch/4 returns data. But in state we see dead_since

iex(admin@35> :sys.get_state(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_0.Client))
{:state, Admin.Kafka.Consumer.Broadway.Producer_0.Client, [{"kafka", 9092}],
 #PID<0.31736.27>,
 [
   {:conn, {"kafka-0", 9092},
    {:dead_since, {1658, 257147, 463027}, {:shutdown, :tcp_closed}}}
 ], #PID<0.5428.4>, #PID<0.5430.4>, [connect_timeout: 10000],
 Admin.Kafka.Consumer.Broadway.Producer_0.Client}

UPD2:

I checked Producer state and got timeout on :sys.get_state because it stuck in handle_info callback on BrodClient.stop_group_coordinator -> :brod_group_coordinator.stop

iex(admin@)13> :erlang.process_info(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_1))
[
  registered_name: Admin.Kafka.Consumer.Broadway.Producer_1,
  current_function: {:brod_group_coordinator, :stop, 1},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 4,
  links: [#PID<0.3880.0>],
  dictionary: [
    {63, []},
    {62, []},
    {61, []},
    {60, []},
    {59, []},
    {58, []},
    {57, []},
    {56, []},
    {55, []},
    {54, []},
    {53, []},
    {52, []},
    {51, []},
    {50, []},
    {49, []},
    {:"$initial_call", {GenStage, :init, 1}},
    {48, []},
    {:"$ancestors",
     [Admin.Kafka.Consumer.Broadway.ProducerSupervisor,
      Admin.Kafka.Consumer.Broadway.Supervisor, Admin.Kafka.Consumer,
      Admin.Supervisor, #PID<0.3569.0>]},
    {47, []},
    {46, []},
    {45, []},
    {44, []},
    {43, []},
    {42, []},
    {41, []},
    {40, []},
    {39, []},
    {38, []},
    {37, []},
    {36, []},
    {35, []},
    {34, []},
    {33, []},
    {32, []},
    {31, []},
    {30, []},
    {29, []},
    {28, []},
    {27, []},
    {26, []},
    {25, []},
    {24, ...},
    {...},
    ...
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.3568.0>,
  total_heap_size: 20338,
  heap_size: 2586,
  stack_size: 29,
  reductions: 20656517,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 65535,
    minor_gcs: 2858
  ],
  suspending: []
]

Any suggestions why :brod_group_coordinator.stop is blocked?

@zmstone
Copy link
Contributor

zmstone commented Aug 20, 2022

the stop call waits for the pid's DOWN message before returns.
my guess is that the fetched batch is too large, causing the workers to be blocked for too long and not reacting to the assignment revocation, hence the coordinator is unable react to group rejoin events, so Kafka would just mark it as inactive.

maybe try these:

  • Fetch smaller batches
  • increase group rebalance timeout

@anoskov
Copy link
Author

anoskov commented Aug 20, 2022

@zmstone thanks for reply.
It happens only if consumer not receiving messages for 10 minutes+ (connections.max.idle.ms). So at this point it doesn't process batches. The connection is closed permanently and does not recover even after a few hours until we restart the pod.

@anoskov
Copy link
Author

anoskov commented Aug 22, 2022

@zmstone hi!
I found out that brod_group_coordinator stucks on some call and therefore cannot process terminate message.
here is coordinator info:

[
  current_function: {:gen, :do_call, 4},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 2,
  links: [],
  dictionary: [
    rand_seed: {%{
       bits: 58,
       jump: #Function<3.92093067/1 in :rand."-fun.exsplus_jump/1-">,
       next: #Function<0.92093067/1 in :rand."-fun.exsss_next/1-">,
       type: :exsss,
       uniform: #Function<1.92093067/1 in :rand."-fun.exsss_uniform/1-">,
       uniform_n: #Function<2.92093067/2 in :rand."-fun.exsss_uniform/2-">
     }, [76455265455526698 | 144843847514796377]},
    "$initial_call": {:brod_group_coordinator, :init, 1},
    "$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_8,
     Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
     Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
     Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4609.0>]
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4608.0>,
  total_heap_size: 2589,
  heap_size: 1598,
  stack_size: 48,
  reductions: 780336,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 5
  ],
  suspending: []
]

@robsonpeixoto
Copy link
Contributor

robsonpeixoto commented Aug 25, 2022

I'm have a similar problem, but now with the producers connections.

Always, the first message that I try to send I got the error: {:connection_down, {:shutdown, :ssl_closed}}

Any tip how to solve it?

Here the config:

  kafka_hosts = parse_kafka_hosts.()
  config :brod,
    clients: [
      kafka_producer: [
        endpoints: kafka_hosts,
        restart_delay_seconds: 10,
        auto_start_producers: true,
        allow_topic_auto_creation: false,
        default_producer_config: [
          retry_backoff_ms: 50,
          max_retries: 3,
          required_acks: -1,
          ack_timeout: 300,
          max_linger_ms: 10,
          max_linger_count: 5
        ]
        client_id: "producer-#{System.fetch_env!("POD_NAME")}",
        endpoints: kafka_hosts,
        ssl: [
          verify: :verify_peer,
          cacertfile: "/etc/ssl/certs/ca-certificates.crt",
          depth: 3,
          customize_hostname_check: [
            match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
          ]
        ],
        sasl: {
          "KAFKA_sasl_mechanisms" |> System.fetch_env!() |> String.downcase() |> String.to_atom(),
          System.fetch_env!("KAFKA_sasl_username"),
          System.fetch_env!("KAFKA_sasl_password")
        }
      ]
    ]

Thanks =D

@zmstone
Copy link
Contributor

zmstone commented Aug 26, 2022

Hi @anoskov
My guess is the assignment revocation callback.
You can maybe try to get the current stacktrace of the coordinator process ?
in iex, it's Process.info(pid, :current_stacktrace)

@zmstone
Copy link
Contributor

zmstone commented Aug 26, 2022

Hi @robsonpeixoto
Let's move the discussion to the other issue I created for you.

@anoskov
Copy link
Author

anoskov commented Aug 30, 2022

@zmstone Hello!
You're right

iex(messaging@10.194.133.121)5> Process.info pid("0.3283.44"), :current_stacktrace
{:current_stacktrace,
 [
   {:gen, :do_call, 4, [file: 'gen.erl', line: 214]},
   {GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1027]},
   {BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 525]},
   {:telemetry, :span, 3,
    [file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 320]},
   {:brod_group_coordinator, :stabilize, 3,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 502
    ]},
   {:brod_group_coordinator, :handle_info, 2,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 376
    ]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 695]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 771]}
 ]}

@anoskov
Copy link
Author

anoskov commented Aug 30, 2022

Finally got the pieces together. This is deadlock on race between a) assigments_revoked call inside brod_group_coordinator which do infinity drain_after_revoke call to Producer in Broadway implementation, and b) handle 'DOWN' message in Producer which call :brod_group_coordinator.stop who is waiting result of a)

I think it is a BroadwayKafka issue. I close this issue. Thanks for help!

@anoskov anoskov closed this as completed Aug 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants