Skip to content

Commit 2ef7730

Browse files
committed
added an option to avoid recycling a PubSub connection when it's not subscribed to any channel anymore
1 parent b063b18 commit 2ef7730

File tree

2 files changed

+39
-4
lines changed

2 files changed

+39
-4
lines changed

redis/client.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,16 @@ def lock(self, name, timeout=None, sleep=0.1):
327327
"""
328328
return Lock(self, name, timeout=timeout, sleep=sleep)
329329

330-
def pubsub(self, shard_hint=None):
330+
def pubsub(self, shard_hint=None, release_connection=True):
331331
"""
332332
Return a Publish/Subscribe object. With this object, you can
333333
subscribe to channels and listen for messages that get published to
334334
them.
335+
336+
``shard_hint`` and ``release_connection`` are passed to the
337+
:class:`PubSub` constructor.
335338
"""
336-
return PubSub(self.connection_pool, shard_hint)
339+
return PubSub(self.connection_pool, shard_hint, release_connection)
337340

338341
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
339342
def execute_command(self, *args, **options):
@@ -1364,10 +1367,18 @@ class PubSub(object):
13641367
After subscribing to one or more channels, the listen() method will block
13651368
until a message arrives on one of the subscribed channels. That message
13661369
will be returned and it's safe to start listening again.
1370+
1371+
The default is to recycle the connection when parsing a response (usually
1372+
by calling ``listen()``) indicating that this connection is not subscribed
1373+
to any channel anymore. This can be changed by passing ``False`` to
1374+
``release_connection``, which will hold on the same connection regardless
1375+
of its subscriptions.
13671376
"""
1368-
def __init__(self, connection_pool, shard_hint=None):
1377+
def __init__(self, connection_pool, shard_hint=None,
1378+
release_connection=True):
13691379
self.connection_pool = connection_pool
13701380
self.shard_hint = shard_hint
1381+
self.release_connection = release_connection
13711382
self.connection = None
13721383
self.channels = set()
13731384
self.patterns = set()
@@ -1431,7 +1442,7 @@ def parse_response(self):
14311442
self.subscription_count = response[2]
14321443
# if we've just unsubscribed from the remaining channels,
14331444
# release the connection back to the pool
1434-
if not self.subscription_count:
1445+
if self.release_connection and not self.subscription_count:
14351446
self.reset()
14361447
return response
14371448

tests/pubsub.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,30 @@ def test_pattern_subscribe(self):
104104
}
105105
)
106106

107+
def test_hold_connection(self):
108+
# SUBSCRIBE => UNSUBSCRIBE => SUBSCRIBE
109+
pubsub = self.client.pubsub(release_connection=False)
110+
pubsub.subscribe('foo')
111+
connection = pubsub.connection
112+
pubsub.unsubscribe('foo')
113+
pubsub.subscribe('bar')
114+
115+
# Get a message
116+
self.client.publish('bar', 'baz')
117+
for i in range(4):
118+
message = next(pubsub.listen())
119+
self.assertEquals(message,
120+
{
121+
'type': 'message',
122+
'pattern': None,
123+
'channel': 'bar',
124+
'data': b('baz'),
125+
}
126+
)
127+
128+
# Verify the connection hasn't been recycled
129+
self.assertEqual(id(connection), id(pubsub.connection))
130+
107131

108132
class PubSubRedisDownTestCase(unittest.TestCase):
109133
def setUp(self):

0 commit comments

Comments
 (0)