Skip to content

Avoid multi-threaded auto-commit #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
3 changes: 1 addition & 2 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
import logging
import socket
import struct
from threading import local

from kafka.common import BufferUnderflowError
from kafka.common import ConnectionError

log = logging.getLogger("kafka")


class KafkaConnection(local):
class KafkaConnection(object):
"""
A socket connection to a single Kafka broker

Expand Down
193 changes: 141 additions & 52 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@
from itertools import izip_longest, repeat
import logging
import time
from threading import Lock
from multiprocessing import Process, Queue, Event, Value
from Queue import Empty
from multiprocessing import Process, Queue, Event, Value, Array, \
current_process

from kafka.common import (
ErrorMapping, FetchRequest,
OffsetRequest, OffsetCommitRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
)

from kafka.util import ReentrantTimer

log = logging.getLogger("kafka")

AUTO_COMMIT_MSG_COUNT = 100
Expand Down Expand Up @@ -51,6 +49,98 @@ def __exit__(self, type, value, traceback):
self.consumer.fetch_min_bytes = FETCH_MIN_BYTES


class Offsets(dict):
"""
A dictionary of partitions=>offsets. The dict is such that the entries
are shared over multiprocessing
"""
def __init__(self, *args, **kwargs):
super(Offsets, self).__init__(*args, **kwargs)
self.length = len(self) * 2
self.array = Array('i', self.length)
self.__syncup()

def __syncup(self):
i = 0
for k, v in self.items():
self.array[i] = k
self.array[i+1] = v
i += 2

def __setitem__(self, key, value):
super(Offsets, self).__setitem__(key, value)
self.__syncup()

def shareditems(self, keys=None):
if keys is None:
keys = self.keys()

for i in range(self.length):
if i % 2 == 0:
k = self.array[i]
else:
if k in keys:
yield k, self.array[i]


def _commit(client, group, topic, count, offsets, partitions=None):
"""
Commit offsets for this consumer

partitions: list of partitions to commit, default is to commit
all of them
"""

# short circuit if nothing happened.
if count.value == 0:
return

reqs = []
for partition, offset in offsets.shareditems(keys=partitions):
log.debug("Commit offset %d in SimpleConsumer: "
"group=%s, topic=%s, partition=%s" %
(offset, group, topic, partition))

reqs.append(OffsetCommitRequest(topic, partition, offset, None))

resps = client.send_offset_commit_request(group, reqs)
for resp in resps:
assert resp.error == 0

count.value = 0


def _committer(client, group, topic, timeout, queue, event, count, offsets):
"""
The process thread which takes care of committing

NOTE: Ideally, this should have been a method inside the Consumer
class. However, multiprocessing module has issues in windows. The
functionality breaks unless this function is kept outside of a class
"""
client.reinit()

if timeout is not None:
timeout /= 1000.0

while True:
try:
partitions = queue.get(timeout=timeout)
if partitions == -1:
break
notify = True
except Empty:
# A timeout has happened. Do a commit
partitions = None
notify = False

# Try and commit the offsets
_commit(client, group, topic, count, offsets, partitions)

if notify:
event.set()


class Consumer(object):
"""
Base class to be used by other consumers. Not to be used directly
Expand All @@ -68,24 +158,24 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
self.topic = topic
self.group = group
self.client._load_metadata_for_topics(topic)
self.offsets = {}
offsets = {}

if not partitions:
partitions = self.client.topic_partitions[topic]

# Variables for handling offset commits
self.commit_lock = Lock()
self.commit_queue = Queue()
self.commit_event = Event()
self.commit_timer = None
self.count_since_commit = 0
self.count_since_commit = Value('i', 0)
self.auto_commit = auto_commit
self.auto_commit_every_n = auto_commit_every_n
self.auto_commit_every_t = auto_commit_every_t

# Set up the auto-commit timer
if auto_commit is True and auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t,
self.commit)
self.commit_timer.start()
if auto_commit:
self.auto_commit_every_n = auto_commit_every_n
self.auto_commit_every_t = auto_commit_every_t
else:
self.auto_commit_every_n = None
self.auto_commit_every_t = None

def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
Expand All @@ -104,48 +194,41 @@ def get_or_init_offset_callback(resp):
# (offset,) = self.client.send_offset_fetch_request(group, [req],
# callback=get_or_init_offset_callback,
# fail_on_error=False)
# self.offsets[partition] = offset
# offsets[partition] = offset

