From 99b9759670cc23baee1a0b33210471e96986e786 Mon Sep 17 00:00:00 2001 From: Luper Rouch Date: Thu, 30 Aug 2012 18:03:23 +0200 Subject: [PATCH 1/5] pubsub: connection is no longer released when PubSub.parse_response() sees 0 subscriptions --- redis/client.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/redis/client.py b/redis/client.py index 7739cbfd03..4fa0400290 100644 --- a/redis/client.py +++ b/redis/client.py @@ -1429,10 +1429,6 @@ def parse_response(self): response = self.connection.read_response() if nativestr(response[0]) in self.subscribe_commands: self.subscription_count = response[2] - # if we've just unsubscribed from the remaining channels, - # release the connection back to the pool - if not self.subscription_count: - self.reset() return response def psubscribe(self, patterns): From b063b1864f94ec0726bb801a4019e5f3e5a1f3d5 Mon Sep 17 00:00:00 2001 From: Luper Rouch Date: Thu, 30 Aug 2012 18:12:39 +0200 Subject: [PATCH 2/5] Revert "pubsub: connection is no longer released when PubSub.parse_response() sees 0 subscriptions" This reverts commit 99b9759670cc23baee1a0b33210471e96986e786. --- redis/client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/redis/client.py b/redis/client.py index 4fa0400290..7739cbfd03 100644 --- a/redis/client.py +++ b/redis/client.py @@ -1429,6 +1429,10 @@ def parse_response(self): response = self.connection.read_response() if nativestr(response[0]) in self.subscribe_commands: self.subscription_count = response[2] + # if we've just unsubscribed from the remaining channels, + # release the connection back to the pool + if not self.subscription_count: + self.reset() return response def psubscribe(self, patterns): From 2ef7730d897eb82fc4aae624ea398e2924ef2d78 Mon Sep 17 00:00:00 2001 From: Luper Rouch Date: Thu, 30 Aug 2012 18:33:31 +0200 Subject: [PATCH 3/5] added an option to avoid recycling a PubSub connection when it's not subscribed to any channel anymore --- redis/client.py | 19 +++++++++++++++---- tests/pubsub.py | 24 ++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/redis/client.py b/redis/client.py index 7739cbfd03..ecb0fb39a8 100644 --- a/redis/client.py +++ b/redis/client.py @@ -327,13 +327,16 @@ def lock(self, name, timeout=None, sleep=0.1): """ return Lock(self, name, timeout=timeout, sleep=sleep) - def pubsub(self, shard_hint=None): + def pubsub(self, shard_hint=None, release_connection=True): """ Return a Publish/Subscribe object. With this object, you can subscribe to channels and listen for messages that get published to them. + + ``shard_hint`` and ``release_connection`` are passed to the + :class:`PubSub` constructor. """ - return PubSub(self.connection_pool, shard_hint) + return PubSub(self.connection_pool, shard_hint, release_connection) #### COMMAND EXECUTION AND PROTOCOL PARSING #### def execute_command(self, *args, **options): @@ -1364,10 +1367,18 @@ class PubSub(object): After subscribing to one or more channels, the listen() method will block until a message arrives on one of the subscribed channels. That message will be returned and it's safe to start listening again. + + The default is to recycle the connection when parsing a response (usually + by calling ``listen()``) indicating that this connection is not subscribed + to any channel anymore. This can be changed by passing ``False`` to + ``release_connection``, which will hold on the same connection regardless + of its subscriptions. """ - def __init__(self, connection_pool, shard_hint=None): + def __init__(self, connection_pool, shard_hint=None, + release_connection=True): self.connection_pool = connection_pool self.shard_hint = shard_hint + self.release_connection = release_connection self.connection = None self.channels = set() self.patterns = set() @@ -1431,7 +1442,7 @@ def parse_response(self): self.subscription_count = response[2] # if we've just unsubscribed from the remaining channels, # release the connection back to the pool - if not self.subscription_count: + if self.release_connection and not self.subscription_count: self.reset() return response diff --git a/tests/pubsub.py b/tests/pubsub.py index 26b8c0df97..94cf943aa3 100644 --- a/tests/pubsub.py +++ b/tests/pubsub.py @@ -104,6 +104,30 @@ def test_pattern_subscribe(self): } ) + def test_hold_connection(self): + # SUBSCRIBE => UNSUBSCRIBE => SUBSCRIBE + pubsub = self.client.pubsub(release_connection=False) + pubsub.subscribe('foo') + connection = pubsub.connection + pubsub.unsubscribe('foo') + pubsub.subscribe('bar') + + # Get a message + self.client.publish('bar', 'baz') + for i in range(4): + message = next(pubsub.listen()) + self.assertEquals(message, + { + 'type': 'message', + 'pattern': None, + 'channel': 'bar', + 'data': b('baz'), + } + ) + + # Verify the connection hasn't been recycled + self.assertEqual(id(connection), id(pubsub.connection)) + class PubSubRedisDownTestCase(unittest.TestCase): def setUp(self): From 65fd683593fb7642dc1c6c99b98a55ee29cdf212 Mon Sep 17 00:00:00 2001 From: Luper Rouch Date: Wed, 12 Sep 2012 17:09:59 +0200 Subject: [PATCH 4/5] added PEXPIRE/PTTL support (redis 2.6.0) --- redis/client.py | 14 ++++++++++++++ tests/server_commands.py | 15 +++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/redis/client.py b/redis/client.py index ecb0fb39a8..45be7dafcc 100644 --- a/redis/client.py +++ b/redis/client.py @@ -515,6 +515,16 @@ def expire(self, name, time): time = int(time.total_seconds()) return self.execute_command('EXPIRE', name, time) + def pexpire(self, name, time): + """ + Set an expire flag on key ``name`` for ``time`` milliseconds. + ``time`` can be represented by an integer or a Python timedelta + object. + """ + if isinstance(time, datetime.timedelta): + time = int(time.total_seconds()) * 1000 + return self.execute_command('PEXPIRE', name, time) + def expireat(self, name, when): """ Set an expire flag on key ``name``. ``when`` can be represented @@ -663,6 +673,10 @@ def ttl(self, name): "Returns the number of seconds until the key ``name`` will expire" return self.execute_command('TTL', name) + def pttl(self, name): + "Returns the number of milliseconds until the key ``name`` will expire" + return self.execute_command('PTTL', name) + def type(self, name): "Returns the type of key ``name``" return self.execute_command('TYPE', name) diff --git a/tests/server_commands.py b/tests/server_commands.py index df92bfb935..b7dcf53f27 100644 --- a/tests/server_commands.py +++ b/tests/server_commands.py @@ -168,6 +168,21 @@ def test_expire(self): self.assertEquals(self.client.persist('a'), True) self.assertEquals(self.client.ttl('a'), None) + def test_pexpire(self): + version = self.client.info()['redis_version'] + if StrictVersion(version) < StrictVersion('2.6.0'): + try: + raise unittest.SkipTest() + except AttributeError: + return + + self.assertEquals(self.client.pexpire('a', 10000), False) + self.client['a'] = 'foo' + self.assertEquals(self.client.expire('a', 10000), True) + self.assertEquals(self.client.pttl('a'), 10000) + self.assertEquals(self.client.persist('a'), True) + self.assertEquals(self.client.pttl('a'), None) + def test_expireat(self): expire_at = datetime.datetime.now() + datetime.timedelta(minutes=1) self.assertEquals(self.client.expireat('a', expire_at), False) From 910f4daa817f2c8bf7414a8823b91b4b4d82180f Mon Sep 17 00:00:00 2001 From: Luper Rouch Date: Wed, 12 Sep 2012 17:11:28 +0200 Subject: [PATCH 5/5] Revert "added PEXPIRE/PTTL support (redis 2.6.0)" This reverts commit 65fd683593fb7642dc1c6c99b98a55ee29cdf212. --- redis/client.py | 14 -------------- tests/server_commands.py | 15 --------------- 2 files changed, 29 deletions(-) diff --git a/redis/client.py b/redis/client.py index 45be7dafcc..ecb0fb39a8 100644 --- a/redis/client.py +++ b/redis/client.py @@ -515,16 +515,6 @@ def expire(self, name, time): time = int(time.total_seconds()) return self.execute_command('EXPIRE', name, time) - def pexpire(self, name, time): - """ - Set an expire flag on key ``name`` for ``time`` milliseconds. - ``time`` can be represented by an integer or a Python timedelta - object. - """ - if isinstance(time, datetime.timedelta): - time = int(time.total_seconds()) * 1000 - return self.execute_command('PEXPIRE', name, time) - def expireat(self, name, when): """ Set an expire flag on key ``name``. ``when`` can be represented @@ -673,10 +663,6 @@ def ttl(self, name): "Returns the number of seconds until the key ``name`` will expire" return self.execute_command('TTL', name) - def pttl(self, name): - "Returns the number of milliseconds until the key ``name`` will expire" - return self.execute_command('PTTL', name) - def type(self, name): "Returns the type of key ``name``" return self.execute_command('TYPE', name) diff --git a/tests/server_commands.py b/tests/server_commands.py index b7dcf53f27..df92bfb935 100644 --- a/tests/server_commands.py +++ b/tests/server_commands.py @@ -168,21 +168,6 @@ def test_expire(self): self.assertEquals(self.client.persist('a'), True) self.assertEquals(self.client.ttl('a'), None) - def test_pexpire(self): - version = self.client.info()['redis_version'] - if StrictVersion(version) < StrictVersion('2.6.0'): - try: - raise unittest.SkipTest() - except AttributeError: - return - - self.assertEquals(self.client.pexpire('a', 10000), False) - self.client['a'] = 'foo' - self.assertEquals(self.client.expire('a', 10000), True) - self.assertEquals(self.client.pttl('a'), 10000) - self.assertEquals(self.client.persist('a'), True) - self.assertEquals(self.client.pttl('a'), None) - def test_expireat(self): expire_at = datetime.datetime.now() + datetime.timedelta(minutes=1) self.assertEquals(self.client.expireat('a', expire_at), False)