From 92dfcc530b083b073820f24368b469a662d5157d Mon Sep 17 00:00:00 2001 From: pfreixes Date: Sun, 19 Jun 2016 20:40:44 +0200 Subject: [PATCH 1/4] Disconnect clients from pool safely This PR tries to solve the issues raised by #732 regarding the danger of disconnect clients from the `ConnectionPool.disconnect` method executed by a Thread different that those ones that are in charge of the connections. Instead of call the `Connection.disconnect` method it uses the syscall `shutdown` to leave the socket unusable. Once the connection tries to use the socket, even when it is already blocked such us the `PubSub` pattern, it gets a `socket.error` exception that will be cactched by the `Connection` class to then raise an `ConnectionError` and disconnect the socket in a clean and safe way. The `Client.execute_command` function catches the `ConnectionError` exception and tries to connect again and run the command that raised the error. Worth mentioning that in the case of the `Sentinel` environment, if some changes regarding the Redis pool of servers - perhaps the mater went down and a slave was promoted - the next command will be executed using an other server. --- redis/connection.py | 18 ++++++++++++++++-- tests/test_connection_pool.py | 26 ++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/redis/connection.py b/redis/connection.py index 004c7a6f78..6b6a8faed1 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -534,6 +534,20 @@ def disconnect(self): pass self._sock = None + def shutdown_socket(self): + """ + Shutdown the socket hold by the current connection, called from + the connection pool class u other manager to singal it that has to be + disconnected in a thread safe way. Later the connection instance + will get an error and will call `disconnect` by it self. + """ + try: + self._sock.shutdown(socket.SHUT_RDWR) + except AttributeError: + # either _sock attribute does not exist or + # connection thread removed it. + pass + def send_packed_command(self, command): "Send an already packed command to the Redis server" if not self._sock: @@ -953,7 +967,7 @@ def disconnect(self): all_conns = chain(self._available_connections, self._in_use_connections) for connection in all_conns: - connection.disconnect() + connection.shutdown_socket() class BlockingConnectionPool(ConnectionPool): @@ -1072,4 +1086,4 @@ def release(self, connection): def disconnect(self): "Disconnects all connections in the pool." for connection in self._connections: - connection.disconnect() + connection.shutdown_socket() diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index 11c20080a9..2a764a0df4 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -1,4 +1,6 @@ from __future__ import with_statement +from mock import Mock + import os import pytest import redis @@ -69,6 +71,30 @@ def test_repr_contains_db_info_unix(self): expected = 'ConnectionPool>' assert repr(pool) == expected + def test_disconnect_active_connections(self): + + class MyConnection(redis.Connection): + + connect_calls = 0 + + def __init__(self, *args, **kwargs): + super(MyConnection, self).__init__(*args, **kwargs) + self.register_connect_callback(self.count_connect) + + def count_connect(self, connection): + MyConnection.connect_calls += 1 + + pool = self.get_pool(connection_class=MyConnection) + r = redis.StrictRedis(connection_pool=pool) + r.ping() + pool.disconnect() + r.ping() + + # If the connection is not disconnected by the pool the + # callback belonging to Connection will be called just + # one time. + assert MyConnection.connect_calls == 2 + class TestBlockingConnectionPool(object): def get_pool(self, connection_kwargs=None, max_connections=10, timeout=20): From f07ed8d01b6508117137f41e75c2b7f1e04129a7 Mon Sep 17 00:00:00 2001 From: pfreixes Date: Sun, 19 Jun 2016 20:55:08 +0200 Subject: [PATCH 2/4] Removed unused mock module --- tests/test_connection_pool.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index 2a764a0df4..6b01fdd7e7 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -1,5 +1,4 @@ from __future__ import with_statement -from mock import Mock import os import pytest From 7eca549375f2d8cfa1133e2b3f00b3bcb59c352f Mon Sep 17 00:00:00 2001 From: pfreixes Date: Mon, 20 Jun 2016 18:18:35 +0200 Subject: [PATCH 3/4] Make a shallow copy to avoid threading issues --- redis/connection.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/redis/connection.py b/redis/connection.py index 6b6a8faed1..8551004ec2 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -2,6 +2,8 @@ from distutils.version import StrictVersion from itertools import chain from select import select +from copy import copy + import os import socket import sys @@ -547,6 +549,12 @@ def shutdown_socket(self): # either _sock attribute does not exist or # connection thread removed it. pass + except OSError as e: + if e.errno == 107: + # Transport endpoint is not connected + pass + else: + raise def send_packed_command(self, command): "Send an already packed command to the Redis server" @@ -964,8 +972,8 @@ def release(self, connection): def disconnect(self): "Disconnects all connections in the pool" - all_conns = chain(self._available_connections, - self._in_use_connections) + all_conns = chain(copy(self._available_connections), + copy(self._in_use_connections.copy())) for connection in all_conns: connection.shutdown_socket() From a7d66d5378cf795a051c95d4419d966b88f39643 Mon Sep 17 00:00:00 2001 From: pfreixes Date: Mon, 20 Jun 2016 18:21:34 +0200 Subject: [PATCH 4/4] Removed invalid copy, already done explicitly --- redis/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/connection.py b/redis/connection.py index 8551004ec2..908382e3cc 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -973,7 +973,7 @@ def release(self, connection): def disconnect(self): "Disconnects all connections in the pool" all_conns = chain(copy(self._available_connections), - copy(self._in_use_connections.copy())) + copy(self._in_use_connections)) for connection in all_conns: connection.shutdown_socket()