Skip to content

Refactor tests #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
*.egg-info
*.pyc
.tox
build
dist
MANIFEST
18 changes: 4 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,12 @@ pip install python-snappy

# Tests

Some of the tests will fail if Snappy is not installed. These tests will throw
NotImplementedError. If you see other failures, they might be bugs - so please
report them!

## Run the unit tests

_These are broken at the moment_

```shell
python -m test.unit
tox ./test/test_unit.py
```

## Run the integration tests
Expand All @@ -125,15 +121,9 @@ cd kafka-src
./sbt package
```

Next start up a ZooKeeper server on localhost:2181

```shell
/opt/zookeeper/bin/zkServer.sh start
```

And finally run the tests. This will actually start up real Kafka brokers and send messages in using the
client.
And then run the tests. This will actually start up real local Zookeeper
instance and Kafka brokers, and send messages in using the client.

```shell
python -m test.integration
tox ./test/test_integration.py
```
52 changes: 28 additions & 24 deletions kafka/codec.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,48 @@
from cStringIO import StringIO
import gzip
import logging

log = logging.getLogger("kafka.codec")

try:
import snappy
hasSnappy = True
_has_snappy = True
except ImportError:
log.warn("Snappy codec not available")
hasSnappy = False
_has_snappy = False


def has_gzip():
return True


def has_snappy():
return _has_snappy


def gzip_encode(payload):
buf = StringIO()
f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
f.write(payload)
f.close()
buf.seek(0)
out = buf.read()
buf.close()
return out
buffer = StringIO()
handle = gzip.GzipFile(fileobj=buffer, mode="w")
handle.write(payload)
handle.close()
buffer.seek(0)
result = buffer.read()
buffer.close()
return result


def gzip_decode(payload):
buf = StringIO(payload)
f = gzip.GzipFile(fileobj=buf, mode='r')
out = f.read()
f.close()
buf.close()
return out
buffer = StringIO(payload)
handle = gzip.GzipFile(fileobj=buffer, mode='r')
result = handle.read()
handle.close()
buffer.close()
return result


def snappy_encode(payload):
if not hasSnappy:
raise NotImplementedError("Snappy codec not available")
if not _has_snappy:
raise NotImplementedError("Snappy codec is not available")
return snappy.compress(payload)


def snappy_decode(payload):
if not hasSnappy:
raise NotImplementedError("Snappy codec not available")
if not _has_snappy:
raise NotImplementedError("Snappy codec is not available")
return snappy.decompress(payload)
5 changes: 5 additions & 0 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ def get_or_init_offset_callback(resp):
for partition in self.client.topic_partitions[topic]:
self.offsets[partition] = 0

def stop(self):
if self.commit_timer is not None:
self.commit_timer.stop()
self.commit()

def seek(self, offset, whence):
"""
Alter the current offset in the consumer, similar to fseek
Expand Down
34 changes: 31 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,41 @@
from distutils.core import setup
import os.path
import sys

from distutils.core import setup, Command


class Tox(Command):
user_options = []
def initialize_options(self):
pass

def finalize_options(self):
pass

def run(self):
import tox
sys.exit(tox.cmdline([]))


setup(
name="kafka-python",
version="0.8.1-1",

install_requires=["distribute"],
tests_require=["tox"],
cmdclass={"test": Tox},

packages=["kafka"],

author="David Arthur",
author_email="mumrah@gmail.com",
url="https://github.com/mumrah/kafka-python",
packages=["kafka"],
license="Copyright 2012, David Arthur under Apache License, v2.0",
description="Pure Python client for Apache Kafka",
long_description=open("README.md").read(),
long_description="""
This module provides low-level protocol support for Apache Kafka as well as
high-level consumer and producer classes. Request batching is supported by the
protocol as well as broker-aware request routing. Gzip and Snappy compression
is also supported for message sets.
"""
)
Loading