Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need help with AsyncProducer and test #862

Closed
shinji62 opened this issue Apr 6, 2017 · 7 comments
Closed

Need help with AsyncProducer and test #862

shinji62 opened this issue Apr 6, 2017 · 7 comments

Comments

@shinji62
Copy link

shinji62 commented Apr 6, 2017

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version: Version 1.11.0 (2016-12-20)
Kafka Version: No kafka version only unit testing using Mock but sarama is fixed to "sarama.V0_10_1_0"
Go Version: 1.8

Configuration

sarama.V0_10_1_0

Logs

When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set sarama.Logger to a log.Logger to capture Sarama debug
output.

7:57:15 client.go:107: Initializing new client
17:57:15 config.go:326: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
17:57:15 config.go:326: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
17:57:15 client.go:588: client/metadata fetching metadata for all topics from broker 127.0.0.1:52222
17:57:15 mockbroker.go:181: *** mockbroker/2/0: connection opened
17:57:15 broker.go:146: Connected to broker at 127.0.0.1:52222 (unregistered)
17:57:15 mockbroker.go:216: *** mockbroker/2/0: served &{0 sarama 0xc4200f5400} -> &{[0xc4201aa000 0xc4201aa160] [0xc42015a540]}
17:57:15 client.go:384: client/brokers registered new broker #0 at 127.0.0.1:52220
17:57:15 client.go:384: client/brokers registered new broker #1 at 127.0.0.1:52221
17:57:15 client.go:151: Successfully initialized new client
17:57:15 kafka.go:131: [INFO] Start to watching producer error for re-partition
17:57:15 kafka.go:179: [INFO] Start to sub input (buffer for sarama input)
17:57:15 kafka.go:188: [INFO] Start loop to watch events
17:57:15 config.go:326: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
17:57:15 config.go:326: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
17:57:15 async_producer.go:585: producer/broker/1 starting up
17:57:15 async_producer.go:597: producer/broker/1 state change to [open] on LogMessage/1
17:57:15 async_producer.go:585: producer/broker/0 starting up
17:57:15 async_producer.go:597: producer/broker/0 state change to [open] on LogMessage/0
17:57:15 mockbroker.go:181: *** mockbroker/1/0: connection opened
17:57:15 mockbroker.go:181: *** mockbroker/0/0: connection opened
17:57:15 broker.go:144: Connected to broker at 127.0.0.1:52221 (registered as #1)
17:57:15 broker.go:144: Connected to broker at 127.0.0.1:52220 (registered as #0)
17:57:15 mockbroker.go:216: *** mockbroker/1/0: served &{0 sarama 0xc42022c300} -> &{map[LogMessage:map[1:0xc42015a660]] 0 0s}
17:57:15 mockbroker.go:216: *** mockbroker/0/0: served &{0 sarama 0xc4202304a0} -> &{map[LogMessage:map[0:0xc42015a5d0]] 0 0s}
17:57:15 async_producer.go:763: producer/broker/1 state change to [closing] because kafka: insufficient data to decode packet, more bytes expected
17:57:15 async_producer.go:763: producer/broker/0 state change to [closing] because kafka: insufficient data to decode packet, more bytes expected
17:57:15 broker.go:182: Closed connection to broker 127.0.0.1:52221
17:57:15 broker.go:182: Closed connection to broker 127.0.0.1:52220
17:57:15 async_producer.go:459: producer/leader/LogMessage/1 state change to [retrying-1]
17:57:15 async_producer.go:469: producer/leader/LogMessage/1 abandoning broker 1
17:57:15 async_producer.go:664: producer/broker/1 shut down
17:57:15 async_producer.go:459: producer/leader/LogMessage/0 state change to [retrying-1]
17:57:15 mockbroker.go:198: *** mockbroker/0/0: invalid request: err=EOF, (*sarama.request)(<nil>)
17:57:15 mockbroker.go:249: *** mockbroker/0/0: connection closed, err=<nil>
17:57:15 mockbroker.go:198: *** mockbroker/1/0: invalid request: err=EOF, (*sarama.request)(<nil>)
17:57:15 mockbroker.go:249: *** mockbroker/1/0: connection closed, err=<nil>
17:57:15 async_producer.go:469: producer/leader/LogMessage/0 abandoning broker 0
17:57:15 async_producer.go:664: producer/broker/0 shut down
17:57:15 client.go:586: client/metadata fetching metadata for [LogMessage] from broker 127.0.0.1:52222
17:57:15 client.go:586: client/metadata fetching metadata for [LogMessage] from broker 127.0.0.1:52222
17:57:16 mockbroker.go:213: *** mockbroker/2/0: ignored (*sarama.request)(0xc420110ab0)({
 correlationID: (int32) 1,
 clientID: (string) (len=6) "sarama",
 body: (*sarama.MetadataRequest)(0xc4200f5740)({
  Topics: ([]string) (len=1 cap=1) {
   (string) (len=10) "LogMessage"
  }
 })
})
17:57:16 mockbroker.go:213: *** mockbroker/2/0: ignored (*sarama.request)(0xc420110b70)({
 correlationID: (int32) 2,
 clientID: (string) (len=6) "sarama",
 body: (*sarama.MetadataRequest)(0xc4200f5960)({
  Topics: ([]string) (len=1 cap=1) {
   (string) (len=10) "LogMessage"
  }
 })
})
17:57:45 client.go:607: client/metadata got error from broker while fetching metadata: read tcp 127.0.0.1:52223->127.0.0.1:52222: i/o timeout
17:57:45 client.go:607: client/metadata got error from broker while fetching metadata: read tcp 127.0.0.1:52223->127.0.0.1:52222: i/o timeout
17:57:45 broker.go:182: Closed connection to broker 127.0.0.1:52222
17:57:45 config.go:326: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
17:57:45 client.go:586: client/metadata fetching metadata for [LogMessage] from broker 127.0.0.1:52220
17:57:45 client.go:406: client/brokers deregistered broker #-1 at 127.0.0.1:52222
17:57:45 client.go:586: client/metadata fetching metadata for [LogMessage] from broker 127.0.0.1:52220
17:57:45 mockbroker.go:198: *** mockbroker/2/0: invalid request: err=EOF, (*sarama.request)(<nil>)
17:57:45 mockbroker.go:249: *** mockbroker/2/0: connection closed, err=<nil>
17:57:45 broker.go:144: Connected to broker at 127.0.0.1:52220 (registered as #0)
17:57:45 mockbroker.go:181: *** mockbroker/0/1: connection opened
17:57:46 mockbroker.go:213: *** mockbroker/0/1: ignored (*sarama.request)(0xc4201cc270)({
 correlationID: (int32) 1,
 clientID: (string) (len=6) "sarama",
 body: (*sarama.MetadataRequest)(0xc42014d880)({
  Topics: ([]string) (len=1 cap=1) {
   (string) (len=10) "LogMessage"
  }
 })
})
17:57:46 mockbroker.go:213: *** mockbroker/0/1: ignored (*sarama.request)(0xc4201cc300)({
 correlationID: (int32) 2,
 clientID: (string) (len=6) "sarama",
 body: (*sarama.MetadataRequest)(0xc42014da00)({
  Topics: ([]string) (len=1 cap=1) {
   (string) (len=10) "LogMessage"
  }
 })
})
Problem Description

Since I update the library to use the gopkg one, my test are not working anymore.

My testing code is here https://github.com/shinji62/kafka-firehose-nozzle/blob/master/kafka_test.go#L127

My Async Declarion is here https://github.com/shinji62/kafka-firehose-nozzle/blob/master/kafka.go#L32

My test just timeout I never got producer.Errors() or producer.Successes()

And as you can see in the logs I got the error

17:57:15 async_producer.go:763: producer/broker/1 state change to [closing] because kafka: insufficient data to decode packet, more bytes expected
17:57:15 async_producer.go:763: producer/broker/0 state change to [closing] because kafka: insufficient data to decode packet, more bytes expected

No idea why I am getting this error, so if someone can help me with that.

Thanks

@shinji62
Copy link
Author

Still looking for help ..
thanks

@eapache
Copy link
Contributor

eapache commented Apr 24, 2017

Did you change versions at the same time, or just change from github to gopkg but maintained the same version?

My initial suspicion would be something related to #497, but I'm surprised it even builds in that case.

@shinji62
Copy link
Author

I change at the same time github +++> gopkg .

Only the test failed the program itself is working fine.

@shinji62
Copy link
Author

@eapache I change back everything to use github package but same error ..

5:17:15 client.go:110: Initializing new client
15:17:15 config.go:329: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
15:17:15 config.go:329: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
15:17:15 client.go:601: client/metadata fetching metadata for all topics from broker 127.0.0.1:53896
15:17:15 mockbroker.go:181: *** mockbroker/0/0: connection opened
15:17:15 broker.go:146: Connected to broker at 127.0.0.1:53896 (unregistered)
15:17:15 mockbroker.go:216: *** mockbroker/0/0: served &{0 sarama 0xc420160d60} -> &{[0xc42001fce0] [0xc420170420]}
15:17:15 client.go:397: client/brokers registered new broker #1 at 127.0.0.1:53895
15:17:15 client.go:154: Successfully initialized new client
[{0xc420160d60 0xc4201703f0}]
15:17:15 kafka.go:133: [INFO] Start to watching producer error for re-partition
15:17:15 kafka.go:179: [INFO] Start to sub input (buffer for sarama input)
15:17:15 kafka.go:191: [INFO] Start loop to watch events
15:17:15 config.go:329: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
15:17:15 async_producer.go:584: producer/broker/1 starting up
15:17:15 async_producer.go:596: producer/broker/1 state change to [open] on LogMessage/0
15:17:15 mockbroker.go:181: *** mockbroker/1/0: connection opened
15:17:15 broker.go:144: Connected to broker at 127.0.0.1:53895 (registered as #1)
15:17:15 mockbroker.go:216: *** mockbroker/1/0: served &{0 sarama 0xc420160ec0} -> &{map[LogMessage:map[0:0xc4201703c0]] 0 0s}
15:17:15 async_producer.go:762: producer/broker/1 state change to [closing] because kafka: insufficient data to decode packet, more bytes expected
15:17:15 broker.go:182: Closed connection to broker 127.0.0.1:53895
15:17:15 async_producer.go:458: producer/leader/LogMessage/0 state change to [retrying-1]
15:17:15 mockbroker.go:198: *** mockbroker/1/0: invalid request: err=EOF, (*sarama.request)(<nil>)
15:17:15 mockbroker.go:249: *** mockbroker/1/0: connection closed, err=<nil>
15:17:15 async_producer.go:468: producer/leader/LogMessage/0 abandoning broker 1
15:17:15 async_producer.go:663: producer/broker/1 shut down
15:17:15 client.go:599: client/metadata fetching metadata for [LogMessage] from broker 127.0.0.1:53896
15:17:15 mockbroker.go:213: *** mockbroker/0/0: ignored (*sarama.request)(0xc420152a80)({
 correlationID: (int32) 1,
 clientID: (string) (len=6) "sarama",
 body: (*sarama.MetadataRequest)(0xc42015a4e0)({
  Topics: ([]string) (len=1 cap=1) {
   (string) (len=10) "LogMessage"
  }
 })
})

@eapache
Copy link
Contributor

eapache commented Apr 25, 2017

Hmm, this is an interesting one:

  • you are setting producerConfig.Version = sarama.V0_10_1_0
  • this is causing the (real) producer to use V1 messages, and to expect V1 responses
  • the mockbroker isn't picking up on this and is sending V0 responses, which the producer doesn't understand

As an immediate workaround, you can set ProduceResponse.Version = 1 in your test method.

@shinji62
Copy link
Author

hi,

Still the same issue ..
I try your fix but still the same error, but I can see that the mock produce v1 response

11:38:18 mockbroker.go:216: *** mockbroker/2/0: served &{0 sarama 0xc42017a540} -> &{map[LogMessage:map[0:0xc420156390]] 1 0s}
11:38:18 async_producer.go:762: producer/broker/2 state change to [closing] because kafka: insufficient data to decode packet, more bytes expected

@eapache
Copy link
Contributor

eapache commented Apr 26, 2017

Hmm... without more information I really don't know what the issue could be. In the log line mockbroker/%d/%d: served... could you please add some more information about the request that is being responded to? Maybe it's the wrong kind of message entirely, or it's got an extra partition or something...

@eapache eapache closed this as completed Jul 27, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants