diff --git a/kafka/client.py b/kafka/client.py index d0e07d072..9cb4b48d8 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,11 +1,11 @@ +import collections import copy +import functools +import itertools import logging -import collections - +import time import kafka.common -from functools import partial -from itertools import count from kafka.common import (TopicAndPartition, ConnectionError, FailedPayloadsError, PartitionUnavailableError, @@ -21,7 +21,7 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" - ID_GEN = count() + ID_GEN = itertools.count() # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a @@ -213,6 +213,16 @@ def reset_all_metadata(self): def has_metadata_for_topic(self, topic): return topic in self.topic_partitions + def ensure_topic_exists(self, topic, timeout = 30): + start_time = time.time() + + self.load_metadata_for_topics(topic) + while not self.has_metadata_for_topic(topic): + if time.time() > start_time + timeout: + raise KafkaTimeoutError("Unable to create topic {}".format(topic)) + self.load_metadata_for_topics(topic) + time.sleep(.5) + def close(self): for conn in self.conns.values(): conn.close() @@ -289,7 +299,7 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, order of input payloads """ - encoder = partial( + encoder = functools.partial( KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout) @@ -321,7 +331,7 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, to the same brokers. """ - encoder = partial(KafkaProtocol.encode_fetch_request, + encoder = functools.partial(KafkaProtocol.encode_fetch_request, max_wait_time=max_wait_time, min_bytes=min_bytes) @@ -359,7 +369,7 @@ def send_offset_request(self, payloads=[], fail_on_error=True, def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): - encoder = partial(KafkaProtocol.encode_offset_commit_request, + encoder = functools.partial(KafkaProtocol.encode_offset_commit_request, group=group) decoder = KafkaProtocol.decode_offset_commit_response resps = self._send_broker_aware_request(payloads, encoder, decoder) @@ -378,7 +388,7 @@ def send_offset_commit_request(self, group, payloads=[], def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): - encoder = partial(KafkaProtocol.encode_offset_fetch_request, + encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, group=group) decoder = KafkaProtocol.decode_offset_fetch_response resps = self._send_broker_aware_request(payloads, encoder, decoder) diff --git a/kafka/producer.py b/kafka/producer.py index 8e40be5b1..95c75c452 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -10,7 +10,7 @@ from multiprocessing import Queue, Process from kafka.common import ( - ProduceRequest, TopicAndPartition, UnsupportedCodecError + ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError ) from kafka.partitioner import HashedPartitioner from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set @@ -216,7 +216,10 @@ def _next_partition(self, topic): if topic not in self.partition_cycles: if topic not in self.client.topic_partitions: self.client.load_metadata_for_topics(topic) - self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + try: + self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + except KeyError: + raise UnknownTopicOrPartitionError(topic) # Randomize the initial partition that is returned if self.random_start: diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index c69e1178b..7d3a1805d 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -142,6 +142,15 @@ def test_simple_producer(self): producer.stop() + @kafka_versions("all") + def test_produce__new_topic_fails_with_reasonable_error(self): + new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())) + producer = SimpleProducer(self.client) + + # At first it doesn't exist + with self.assertRaises(UnknownTopicOrPartitionError): + resp = producer.send_messages(new_topic, self.msg("one")) + @kafka_versions("all") def test_producer_random_order(self): producer = SimpleProducer(self.client, random_start = True) diff --git a/test/testutil.py b/test/testutil.py index 78e6f7d93..4f5f6eead 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -13,7 +13,6 @@ __all__ = [ 'random_string', - 'ensure_topic_creation', 'get_open_port', 'kafka_versions', 'KafkaIntegrationTestCase', @@ -39,16 +38,6 @@ def wrapper(self): return wrapper return kafka_versions -def ensure_topic_creation(client, topic_name, timeout = 30): - start_time = time.time() - - client.load_metadata_for_topics(topic_name) - while not client.has_metadata_for_topic(topic_name): - if time.time() > start_time + timeout: - raise Exception("Unable to create topic %s" % topic_name) - client.load_metadata_for_topics(topic_name) - time.sleep(1) - def get_open_port(): sock = socket.socket() sock.bind(("", 0)) @@ -71,7 +60,7 @@ def setUp(self): if self.create_client: self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port)) - ensure_topic_creation(self.client, self.topic) + self.client.ensure_topic_exists(self.topic) self._messages = {}