From e6e68d675dded0069428ca5097edbfb2053bd396 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 31 Oct 2024 16:52:03 -0500 Subject: [PATCH 01/11] Avoid starting connection ceil_timeout when a connection is already available --- aiohttp/client.py | 11 +-- aiohttp/connector.py | 157 +++++++++++++++++++++++++------------------ 2 files changed, 96 insertions(+), 72 deletions(-) diff --git a/aiohttp/client.py b/aiohttp/client.py index 32bca1db509..d73a2bd6161 100644 --- a/aiohttp/client.py +++ b/aiohttp/client.py @@ -94,7 +94,6 @@ _SENTINEL, BasicAuth, TimeoutHandle, - ceil_timeout, get_env_proxy_for_url, method_must_be_empty_body, sentinel, @@ -634,13 +633,9 @@ async def _request( # connection timeout try: - async with ceil_timeout( - real_timeout.connect, - ceil_threshold=real_timeout.ceil_threshold, - ): - conn = await self._connector.connect( - req, traces=traces, timeout=real_timeout - ) + conn = await self._connector.connect( + req, traces=traces, timeout=real_timeout + ) except asyncio.TimeoutError as exc: raise ConnectionTimeoutError( f"Connection timeout to host {url}" diff --git a/aiohttp/connector.py b/aiohttp/connector.py index b537da10b28..995e7145f31 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -493,83 +493,112 @@ async def connect( """Get from pool or create new connection.""" key = req.connection_key available = self._available_connections(key) - - # Wait if there are no available connections or if there are/were - # waiters (i.e. don't steal connection from a waiter about to wake up) - if available <= 0 or key in self._waiters: - fut: asyncio.Future[None] = self._loop.create_future() - - # This connection will now count towards the limit. - self._waiters[key].append(fut) - + wait_for_conn = available <= 0 or key in self._waiters + if not wait_for_conn and (proto := self._get(key)): + # If we do not have to wait and we can get a connection from the pool + # we can avoid the timeout ceil logic and directly return the connection if traces: - for trace in traces: - await trace.send_connection_queued_start() + await self._send_connect_reuseconn(key, traces) + return self._acquired_connection(proto, key) - try: - await fut - except BaseException as e: - if key in self._waiters: - # remove a waiter even if it was cancelled, normally it's - # removed when it's notified - try: - self._waiters[key].remove(fut) - except ValueError: # fut may no longer be in list - pass - - raise e - finally: - if key in self._waiters and not self._waiters[key]: - del self._waiters[key] + async with ceil_timeout( + timeout.connect, + ceil_threshold=timeout.ceil_threshold, + ): + # Wait if there are no available connections or if there are/were + # waiters (i.e. don't steal connection from a waiter about to wake up) + if wait_for_conn: + await self._wait_for_available_connection(key, traces) + proto = self._get(key) - if traces: - for trace in traces: - await trace.send_connection_queued_end() + if proto is None: + placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) + self._acquired.add(placeholder) + self._acquired_per_host[key].add(placeholder) - proto = self._get(key) - if proto is None: - placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) - self._acquired.add(placeholder) - self._acquired_per_host[key].add(placeholder) + if traces: + for trace in traces: + await trace.send_connection_create_start() - if traces: - for trace in traces: - await trace.send_connection_create_start() + try: + proto = await self._create_connection(req, traces, timeout) + if self._closed: + proto.close() + raise ClientConnectionError("Connector is closed.") + except BaseException: + if not self._closed: + self._acquired.remove(placeholder) + self._drop_acquired_per_host(key, placeholder) + self._release_waiter() + raise + else: + if not self._closed: + self._acquired.remove(placeholder) + self._drop_acquired_per_host(key, placeholder) - try: - proto = await self._create_connection(req, traces, timeout) - if self._closed: - proto.close() - raise ClientConnectionError("Connector is closed.") - except BaseException: - if not self._closed: - self._acquired.remove(placeholder) - self._drop_acquired_per_host(key, placeholder) - self._release_waiter() - raise + if traces: + for trace in traces: + await trace.send_connection_create_end() else: - if not self._closed: - self._acquired.remove(placeholder) - self._drop_acquired_per_host(key, placeholder) + if traces: + await self._send_connect_reuseconn(key, traces) - if traces: - for trace in traces: - await trace.send_connection_create_end() - else: - if traces: - # Acquire the connection to prevent race conditions with limits - placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) - self._acquired.add(placeholder) - self._acquired_per_host[key].add(placeholder) - for trace in traces: - await trace.send_connection_reuseconn() - self._acquired.remove(placeholder) - self._drop_acquired_per_host(key, placeholder) + return self._acquired_connection(proto, key) + def _acquired_connection( + self, proto: ResponseHandler, key: "ConnectionKey" + ) -> Connection: + """Mark proto as acquired and wrap it in a Connection object.""" self._acquired.add(proto) self._acquired_per_host[key].add(proto) return Connection(self, key, proto, self._loop) + async def _wait_for_available_connection( + self, key: "ConnectionKey", traces: List["Trace"] + ) -> None: + """Wait until there is an available connection.""" + fut: asyncio.Future[None] = self._loop.create_future() + + # This connection will now count towards the limit. + self._waiters[key].append(fut) + + if traces: + for trace in traces: + await trace.send_connection_queued_start() + + try: + await fut + except BaseException as e: + if key in self._waiters: + # remove a waiter even if it was cancelled, normally it's + # removed when it's notified + try: + self._waiters[key].remove(fut) + except ValueError: # fut may no longer be in list + pass + + raise e + finally: + if key in self._waiters and not self._waiters[key]: + del self._waiters[key] + + if traces: + for trace in traces: + await trace.send_connection_queued_end() + + async def _send_connect_reuseconn( + self, key: ConnectionKey, traces: List["Trace"] + ) -> None: + """Send tracing events for reusing a connection.""" + # Acquire the connection to prevent race conditions with limits + placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) + self._acquired.add(placeholder) + self._acquired_per_host[key].add(placeholder) + for trace in traces: + await trace.send_connection_reuseconn() + self._acquired.remove(placeholder) + self._drop_acquired_per_host(key, placeholder) + def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]: try: conns = self._conns[key] From 7509ffbc5275a8eb9c87491e4c93733acc907b8e Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 31 Oct 2024 16:56:56 -0500 Subject: [PATCH 02/11] fix missing quotes --- aiohttp/connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/connector.py b/aiohttp/connector.py index 995e7145f31..36388460566 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -587,7 +587,7 @@ async def _wait_for_available_connection( await trace.send_connection_queued_end() async def _send_connect_reuseconn( - self, key: ConnectionKey, traces: List["Trace"] + self, key: "ConnectionKey", traces: List["Trace"] ) -> None: """Send tracing events for reusing a connection.""" # Acquire the connection to prevent race conditions with limits From f6f0e000d01eeba726dfea09293c1f17c6dbcd3b Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 31 Oct 2024 17:00:04 -0500 Subject: [PATCH 03/11] changelog --- CHANGES/9600.breaking.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 CHANGES/9600.breaking.rst diff --git a/CHANGES/9600.breaking.rst b/CHANGES/9600.breaking.rst new file mode 100644 index 00000000000..d8284318847 --- /dev/null +++ b/CHANGES/9600.breaking.rst @@ -0,0 +1,3 @@ +Improved performance of the connector when a connection can be reused -- by :user:`bdraco`. + +If ``BaseConnector.connect`` has sub-classed and replaced with custom logic, the ``ceil_timeout`` must be added. From 9a09b78e1e3514ad8dd13cda096ef7cbf03141d0 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 31 Oct 2024 17:07:59 -0500 Subject: [PATCH 04/11] Update CHANGES/9600.breaking.rst --- CHANGES/9600.breaking.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES/9600.breaking.rst b/CHANGES/9600.breaking.rst index d8284318847..5997344e4cd 100644 --- a/CHANGES/9600.breaking.rst +++ b/CHANGES/9600.breaking.rst @@ -1,3 +1,3 @@ Improved performance of the connector when a connection can be reused -- by :user:`bdraco`. -If ``BaseConnector.connect`` has sub-classed and replaced with custom logic, the ``ceil_timeout`` must be added. +If ``BaseConnector.connect`` has been subclassed and replaced with custom logic, the ``ceil_timeout`` must be added. From 39d210c6c2ffd775c319885edc28df7ea89a7287 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 31 Oct 2024 17:30:37 -0500 Subject: [PATCH 05/11] fix check --- aiohttp/connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/connector.py b/aiohttp/connector.py index 36388460566..192666bcb41 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -494,7 +494,7 @@ async def connect( key = req.connection_key available = self._available_connections(key) wait_for_conn = available <= 0 or key in self._waiters - if not wait_for_conn and (proto := self._get(key)): + if not wait_for_conn and (proto := self._get(key)) is not None: # If we do not have to wait and we can get a connection from the pool # we can avoid the timeout ceil logic and directly return the connection if traces: From c8d0c35cec551d3a7bfcae49b9de501c0b2e52ba Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 31 Oct 2024 21:35:18 -0500 Subject: [PATCH 06/11] preen --- aiohttp/connector.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/aiohttp/connector.py b/aiohttp/connector.py index 192666bcb41..c25d6fc8cc7 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -572,10 +572,9 @@ async def _wait_for_available_connection( if key in self._waiters: # remove a waiter even if it was cancelled, normally it's # removed when it's notified - try: + with suppress(ValueError): + # fut may no longer be in list self._waiters[key].remove(fut) - except ValueError: # fut may no longer be in list - pass raise e finally: From 245bdb1517af79e2d25166140fb3ac09db8d9613 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 31 Oct 2024 21:36:16 -0500 Subject: [PATCH 07/11] preen --- aiohttp/connector.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/aiohttp/connector.py b/aiohttp/connector.py index c25d6fc8cc7..7acfb421935 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -539,9 +539,8 @@ async def connect( if traces: for trace in traces: await trace.send_connection_create_end() - else: - if traces: - await self._send_connect_reuseconn(key, traces) + elif traces: + await self._send_connect_reuseconn(key, traces) return self._acquired_connection(proto, key) From 98438b43dc0b7599f6cd9465d0a007dd003013d5 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 31 Oct 2024 21:43:07 -0500 Subject: [PATCH 08/11] preen --- aiohttp/connector.py | 65 +++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/aiohttp/connector.py b/aiohttp/connector.py index 7acfb421935..3e07ec4bb5d 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -497,9 +497,7 @@ async def connect( if not wait_for_conn and (proto := self._get(key)) is not None: # If we do not have to wait and we can get a connection from the pool # we can avoid the timeout ceil logic and directly return the connection - if traces: - await self._send_connect_reuseconn(key, traces) - return self._acquired_connection(proto, key) + return await self._reused_connection(key, proto, traces) async with ceil_timeout( timeout.connect, @@ -509,39 +507,44 @@ async def connect( # waiters (i.e. don't steal connection from a waiter about to wake up) if wait_for_conn: await self._wait_for_available_connection(key, traces) - proto = self._get(key) + if (proto := self._get(key)) is not None: + return await self._reused_connection(key, proto, traces) - if proto is None: - placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) - self._acquired.add(placeholder) - self._acquired_per_host[key].add(placeholder) + placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) + self._acquired.add(placeholder) + self._acquired_per_host[key].add(placeholder) - if traces: - for trace in traces: - await trace.send_connection_create_start() + if traces: + for trace in traces: + await trace.send_connection_create_start() - try: - proto = await self._create_connection(req, traces, timeout) - if self._closed: - proto.close() - raise ClientConnectionError("Connector is closed.") - except BaseException: - if not self._closed: - self._acquired.remove(placeholder) - self._drop_acquired_per_host(key, placeholder) - self._release_waiter() - raise - else: - if not self._closed: - self._acquired.remove(placeholder) - self._drop_acquired_per_host(key, placeholder) + try: + proto = await self._create_connection(req, traces, timeout) + if self._closed: + proto.close() + raise ClientConnectionError("Connector is closed.") + except BaseException: + if not self._closed: + self._acquired.remove(placeholder) + self._drop_acquired_per_host(key, placeholder) + self._release_waiter() + raise + else: + if not self._closed: + self._acquired.remove(placeholder) + self._drop_acquired_per_host(key, placeholder) + + if traces: + for trace in traces: + await trace.send_connection_create_end() - if traces: - for trace in traces: - await trace.send_connection_create_end() - elif traces: - await self._send_connect_reuseconn(key, traces) + return self._acquired_connection(proto, key) + async def _reused_connection( + self, key: "ConnectionKey", proto: ResponseHandler, traces: List["Trace"] + ) -> Connection: + if traces: + await self._send_connect_reuseconn(key, traces) return self._acquired_connection(proto, key) def _acquired_connection( From 7ed527686574329a88f5d4e7cfe54f4cb1e33339 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 31 Oct 2024 21:43:39 -0500 Subject: [PATCH 09/11] preen --- aiohttp/connector.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/aiohttp/connector.py b/aiohttp/connector.py index 3e07ec4bb5d..02c3f718b08 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -499,10 +499,7 @@ async def connect( # we can avoid the timeout ceil logic and directly return the connection return await self._reused_connection(key, proto, traces) - async with ceil_timeout( - timeout.connect, - ceil_threshold=timeout.ceil_threshold, - ): + async with ceil_timeout(timeout.connect, timeout.ceil_threshold): # Wait if there are no available connections or if there are/were # waiters (i.e. don't steal connection from a waiter about to wake up) if wait_for_conn: From 6c183e339e9c0e49b30cd30414d4fb1ea4a57945 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 31 Oct 2024 21:44:26 -0500 Subject: [PATCH 10/11] preen --- aiohttp/connector.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/aiohttp/connector.py b/aiohttp/connector.py index 02c3f718b08..617ccd33cc9 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -541,7 +541,14 @@ async def _reused_connection( self, key: "ConnectionKey", proto: ResponseHandler, traces: List["Trace"] ) -> Connection: if traces: - await self._send_connect_reuseconn(key, traces) + # Acquire the connection to prevent race conditions with limits + placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) + self._acquired.add(placeholder) + self._acquired_per_host[key].add(placeholder) + for trace in traces: + await trace.send_connection_reuseconn() + self._acquired.remove(placeholder) + self._drop_acquired_per_host(key, placeholder) return self._acquired_connection(proto, key) def _acquired_connection( @@ -584,19 +591,6 @@ async def _wait_for_available_connection( for trace in traces: await trace.send_connection_queued_end() - async def _send_connect_reuseconn( - self, key: "ConnectionKey", traces: List["Trace"] - ) -> None: - """Send tracing events for reusing a connection.""" - # Acquire the connection to prevent race conditions with limits - placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) - self._acquired.add(placeholder) - self._acquired_per_host[key].add(placeholder) - for trace in traces: - await trace.send_connection_reuseconn() - self._acquired.remove(placeholder) - self._drop_acquired_per_host(key, placeholder) - def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]: try: conns = self._conns[key] From c7ed98cbeb5e9f4448b2cabfc04d5532549b3ab3 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 31 Oct 2024 23:05:04 -0500 Subject: [PATCH 11/11] subclassed --- docs/spelling_wordlist.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 396c85cb0e5..3b80584e02b 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -295,6 +295,7 @@ ssl SSLContext startup subapplication +subclassed subclasses subdirectory submodules