Skip to content

Commit

Permalink
Support produce with Kafka record headers
Browse files Browse the repository at this point in the history
  • Loading branch information
hnousiainen committed Aug 29, 2018
1 parent 2941b76 commit 6ebdf28
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 18 deletions.
10 changes: 5 additions & 5 deletions kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def wait(self, timeout=None):


class FutureRecordMetadata(Future):
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
super(FutureRecordMetadata, self).__init__()
self._produce_future = produce_future
# packing args as a tuple is a minor speed optimization
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
produce_future.add_callback(self._produce_success)
produce_future.add_errback(self.failure)

Expand All @@ -42,7 +42,7 @@ def _produce_success(self, offset_and_timestamp):

# Unpacking from args tuple is minor speed optimization
(relative_offset, timestamp_ms, checksum,
serialized_key_size, serialized_value_size) = self.args
serialized_key_size, serialized_value_size, serialized_header_size) = self.args

# None is when Broker does not support the API (<0.10) and
# -1 is when the broker is configured for CREATE_TIME timestamps
Expand All @@ -53,7 +53,7 @@ def _produce_success(self, offset_and_timestamp):
tp = self._produce_future.topic_partition
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
checksum, serialized_key_size,
serialized_value_size)
serialized_value_size, serialized_header_size)
self.success(metadata)

def get(self, timeout=None):
Expand All @@ -68,4 +68,4 @@ def get(self, timeout=None):

RecordMetadata = collections.namedtuple(
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
'checksum', 'serialized_key_size', 'serialized_value_size'])
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])
18 changes: 13 additions & 5 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def _estimate_size_in_bytes(self, key, value, headers=[]):
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
magic, self.config['compression_type'], key, value)

def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
Arguments:
Expand All @@ -530,6 +530,8 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
partition (but if key is None, partition is chosen randomly).
Must be type bytes, or be serializable to bytes via configured
key_serializer.
headers (optional): a list of header key value pairs. List items
are tuples of str key and bytes value.
timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
to use as the message timestamp. Defaults to current time.
Expand Down Expand Up @@ -559,13 +561,18 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)

message_size = self._estimate_size_in_bytes(key_bytes, value_bytes)
if headers is None:
headers = []
assert type(headers) == list
assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers)

message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
self._ensure_valid_record_size(message_size)

tp = TopicPartition(topic, partition)
log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes,
key_bytes, value_bytes, headers,
self.config['max_block_ms'],
estimated_size=message_size)
future, batch_is_full, new_batch_created = result
Expand All @@ -584,7 +591,8 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
FutureProduceResult(TopicPartition(topic, partition)),
-1, None, None,
len(key_bytes) if key_bytes is not None else -1,
len(value_bytes) if value_bytes is not None else -1
len(value_bytes) if value_bytes is not None else -1,
sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1,
).failure(e)

def flush(self, timeout=None):
Expand Down
16 changes: 9 additions & 7 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def __init__(self, tp, records, buffer):
def record_count(self):
return self.records.next_offset()

def try_append(self, timestamp_ms, key, value):
metadata = self.records.append(timestamp_ms, key, value)
def try_append(self, timestamp_ms, key, value, headers):
metadata = self.records.append(timestamp_ms, key, value, headers)
if metadata is None:
return None

Expand All @@ -65,7 +65,8 @@ def try_append(self, timestamp_ms, key, value):
future = FutureRecordMetadata(self.produce_future, metadata.offset,
metadata.timestamp, metadata.crc,
len(key) if key is not None else -1,
len(value) if value is not None else -1)
len(value) if value is not None else -1,
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
return future

def done(self, base_offset=None, timestamp_ms=None, exception=None):
Expand Down Expand Up @@ -196,7 +197,7 @@ def __init__(self, **configs):
self.muted = set()
self._drain_index = 0

def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
estimated_size=0):
"""Add a record to the accumulator, return the append result.
Expand All @@ -209,6 +210,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
timestamp_ms (int): The timestamp of the record (epoch ms)
key (bytes): The key for the record
value (bytes): The value for the record
headers (List[Tuple[str, bytes]]): The header fields for the record
max_time_to_block_ms (int): The maximum time in milliseconds to
block for buffer memory to be available
Expand All @@ -231,7 +233,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
dq = self._batches[tp]
if dq:
last = dq[-1]
future = last.try_append(timestamp_ms, key, value)
future = last.try_append(timestamp_ms, key, value, headers)
if future is not None:
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
Expand All @@ -246,7 +248,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,

if dq:
last = dq[-1]
future = last.try_append(timestamp_ms, key, value)
future = last.try_append(timestamp_ms, key, value, headers)
if future is not None:
# Somebody else found us a batch, return the one we
# waited for! Hopefully this doesn't happen often...
Expand All @@ -261,7 +263,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
)

batch = ProducerBatch(tp, records, buf)
future = batch.try_append(timestamp_ms, key, value)
future = batch.try_append(timestamp_ms, key, value, headers)
if not future:
raise Exception()

Expand Down
10 changes: 9 additions & 1 deletion test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,16 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
compression_type=compression)
magic = producer._max_usable_produce_magic()

# record headers are supported in 0.11.0
if version() < (0, 11, 0):
headers = None
else:
headers = [("Header Key", b"Header Value")]

topic = random_string(5)
future = producer.send(
topic,
value=b"Simple value", key=b"Simple key", timestamp_ms=9999999,
value=b"Simple value", key=b"Simple key", headers=headers, timestamp_ms=9999999,
partition=0)
record = future.get(timeout=5)
assert record is not None
Expand All @@ -116,6 +122,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):

assert record.serialized_key_size == 10
assert record.serialized_value_size == 12
if headers:
assert record.serialized_header_size == 22

# generated timestamp case is skipped for broker 0.9 and below
if magic == 0:
Expand Down

0 comments on commit 6ebdf28

Please sign in to comment.