Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

panic: runtime error: send on closed channel #151

Closed
wkuranowski opened this issue Sep 23, 2014 · 10 comments
Closed

panic: runtime error: send on closed channel #151

wkuranowski opened this issue Sep 23, 2014 · 10 comments
Labels

Comments

@wkuranowski
Copy link

Hi

I get panic: runtime error: send on closed channel when I kill a broker in a Kafka cluster.

It looks like backPressureThreshold is exceeded and you want to flush messages but something bad is happening.

I also wonder why I get kafka: Dropped 13803 messages... What is the reason? I know that one of the brokers is dead, but what about new Leaders for partitions?

I use an asynchronous producer. My configuration:

producerConfig := sarama.NewProducerConfig()
producerConfig.RequiredAcks = sarama.NoResponse
producerConfig.MaxBufferedBytes = 1000000
producerConfig.BackPressureThresholdBytes = 10000000
producerConfig.MaxBufferTime = 30 * time.Second
producerConfig.Compression = sarama.CompressionSnappy

Logs:

[Sarama] 2014/09/23 18:19:24 Initializing new client
[Sarama] 2014/09/23 18:19:24 Fetching metadata from broker localhost:9092
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9092
[Sarama] 2014/09/23 18:19:24 Registered new broker #5 at localhost:9096
[Sarama] 2014/09/23 18:19:24 Registered new broker #1 at localhost:9092
[Sarama] 2014/09/23 18:19:24 Registered new broker #2 at localhost:9093
[Sarama] 2014/09/23 18:19:24 Registered new broker #3 at localhost:9094
[Sarama] 2014/09/23 18:19:24 Registered new broker #4 at localhost:9095
[Sarama] 2014/09/23 18:19:24 Successfully initialized new client
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9096
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9095
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9094
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9093
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 Fetching metadata from broker localhost:9093
[Sarama] 2014/09/23 18:21:56 Closed connection to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 Registered new broker #1 at localhost:9092
[Sarama] 2014/09/23 18:21:56 Failed to connect to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 kafka: Dropped 13803 messages: dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:56 Fetching metadata from broker localhost:9096
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 kafka: Dropped 4406 messages: dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:56 Registered new broker #1 at localhost:9092
[Sarama] 2014/09/23 18:21:56 Failed to connect to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:58 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:58 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:58 kafka: broker: not connected
panic: runtime error: send on closed channel

Panic:

goroutine 35 [running]:
runtime.panic(0x57c3c0, 0x690fbe)
/usr/lib/go/src/pkg/runtime/panic.c:279 +0xf5
github.com/Shopify/sarama.(_brokerProducer).flushIfOverCapacity(0xc20804e680, 0x989680000f4240)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:290 +0xa0
github.com/Shopify/sarama.(_brokerProducer).addMessage(0xc20804e680, 0xc214a13e00, 0x989680000f4240)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:280 +0x2ae
github.com/Shopify/sarama.(_Producer).addMessage(0xc208044120, 0xc214a13e00, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:190 +0xad
github.com/Shopify/sarama.(_produceMessage).enqueue(0xc214a13e00, 0xc208044120, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/produce_message.go:22 +0x61
github.com/Shopify/sarama.(_Producer).genericSendMessage(0xc208044120, 0x5aa3d0, 0x9, 0x7f1e203739d0, 0xc208bd4b60, 0x7f1e203739d0, 0xc208bd4b80, 0x7f1e201cdf00, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:182 +0x257
github.com/Shopify/sarama.(_Producer).QueueMessage(0xc208044120, 0x5aa3d0, 0x9, 0x7f1e203739d0, 0xc208bd4b60, 0x7f1e203739d0, 0xc208bd4b80, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:136 +0x8b
main.worker(0xc208082000, 0xc208040020, 0xc208044120)
/home/noxis/IdeaProjects/kafka-log-producer/src/kafka-log-producer.go:112 +0x263
created by main.main
/home/noxis/IdeaProjects/kafka-log-producer/src/kafka-log-producer.go:43 +0x120

@eapache
Copy link
Contributor

eapache commented Sep 23, 2014

I don't know why your cluster is not electing a new leader for that partition; as you can see from the logs, once the broker goes down we do fetch updated metadata (twice!) which should include the new leader. Perhaps it's just a timing issue (the new leader is not elected yet, so we have to drop messages while we wait to avoid overflowing)? I'm not sure.

The panic is due to a subtle race condition; if the brokerProducer shuts itself down because it loses its connection to the broker, it immediately closes all its channels even though there may be messages "on their way" to being added to it. The shutdown logic of the brokerProducer should wait for all messages to make their way to the queue before doing its final flush, but I'm not sure how to make it do that in a non-racey fashion. @burke you wrote this code, any ideas?

@eapache
Copy link
Contributor

eapache commented Sep 23, 2014

FWIW, the alternate producer design I played with earlier (#132) shouldn't have this bug; I'm thinking I should pursue that a little more seriously now.

@wvanbergen
Copy link
Contributor

+1, I think we should adopt the new design, after doing some distributed
error simulations

On Tue, Sep 23, 2014 at 1:49 PM, Evan Huus notifications@github.com wrote:

FWIW, the alternate producer design I played with earlier (#132
#132) shouldn't have this bug;
I'm thinking I should pursue that a little more seriously now.


Reply to this email directly or view it on GitHub
#151 (comment).

@wkuranowski
Copy link
Author

@eapache Does #132 solves also #150?

@eapache
Copy link
Contributor

eapache commented Sep 23, 2014

Not at the moment; I think that's a relatively easy fix on either design, I just haven't found the time to write it, sorry :/

@wkuranowski
Copy link
Author

"Close issue" is too close to "Comment" button :)

@eapache If you are ready with #132, let me know, I would love to test this producer.

@eapache
Copy link
Contributor

eapache commented Sep 23, 2014

I think it's "ready" in the sense that all the tests pass, and it should work in normal cases. The open questions about it are:

  • Does it perform well (worse or better than the current design)? It adds a lot of goroutines, so it's not clear to me it will be quite as fast, but I have no idea.
  • Does it correctly handle all the possible error cases?
  • Do other people agree with the design generally?

I definitely wouldn't use it for production right now, but if you want to load it locally to try it out (I assume you have some sort of local harness based on all the "localhost" addresses in your logs) then it should work. The API is slightly different, but not excessively so.

@wkuranowski
Copy link
Author

@eapache I try to understand why current producer drops messages when broker dies... Is it because brokerProducer shuts itself down and has no idea what to do with queued messages for that broker?

In large clusters with a lot of partitions, election can take some time. I wonder how to not lose those messages and wait for new leaders... Is #132 better in handling that case?

@eapache
Copy link
Contributor

eapache commented Sep 24, 2014

Both producers can wait for leader election if so configured (that's in the client config, see MetadataRetries and WaitForElection). The problem is that in this situation, the cluster doesn't even realize the broker is down yet, so we re-query the leader before the election even starts, and get the same broker back.

The default value for zookeeper.session.timeout.ms is 6 seconds, so I think under default configuration it takes 6 seconds for leader election to even start... before that any metadata request will just keep returning the old, dead broker.

eapache added a commit that referenced this issue Sep 24, 2014
This might help with part of #151 by triggering the WaitForElection timeout if
we're super-eager and request metadata before the cluster even realizes the
broker is gone.
@eapache
Copy link
Contributor

eapache commented Nov 15, 2014

The new producer design has been merged, so this should be fixed.

@eapache eapache closed this as completed Nov 15, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants