From 4e1e74809235edc19e03edb79c97c80a3e4e9eca Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Thu, 31 Jan 2019 19:30:41 -0800 Subject: [PATCH] Improve how connection pools operate in forked/child proceeses. Sometimes a process with an active connection to Redis forks and creates child processes taht also want to talk to Redis. Prior to this change there were a number of potential conflicts that could cause this to fail. Retrieving a connection from the pool and releasing a connection back to the pool check the current proceeses PID. If it's different than the PID that created the pool, reset() is called to get a fresh set of connections for the current process. However in doing so, pool.disconnect() was caused which closes the file descriptors that the parent may still be using. Further when the available_connections and in_use_connections lists are reset, all of those connections inherited from the parent are GC'd and the connection's `__del__` was called, which also closed the socket and file descriptor. This change prevents pool.disconnect() from being called when a pid is changed. It also removes the `__del__` destructor from connections. Neither of these are necessary or practical. Child processes still reset() their copy of the pool when first accessed causing their own connections to be created. `ConnectionPool.disconnect()` now checks the current process ID so that a child or parent can't disconnect the other's connections. Additionally, `Connection.disconnect()` now checks the current process ID and only calls `socket.shutdown()` if `disconnect()` is called by the same process that created the connection. This allows for a child process that inherited a connection to call `Connection.disconnect()` and not shutdown the parent's copy of the socket. Fixes #863 Fixes #784 Fixes #732 Fixes #1085 Fixes #504 --- redis/connection.py | 12 +++---- tests/test_multiprocessing.py | 66 ++++++++++++++++++++++++----------- 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/redis/connection.py b/redis/connection.py index c81e4c1d3b..ee0b92ae72 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -471,12 +471,6 @@ def __init__(self, host='localhost', port=6379, db=0, password=None, def __repr__(self): return self.description_format % self._description_args - def __del__(self): - try: - self.disconnect() - except Exception: - pass - def register_connect_callback(self, callback): self._connect_callbacks.append(callback) @@ -580,7 +574,8 @@ def disconnect(self): if self._sock is None: return try: - self._sock.shutdown(socket.SHUT_RDWR) + if os.getpid() == self.pid: + self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() except socket.error: pass @@ -973,7 +968,6 @@ def _checkpid(self): # another thread already did the work while we waited # on the lock. return - self.disconnect() self.reset() def get_connection(self, command_name, *keys, **options): @@ -1012,6 +1006,7 @@ def release(self, connection): def disconnect(self): "Disconnects all connections in the pool" + self._checkpid() all_conns = chain(self._available_connections, self._in_use_connections) for connection in all_conns: @@ -1133,5 +1128,6 @@ def release(self, connection): def disconnect(self): "Disconnects all connections in the pool." + self._checkpid() for connection in self._connections: connection.disconnect() diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py index dae35bcc5b..bb31a067e7 100644 --- a/tests/test_multiprocessing.py +++ b/tests/test_multiprocessing.py @@ -18,13 +18,17 @@ class TestMultiprocessing(object): # Test connection sharing between forks. # See issue #1085 for details. - def test_connection(self): + def test_close_connection_in_child(self): + """ + A connection owned by a parent and closed by a child doesn't + destroy the file descriptors so a parent can still use it. + """ conn = Connection() - assert conn.send_command('ping') is None + conn.send_command('ping') assert conn.read_response() == b'PONG' def target(conn): - assert conn.send_command('ping') is None + conn.send_command('ping') assert conn.read_response() == b'PONG' conn.disconnect() @@ -33,20 +37,29 @@ def target(conn): proc.join(3) assert proc.exitcode is 0 - # Check that connection is still alive after fork process has exited. - with pytest.raises(ConnectionError): - assert conn.send_command('ping') is None - assert conn.read_response() == b'PONG' + # The connection was created in the parent but disconnected in the + # child. The child called socket.close() but did not call + # socket.shutdown() because it wasn't the "owning" process. + # Therefore the connection still works in the parent. + conn.send_command('ping') + assert conn.read_response() == b'PONG' - def test_close_connection_in_main(self): + def test_close_connection_in_parent(self): + """ + A connection owned by a parent is unusable by a child if the parent + (the owning process) closes the connection. + """ conn = Connection() - assert conn.send_command('ping') is None + conn.send_command('ping') assert conn.read_response() == b'PONG' def target(conn, ev): ev.wait() - assert conn.send_command('ping') is None - assert conn.read_response() == b'PONG' + # the parent closed the connection. because it also created the + # connection, the connection is shutdown and the child + # cannot use it. + with pytest.raises(ConnectionError): + conn.send_command('ping') ev = multiprocessing.Event() proc = multiprocessing.Process(target=target, args=(conn, ev)) @@ -56,21 +69,27 @@ def target(conn, ev): ev.set() proc.join(3) - assert proc.exitcode is 1 + assert proc.exitcode is 0 @pytest.mark.parametrize('max_connections', [1, 2, None]) def test_pool(self, max_connections): + """ + A child will create its own connections when using a pool created + by a parent. + """ pool = ConnectionPool.from_url('redis://localhost', max_connections=max_connections) conn = pool.get_connection('ping') + main_conn_pid = conn.pid with exit_callback(pool.release, conn): - assert conn.send_command('ping') is None + conn.send_command('ping') assert conn.read_response() == b'PONG' def target(pool): with exit_callback(pool.disconnect): conn = pool.get_connection('ping') + assert conn.pid != main_conn_pid with exit_callback(pool.release, conn): assert conn.send_command('ping') is None assert conn.read_response() == b'PONG' @@ -80,15 +99,19 @@ def target(pool): proc.join(3) assert proc.exitcode is 0 - # Check that connection is still alive after fork process has exited. + # Check that connection is still alive after fork process has exited + # and disconnected the connections in its pool conn = pool.get_connection('ping') with exit_callback(pool.release, conn): - with pytest.raises(ConnectionError): - assert conn.send_command('ping') is None - assert conn.read_response() == b'PONG' + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' @pytest.mark.parametrize('max_connections', [1, 2, None]) def test_close_pool_in_main(self, max_connections): + """ + A child process that uses the same pool as its parent isn't affected + when the parent disconnects all connections within the pool. + """ pool = ConnectionPool.from_url('redis://localhost', max_connections=max_connections) @@ -115,12 +138,13 @@ def target(pool, disconnect_event): proc.join(3) assert proc.exitcode is 0 - def test_redis(self, r): + def test_redis_client(self, r): + "A redis client created in a parent can also be used in a child" assert r.ping() is True - def target(redis): - assert redis.ping() is True - del redis + def target(client): + assert client.ping() is True + del client proc = multiprocessing.Process(target=target, args=(r,)) proc.start()