Skip to content

Commit 26414d4

Browse files
committed
Add Snappy support
Fixes #2
1 parent 03abf98 commit 26414d4

File tree

6 files changed

+144
-21
lines changed

6 files changed

+144
-21
lines changed

README.md

+32-1
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,29 @@ cd kafka-python
3838
python setup.py install
3939
```
4040

41+
## Optional Snappy install
42+
43+
Download and build Snappy from http://code.google.com/p/snappy/downloads/list
44+
45+
```shell
46+
wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
47+
tar xzvf snappy-1.0.5.tar.gz
48+
cd snappy-1.0.5
49+
./configure
50+
make
51+
sudo make install
52+
```
53+
54+
Install the `python-snappy` module
55+
```shell
56+
pip install python-snappy
57+
```
58+
4159
# Tests
4260

61+
Some of the tests will fail if Snappy is not installed. These tests will throw NotImplementedError. If you see other failures,
62+
they might be bugs - so please report them!
63+
4364
## Run the unit tests
4465

4566
```shell
@@ -137,5 +158,15 @@ for msg in kafka.iter_messages(FetchRequest("my-topic", 0, 0, 1024*1024), False)
137158
print(msg.payload)
138159
kafka.close()
139160
```
140-
141161
This will only iterate through messages in the byte range of (0, 1024\*1024)
162+
163+
## Create some compressed messages
164+
165+
```python
166+
kafka = KafkaClient("localhost", 9092)
167+
messages = [kafka.create_snappy_message("testing 1"),
168+
kafka.create_snappy_message("testing 2")]
169+
req = ProduceRequest(topic, 1, messages)
170+
kafka.send_message_set(req)
171+
kafka.close()
172+
```

kafka/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@
99
Message, ProduceRequest, FetchRequest, OffsetRequest
1010
)
1111
from .codec import gzip_encode, gzip_decode
12+
from .codec import snappy_encode, snappy_decode

kafka/client.py

+23-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import zlib
66

77
from .codec import gzip_encode, gzip_decode
8+
from .codec import snappy_encode, snappy_decode
89

910
log = logging.getLogger("kafka")
1011

@@ -340,7 +341,10 @@ def decode_message(cls, data):
340341
yield msg
341342
elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 2:
342343
# Snappy encoded Message
343-
raise NotImplementedError("Snappy codec is not yet supported")
344+
snp = snappy_decode(payload)
345+
(msgs, _) = cls.read_message_set(snp)
346+
for msg in msgs:
347+
yield msg
344348
else:
345349
raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK))
346350

@@ -437,6 +441,24 @@ def create_gzip_message(cls, *payloads):
437441
gzipped = gzip_encode(message_set)
438442
return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped)
439443

444+
@classmethod
445+
def create_snappy_message(cls, *payloads):
446+
"""
447+
Create a Snappy encoded Message
448+
449+
Params
450+
======
451+
payloads, list of messages (bytes) to be encoded
452+
453+
Returns
454+
=======
455+
A Message tuple
456+
"""
457+
messages = [cls.create_message(payload) for payload in payloads]
458+
message_set = cls.encode_message_set(messages)
459+
snapped = snappy_encode(message_set)
460+
return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x02), zlib.crc32(snapped), snapped)
461+
440462
def send_message_set(self, produceRequest):
441463
"""
442464
Send a ProduceRequest

kafka/codec.py

+17
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@
44

55
log = logging.getLogger("kafka.codec")
66

7+
try:
8+
import snappy
9+
hasSnappy=True
10+
except ImportError:
11+
log.warn("Snappy codec not available")
12+
hasSnappy=False
13+
714
def gzip_encode(payload):
815
buf = StringIO()
916
f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
@@ -21,3 +28,13 @@ def gzip_decode(payload):
2128
f.close()
2229
buf.close()
2330
return out
31+
32+
def snappy_encode(payload):
33+
if not hasSnappy:
34+
raise NotImplementedError("Snappy codec not available")
35+
return snappy.compress(payload)
36+
37+
def snappy_decode(payload):
38+
if not hasSnappy:
39+
raise NotImplementedError("Snappy codec not available")
40+
return snappy.decompress(payload)

test/integration.py

+26-17
Original file line numberDiff line numberDiff line change
@@ -118,32 +118,41 @@ def test_produce(self):
118118
self.kafka.send_message_set(req)
119119
self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
120120

121-
def test_produce_consume(self):
121+
def _test_produce_consume(self, topic, create_func):
122122
# Send two messages and consume them
123-
message1 = KafkaClient.create_message("testing 1")
124-
message2 = KafkaClient.create_message("testing 2")
125-
req = ProduceRequest("test-produce-consume", 0, [message1, message2])
123+
message1 = create_func("testing 1")
124+
message2 = create_func("testing 2")
125+
req = ProduceRequest(topic, 0, [message1, message2])
126126
self.kafka.send_message_set(req)
127-
self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0"))
128-
self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-0'"))
129-
req = FetchRequest("test-produce-consume", 0, 0, 1024)
127+
self.assertTrue(self.server.wait_for("Created log for '%s'-0" % topic))
128+
self.assertTrue(self.server.wait_for("Flushing log '%s-0'" % topic))
129+
req = FetchRequest(topic, 0, 0, 1024)
130130
(messages, req) = self.kafka.get_message_set(req)
131131
self.assertEquals(len(messages), 2)
132-
self.assertEquals(messages[0], message1)
133-
self.assertEquals(messages[1], message2)
132+
self.assertEquals(messages[0].payload, "testing 1")
133+
self.assertEquals(messages[1].payload, "testing 2")
134134

135135
# Do the same, but for a different partition
136-
message3 = KafkaClient.create_message("testing 3")
137-
message4 = KafkaClient.create_message("testing 4")
138-
req = ProduceRequest("test-produce-consume", 1, [message3, message4])
136+
message3 = create_func("testing 3")
137+
message4 = create_func("testing 4")
138+
req = ProduceRequest(topic, 1, [message3, message4])
139139
self.kafka.send_message_set(req)
140-
self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1"))
141-
self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-1'"))
142-
req = FetchRequest("test-produce-consume", 1, 0, 1024)
140+
self.assertTrue(self.server.wait_for("Created log for '%s'-1" % topic))
141+
self.assertTrue(self.server.wait_for("Flushing log '%s-1'" % topic))
142+
req = FetchRequest(topic, 1, 0, 1024)
143143
(messages, req) = self.kafka.get_message_set(req)
144144
self.assertEquals(len(messages), 2)
145-
self.assertEquals(messages[0], message3)
146-
self.assertEquals(messages[1], message4)
145+
self.assertEquals(messages[0].payload, "testing 3")
146+
self.assertEquals(messages[1].payload, "testing 4")
147+
148+
def test_produce_consume(self):
149+
self._test_produce_consume("test-produce-consume", KafkaClient.create_message)
150+
151+
def test_produce_consume_snappy(self):
152+
self._test_produce_consume("test-produce-consume-snappy", KafkaClient.create_snappy_message)
153+
154+
def test_produce_consume_gzip(self):
155+
self._test_produce_consume("test-produce-consume-gzip", KafkaClient.create_gzip_message)
147156

148157
def test_check_offset(self):
149158
# Produce/consume a message, check that the next offset looks correct

test/unit.py

+45-2
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,20 @@
55

66
from kafka.client import KafkaClient, ProduceRequest, FetchRequest, length_prefix_message
77
from kafka.codec import gzip_encode, gzip_decode
8+
from kafka.codec import snappy_encode, snappy_decode
89

910
ITERATIONS = 1000
1011
STRLEN = 100
1112

1213
def random_string():
13-
return os.urandom(random.randint(0, STRLEN))
14+
return os.urandom(random.randint(1, STRLEN))
1415

1516
class TestPackage(unittest.TestCase):
1617
def test_top_level_namespace(self):
1718
import kafka as kafka1
1819
self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient")
1920
self.assertEquals(kafka1.gzip_encode.__name__, "gzip_encode")
21+
self.assertEquals(kafka1.snappy_encode.__name__, "snappy_encode")
2022
self.assertEquals(kafka1.client.__name__, "kafka.client")
2123
self.assertEquals(kafka1.codec.__name__, "kafka.codec")
2224

@@ -41,6 +43,9 @@ def test_submodule_namespace(self):
4143
from kafka import gzip_encode as gzip_encode2
4244
self.assertEquals(gzip_encode2.__name__, "gzip_encode")
4345

46+
from kafka import snappy_encode as snappy_encode2
47+
self.assertEquals(snappy_encode2.__name__, "snappy_encode")
48+
4449
class TestMisc(unittest.TestCase):
4550
def test_length_prefix(self):
4651
for i in xrange(ITERATIONS):
@@ -55,6 +60,12 @@ def test_gzip(self):
5560
s2 = gzip_decode(gzip_encode(s1))
5661
self.assertEquals(s1,s2)
5762

63+
def test_snappy(self):
64+
for i in xrange(ITERATIONS):
65+
s1 = random_string()
66+
s2 = snappy_decode(snappy_encode(s1))
67+
self.assertEquals(s1,s2)
68+
5869
class TestMessage(unittest.TestCase):
5970
def test_create(self):
6071
msg = KafkaClient.create_message("testing")
@@ -75,6 +86,18 @@ def test_create_gzip(self):
7586
self.assertEquals(inner.payload, "testing")
7687
self.assertEquals(inner.crc, -386704890)
7788

89+
def test_create_snappy(self):
90+
msg = KafkaClient.create_snappy_message("testing")
91+
self.assertEquals(msg.magic, 1)
92+
self.assertEquals(msg.attributes, 2)
93+
self.assertEquals(msg.crc, -62350868)
94+
(messages, _) = KafkaClient.read_message_set(snappy_decode(msg.payload))
95+
inner = messages[0]
96+
self.assertEquals(inner.magic, 1)
97+
self.assertEquals(inner.attributes, 0)
98+
self.assertEquals(inner.payload, "testing")
99+
self.assertEquals(inner.crc, -386704890)
100+
78101
def test_message_simple(self):
79102
msg = KafkaClient.create_message("testing")
80103
enc = KafkaClient.encode_message(msg)
@@ -110,6 +133,15 @@ def test_message_gzip(self):
110133
self.assertEquals(messages[1].payload, "two")
111134
self.assertEquals(messages[2].payload, "three")
112135

136+
def test_message_snappy(self):
137+
msg = KafkaClient.create_snappy_message("one", "two", "three")
138+
enc = KafkaClient.encode_message(msg)
139+
(messages, read) = KafkaClient.read_message_set(enc)
140+
self.assertEquals(len(messages), 3)
141+
self.assertEquals(messages[0].payload, "one")
142+
self.assertEquals(messages[1].payload, "two")
143+
self.assertEquals(messages[2].payload, "three")
144+
113145
def test_message_simple_random(self):
114146
for i in xrange(ITERATIONS):
115147
n = random.randint(0, 10)
@@ -122,7 +154,7 @@ def test_message_simple_random(self):
122154

123155
def test_message_gzip_random(self):
124156
for i in xrange(ITERATIONS):
125-
n = random.randint(0, 10)
157+
n = random.randint(1, 10)
126158
strings = [random_string() for j in range(n)]
127159
msg = KafkaClient.create_gzip_message(*strings)
128160
enc = KafkaClient.encode_message(msg)
@@ -131,6 +163,17 @@ def test_message_gzip_random(self):
131163
for j in range(n):
132164
self.assertEquals(messages[j].payload, strings[j])
133165

166+
def test_message_snappy_random(self):
167+
for i in xrange(ITERATIONS):
168+
n = random.randint(1, 10)
169+
strings = [random_string() for j in range(n)]
170+
msg = KafkaClient.create_snappy_message(*strings)
171+
enc = KafkaClient.encode_message(msg)
172+
(messages, read) = KafkaClient.read_message_set(enc)
173+
self.assertEquals(len(messages), n)
174+
for j in range(n):
175+
self.assertEquals(messages[j].payload, strings[j])
176+
134177
class TestRequests(unittest.TestCase):
135178
def test_produce_request(self):
136179
req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])

0 commit comments

Comments
 (0)