diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 7157cf77ebf..129786069f0 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -288,7 +288,18 @@ (filter-key (complement (-> worker :task-ids set)))) needed-connections (-> needed-assignment vals set) needed-tasks (-> needed-assignment keys) - + + closed-connections (into [] (for [[node+port socket] @(:cached-node+port->socket worker)] + (if (= ConnectionWithStatus$Status/Closed (.status socket)) node+port))) + ;; If the node+port->scoket have closed Client, we must remove the closed Client first, else: + ;; 1. This will result in Client object leak; + ;; 2. If this worker will reconnect to this node+port, as the client is closed, we will never send message success; + - (log-warn "connections to " closed-connections "is closed, we will remove them from cached-node+port->socket") + - (apply swap! + (:cached-node+port->socket worker) + #(HashMap. (apply dissoc (into {} %1) %&)) + closed-connections) + current-connections (set (keys @(:cached-node+port->socket worker))) new-connections (set/difference needed-connections current-connections) remove-connections (set/difference current-connections needed-connections)]