for partition in partitions:
self.offsets[partition] = 0
offsets[partition] = 0

# Set this as a shared object
self.offsets = Offsets(offsets)

# Start committer only in the master/controller
if not current_process().daemon:
args = (client.copy(), group, topic,
self.auto_commit_every_t,
self.commit_queue,
self.commit_event,
self.count_since_commit,
self.offsets)

self.commit_timer = Process(target=_committer, args=args)
self.commit_timer.daemon = True
self.commit_timer.start()

def commit(self, partitions=None):
def commit(self, partitions=None, block=True, timeout=None):
"""
Commit offsets for this consumer

partitions: list of partitions to commit, default is to commit
all of them
block: If set, the API will block for commit to happen
timeout: The time in seconds for the API to block
"""
self.commit_event.clear()
self.commit_queue.put(partitions)

# short circuit if nothing happened. This check is kept outside
# to prevent un-necessarily acquiring a lock for checking the state
if self.count_since_commit == 0:
return

with self.commit_lock:
# Do this check again, just in case the state has changed
# during the lock acquiring timeout
if self.count_since_commit == 0:
return

reqs = []
if not partitions: # commit all partitions
partitions = self.offsets.keys()

for partition in partitions:
offset = self.offsets[partition]
log.debug("Commit offset %d in SimpleConsumer: "
"group=%s, topic=%s, partition=%s" %
(offset, self.group, self.topic, partition))

reqs.append(OffsetCommitRequest(self.topic, partition,
offset, None))

resps = self.client.send_offset_commit_request(self.group, reqs)
for resp in resps:
assert resp.error == 0

self.count_since_commit = 0
if block:
self.commit_event.wait(timeout)

def _auto_commit(self):
"""
Expand All @@ -156,13 +239,19 @@ def _auto_commit(self):
if not self.auto_commit or self.auto_commit_every_n is None:
return

if self.count_since_commit > self.auto_commit_every_n:
if self.count_since_commit.value >= self.auto_commit_every_n:
self.commit()

def stop(self):
if self.commit_timer is not None:
self.commit_timer.stop()
self.commit()
# We will do an auto commit only if configured to do so
# Else, it is the responsibility of the caller to commit before
# stopping
if self.auto_commit:
self.commit()

self.commit_queue.put(-1)
self.commit_timer.join()

def pending(self, partitions=None):
"""
Expand Down Expand Up @@ -336,7 +425,7 @@ def __iter__(self):
continue

# Count, check and commit messages if necessary
self.count_since_commit += 1
self.count_since_commit.value += 1
self._auto_commit()

def __iter_partition__(self, partition, offset):
Expand Down Expand Up @@ -572,7 +661,7 @@ def __iter__(self):
self.start.clear()
yield message

self.count_since_commit += 1
self.count_since_commit.value += 1
self._auto_commit()

self.start.clear()
Expand Down Expand Up @@ -612,7 +701,7 @@ def get_messages(self, count=1, block=True, timeout=10):

# Count, check and commit messages if necessary
self.offsets[partition] = message.offset
self.count_since_commit += 1
self.count_since_commit.value += 1
self._auto_commit()
count -= 1

Expand Down
48 changes: 0 additions & 48 deletions kafka/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from collections import defaultdict
import struct
from threading import Thread, Event

from kafka.common import BufferUnderflowError

Expand Down Expand Up @@ -67,50 +66,3 @@ def group_by_topic_and_partition(tuples):
for t in tuples:
out[t.topic][t.partition] = t
return out


class ReentrantTimer(object):
"""
A timer that can be restarted, unlike threading.Timer
(although this uses threading.Timer)

t: timer interval in milliseconds
fn: a callable to invoke
args: tuple of args to be passed to function
kwargs: keyword arguments to be passed to function
"""
def __init__(self, t, fn, *args, **kwargs):

if t <= 0:
raise ValueError('Invalid timeout value')

if not callable(fn):
raise ValueError('fn must be callable')

self.thread = None
self.t = t / 1000.0
self.fn = fn
self.args = args
self.kwargs = kwargs
self.active = None

def _timer(self, active):
while not active.wait(self.t):
self.fn(*self.args, **self.kwargs)

def start(self):
if self.thread is not None:
self.stop()

self.active = Event()
self.thread = Thread(target=self._timer, args=(self.active,))
self.thread.daemon = True # So the app exits when main thread exits
self.thread.start()

def stop(self):
if self.thread is None:
return

self.active.set()
self.thread.join(self.t + 1)
self.timer = None