Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 4923350

Browse files
committedApr 5, 2024·
Support cancellation during client connection establishment
With #714 we added support for cancelling in-flight HTTP requests by putting the response deferred into an error state. However, this only worked once the underlying TCP connection was established. With this patch, it is now possible to cancel requests even while the connection is still being established (possible since Netty 4.1.108.Final via netty/netty#13849). This also works for `aleph.tcp/client`.
1 parent 659f245 commit 4923350

File tree

7 files changed

+282
-162
lines changed

7 files changed

+282
-162
lines changed
 

‎src/aleph/http.clj

+20-14
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,23 @@
100100
will be errors, and a new connection must be created."
101101
[^URI uri options middleware on-closed]
102102
(let [scheme (.getScheme uri)
103-
ssl? (= "https" scheme)]
104-
(-> (client/http-connection
105-
(InetSocketAddress/createUnresolved
106-
(.getHost uri)
107-
(int
108-
(or
109-
(when (pos? (.getPort uri)) (.getPort uri))
110-
(if ssl? 443 80))))
111-
ssl?
112-
(if on-closed
113-
(assoc options :on-closed on-closed)
114-
options))
115-
116-
(d/chain' middleware))))
103+
ssl? (= "https" scheme)
104+
conn (client/http-connection
105+
(InetSocketAddress/createUnresolved
106+
(.getHost uri)
107+
(int
108+
(or
109+
(when (pos? (.getPort uri)) (.getPort uri))
110+
(if ssl? 443 80))))
111+
ssl?
112+
(if on-closed
113+
(assoc options :on-closed on-closed)
114+
options))]
115+
(doto (d/chain' conn middleware)
116+
(d/catch' (fn [e]
117+
(log/trace e "Terminating creation of HTTP connection")
118+
(d/error! conn e)
119+
(d/error-deferred e))))))
117120

118121
(def ^:private connection-stats-callbacks (atom #{}))
119122

@@ -389,6 +392,9 @@
389392
;; function.
390393
(reset! dispose-conn! (fn [] (flow/dispose pool k conn)))
391394

395+
;; allow cancellation during connection establishment
396+
(d/connect result (first conn))
397+
392398
(if (realized? result)
393399
;; to account for race condition between setting `dispose-conn!`
394400
;; and putting `result` into error state for cancellation

‎src/aleph/http/client.clj

+120-106
Original file line numberDiff line numberDiff line change
@@ -821,112 +821,126 @@
821821
:local-address local-address
822822
:transport (netty/determine-transport transport epoll?)
823823
:name-resolver name-resolver
824-
:connect-timeout connect-timeout})]
825-
826-
(attach-on-close-handler ch-d on-closed)
827-
828-
(d/chain' ch-d
829-
(fn setup-client
830-
[^Channel ch]
831-
(log/debug "Channel:" ch)
832-
833-
;; We know the SSL handshake must be complete because create-client wraps the
834-
;; future with maybe-ssl-handshake-future, so we can get the negotiated
835-
;; protocol, falling back to HTTP/1.1 by default.
836-
(let [pipeline (.pipeline ch)
837-
protocol (cond
838-
ssl?
839-
(or (-> pipeline
840-
^SslHandler (.get ^Class SslHandler)
841-
(.applicationProtocol))
842-
ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed
843-
844-
force-h2c?
845-
(do
846-
(log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.")
847-
ApplicationProtocolNames/HTTP_2)
848-
849-
:else
850-
ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested
851-
setup-opts (assoc opts
852-
:authority authority
853-
:ch ch
854-
:server? false
855-
:keep-alive? keep-alive?
856-
:keep-alive?' keep-alive?'
857-
:logger logger
858-
:non-tun-proxy? non-tun-proxy?
859-
:pipeline pipeline
860-
:pipeline-transform pipeline-transform
861-
:raw-stream? raw-stream?
862-
:remote-address remote-address
863-
:response-buffer-size response-buffer-size
864-
:ssl-context ssl-context
865-
:ssl? ssl?)]
866-
867-
(log/debug (str "Using HTTP protocol: " protocol)
868-
{:authority authority
869-
:ssl? ssl?
870-
:force-h2c? force-h2c?})
871-
872-
;; can't use ApnHandler, because we need to coordinate with Manifold code
873-
(let [http-req-handler
874-
(cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol)
875-
(setup-http1-client setup-opts)
876-
877-
(.equals ApplicationProtocolNames/HTTP_2 protocol)
878-
(do
879-
(http2/setup-conn-pipeline setup-opts)
880-
(http2-req-handler setup-opts))
881-
882-
:else
883-
(do
884-
(let [msg (str "Unknown protocol: " protocol)
885-
e (IllegalStateException. msg)]
886-
(log/error e msg)
887-
(netty/close ch)
888-
(throw e))))]
889-
890-
;; Both Netty and Aleph are set up, unpause the pipeline
891-
(when (.get pipeline "pause-handler")
892-
(log/debug "Unpausing pipeline")
893-
(.remove pipeline "pause-handler"))
894-
895-
(fn http-req-fn
896-
[req]
897-
(log/trace "http-req-fn fired")
898-
(log/debug "client request:" (pr-str req))
899-
900-
;; If :aleph/close is set in the req, closes the channel and
901-
;; returns a deferred containing the result.
902-
(if (or (contains? req :aleph/close)
903-
(contains? req ::close))
904-
(-> ch (netty/close) (netty/wrap-future))
905-
906-
(let [t0 (System/nanoTime)
907-
;; I suspect the below is an error for http1
908-
;; since the shared handler might not match.
909-
;; Should work for HTTP2, though
910-
raw-stream? (get req :raw-stream? raw-stream?)]
911-
912-
(if (or (not (.isActive ch))
913-
(not (.isOpen ch)))
914-
915-
(d/error-deferred
916-
(ex-info "Channel is inactive/closed."
917-
{:req req
918-
:ch ch
919-
:open? (.isOpen ch)
920-
:active? (.isActive ch)}))
921-
922-
(-> (http-req-handler req)
923-
(d/chain' (rsp-handler
924-
{:ch ch
925-
:keep-alive? keep-alive? ; why not keep-alive?'
926-
:raw-stream? raw-stream?
927-
:req req
928-
:response-buffer-size response-buffer-size
929-
:t0 t0})))))))))))))
824+
:connect-timeout connect-timeout})
825+
826+
_ (attach-on-close-handler ch-d on-closed)
827+
828+
close-ch! (atom (fn []))
829+
result (d/deferred)
830+
831+
conn (d/chain' ch-d
832+
(fn setup-client
833+
[^Channel ch]
834+
(log/debug "Channel:" ch)
835+
(reset! close-ch! (fn [] @(-> (netty/close ch) (netty/wrap-future))))
836+
(if (realized? result)
837+
;; Account for race condition between setting `close-ch!` and putting
838+
;; `result` into error state for cancellation
839+
(@close-ch!)
840+
;; We know the SSL handshake must be complete because create-client wraps the
841+
;; future with maybe-ssl-handshake-future, so we can get the negotiated
842+
;; protocol, falling back to HTTP/1.1 by default.
843+
(let [pipeline (.pipeline ch)
844+
protocol (cond
845+
ssl?
846+
(or (-> pipeline
847+
^SslHandler (.get ^Class SslHandler)
848+
(.applicationProtocol))
849+
ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed
850+
851+
force-h2c?
852+
(do
853+
(log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.")
854+
ApplicationProtocolNames/HTTP_2)
855+
856+
:else
857+
ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested
858+
setup-opts (assoc opts
859+
:authority authority
860+
:ch ch
861+
:server? false
862+
:keep-alive? keep-alive?
863+
:keep-alive?' keep-alive?'
864+
:logger logger
865+
:non-tun-proxy? non-tun-proxy?
866+
:pipeline pipeline
867+
:pipeline-transform pipeline-transform
868+
:raw-stream? raw-stream?
869+
:remote-address remote-address
870+
:response-buffer-size response-buffer-size
871+
:ssl-context ssl-context
872+
:ssl? ssl?)]
873+
874+
(log/debug (str "Using HTTP protocol: " protocol)
875+
{:authority authority
876+
:ssl? ssl?
877+
:force-h2c? force-h2c?})
878+
879+
;; can't use ApnHandler, because we need to coordinate with Manifold code
880+
(let [http-req-handler
881+
(cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol)
882+
(setup-http1-client setup-opts)
883+
884+
(.equals ApplicationProtocolNames/HTTP_2 protocol)
885+
(do
886+
(http2/setup-conn-pipeline setup-opts)
887+
(http2-req-handler setup-opts))
888+
889+
:else
890+
(do
891+
(let [msg (str "Unknown protocol: " protocol)
892+
e (IllegalStateException. msg)]
893+
(log/error e msg)
894+
(netty/close ch)
895+
(throw e))))]
896+
897+
;; Both Netty and Aleph are set up, unpause the pipeline
898+
(when (.get pipeline "pause-handler")
899+
(log/debug "Unpausing pipeline")
900+
(.remove pipeline "pause-handler"))
901+
902+
(fn http-req-fn
903+
[req]
904+
(log/trace "http-req-fn fired")
905+
(log/debug "client request:" (pr-str req))
906+
907+
;; If :aleph/close is set in the req, closes the channel and
908+
;; returns a deferred containing the result.
909+
(if (or (contains? req :aleph/close)
910+
(contains? req ::close))
911+
(-> ch (netty/close) (netty/wrap-future))
912+
913+
(let [t0 (System/nanoTime)
914+
;; I suspect the below is an error for http1
915+
;; since the shared handler might not match.
916+
;; Should work for HTTP2, though
917+
raw-stream? (get req :raw-stream? raw-stream?)]
918+
919+
(if (or (not (.isActive ch))
920+
(not (.isOpen ch)))
921+
922+
(d/error-deferred
923+
(ex-info "Channel is inactive/closed."
924+
{:req req
925+
:ch ch
926+
:open? (.isOpen ch)
927+
:active? (.isActive ch)}))
928+
929+
(-> (http-req-handler req)
930+
(d/chain' (rsp-handler
931+
{:ch ch
932+
:keep-alive? keep-alive? ; why not keep-alive?'
933+
:raw-stream? raw-stream?
934+
:req req
935+
:response-buffer-size response-buffer-size
936+
:t0 t0}))))))))))))]
937+
(d/connect conn result)
938+
(d/catch' result (fn [e]
939+
(log/trace e "Closing HTTP connection channel")
940+
(d/error! ch-d e)
941+
(@close-ch!)
942+
(d/error-deferred e)))
943+
result))
930944

