Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a test for MemoryRecordsBuilder and fixed some problems there #1448

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/record/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from kafka.record.memory_records import MemoryRecords
from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder

__all__ = ["MemoryRecords"]
__all__ = ["MemoryRecords", "MemoryRecordsBuilder"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very familiar with this code, but I don't understand why this import is needed? Reading the bug description, it looks like the problems were all due to the code fixed in kafka/record/memory_records.py below... If the import were just forgotten, I would have thought it would have thrown problems before?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason, just used in tests. We can remove it, yea

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine leaving it in if you think it should be there, I just didn't understand if there was something I was missing.

2 changes: 1 addition & 1 deletion kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def _read_msg(

# validate whether we have read all header bytes in the current record
if pos - start_pos != length:
CorruptRecordException(
raise CorruptRecordException(
"Invalid record size: expected to read {} bytes in record "
"payload, but instead read {}".format(length, pos - start_pos))
self._pos = pos
Expand Down
4 changes: 2 additions & 2 deletions kafka/record/memory_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def append(self, timestamp, key, value, headers=[]):
(int, int): checksum and bytes written
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return type in docstring is incorrect.

"""
if self._closed:
return None, 0
return None

offset = self._next_offset
metadata = self._builder.append(offset, timestamp, key, value, headers)
Expand Down Expand Up @@ -166,7 +166,7 @@ def size_in_bytes(self):

def compression_rate(self):
assert self._closed
return self.size_in_bytes() / self._bytes_written
return self.size_in_bytes() / float(self._bytes_written)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only required for python2; python3 division returns a float by default. I think a cleaner way to handle this compatibility is with __future__:

from __future__ import division

With that you shouldn't need to cast to float


def is_full(self):
if self._closed:
Expand Down
69 changes: 68 additions & 1 deletion test/record/test_records.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import pytest
from kafka.record import MemoryRecords
from kafka.record import MemoryRecords, MemoryRecordsBuilder
from kafka.errors import CorruptRecordException

# This is real live data from Kafka 11 broker
Expand Down Expand Up @@ -152,3 +154,68 @@ def test_memory_records_corrupt():
)
with pytest.raises(CorruptRecordException):
records.next_batch()


@pytest.mark.parametrize("compression_type", [0, 1, 2, 3])
@pytest.mark.parametrize("magic", [0, 1, 2])
def test_memory_records_builder(magic, compression_type):
builder = MemoryRecordsBuilder(
magic=magic, compression_type=compression_type, batch_size=1024 * 10)
base_size = builder.size_in_bytes() # V2 has a header before

msg_sizes = []
for offset in range(10):
metadata = builder.append(
timestamp=10000 + offset, key=b"test", value=b"Super")
msg_sizes.append(metadata.size)
assert metadata.offset == offset
if magic > 0:
assert metadata.timestamp == 10000 + offset
else:
assert metadata.timestamp == -1
assert builder.next_offset() == offset + 1

# Error appends should not leave junk behind, like null bytes or something
with pytest.raises(TypeError):
builder.append(
timestamp=None, key="test", value="Super") # Not bytes, but str

assert not builder.is_full()
size_before_close = builder.size_in_bytes()
assert size_before_close == sum(msg_sizes) + base_size

# Size should remain the same after closing. No traling bytes
builder.close()
assert builder.compression_rate() > 0
expected_size = size_before_close * builder.compression_rate()
assert builder.is_full()
assert builder.size_in_bytes() == expected_size
buffer = builder.buffer()
assert len(buffer) == expected_size

# We can close second time, as in retry
builder.close()
assert builder.size_in_bytes() == expected_size
assert builder.buffer() == buffer

# Can't append after close
meta = builder.append(timestamp=None, key=b"test", value=b"Super")
assert meta is None


@pytest.mark.parametrize("compression_type", [0, 1, 2, 3])
@pytest.mark.parametrize("magic", [0, 1, 2])
def test_memory_records_builder_full(magic, compression_type):
builder = MemoryRecordsBuilder(
magic=magic, compression_type=compression_type, batch_size=1024 * 10)

# 1 message should always be appended
metadata = builder.append(
key=None, timestamp=None, value=b"M" * 10240)
assert metadata is not None
assert builder.is_full()

metadata = builder.append(
key=None, timestamp=None, value=b"M")
assert metadata is None
assert builder.next_offset() == 1