Skip to content

Handle New Topic Creation #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions kafka/client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import collections
import copy
import functools
import itertools
import logging
import collections

import time
import kafka.common

from functools import partial
from itertools import count
from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError,
Expand All @@ -21,7 +21,7 @@
class KafkaClient(object):

CLIENT_ID = "kafka-python"
ID_GEN = count()
ID_GEN = itertools.count()

# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
Expand Down Expand Up @@ -213,6 +213,16 @@ def reset_all_metadata(self):
def has_metadata_for_topic(self, topic):
return topic in self.topic_partitions

def ensure_topic_exists(self, topic, timeout = 30):
start_time = time.time()

self.load_metadata_for_topics(topic)
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError("Unable to create topic {}".format(topic))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is KafkaTimeoutError defined?

self.load_metadata_for_topics(topic)
time.sleep(.5)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sleep time should probably also be a param that users can override

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great idea. I'll get back to this based on the style review in the other conversation as soon as the world isn't literally blowing up at work.


def close(self):
for conn in self.conns.values():
conn.close()
Expand Down Expand Up @@ -289,7 +299,7 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
order of input payloads
"""

encoder = partial(
encoder = functools.partial(
KafkaProtocol.encode_produce_request,
acks=acks,
timeout=timeout)
Expand Down Expand Up @@ -321,7 +331,7 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,
to the same brokers.
"""

encoder = partial(KafkaProtocol.encode_fetch_request,
encoder = functools.partial(KafkaProtocol.encode_fetch_request,
max_wait_time=max_wait_time,
min_bytes=min_bytes)

Expand Down Expand Up @@ -359,7 +369,7 @@ def send_offset_request(self, payloads=[], fail_on_error=True,

def send_offset_commit_request(self, group, payloads=[],
fail_on_error=True, callback=None):
encoder = partial(KafkaProtocol.encode_offset_commit_request,
encoder = functools.partial(KafkaProtocol.encode_offset_commit_request,
group=group)
decoder = KafkaProtocol.decode_offset_commit_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
Expand All @@ -378,7 +388,7 @@ def send_offset_commit_request(self, group, payloads=[],
def send_offset_fetch_request(self, group, payloads=[],
fail_on_error=True, callback=None):

encoder = partial(KafkaProtocol.encode_offset_fetch_request,
encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
group=group)
decoder = KafkaProtocol.decode_offset_fetch_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
Expand Down
7 changes: 5 additions & 2 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from multiprocessing import Queue, Process

from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError
ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
)
from kafka.partitioner import HashedPartitioner
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
Expand Down Expand Up @@ -216,7 +216,10 @@ def _next_partition(self, topic):
if topic not in self.partition_cycles:
if topic not in self.client.topic_partitions:
self.client.load_metadata_for_topics(topic)
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
try:
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
except KeyError:
raise UnknownTopicOrPartitionError(topic)

# Randomize the initial partition that is returned
if self.random_start:
Expand Down
9 changes: 9 additions & 0 deletions test/test_producer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ def test_simple_producer(self):

producer.stop()

@kafka_versions("all")
def test_produce__new_topic_fails_with_reasonable_error(self):
new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4()))
producer = SimpleProducer(self.client)

# At first it doesn't exist
with self.assertRaises(UnknownTopicOrPartitionError):
resp = producer.send_messages(new_topic, self.msg("one"))

@kafka_versions("all")
def test_producer_random_order(self):
producer = SimpleProducer(self.client, random_start = True)
Expand Down
13 changes: 1 addition & 12 deletions test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

__all__ = [
'random_string',
'ensure_topic_creation',
'get_open_port',
'kafka_versions',
'KafkaIntegrationTestCase',
Expand All @@ -39,16 +38,6 @@ def wrapper(self):
return wrapper
return kafka_versions

def ensure_topic_creation(client, topic_name, timeout = 30):
start_time = time.time()

client.load_metadata_for_topics(topic_name)
while not client.has_metadata_for_topic(topic_name):
if time.time() > start_time + timeout:
raise Exception("Unable to create topic %s" % topic_name)
client.load_metadata_for_topics(topic_name)
time.sleep(1)

def get_open_port():
sock = socket.socket()
sock.bind(("", 0))
Expand All @@ -71,7 +60,7 @@ def setUp(self):
if self.create_client:
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))

ensure_topic_creation(self.client, self.topic)
self.client.ensure_topic_exists(self.topic)

self._messages = {}

Expand Down