Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cancellation during client connection establishment #721

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
24 changes: 16 additions & 8 deletions src/aleph/http.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
[aleph.http.websocket.common :as ws.common]
[aleph.http.websocket.server :as ws.server]
[aleph.netty :as netty]
[aleph.util :as util]
[clojure.string :as str]
[clojure.tools.logging :as log]
[manifold.deferred :as d]
Expand Down Expand Up @@ -100,8 +101,8 @@
will be errors, and a new connection must be created."
[^URI uri options middleware on-closed]
(let [scheme (.getScheme uri)
ssl? (= "https" scheme)]
(-> (client/http-connection
ssl? (= "https" scheme)
conn (client/http-connection
(InetSocketAddress/createUnresolved
(.getHost uri)
(int
Expand All @@ -111,9 +112,11 @@
ssl?
(if on-closed
(assoc options :on-closed on-closed)
options))

(d/chain' middleware))))
options))]
(-> (d/chain' conn middleware)
(util/propagate-error conn
(fn [e]
(log/trace e "Terminated creation of HTTP connection"))))))

(def ^:private connection-stats-callbacks (atom #{}))

Expand Down Expand Up @@ -389,6 +392,12 @@
;; function.
(reset! dispose-conn! (fn [] (flow/dispose pool k conn)))

;; allow cancellation during connection establishment
(util/propagate-error result
(first conn)
(fn [e]
(log/trace e "Aborted connection acquisition")))

(if (realized? result)
;; to account for race condition between setting `dispose-conn!`
;; and putting `result` into error state for cancellation
Expand Down Expand Up @@ -456,11 +465,10 @@
(middleware/handle-redirects request req))))))))))))
req))]
(d/connect response result)
(d/catch' result
(util/on-error result
(fn [e]
(log/trace e "Request failed. Disposing of connection...")
(@dispose-conn!)
(d/error-deferred e)))
(@dispose-conn!)))
result)))

(defn cancel-request!
Expand Down
25 changes: 19 additions & 6 deletions src/aleph/http/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[aleph.http.multipart :as multipart]
[aleph.http.websocket.client :as ws.client]
[aleph.netty :as netty]
[aleph.util :as util]
[clj-commons.byte-streams :as bs]
[clojure.tools.logging :as log]
[manifold.deferred :as d]
Expand Down Expand Up @@ -814,22 +815,28 @@
:logger logger
:pipeline-transform pipeline-transform))

ch-d (netty/create-client-chan
ch-d (doto (netty/create-client-chan
{:pipeline-builder pipeline-builder
:bootstrap-transform bootstrap-transform
:remote-address remote-address
:local-address local-address
:transport (netty/determine-transport transport epoll?)
:name-resolver name-resolver
:connect-timeout connect-timeout})]
:connect-timeout connect-timeout})
(attach-on-close-handler on-closed))

(attach-on-close-handler ch-d on-closed)
close-ch! (atom (fn []))
result (d/deferred)

(d/chain' ch-d
conn (d/chain' ch-d
(fn setup-client
[^Channel ch]
(log/debug "Channel:" ch)

(reset! close-ch! (fn [] @(-> (netty/close ch) (netty/wrap-future))))
(if (realized? result)
;; Account for race condition between setting `close-ch!` and putting
;; `result` into error state for cancellation
(@close-ch!)
;; We know the SSL handshake must be complete because create-client wraps the
;; future with maybe-ssl-handshake-future, so we can get the negotiated
;; protocol, falling back to HTTP/1.1 by default.
Expand Down Expand Up @@ -926,7 +933,13 @@
:raw-stream? raw-stream?
:req req
:response-buffer-size response-buffer-size
:t0 t0})))))))))))))
:t0 t0}))))))))))))]
(d/connect conn result)
(util/propagate-error result
ch-d
(fn [e]
(log/trace e "Closing HTTP connection channel")
(@close-ch!)))))



Expand Down
39 changes: 27 additions & 12 deletions src/aleph/netty.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns aleph.netty
(:refer-clojure :exclude [flush])
(:require
[aleph.util :as util]
[clj-commons.byte-streams :as bs]
[clj-commons.primitive-math :as p]
[clojure.string :as str]
Expand Down Expand Up @@ -1521,6 +1522,14 @@
(ssl-handler ch ssl-ctx))))
(pipeline-builder p))))

(defn- connect-client
^ChannelFuture [^Bootstrap bootstrap
^SocketAddress remote-address
^SocketAddress local-address]
(if local-address
(.connect bootstrap remote-address local-address)
(.connect bootstrap remote-address)))

