From 532c4986ac9f8e5029f4e8e75ab9264a076e8b27 Mon Sep 17 00:00:00 2001 From: Max Baryshnikov Date: Tue, 7 Feb 2017 22:31:06 +0300 Subject: [PATCH 01/10] Fixed couple of "leaks" when gc is disabled Since I'm using kafka-python in an environment with disabled gc these "leaks" hit me hard. --- kafka/protocol/struct.py | 6 +++++- kafka/vendor/selectors34.py | 3 ++- kafka/vendor/six.py | 4 +++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py index 602cfb8d4..5fcc4eb67 100644 --- a/kafka/protocol/struct.py +++ b/kafka/protocol/struct.py @@ -3,6 +3,8 @@ #from collections import namedtuple from io import BytesIO +from kafka.util import WeakMethod + from .abstract import AbstractType from .types import Schema @@ -20,7 +22,9 @@ def __init__(self, *args, **kwargs): self.__dict__.update(kwargs) # overloading encode() to support both class and instance - self.encode = self._encode_self + # Without WeakMethod() this creates circular ref, which + # causes instances to "leak" to garbage + self.encode = WeakMethod(self._encode_self) @classmethod def encode(cls, item): # pylint: disable=E0202 diff --git a/kafka/vendor/selectors34.py b/kafka/vendor/selectors34.py index 2a6e55628..73d556b62 100644 --- a/kafka/vendor/selectors34.py +++ b/kafka/vendor/selectors34.py @@ -93,7 +93,8 @@ def __iter__(self): return iter(self._selector._fd_to_key) -class BaseSelector(six.with_metaclass(ABCMeta)): +@six.add_metaclass(ABCMeta) +class BaseSelector(object): """Selector abstract base class. A selector supports registering file objects to be monitored for specific diff --git a/kafka/vendor/six.py b/kafka/vendor/six.py index 808e6510e..a949b9539 100644 --- a/kafka/vendor/six.py +++ b/kafka/vendor/six.py @@ -70,7 +70,9 @@ def __len__(self): else: # 64-bit MAXSIZE = int((1 << 63) - 1) - del X + + # Don't del it here, cause with gc disabled this "leaks" to garbage + # del X def _add_doc(func, doc): From 8cba2240c5a6bcfab019813e4d3813f86e16054e Mon Sep 17 00:00:00 2001 From: Max Baryshnikov Date: Mon, 13 Feb 2017 18:27:31 +0300 Subject: [PATCH 02/10] One more circular ref addressed with WeakMethod --- kafka/protocol/message.py | 4 ++-- kafka/protocol/struct.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 36f03ca92..b65061ed7 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -11,7 +11,7 @@ from .types import ( Int8, Int32, Int64, Bytes, Schema, AbstractType ) -from ..util import crc32 +from ..util import crc32, WeakMethod class Message(Struct): @@ -53,7 +53,7 @@ def __init__(self, value, key=None, magic=0, attributes=0, crc=0, self.attributes = attributes self.key = key self.value = value - self.encode = self._encode_self + self.encode = WeakMethod(self._encode_self) @property def timestamp_type(self): diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py index 5fcc4eb67..65b8c5814 100644 --- a/kafka/protocol/struct.py +++ b/kafka/protocol/struct.py @@ -3,11 +3,11 @@ #from collections import namedtuple from io import BytesIO -from kafka.util import WeakMethod - from .abstract import AbstractType from .types import Schema +from ..util import WeakMethod + class Struct(AbstractType): SCHEMA = Schema() From 8b27802bfbe1023721a09318deff71e55062b202 Mon Sep 17 00:00:00 2001 From: Max Baryshnikov Date: Tue, 14 Feb 2017 11:15:07 +0300 Subject: [PATCH 03/10] _encode_self in Message looses self --- kafka/protocol/message.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index ec5ee6c1b..b5795c48c 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -52,7 +52,7 @@ def __init__(self, value, key=None, magic=0, attributes=0, crc=0, self.attributes = attributes self.key = key self.value = value - self.encode = WeakMethod(self._encode_self) + self.encode = WeakMethod(self._encode_self_message) @property def timestamp_type(self): @@ -68,7 +68,7 @@ def timestamp_type(self): else: return 0 - def _encode_self(self, recalc_crc=True): + def _encode_self_message(self, recalc_crc=True): version = self.magic if version == 1: fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value) From 0b82c46079835f0cb9f15121ae31cb554850e65f Mon Sep 17 00:00:00 2001 From: Max Baryshnikov Date: Tue, 14 Feb 2017 11:30:32 +0300 Subject: [PATCH 04/10] A bit different implementation of WeakMethod --- kafka/protocol/message.py | 4 +- kafka/util.py | 127 +++++++++++++++++++++++++++----------- 2 files changed, 93 insertions(+), 38 deletions(-) diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index b5795c48c..ec5ee6c1b 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -52,7 +52,7 @@ def __init__(self, value, key=None, magic=0, attributes=0, crc=0, self.attributes = attributes self.key = key self.value = value - self.encode = WeakMethod(self._encode_self_message) + self.encode = WeakMethod(self._encode_self) @property def timestamp_type(self): @@ -68,7 +68,7 @@ def timestamp_type(self): else: return 0 - def _encode_self_message(self, recalc_crc=True): + def _encode_self(self, recalc_crc=True): version = self.magic if version == 1: fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value) diff --git a/kafka/util.py b/kafka/util.py index bc011540a..42dd83820 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -157,46 +157,101 @@ def __del__(self): self.stop() -class WeakMethod(object): +class WeakMethod(ref): + """ + A custom `weakref.ref` subclass which simulates a weak reference to + a bound method, working around the lifetime problem of bound methods. """ - Callable that weakly references a method and the object it is bound to. It - is based on http://stackoverflow.com/a/24287465. - Arguments: + __slots__ = "_func_ref", "_meth_type", "_alive", "__weakref__" - object_dot_method: A bound instance method (i.e. 'object.method'). - """ - def __init__(self, object_dot_method): - try: - self.target = weakref.ref(object_dot_method.__self__) - except AttributeError: - self.target = weakref.ref(object_dot_method.im_self) - self._target_id = id(self.target()) + def __new__(cls, meth, callback=None): try: - self.method = weakref.ref(object_dot_method.__func__) + obj = meth.__self__ + func = meth.__func__ except AttributeError: - self.method = weakref.ref(object_dot_method.im_func) - self._method_id = id(self.method()) - - def __call__(self, *args, **kwargs): - """ - Calls the method on target with args and kwargs. - """ - return self.method()(self.target(), *args, **kwargs) - - def __hash__(self): - return hash(self.target) ^ hash(self.method) + raise TypeError("argument should be a bound method") + + def _cb(arg): + # The self-weakref trick is needed to avoid creating a reference + # cycle. + self = self_wr() + if self._alive: + self._alive = False + if callback is not None: + callback(self) + + self = ref.__new__(cls, obj, _cb) + self._func_ref = ref(func, _cb) + self._meth_type = type(meth) + self._alive = True + self_wr = ref(self) + return self + + def __call__(self): + obj = super(WeakMethod, self).__call__() + func = self._func_ref() + if obj is None or func is None: + return None + return self._meth_type(func, obj) def __eq__(self, other): - if not isinstance(other, WeakMethod): - return False - return self._target_id == other._target_id and self._method_id == other._method_id - - -def try_method_on_system_exit(obj, method, *args, **kwargs): - def wrapper(_obj, _meth, *args, **kwargs): - try: - getattr(_obj, _meth)(*args, **kwargs) - except (ReferenceError, AttributeError): - pass - atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs) + if isinstance(other, WeakMethod): + if not self._alive or not other._alive: + return self is other + return ref.__eq__(self, other) and self._func_ref == other._func_ref + return False + + def __ne__(self, other): + if isinstance(other, WeakMethod): + if not self._alive or not other._alive: + return self is not other + return ref.__ne__(self, other) or self._func_ref != other._func_ref + return True + + __hash__ = ref.__hash__ + + +# class WeakMethod(object): +# """ +# Callable that weakly references a method and the object it is bound to. It +# is based on http://stackoverflow.com/a/24287465. + +# Arguments: + +# object_dot_method: A bound instance method (i.e. 'object.method'). +# """ +# def __init__(self, object_dot_method): +# try: +# self.target = weakref.ref(object_dot_method.__self__) +# except AttributeError: +# self.target = weakref.ref(object_dot_method.im_self) +# self._target_id = id(self.target()) +# try: +# self.method = weakref.ref(object_dot_method.__func__) +# except AttributeError: +# self.method = weakref.ref(object_dot_method.im_func) +# self._method_id = id(self.method()) + +# def __call__(self, *args, **kwargs): +# """ +# Calls the method on target with args and kwargs. +# """ +# return self.method()(self.target(), *args, **kwargs) + +# def __hash__(self): +# return hash(self.target) ^ hash(self.method) + +# def __eq__(self, other): +# if not isinstance(other, WeakMethod): +# return False +# return self._target_id == other._target_id and self._method_id == other._method_id + + +# def try_method_on_system_exit(obj, method, *args, **kwargs): +# def wrapper(_obj, _meth, *args, **kwargs): +# try: +# getattr(_obj, _meth)(*args, **kwargs) +# except (ReferenceError, AttributeError): +# pass +# atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs) From 95dd5111275b8305db6b2af143dde477537c5f2e Mon Sep 17 00:00:00 2001 From: Max Baryshnikov Date: Tue, 14 Feb 2017 11:33:18 +0300 Subject: [PATCH 05/10] Missed import --- kafka/util.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/util.py b/kafka/util.py index 42dd83820..1c20de9ad 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -6,6 +6,7 @@ import struct import sys from threading import Thread, Event +from weakref import ref import weakref from kafka.vendor import six From 9e4913f6698aac4c809863b29336149770a7036d Mon Sep 17 00:00:00 2001 From: Max Baryshnikov Date: Tue, 14 Feb 2017 11:39:34 +0300 Subject: [PATCH 06/10] One more shot on WeakMethod --- kafka/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/util.py b/kafka/util.py index 1c20de9ad..a6405acb1 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -189,12 +189,12 @@ def _cb(arg): self_wr = ref(self) return self - def __call__(self): + def __call__(self, *args, **kwargs): obj = super(WeakMethod, self).__call__() func = self._func_ref() if obj is None or func is None: return None - return self._meth_type(func, obj) + return self._meth_type(func, obj)(*args, **kwargs) def __eq__(self, other): if isinstance(other, WeakMethod): From ded09981565a594f93a4774eaf0bde6cbb55499f Mon Sep 17 00:00:00 2001 From: Max Baryshnikov Date: Tue, 14 Feb 2017 11:44:43 +0300 Subject: [PATCH 07/10] Reverted WeakMethod --- kafka/protocol/message.py | 1 + kafka/util.py | 124 +++++++++++--------------------------- 2 files changed, 35 insertions(+), 90 deletions(-) diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index ec5ee6c1b..eb24c6a5a 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -52,6 +52,7 @@ def __init__(self, value, key=None, magic=0, attributes=0, crc=0, self.attributes = attributes self.key = key self.value = value + self.encode = None self.encode = WeakMethod(self._encode_self) @property diff --git a/kafka/util.py b/kafka/util.py index a6405acb1..bc011540a 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -6,7 +6,6 @@ import struct import sys from threading import Thread, Event -from weakref import ref import weakref from kafka.vendor import six @@ -158,101 +157,46 @@ def __del__(self): self.stop() -class WeakMethod(ref): - """ - A custom `weakref.ref` subclass which simulates a weak reference to - a bound method, working around the lifetime problem of bound methods. +class WeakMethod(object): """ + Callable that weakly references a method and the object it is bound to. It + is based on http://stackoverflow.com/a/24287465. - __slots__ = "_func_ref", "_meth_type", "_alive", "__weakref__" + Arguments: - def __new__(cls, meth, callback=None): + object_dot_method: A bound instance method (i.e. 'object.method'). + """ + def __init__(self, object_dot_method): + try: + self.target = weakref.ref(object_dot_method.__self__) + except AttributeError: + self.target = weakref.ref(object_dot_method.im_self) + self._target_id = id(self.target()) try: - obj = meth.__self__ - func = meth.__func__ + self.method = weakref.ref(object_dot_method.__func__) except AttributeError: - raise TypeError("argument should be a bound method") - - def _cb(arg): - # The self-weakref trick is needed to avoid creating a reference - # cycle. - self = self_wr() - if self._alive: - self._alive = False - if callback is not None: - callback(self) - - self = ref.__new__(cls, obj, _cb) - self._func_ref = ref(func, _cb) - self._meth_type = type(meth) - self._alive = True - self_wr = ref(self) - return self + self.method = weakref.ref(object_dot_method.im_func) + self._method_id = id(self.method()) def __call__(self, *args, **kwargs): - obj = super(WeakMethod, self).__call__() - func = self._func_ref() - if obj is None or func is None: - return None - return self._meth_type(func, obj)(*args, **kwargs) + """ + Calls the method on target with args and kwargs. + """ + return self.method()(self.target(), *args, **kwargs) + + def __hash__(self): + return hash(self.target) ^ hash(self.method) def __eq__(self, other): - if isinstance(other, WeakMethod): - if not self._alive or not other._alive: - return self is other - return ref.__eq__(self, other) and self._func_ref == other._func_ref - return False - - def __ne__(self, other): - if isinstance(other, WeakMethod): - if not self._alive or not other._alive: - return self is not other - return ref.__ne__(self, other) or self._func_ref != other._func_ref - return True - - __hash__ = ref.__hash__ - - -# class WeakMethod(object): -# """ -# Callable that weakly references a method and the object it is bound to. It -# is based on http://stackoverflow.com/a/24287465. - -# Arguments: - -# object_dot_method: A bound instance method (i.e. 'object.method'). -# """ -# def __init__(self, object_dot_method): -# try: -# self.target = weakref.ref(object_dot_method.__self__) -# except AttributeError: -# self.target = weakref.ref(object_dot_method.im_self) -# self._target_id = id(self.target()) -# try: -# self.method = weakref.ref(object_dot_method.__func__) -# except AttributeError: -# self.method = weakref.ref(object_dot_method.im_func) -# self._method_id = id(self.method()) - -# def __call__(self, *args, **kwargs): -# """ -# Calls the method on target with args and kwargs. -# """ -# return self.method()(self.target(), *args, **kwargs) - -# def __hash__(self): -# return hash(self.target) ^ hash(self.method) - -# def __eq__(self, other): -# if not isinstance(other, WeakMethod): -# return False -# return self._target_id == other._target_id and self._method_id == other._method_id - - -# def try_method_on_system_exit(obj, method, *args, **kwargs): -# def wrapper(_obj, _meth, *args, **kwargs): -# try: -# getattr(_obj, _meth)(*args, **kwargs) -# except (ReferenceError, AttributeError): -# pass -# atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs) + if not isinstance(other, WeakMethod): + return False + return self._target_id == other._target_id and self._method_id == other._method_id + + +def try_method_on_system_exit(obj, method, *args, **kwargs): + def wrapper(_obj, _meth, *args, **kwargs): + try: + getattr(_obj, _meth)(*args, **kwargs) + except (ReferenceError, AttributeError): + pass + atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs) From d9fce4adf72278351580fbc2477f5b392d7dc2c2 Mon Sep 17 00:00:00 2001 From: Max Baryshnikov Date: Tue, 14 Feb 2017 12:14:44 +0300 Subject: [PATCH 08/10] Chain call caused WeakMethod target disappear by invoke time --- kafka/protocol/legacy.py | 11 ++++++----- kafka/protocol/message.py | 1 - 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 6d9329d1a..11683c1ae 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -133,6 +133,11 @@ def encode_produce_request(cls, payloads=(), acks=1, timeout=1000): if acks not in (1, 0, -1): raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks) + msg = kafka.protocol.message.Message( + msg.value, key=msg.key, + magic=msg.magic, attributes=msg.attributes + ); + return kafka.protocol.produce.ProduceRequest[0]( required_acks=acks, timeout=timeout, @@ -140,11 +145,7 @@ def encode_produce_request(cls, payloads=(), acks=1, timeout=1000): topic, [( partition, - [(0, - kafka.protocol.message.Message( - msg.value, key=msg.key, - magic=msg.magic, attributes=msg.attributes - ).encode()) + [(0, msg.encode()) for msg in payload.messages]) for partition, payload in topic_payloads.items()]) for topic, topic_payloads in group_by_topic_and_partition(payloads).items()]) diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index eb24c6a5a..ec5ee6c1b 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -52,7 +52,6 @@ def __init__(self, value, key=None, magic=0, attributes=0, crc=0, self.attributes = attributes self.key = key self.value = value - self.encode = None self.encode = WeakMethod(self._encode_self) @property From f1df0cd84739369f539f441158a231a44d538f3c Mon Sep 17 00:00:00 2001 From: Max Baryshnikov Date: Tue, 14 Feb 2017 12:24:40 +0300 Subject: [PATCH 09/10] Unwinded comprehensions and generators in order to be prevent interpreter to cleanup WekMethod target preliminary --- kafka/protocol/legacy.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 11683c1ae..80cc910fa 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -133,22 +133,26 @@ def encode_produce_request(cls, payloads=(), acks=1, timeout=1000): if acks not in (1, 0, -1): raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks) - msg = kafka.protocol.message.Message( + topics = [] + for topic, topic_payloads in group_by_topic_and_partition(payloads).items(): + topic_msgs = [] + for partition, payload in topic_payloads.items(): + partition_msgs = [] + for msg in payload.messages: + m = kafka.protocol.message.Message( msg.value, key=msg.key, magic=msg.magic, attributes=msg.attributes - ); + ) + partition_msgs.append(0, m.encode()) + topic_msgs.append(partition, partition_msgs) + topics.append(topic, topic_msgs) + return kafka.protocol.produce.ProduceRequest[0]( required_acks=acks, timeout=timeout, - topics=[( - topic, - [( - partition, - [(0, msg.encode()) - for msg in payload.messages]) - for partition, payload in topic_payloads.items()]) - for topic, topic_payloads in group_by_topic_and_partition(payloads).items()]) + topics=topics + ) @classmethod def decode_produce_response(cls, response): From 067e632737847ae06e3fe58d7c5ece9c67873714 Mon Sep 17 00:00:00 2001 From: Max Baryshnikov Date: Tue, 14 Feb 2017 12:29:30 +0300 Subject: [PATCH 10/10] Fixed mistype --- kafka/protocol/legacy.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 80cc910fa..89b266d84 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -143,9 +143,9 @@ def encode_produce_request(cls, payloads=(), acks=1, timeout=1000): msg.value, key=msg.key, magic=msg.magic, attributes=msg.attributes ) - partition_msgs.append(0, m.encode()) - topic_msgs.append(partition, partition_msgs) - topics.append(topic, topic_msgs) + partition_msgs.append((0, m.encode())) + topic_msgs.append((partition, partition_msgs)) + topics.append((topic, topic_msgs)) return kafka.protocol.produce.ProduceRequest[0](