931945

932946

‎src/aleph/netty.clj

+44-29
Original file line numberDiff line numberDiff line change
@@ -1521,6 +1521,14 @@
15211521
(ssl-handler ch ssl-ctx))))
15221522
(pipeline-builder p))))
15231523

1524+
(defn- connect-client
1525+
^ChannelFuture [^Bootstrap bootstrap
1526+
^SocketAddress remote-address
1527+
^SocketAddress local-address]
1528+
(if local-address
1529+
(.connect bootstrap remote-address local-address)
1530+
(.connect bootstrap remote-address)))
1531+
15241532
(defn ^:no-doc create-client-chan
15251533
"Returns a deferred containing a new Channel.
15261534
@@ -1529,8 +1537,8 @@
15291537
complete."
15301538
[{:keys [pipeline-builder
15311539
bootstrap-transform
1532-
^SocketAddress remote-address
1533-
^SocketAddress local-address
1540+
remote-address
1541+
local-address
15341542
transport
15351543
name-resolver
15361544
connect-timeout]
@@ -1543,32 +1551,39 @@
15431551
(throw (IllegalArgumentException. "Can't use :ssl-context anymore.")))
15441552

15451553
(let [^Class chan-class (transport-channel-class transport)
1546-
initializer (pipeline-initializer pipeline-builder)]
1547-
(try
1548-
(let [client-event-loop-group @(transport-client-group transport)
1549-
resolver' (when (some? name-resolver)
1550-
(cond
1551-
(= :default name-resolver) nil
1552-
(= :noop name-resolver) NoopAddressResolverGroup/INSTANCE
1553-
(instance? AddressResolverGroup name-resolver) name-resolver))
1554-
bootstrap (doto (Bootstrap.)
1555-
(.option ChannelOption/SO_REUSEADDR true)
1556-
(.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout))
1557-
#_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5
1558-
(.group client-event-loop-group)
1559-
(.channel chan-class)
1560-
(.handler initializer)
1561-
(.resolver resolver')
1562-
bootstrap-transform)
1563-
1564-
fut (if local-address
1565-
(.connect bootstrap remote-address local-address)
1566-
(.connect bootstrap remote-address))]
1567-
1568-
(d/chain' (wrap-future fut)
1569-
(fn [_]
1570-
(let [ch (.channel ^ChannelFuture fut)]
1571-
(maybe-ssl-handshake-future ch))))))))
1554+
initializer (pipeline-initializer pipeline-builder)
1555+
client-event-loop-group @(transport-client-group transport)
1556+
resolver' (when (some? name-resolver)
1557+
(cond
1558+
(= :default name-resolver) nil
1559+
(= :noop name-resolver) NoopAddressResolverGroup/INSTANCE
1560+
(instance? AddressResolverGroup name-resolver) name-resolver))
1561+
bootstrap (doto (Bootstrap.)
1562+
(.option ChannelOption/SO_REUSEADDR true)
1563+
(.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout))
1564+
#_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5
1565+
(.group client-event-loop-group)
1566+
(.channel chan-class)
1567+
(.handler initializer)
1568+
(.resolver resolver')
1569+
bootstrap-transform)
1570+
1571+
fut (connect-client bootstrap remote-address local-address)]
1572+
(doto (-> (wrap-future fut)
1573+
(d/chain'
1574+
(fn [_]
1575+
(let [ch (.channel ^ChannelFuture fut)]
1576+
(maybe-ssl-handshake-future ch)))))
1577+
(d/catch' (fn [e]
1578+
(when-not (.isDone fut)
1579+
(log/trace e "Cancelling Bootstrap#connect future")
1580+
(when-not (.cancel fut true)
1581+
(when-not (.isDone fut)
1582+
(log/warn "Transport" transport "does not support cancellation of connection attempts."
1583+
"Instead, you have to wait for the connect timeout to expire for it to be terminated."
1584+
"Its current value is" connect-timeout "ms."
1585+
"It can be set via the `connect-timeout` option."))))
1586+
(d/error-deferred e))))))
15721587