(defn ^:no-doc create-client-chan
"Returns a deferred containing a new Channel.

Expand All @@ -1529,8 +1538,8 @@
complete."
[{:keys [pipeline-builder
bootstrap-transform
^SocketAddress remote-address
^SocketAddress local-address
remote-address
local-address
transport
name-resolver
connect-timeout]
Expand All @@ -1543,9 +1552,8 @@
(throw (IllegalArgumentException. "Can't use :ssl-context anymore.")))

(let [^Class chan-class (transport-channel-class transport)
initializer (pipeline-initializer pipeline-builder)]
(try
(let [client-event-loop-group @(transport-client-group transport)
initializer (pipeline-initializer pipeline-builder)
client-event-loop-group @(transport-client-group transport)
resolver' (when (some? name-resolver)
(cond
(= :default name-resolver) nil
Expand All @@ -1561,14 +1569,21 @@
(.resolver resolver')
bootstrap-transform)

fut (if local-address
(.connect bootstrap remote-address local-address)
(.connect bootstrap remote-address))]

(d/chain' (wrap-future fut)
fut (connect-client bootstrap remote-address local-address)]
(-> (wrap-future fut)
(d/chain'
(fn [_]
(let [ch (.channel ^ChannelFuture fut)]
(maybe-ssl-handshake-future ch))))))))
(maybe-ssl-handshake-future ch))))
(util/on-error (fn [e]
(when-not (.isDone fut)
(log/trace e "Cancelling Bootstrap#connect future")
(when-not (.cancel fut true)
(when-not (.isDone fut)
(log/warn "Transport" transport "does not support cancellation of connection attempts."
"Instead, you have to wait for the connect timeout to expire for it to be terminated."
"Its current value is" connect-timeout "ms."
"It can be set via the `connect-timeout` option.")))))))))


(defn ^:no-doc ^:deprecated create-client
Expand Down Expand Up @@ -1732,7 +1747,7 @@
(fn [shutdown-output]
(when (= shutdown-output ::timeout)
(log/error
(format "Timeout while waiting for requests to close (exceeded: %ss)"
(format "Timeout while waiting for connections to close (exceeded: %ss)"
shutdown-timeout)))))
(d/finally'
;; 3. At this stage, stop the EventLoopGroup, this will cancel any
Expand Down
19 changes: 14 additions & 5 deletions src/aleph/tcp.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns aleph.tcp
(:require
[aleph.netty :as netty]
[aleph.util :as util]
[clojure.tools.logging :as log]
[manifold.deferred :as d]
[manifold.stream :as s]
Expand Down Expand Up @@ -155,6 +156,11 @@
"Given a host and port, returns a deferred which yields a duplex stream that can be used
to communicate with the server.

Closing the stream will also close the underlying connection.

Putting the returned deferred into an error state before it yielded the stream will cancel an
in-flight connection attempt.

Param key | Description
| --- | ---
| `host` | the hostname of the server.
Expand Down Expand Up @@ -204,13 +210,16 @@
(netty/ssl-handler (.channel pipeline) ssl-context remote-address ssl-endpoint-id-alg)))
(.addLast pipeline "handler" handler)
(when pipeline-transform
(pipeline-transform pipeline)))]
(-> (netty/create-client-chan
(pipeline-transform pipeline)))
ch-d (netty/create-client-chan
{:pipeline-builder pipeline-builder
:bootstrap-transform bootstrap-transform
:remote-address remote-address
:local-address local-address
:transport (netty/determine-transport transport epoll?)
:connect-timeout connect-timeout})
(d/catch' #(d/error! s %)))
s))
:connect-timeout connect-timeout})]
(util/propagate-error ch-d s)
(util/propagate-error s
ch-d
(fn [e]
(log/trace e "Closed TCP client channel")))))
20 changes: 20 additions & 0 deletions src/aleph/util.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
(ns aleph.util
(:require [manifold.deferred :as d]))

(defn on-error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be documented why you would use this over chain or on-realized.

[d f]
(d/on-realized d identity f))
Comment on lines +4 to +6
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK for now, but this should be in Manifold


