From 28c75bd3e6d9925c71acb8878c1d3786abfa0ba2 Mon Sep 17 00:00:00 2001 From: xiajun Date: Fri, 17 Jul 2015 11:59:28 +0800 Subject: [PATCH 1/2] STORM-946: We should remove Closed Client form cached-node+port->socket in worker --- storm-core/src/clj/backtype/storm/daemon/worker.clj | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 7157cf77ebf..0ebd4b6c2db 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -288,7 +288,18 @@ (filter-key (complement (-> worker :task-ids set)))) needed-connections (-> needed-assignment vals set) needed-tasks (-> needed-assignment keys) - + + closed-connections (into [] (for [[node+port socket] @(:cached-node+port->socket worker)] + (if (.isClosed socket) node+port))) + ;; If the node+port->scoket have closed Client, we must remove the closed Client first, else: + ;; 1. This will result in Client object leak; + ;; 2. If this worker will reconnect to this node+port, as the client is closed, we will never send message success; + - (log-warn "connections to " closed-connections "is closed, we will remove them from cached-node+port->socket") + - (apply swap! + (:cached-node+port->socket worker) + #(HashMap. (apply dissoc (into {} %1) %&)) + closed-connections) + current-connections (set (keys @(:cached-node+port->socket worker))) new-connections (set/difference needed-connections current-connections) remove-connections (set/difference current-connections needed-connections)] From 125ba05149293607cd368e7149c935a2943bb473 Mon Sep 17 00:00:00 2001 From: xiajun Date: Fri, 17 Jul 2015 17:16:43 +0800 Subject: [PATCH 2/2] Use status instead of isClosed --- storm-core/src/clj/backtype/storm/daemon/worker.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 0ebd4b6c2db..129786069f0 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -290,7 +290,7 @@ needed-tasks (-> needed-assignment keys) closed-connections (into [] (for [[node+port socket] @(:cached-node+port->socket worker)] - (if (.isClosed socket) node+port))) + (if (= ConnectionWithStatus$Status/Closed (.status socket)) node+port))) ;; If the node+port->scoket have closed Client, we must remove the closed Client first, else: ;; 1. This will result in Client object leak; ;; 2. If this worker will reconnect to this node+port, as the client is closed, we will never send message success;