Skip to content

Commit

Permalink
Merge pull request #174 from scrapinghub/fix-import-crash
Browse files Browse the repository at this point in the history
fixing crash on settings import when kafka-python isn't installed
  • Loading branch information
sibiryakov authored Jul 18, 2016
2 parents 62d9e49 + c674254 commit 7dd6165
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 5 deletions.
3 changes: 1 addition & 2 deletions docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ KAFKA_CODEC_LEGACY

Default: ``KAFKA_CODEC_LEGACY``

Kafka-python 0.x version codec, could be one of ``CODEC_NONE``, ``CODEC_SNAPPY`` or ``CODEC_GZIP``,
imported from ``kafka.protocol``.
Kafka-python 0.x version compression codec to use, is a string and could be one of ``none``, ``snappy`` or ``gzip``.


.. setting:: LOGGING_CONFIG
Expand Down
4 changes: 4 additions & 0 deletions frontera/contrib/backends/remote/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,7 @@ def __init__(self, manager):

def get_next_requests(self, max_n_requests, **kwargs):
return self._buffer.get_next_requests(max_n_requests, **kwargs)


raise DeprecationWarning("KafkaBackend and KafkaOverusedBackend is deprecated, and will be removed soon please use "
"MessageBusBackend instead")
15 changes: 14 additions & 1 deletion frontera/contrib/messagebus/kafkabus.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,20 @@ def __init__(self, settings):
self.spider_partition_id = settings.get('SPIDER_PARTITION_ID')
self.max_next_requests = settings.MAX_NEXT_REQUESTS
self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING')
self.codec = settings.get('KAFKA_CODEC_LEGACY')

self.codec = None
codec = settings.get('KAFKA_CODEC_LEGACY')
if codec == 'none':
from kafka.protocol import CODEC_NONE
self.codec = CODEC_NONE
if codec == 'snappy':
from kafka.protocol import CODEC_SNAPPY
self.codec = CODEC_SNAPPY
if codec == 'gzip':
from kafka.protocol import CODEC_GZIP
self.codec = CODEC_GZIP
if not self.codec:
raise NameError("Non-existent Kafka compression codec.")

self.conn = KafkaClient(server)

Expand Down
3 changes: 1 addition & 2 deletions frontera/settings/default_settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datetime import timedelta
from kafka.protocol import CODEC_NONE


AUTO_START = True
Expand All @@ -19,7 +18,7 @@
HBASE_STATE_CACHE_SIZE_LIMIT = 3000000
HBASE_QUEUE_TABLE = 'queue'
KAFKA_GET_TIMEOUT = 5.0
KAFKA_CODEC_LEGACY = CODEC_NONE
KAFKA_CODEC_LEGACY = "none"
MAX_NEXT_REQUESTS = 64
MAX_REQUESTS = 0
MESSAGE_BUS = 'frontera.contrib.messagebus.zeromq.MessageBus'
Expand Down

0 comments on commit 7dd6165

Please sign in to comment.