15731588

15741589
(defn ^:no-doc ^:deprecated create-client
@@ -1732,7 +1747,7 @@
17321747
(fn [shutdown-output]
17331748
(when (= shutdown-output ::timeout)
17341749
(log/error
1735-
(format "Timeout while waiting for requests to close (exceeded: %ss)"
1750+
(format "Timeout while waiting for connections to close (exceeded: %ss)"
17361751
shutdown-timeout)))))
17371752
(d/finally'
17381753
;; 3. At this stage, stop the EventLoopGroup, this will cancel any

‎src/aleph/tcp.clj

+17-9
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@
155155
"Given a host and port, returns a deferred which yields a duplex stream that can be used
156156
to communicate with the server.
157157
158+
Closing the stream will also close the underlying connection.
159+
160+
Putting the returned deferred into an error state before it yielded the stream will cancel an
161+
in-flight connection attempt.
162+
158163
Param key | Description
159164
| --- | ---
160165
| `host` | the hostname of the server.
@@ -204,13 +209,16 @@
204209
(netty/ssl-handler (.channel pipeline) ssl-context remote-address ssl-endpoint-id-alg)))
205210
(.addLast pipeline "handler" handler)
206211
(when pipeline-transform
207-
(pipeline-transform pipeline)))]
208-
(-> (netty/create-client-chan
209-
{:pipeline-builder pipeline-builder
210-
:bootstrap-transform bootstrap-transform
211-
:remote-address remote-address
212-
:local-address local-address
213-
:transport (netty/determine-transport transport epoll?)
214-
:connect-timeout connect-timeout})
215-
(d/catch' #(d/error! s %)))
212+
(pipeline-transform pipeline)))
213+
ch-d (netty/create-client-chan
214+
{:pipeline-builder pipeline-builder
215+
:bootstrap-transform bootstrap-transform
216+
:remote-address remote-address
217+
:local-address local-address
218+
:transport (netty/determine-transport transport epoll?)
219+
:connect-timeout connect-timeout})]
220+
(d/catch' ch-d #(d/error! s %))
221+
(d/catch' s (fn [e]
222+
(d/error! ch-d e)
223+
(d/error-deferred e)))
216224
s))

‎test/aleph/http_test.clj

+29-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
[aleph.resource-leak-detector]
88
[aleph.ssl :as test-ssl]
99
[aleph.tcp :as tcp]
10-
[aleph.testutils :refer [str=]]
10+
[aleph.testutils :refer [passive-tcp-server str=]]
1111
[clj-commons.byte-streams :as bs]
1212
[clojure.java.io :as io]
1313
[clojure.string :as str]
@@ -1451,6 +1451,34 @@
14511451
(is (instance? IllegalArgumentException result))
14521452
(is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result))))))
14531453

