Skip to content

Key Error: Single Broker #183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jshaw86 opened this issue Jul 18, 2014 · 7 comments
Closed

Key Error: Single Broker #183

jshaw86 opened this issue Jul 18, 2014 · 7 comments

Comments

@jshaw86
Copy link
Contributor

jshaw86 commented Jul 18, 2014

Hi, I think this is related to #113, #150, #174. I'm prototyping a kafka environment with a single broker and 3 node zookeeper cluster. I installed the master version since this was a known issue but I'm still getting a key error when producing messages with SimpleProducer.

This cluster just had the topic created with the command line tool and here is the describe:

bin/kafka-topics.sh --describe --topic test --zookeeper XXX.XXX.XXX.XXX:YYYY
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 1       Replicas: 1     Isr: 1

Error:

client WARNING No partitions for test
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/gevent/greenlet.py", line 327, in run
    result = self._run(*self.args, **self.kwargs)
  File "/opt/logparser_deploy/logparser/core.py", line 247, in _run
    response = self._kafka_producer.send_messages(self.TOPIC, json.dumps(data))
  File "/usr/local/lib/python2.7/dist-packages/kafka/producer.py", line 230, in send_messages
    partition = self._next_partition(topic)
  File "/usr/local/lib/python2.7/dist-packages/kafka/producer.py", line 219, in _next_partition
    self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
KeyError: 'test'

Connect Code:

from kafka.client import KafkaClient                                                                                                                                             
from kafka.producer import SimpleProducer
for i in range(0,self.KAFKA_CONNECT_RETRIES):
       try:
          self._kafka_client = KafkaClient(self._kafka_addr)
       except Exception as e:
          logging.error("KAFKA ERROR: %s %s" % (e,self._kafka_addr))
       else:
          # To send messages synchronously
          self._kafka_producer = SimpleProducer(self._kafka_client)
          return True

return False

Send Code:

self._kafka_producer.send_messages(self.TOPIC, json.dumps(data))
@dpkp
Copy link
Owner

dpkp commented Jul 18, 2014

if you have already created the topic via the kafka CLI (bin/kafka-topics.sh) then this shoudn't happen even without #174 . Otherwise we definitely need to merge #174 to address this issue.

@wizzat
Copy link
Collaborator

wizzat commented Jul 18, 2014

How long was it between when you created the topic and you ran the python script?

On Jul 18, 2014, at 10:25, jshaw86 notifications@github.com wrote:

Hi, I think this is related to #113, #150, #174. I'm prototyping a kafka environment with a single broker and 3 node zookeeper cluster. I installed the master version since this was a known issue but I'm still getting a key error when producing messages with SimpleProducer.

This cluster just had the topic created with the command line tool and here is the describe:

bin/kafka-topics.sh --describe --topic test --zookeeper XXX.XXX.XXX.XXX:YYYY
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Error:

client WARNING No partitions for test
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/gevent/greenlet.py", line 327, in run
result = self._run(_self.args, *_self.kwargs)
File "/opt/logparser_deploy/logparser/core.py", line 247, in _run
response = self._kafka_producer.send_messages(self.TOPIC, json.dumps(data))
File "/usr/local/lib/python2.7/dist-packages/kafka/producer.py", line 230, in send_messages
partition = self._next_partition(topic)
File "/usr/local/lib/python2.7/dist-packages/kafka/producer.py", line 219, in _next_partition
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
KeyError: 'test'
Connect Code:

from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
for i in range(0,self.KAFKA_CONNECT_RETRIES):
try:
self._kafka_client = KafkaClient(self._kafka_addr)
except Exception as e:
logging.error("KAFKA ERROR: %s %s" % (e,self._kafka_addr))
else:
# To send messages synchronously
self._kafka_producer = SimpleProducer(self._kafka_client)
return True

return False
Send Code:

self._kafka_producer.send_messages(self.TOPIC, json.dumps(data))

Reply to this email directly or view it on GitHub.

@jshaw86
Copy link
Contributor Author

jshaw86 commented Jul 18, 2014

Hi Guys, thanks for the quick response:
I did pre create the topic with this command:

bin/kafka-topics.sh --zookeeper XXX.XXX.XXX.XXX:YYYY --partition 1 --replication-factor=1 --create --topic test

@wizzat it was a couple minutes initially and I've restart the script over the past 24 hours a few times to test different things

@wizzat
Copy link
Collaborator

wizzat commented Jul 18, 2014

Actually, I'm looking at your stack trace and I'm seeing gevent in place. Does the error happen without gevent? I remember that we had a gevent pull request that the author never could get to pass the test suite.

@jshaw86
Copy link
Contributor Author

jshaw86 commented Jul 18, 2014

@wizzat I did it in REPL and was able to replicate it:

$ python
Python 2.7.3 (default, Feb 27 2014, 19:58:35)
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from kafka.client import KafkaClient
>>> from kafka.producer import SimpleProducer
>>> kafka_client = KafkaClient("XXX.XXX.XXX.XXX:YYYY")
>>> kafka_producer = SimpleProducer(kafka_client)
>>> kafka_producer.send_messages("test", "HI")
No handlers could be found for logger "kafka"
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/dist-packages/kafka/producer.py", line 230, in send_messages
    partition = self._next_partition(topic)
  File "/usr/local/lib/python2.7/dist-packages/kafka/producer.py", line 219, in _next_partition
    self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
KeyError: 'test'

I also put that ensure_topic_exists function in from #174 and it fixed the exception but I continuously get client WARNING No partitions for test

I want too mention to that this was working in a vagrant environment on a local VM setup with a single broker and single zookeeper but I don't understand why the topology change would affect the code.

EDIT: Adding debug logging around the load_metadata function

conn DEBUG About to send 36 bytes to Kafka, request 197
conn DEBUG Reading response 197 from Kafka
conn DEBUG About to read 4 bytes from Kafka
conn DEBUG Read 4/4 bytes from Kafka
conn DEBUG About to read 24 bytes from Kafka
conn DEBUG Read 24/24 bytes from Kafka
client DEBUG Broker metadata: {}
client DEBUG Topic metadata: {'test': {}}
client WARNING No partitions for test

It looks like the client is getting an empty dictionary for partitions even though the kafka-topics.sh describe command says they exist?

@wizzat
Copy link
Collaborator

wizzat commented Jul 18, 2014

Are you absolutely sure you're connecting to the same Zookeeper that your Kafka cluster is connecting to?

@jshaw86
Copy link
Contributor Author

jshaw86 commented Jul 18, 2014

@wizzat I doubled checked the ips and everything matches however I just tried running the Java producer that ships with kafka and got some errors:

bin/kafka-console-producer.sh --broker-list XXX.XXX.XXX.XXX:YYYY --topic test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
test
[2014-07-18 15:05:59,670] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)

So the python clients implementation is in line with the Java one so I'll close and figure out why I'm receiving this LeaderNotAvailableException on the cluster.

@jshaw86 jshaw86 closed this as completed Jul 18, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants