Skip to content

Add an option to avoid releasing PubSub connection #275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,16 @@ def lock(self, name, timeout=None, sleep=0.1):
"""
return Lock(self, name, timeout=timeout, sleep=sleep)

def pubsub(self, shard_hint=None):
def pubsub(self, shard_hint=None, release_connection=True):
"""
Return a Publish/Subscribe object. With this object, you can
subscribe to channels and listen for messages that get published to
them.

``shard_hint`` and ``release_connection`` are passed to the
:class:`PubSub` constructor.
"""
return PubSub(self.connection_pool, shard_hint)
return PubSub(self.connection_pool, shard_hint, release_connection)

#### COMMAND EXECUTION AND PROTOCOL PARSING ####
def execute_command(self, *args, **options):
Expand Down Expand Up @@ -1364,10 +1367,18 @@ class PubSub(object):
After subscribing to one or more channels, the listen() method will block
until a message arrives on one of the subscribed channels. That message
will be returned and it's safe to start listening again.

The default is to recycle the connection when parsing a response (usually
by calling ``listen()``) indicating that this connection is not subscribed
to any channel anymore. This can be changed by passing ``False`` to
``release_connection``, which will hold on the same connection regardless
of its subscriptions.
"""
def __init__(self, connection_pool, shard_hint=None):
def __init__(self, connection_pool, shard_hint=None,
release_connection=True):
self.connection_pool = connection_pool
self.shard_hint = shard_hint
self.release_connection = release_connection
self.connection = None
self.channels = set()
self.patterns = set()
Expand Down Expand Up @@ -1431,7 +1442,7 @@ def parse_response(self):
self.subscription_count = response[2]
# if we've just unsubscribed from the remaining channels,
# release the connection back to the pool
if not self.subscription_count:
if self.release_connection and not self.subscription_count:
self.reset()
return response

Expand Down
24 changes: 24 additions & 0 deletions tests/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,30 @@ def test_pattern_subscribe(self):
}
)

def test_hold_connection(self):
# SUBSCRIBE => UNSUBSCRIBE => SUBSCRIBE
pubsub = self.client.pubsub(release_connection=False)
pubsub.subscribe('foo')
connection = pubsub.connection
pubsub.unsubscribe('foo')
pubsub.subscribe('bar')

# Get a message
self.client.publish('bar', 'baz')
for i in range(4):
message = next(pubsub.listen())
self.assertEquals(message,
{
'type': 'message',
'pattern': None,
'channel': 'bar',
'data': b('baz'),
}
)

# Verify the connection hasn't been recycled
self.assertEqual(id(connection), id(pubsub.connection))


class PubSubRedisDownTestCase(unittest.TestCase):
def setUp(self):
Expand Down