Skip to content

Commit

Permalink
Fix socket garbage collection (#2859)
Browse files Browse the repository at this point in the history
  • Loading branch information
kristjanvalur committed Jul 31, 2023
1 parent 2c2860d commit 8e5d5ce
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Fix #2831, add auto_close_connection_pool=True arg to asyncio.Redis.from_url()
* Fix incorrect redis.asyncio.Cluster type hint for `retry_on_error`
* Fix dead weakref in sentinel connection causing ReferenceError (#2767)
* Fix #2768, Fix KeyError: 'first-entry' in parse_xinfo_stream.
Expand Down
13 changes: 10 additions & 3 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ class Redis(
response_callbacks: MutableMapping[Union[str, bytes], ResponseCallbackT]

@classmethod
def from_url(cls, url: str, **kwargs):
def from_url(
cls,
url: str,
single_connection_client: bool = False,
auto_close_connection_pool: bool = True,
**kwargs,
):
"""
Return a Redis client object configured from the given URL
Expand Down Expand Up @@ -144,12 +150,13 @@ class initializer. In the case of conflicting arguments, querystring
arguments always win.
"""
single_connection_client = kwargs.pop("single_connection_client", False)
connection_pool = ConnectionPool.from_url(url, **kwargs)
return cls(
redis = cls(
connection_pool=connection_pool,
single_connection_client=single_connection_client,
)
redis.auto_close_connection_pool = auto_close_connection_pool
return redis

def __init__(
self,
Expand Down
38 changes: 37 additions & 1 deletion tests/test_asyncio/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
_AsyncRESPBase,
)
from redis.asyncio import Redis
from redis.asyncio.connection import Connection, UnixDomainSocketConnection
from redis.asyncio.connection import Connection, UnixDomainSocketConnection, parse_url
from redis.asyncio.retry import Retry
from redis.backoff import NoBackoff
from redis.exceptions import ConnectionError, InvalidResponse, TimeoutError
Expand Down Expand Up @@ -278,3 +278,39 @@ async def open_connection(*args, **kwargs):
def test_create_single_connection_client_from_url():
client = Redis.from_url("redis://localhost:6379/0?", single_connection_client=True)
assert client.single_connection_client is True


@pytest.mark.parametrize("from_url", (True, False))
async def test_pool_auto_close(request, from_url):
"""Verify that basic Redis instances have auto_close_connection_pool set to True"""

url: str = request.config.getoption("--redis-url")
url_args = parse_url(url)

async def get_redis_connection():
if from_url:
return Redis.from_url(url)
return Redis(**url_args)

r1 = await get_redis_connection()
assert r1.auto_close_connection_pool is True
await r1.close()


@pytest.mark.parametrize("from_url", (True, False))
async def test_pool_auto_close_disable(request, from_url):
"""Verify that auto_close_connection_pool can be disabled"""

url: str = request.config.getoption("--redis-url")
url_args = parse_url(url)

async def get_redis_connection():
if from_url:
return Redis.from_url(url, auto_close_connection_pool=False)
url_args["auto_close_connection_pool"] = False
return Redis(**url_args)

r1 = await get_redis_connection()
assert r1.auto_close_connection_pool is False
await r1.connection_pool.disconnect()
await r1.close()
35 changes: 21 additions & 14 deletions tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def test_tcp_ssl_connect(tcp_address):

def _assert_connect(conn, server_address, certfile=None, keyfile=None):
if isinstance(server_address, str):
if not _RedisUDSServer:
pytest.skip("Unix domain sockets are not supported on this platform")
server = _RedisUDSServer(server_address, _RedisRequestHandler)
else:
server = _RedisTCPServer(
Expand Down Expand Up @@ -113,24 +115,29 @@ def get_request(self):
return connstream, fromaddr


class _RedisUDSServer(socketserver.UnixStreamServer):
def __init__(self, *args, **kw) -> None:
self._ready_event = threading.Event()
self._stop_requested = False
super().__init__(*args, **kw)
if hasattr(socket, "UnixStreamServer"):

def service_actions(self):
self._ready_event.set()
class _RedisUDSServer(socketserver.UnixStreamServer):
def __init__(self, *args, **kw) -> None:
self._ready_event = threading.Event()
self._stop_requested = False
super().__init__(*args, **kw)

def wait_online(self):
self._ready_event.wait()
def service_actions(self):
self._ready_event.set()

def stop(self):
self._stop_requested = True
self.shutdown()
def wait_online(self):
self._ready_event.wait()

def is_serving(self):
return not self._stop_requested
def stop(self):
self._stop_requested = True
self.shutdown()

def is_serving(self):
return not self._stop_requested

else:
_RedisUDSServer = None


class _RedisRequestHandler(socketserver.StreamRequestHandler):
Expand Down

0 comments on commit 8e5d5ce

Please sign in to comment.