1454+
(deftest test-request-cancellation-during-connection-acquisition
1455+
(let [starved-pool (http/connection-pool
1456+
{:total-connections 0})]
1457+
(try
1458+
(let [rsp (http-get "/" {:pool starved-pool
1459+
:pool-timeout 500})]
1460+
(http/cancel-request! rsp)
1461+
(is (thrown? RequestCancellationException (deref rsp 0 :timeout))))
1462+
(finally
1463+
(.shutdown ^Pool starved-pool)))))
1464+
1465+
(deftest test-request-cancellation-during-connection-establishment
1466+
(let [connect-client @#'aleph.netty/connect-client
1467+
connect-future (promise)]
1468+
(with-redefs [aleph.netty/connect-client (fn [& args]
1469+
(let [fut (apply connect-client args)]
1470+
(deliver connect-future fut)
1471+
fut))]
1472+
(with-server (passive-tcp-server port)
1473+
(let [rsp (http-get "/")]
1474+
(is (some? (deref connect-future 1000 nil)))
1475+
(http/cancel-request! rsp)
1476+
(is (thrown? RequestCancellationException (deref rsp 1000 :timeout)))
1477+
(some-> @connect-future (.await 2000 TimeUnit/MILLISECONDS))
1478+
(is (some-> @connect-future .isSuccess false?))
1479+
(is (some-> @connect-future .isDone))
1480+
(is (some-> @connect-future .isCancelled)))))))
1481+
14541482
(deftest test-in-flight-request-cancellation
14551483
(let [conn-established (promise)
14561484
conn-closed (promise)]

‎test/aleph/tcp_test.clj

+24-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
[aleph.netty :as netty]
44
[aleph.resource-leak-detector]
55
[aleph.tcp :as tcp]
6+
[aleph.testutils :refer [passive-tcp-server]]
67
[clj-commons.byte-streams :as bs]
7-
[clojure.test :refer [deftest testing is]]
8-
[manifold.stream :as s]))
8+
[clojure.test :refer [deftest is testing]]
9+
[manifold.deferred :as d]
10+
[manifold.stream :as s])
11+
(:import
12+
(java.util.concurrent TimeUnit)))
913

1014
(defn echo-handler [s _]
1115
(s/connect s s))
@@ -55,4 +59,22 @@
5559
(catch Exception _
5660
(is (not (netty/io-uring-available?)))))))
5761

62+
(deftest test-cancellation-during-connection-establishment
63+
(let [connect-client @#'aleph.netty/connect-client
64+
connect-future (promise)
65+
server (passive-tcp-server 0)]
66+
(with-redefs [aleph.netty/connect-client (fn [& args]
67+
(let [fut (apply connect-client args)]
68+
(deliver connect-future fut)
69+
fut))]
70+
(with-server server
71+
(let [c (tcp/client {:host "localhost"
72+
:port (netty/port server)})]
73+
(is (some? (deref connect-future 1000 nil)))
74+
(d/timeout! c 10)
75+
(some-> @connect-future (.await 2000 TimeUnit/MILLISECONDS))
76+
(is (some-> @connect-future .isSuccess false?))
77+
(is (some-> @connect-future .isDone))
78+
(is (some-> @connect-future .isCancelled)))))))
79+
5880
(aleph.resource-leak-detector/instrument-tests!)

‎test/aleph/testutils.clj

+28-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,35 @@
11
(ns aleph.testutils
2-
(:import (io.netty.util AsciiString)))
2+
(:require
3+
[aleph.netty :as netty])
4+
(:import
5+
(io.netty.util AsciiString)
6+
(java.io Closeable)
7+
(java.net ServerSocket Socket)))
38

49
(defn str=
510
"AsciiString-aware equals"
611
[^CharSequence x ^CharSequence y]
712
(AsciiString/contentEquals x y))
813

14+
(defn passive-tcp-server
15+
"Starts a TCP server which never accepts a connection."
16+
[port]
17+
(let [;; A backlog of 0 would be ideal for this purpose but: "The value provided should be greater
18+
;; than 0. If it is less than or equal to 0, then an implementation specific default will be
19+
;; used." Source:
20+
;; https://docs.oracle.com/en%2Fjava%2Fjavase%2F21%2Fdocs%2Fapi%2F%2F/java.base/java/net/ServerSocket.html#%3Cinit%3E(int,int)
21+
backlog 1
22+
server (ServerSocket. port backlog)
23+
port (.getLocalPort server)
24+
;; Fill up the backlog with pending connection attempts. For some reason, the backlog length
25+
;; is off by one, thus the `inc`.
26+
pending-connects (doall (repeatedly (inc backlog) #(Socket. "localhost" (int port))))]
27+
(reify
28+
netty/AlephServer
29+
(port [_] port)
30+
(wait-for-close [_]
31+
true)
32+
Closeable
33+
(close [_]
34+
(run! #(.close %) pending-connects)
35+
(.close server)))))

0 commit comments

Comments
 (0)
Please sign in to comment.