(defn propagate-error
"Registers an error callback with source which will attempt to propagate the error to destination.

If the error was propagated (i.e. destination wasn't yet realized), on-propagate is invoked with
the error value.

Returns source."
([source destination]
(propagate-error source destination identity))
([source destination on-propagate]
(on-error source (fn [e]
(when (d/error! destination e)
(on-propagate e))))))
Comment on lines +8 to +20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source is stream terminology, not deferred terminology. We should call it something else to avoid confusion. I made that exact mistake on the first read-through.

46 changes: 39 additions & 7 deletions test/aleph/http_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[aleph.resource-leak-detector]
[aleph.ssl :as test-ssl]
[aleph.tcp :as tcp]
[aleph.testutils :refer [str=]]
[aleph.testutils :refer [passive-tcp-server str=]]
[clj-commons.byte-streams :as bs]
[clojure.java.io :as io]
[clojure.string :as str]
Expand Down Expand Up @@ -1451,18 +1451,50 @@
(is (instance? IllegalArgumentException result))
(is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result))))))

(deftest test-request-cancellation-during-connection-acquisition
(let [starved-pool (http/connection-pool
{:total-connections 0})]
(try
(let [rsp (http-get "/" {:pool starved-pool
:pool-timeout 500})]
(http/cancel-request! rsp)
(is (thrown? RequestCancellationException (deref rsp 0 :timeout))))
(finally
(.shutdown ^Pool starved-pool)))))

(deftest test-request-cancellation-during-connection-establishment
(let [connect-client @#'aleph.netty/connect-client
connect-future (promise)]
(with-redefs [aleph.netty/connect-client (fn [& args]
(let [fut (apply connect-client args)]
(deliver connect-future fut)
fut))]
(with-server (passive-tcp-server port)
(let [rsp (http-get "/")]
(is (some? (deref connect-future 1000 nil)))
(http/cancel-request! rsp)
(is (thrown? RequestCancellationException (deref rsp 1000 :timeout)))
(some-> @connect-future (.await 2000 TimeUnit/MILLISECONDS))
(is (some-> @connect-future .isSuccess false?))
(is (some-> @connect-future .isDone))
(is (some-> @connect-future .isCancelled)))))))

(deftest test-in-flight-request-cancellation
(let [conn-established (promise)
conn-closed (promise)]
(let [conn-established (atom nil)
conn-closed (atom nil)]
(with-raw-handler (fn [req]
(deliver conn-established true)
(deliver @conn-established true)
(s/on-closed (:body req)
(fn []
(deliver conn-closed true))))
(deliver @conn-closed true))))
;; NOTE: The atom indirection here is needed because `with-raw-handler` will run the body
;; twice (for HTTP1 and HTTP2), so we need a new promise for each run.
(reset! conn-established (promise))
(reset! conn-closed (promise))
(let [rsp (http-get "/")]
(is (= true (deref conn-established 1000 :timeout)))
(is (= true (deref @conn-established 1000 :timeout)))
(http/cancel-request! rsp)
(is (= true (deref conn-closed 1000 :timeout)))
(is (= true (deref @conn-closed 1000 :timeout)))
Comment on lines -1463 to +1497
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(deref @ ... looks weird. I suggest switching to unwrap/unwrap'.

(is (thrown? RequestCancellationException (deref rsp 1000 :timeout)))))))

(deftest ^:leak test-leak-in-raw-stream-handler
Expand Down
26 changes: 24 additions & 2 deletions test/aleph/tcp_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
[aleph.netty :as netty]
[aleph.resource-leak-detector]
[aleph.tcp :as tcp]
[aleph.testutils :refer [passive-tcp-server]]
[clj-commons.byte-streams :as bs]
[clojure.test :refer [deftest testing is]]
[manifold.stream :as s]))
[clojure.test :refer [deftest is testing]]
[manifold.deferred :as d]
[manifold.stream :as s])
(:import
(java.util.concurrent TimeUnit)))

(defn echo-handler [s _]
(s/connect s s))
Expand Down Expand Up @@ -55,4 +59,22 @@
(catch Exception _
(is (not (netty/io-uring-available?)))))))

(deftest test-cancellation-during-connection-establishment
(let [connect-client @#'aleph.netty/connect-client
connect-future (promise)
server (passive-tcp-server 0)]
(with-redefs [aleph.netty/connect-client (fn [& args]
(let [fut (apply connect-client args)]
(deliver connect-future fut)
fut))]
(with-server server
(let [c (tcp/client {:host "localhost"
:port (netty/port server)})]
(is (some? (deref connect-future 1000 nil)))
(d/timeout! c 10)
(some-> @connect-future (.await 2000 TimeUnit/MILLISECONDS))
(is (some-> @connect-future .isSuccess false?))
(is (some-> @connect-future .isDone))
(is (some-> @connect-future .isCancelled)))))))

(aleph.resource-leak-detector/instrument-tests!)
Loading