Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion storm-core/src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,18 @@
(filter-key (complement (-> worker :task-ids set))))
needed-connections (-> needed-assignment vals set)
needed-tasks (-> needed-assignment keys)


closed-connections (into [] (for [[node+port socket] @(:cached-node+port->socket worker)]
(if (= ConnectionWithStatus$Status/Closed (.status socket)) node+port)))
;; If the node+port->scoket have closed Client, we must remove the closed Client first, else:
;; 1. This will result in Client object leak;
;; 2. If this worker will reconnect to this node+port, as the client is closed, we will never send message success;
- (log-warn "connections to " closed-connections "is closed, we will remove them from cached-node+port->socket")
- (apply swap!
(:cached-node+port->socket worker)
#(HashMap. (apply dissoc (into {} %1) %&))
closed-connections)

current-connections (set (keys @(:cached-node+port->socket worker)))
new-connections (set/difference needed-connections current-connections)
remove-connections (set/difference current-connections needed-connections)]
Expand Down