Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #484 from onyx-platform/defensive-joining
Browse files Browse the repository at this point in the history
Defensive Peer Joining Procedures, fixes #423, #453, #437
  • Loading branch information
MichaelDrogalis committed Jan 14, 2016

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents 0042715 + 732792e commit f7168a6
Showing 12 changed files with 161 additions and 148 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@
[org.btrplace/scheduler-api "0.42"]
[org.btrplace/scheduler-choco "0.42"]
[com.stuartsierra/dependency "0.2.0"]
[com.stuartsierra/component "0.3.0"]
[com.stuartsierra/component "0.3.1"]
[com.taoensso/timbre "4.1.4"]
[com.taoensso/nippy "2.10.0"]
[uk.co.real-logic/aeron-all "0.2.3"]
24 changes: 15 additions & 9 deletions src/onyx/log/commands/abort_join_cluster.clj
Original file line number Diff line number Diff line change
@@ -4,15 +4,20 @@
[clojure.data :refer [diff]]
[schema.core :as s]
[onyx.schema :refer [Replica LogEntry Reactions ReplicaDiff State]]
[taoensso.timbre :as timbre]
[taoensso.timbre :refer [info] :as timbre]
[onyx.extensions :as extensions]))

(defn already-joined? [replica entry]
(some #{(:id (:args entry))} (:peers replica)))

(s/defmethod extensions/apply-log-entry :abort-join-cluster :- Replica
[{:keys [args message-id]} :- LogEntry replica]
(-> replica
(update-in [:prepared] dissoc (get (map-invert (:prepared replica)) (:id args)))
(update-in [:accepted] dissoc (get (map-invert (:accepted replica)) (:id args)))
(update-in [:peer-sites] dissoc (:id args))))
[{:keys [args] :as entry} :- LogEntry replica]
(if-not (already-joined? replica entry)
(-> replica
(update-in [:prepared] dissoc (get (map-invert (:prepared replica)) (:id args)))
(update-in [:accepted] dissoc (get (map-invert (:accepted replica)) (:id args)))
(update-in [:peer-sites] dissoc (:id args)))
replica))

(s/defmethod extensions/replica-diff :abort-join-cluster :- ReplicaDiff
[entry :- LogEntry old new]
@@ -24,9 +29,10 @@
{:aborted (or (first prepared) (first accepted))})))

(s/defmethod extensions/reactions :abort-join-cluster :- Reactions
[{:keys [args]} old new diff peer-args]
(when (and (= (:id args) (:id peer-args))
(not (:onyx.peer/try-join-once? (:peer-opts (:messenger peer-args)))))
[{:keys [args] :as entry} old new diff peer-args]
(when (and (not (:onyx.peer/try-join-once? (:peer-opts (:messenger peer-args))))
(not (already-joined? old entry))
(= (:id args) (:id peer-args)))
[{:fn :prepare-join-cluster
:args {:joiner (:id peer-args)
:peer-site (extensions/peer-site (:messenger peer-args))}}]))
2 changes: 1 addition & 1 deletion src/onyx/log/commands/accept_join_cluster.clj
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@
(let [{:keys [accepted-joiner accepted-observer]} args
target (or (get-in replica [:pairs accepted-observer])
accepted-observer)
accepted? (get-in replica [:accepted accepted-observer])
accepted? (= accepted-joiner (get-in replica [:accepted accepted-observer]))
already-joined? (some #{accepted-joiner} (:peers replica))
no-observer? (not (some #{target} (:peers replica)))]
(if (or already-joined? no-observer? (not accepted?))
53 changes: 30 additions & 23 deletions src/onyx/log/commands/leave_cluster.clj
Original file line number Diff line number Diff line change
@@ -48,34 +48,41 @@
:updated-watch {:observer observer
:subject subject}}))

(defn abort? [replica state {:keys [args]}]
(or (= (:id state) (get (:prepared replica) (:id args)))
(= (:id state) (get (:accepted replica) (:id args)))))

(s/defmethod extensions/reactions :leave-cluster :- Reactions
[{:keys [args]} old new diff state]
(when (or (= (:id state) (get (:prepared old) (:id args)))
(= (:id state) (get (:accepted old) (:id args))))
[entry old new diff state]
(when (abort? old state entry)
[{:fn :abort-join-cluster
:args {:id (:id state)}}]))

(s/defmethod extensions/fire-side-effects! :leave-cluster :- State
[{:keys [message-id args]} old new {:keys [updated-watch] :as diff} state]
(let [job (:job (common/peer->allocated-job (:allocations new) (:id state)))]
(common/start-new-lifecycle
old new diff
(cond (common/should-seal? new job state message-id)
(>!! (:seal-ch (:task-state state)) true)
[{:keys [args message-id] :as entry} old new {:keys [updated-watch] :as diff} state]
(if (and (= (:id state) (:id args))
(not (abort? old state entry)))
(do (close! (:restart-ch state))
state)
(let [job (:job (common/peer->allocated-job (:allocations new) (:id state)))]
(common/start-new-lifecycle
old new diff
(cond (common/should-seal? new job state message-id)
(>!! (:seal-ch (:task-state state)) true)

(and (= (:id state) (:observer updated-watch))
(not= (:observer updated-watch) (:subject updated-watch)))
(and (= (:id state) (:observer updated-watch))
(not= (:observer updated-watch) (:subject updated-watch)))

(let [ch (chan 1)]
(extensions/on-delete (:log state) (:subject updated-watch) ch)
(go (when (<! ch)
(extensions/write-log-entry
(:log state)
{:fn :leave-cluster :args {:id (:subject updated-watch)}
:entry-parent message-id
:peer-parent (:id state)}))
(close! ch))
(close! (or (:watch-ch state) (chan)))
(assoc state :watch-ch ch))
(let [ch (chan 1)]
(extensions/on-delete (:log state) (:subject updated-watch) ch)
(go (when (<! ch)
(extensions/write-log-entry
(:log state)
{:fn :leave-cluster :args {:id (:subject updated-watch)}
:entry-parent message-id
:peer-parent (:id state)}))
(close! ch))
(close! (or (:watch-ch state) (chan)))
(assoc state :watch-ch ch))

:else state))))
:else state)))))
32 changes: 21 additions & 11 deletions src/onyx/log/commands/notify_join_cluster.clj
Original file line number Diff line number Diff line change
@@ -8,12 +8,19 @@
[clojure.data :refer [diff]]
[onyx.extensions :as extensions]))

(defn already-joined? [replica entry]
(boolean (get (set (:peers replica))
(:observer (:args entry)))))

(s/defmethod extensions/apply-log-entry :notify-join-cluster :- Replica
[{:keys [args]} :- LogEntry replica]
[{:keys [args] :as entry} :- LogEntry replica]
(let [prepared (get (map-invert (:prepared replica)) (:observer args))]
(-> replica
(update-in [:accepted] merge {prepared (:observer args)})
(update-in [:prepared] dissoc prepared))))
(assert (not= prepared (:observer args)))
(if (and prepared (not (already-joined? replica entry)))
(-> replica
(update-in [:accepted] merge {prepared (:observer args)})
(update-in [:prepared] dissoc prepared))
replica)))

(s/defmethod extensions/replica-diff :notify-join-cluster :- ReplicaDiff
[entry old new]
@@ -27,13 +34,16 @@

(s/defmethod extensions/reactions :notify-join-cluster :- Reactions
[entry old new diff peer-args]
(cond (and (= (vals diff) (remove nil? (vals diff)))
(= (:id peer-args) (:observer diff)))
[{:fn :accept-join-cluster
:args diff}]
(= (:id peer-args) (:observer (:args entry)))
[{:fn :abort-join-cluster
:args {:id (:observer (:args entry))}}]))
(let [success? (and (= (vals diff) (remove nil? (vals diff)))
(= (:id peer-args) (:observer diff)))]
(cond success?
[{:fn :accept-join-cluster
:args diff}]
(already-joined? old entry)
[]
(= (:id peer-args) (:observer (:args entry)))
[{:fn :abort-join-cluster
:args {:id (:observer (:args entry))}}])))

(s/defmethod extensions/fire-side-effects! :notify-join-cluster :- State
[{:keys [args message-id]} old new diff {:keys [monitoring] :as state}]
58 changes: 36 additions & 22 deletions src/onyx/log/commands/prepare_join_cluster.clj
Original file line number Diff line number Diff line change
@@ -23,30 +23,42 @@
peer-site
(:peer-sites replica))))))

(defn still-joining? [replica joiner]
(or (get (map-invert (:prepared replica)) joiner)
(get (map-invert (:accepted replica)) joiner)))

(defn already-joined? [replica joiner]
(some #{joiner} (:peers replica)))

(s/defmethod extensions/apply-log-entry :prepare-join-cluster :- Replica
[{:keys [args message-id]} :- LogEntry replica]
(let [peers (:peers replica)
joiner (:joiner args)
n (count peers)]
(if (> n 0)
(let [joining-peer (:joiner args)
all-joined-peers (set (into (keys (:pairs replica)) peers))
(let [all-joined-peers (set (into (keys (:pairs replica)) peers))
all-prepared-deps (set (keys (:prepared replica)))
prep-watches (set (map (fn [dep] (get (map-invert (:pairs replica)) dep)) all-prepared-deps))
accepting-deps (set (keys (:accepted replica)))
candidates (difference all-joined-peers all-prepared-deps accepting-deps prep-watches)
candidates (difference all-joined-peers all-prepared-deps accepting-deps prep-watches #{joiner})
sorted-candidates (sort (remove nil? candidates))]
(if (seq sorted-candidates)
(let [index (mod message-id (count sorted-candidates))
watcher (nth sorted-candidates index)]
(-> replica
(update-in [:prepared] merge {watcher joining-peer})
(add-site-acker args)
(reconfigure-cluster-workload)))
replica))
(cond (already-joined? replica joiner)
replica
(still-joining? replica joiner)
replica
(seq sorted-candidates)
(let [index (mod message-id (count sorted-candidates))
watcher (nth sorted-candidates index)]
(-> replica
(update-in [:prepared] merge {watcher joiner})
(add-site-acker args)
(reconfigure-cluster-workload)))
:else
replica))
(-> replica
(update-in [:peers] conj (:joiner args))
(update-in [:peers] conj joiner)
(update-in [:peers] vec)
(assoc-in [:peer-state (:joiner args)] :idle)
(assoc-in [:peer-state joiner] :idle)
(add-site-acker args)
(reconfigure-cluster-workload)))))

@@ -65,15 +77,17 @@

(s/defmethod extensions/reactions :prepare-join-cluster :- Reactions
[entry :- LogEntry old new diff peer-args]
(cond (and (= (:id peer-args) (:joiner (:args entry)))
(nil? diff))
[{:fn :abort-join-cluster
:args {:id (:id peer-args)}}]
(= (:id peer-args) (:observer diff))
[{:fn :notify-join-cluster
:args {:observer (:subject diff)
:subject (or (get (:pairs new) (:observer diff))
(:observer diff))}}]))
(let [joiner (:joiner (:args entry))]
(cond (already-joined? old joiner)
[]
(still-joining? old joiner)
[]
(and (= (:id peer-args) joiner) (nil? diff))
[{:fn :abort-join-cluster
:args {:id (:id peer-args)}}]
(= (:id peer-args) (:observer diff))
[{:fn :notify-join-cluster
:args {:observer (:subject diff)}}])))

(s/defmethod extensions/fire-side-effects! :prepare-join-cluster :- State
[{:keys [args message-id]} :- LogEntry old new diff {:keys [monitoring] :as state}]
3 changes: 2 additions & 1 deletion src/onyx/log/zookeeper.clj
Original file line number Diff line number Diff line change
@@ -273,7 +273,8 @@
(trace e)
(>!! ch e))
(catch Throwable e
(fatal e))))
(fatal e)
(>!! ch e))))
(<!! rets)))

(defmethod extensions/write-chunk [ZooKeeper :catalog]
2 changes: 1 addition & 1 deletion src/onyx/peer/task_lifecycle.clj
Original file line number Diff line number Diff line change
@@ -369,7 +369,7 @@
(let [data (ex-data e)]
(if (:onyx.core/lifecycle-restart? data)
(warn (:original-exception data) "Caught exception inside task lifecycle. Rebooting the task.")
(do (warn e "Uncaught exception throw inside task lifecycle.")
(do (warn e "Handling uncaught exception thrown inside task lifecycle.")
(if (restart-pred-fn e)
(>!! restart-ch true)
(let [entry (entry/create-log-entry :kill-job {:job job-id})]
48 changes: 47 additions & 1 deletion test/onyx/log/generative_peer_join.clj
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
[clojure.test.check.generators :as gen]
[clojure.test.check.properties :as prop]
[clojure.test :refer :all]
[taoensso.timbre :as timbre :refer [info]]
[onyx.log.replica-invariants :refer [standard-invariants]]
[com.gfredericks.test.chuck :refer [times]]
[com.gfredericks.test.chuck.clojure-test :refer [checking]]))
@@ -538,6 +539,8 @@
:args {:id :p1}}]}))
:log []
:peer-choices []}))]
(is (empty? (:accepted replica)))
(is (empty? (:prepared replica)))
(standard-invariants replica)
(is (= 3 (count (:peer-state replica))))
(is (= 3 (count (:peers replica))))))
@@ -559,11 +562,53 @@
:log []
:peer-choices []}))]
(standard-invariants replica)
(is (empty? (:accepted replica)))
(is (empty? (:prepared replica)))
(is (or (= 2 (count (:peer-state replica)))
(= 3 (count (:peer-state replica)))))
(is (or (= 2 (count (:peers replica)))
(= 3 (count (:peers replica)))))))

(deftest peer-spurious-notify
(checking
"Checking a spurious notify is handled correctly"
(times 50)
[{:keys [replica log peer-choices]}
(log-gen/apply-entries-gen
(gen/return
{:replica {:job-scheduler :onyx.job-scheduler/balanced
:messaging {:onyx.messaging/impl :dummy-messenger}}
:message-id 0
:entries
(-> (log-gen/generate-join-queues (log-gen/generate-peer-ids 9))
(assoc :job-1 {:queue [(api/create-submit-job-entry
job-1-id
peer-config
job-1
(planning/discover-tasks (:catalog job-1) (:workflow job-1)))]})
;; TODO, generate spurious entries
(assoc :spurious-prepare {:queue [{:fn :prepare-join-cluster
:args {:joiner :p6}}]})
(assoc :spurious-notify {:queue [{:fn :notify-join-cluster
:args {:observer :p5}}]})
(assoc :spurious-abort {:queue [{:fn :abort-join-cluster
:args {:observer :p1}}]})
(assoc :spurious-accept {:queue [{:fn :accept-join-cluster
:args {:observer :p2
:subject :p8
:accepted-observer :p6
:accepted-joiner :p2}}]})
(assoc :leave-1 {:queue [{:fn :leave-cluster :args {:id :p1}}]})
(assoc :leave-2 {:queue [{:fn :leave-cluster :args {:id :p2}}]}))
:log []
:peer-choices []}))]
(standard-invariants replica)
(is (empty? (:accepted replica)))
(is (empty? (:prepared replica)))
;; peers may have left before they joined, so there should be at LEAST 7 peers allocated
;; since there are enough peers to handle 2 peers leaving without a task being deallocated the
;; job must be able to go on
(is (>= (apply + (map count (vals (get (:allocations replica) job-1-id)))) 7))))

(deftest peer-leave-still-running
(checking
@@ -587,12 +632,13 @@
:log []
:peer-choices []}))]
(standard-invariants replica)
(is (empty? (:accepted replica)))
(is (empty? (:prepared replica)))
;; peers may have left before they joined, so there should be at LEAST 7 peers allocated
;; since there are enough peers to handle 2 peers leaving without a task being deallocated the
;; job must be able to go on
(is (>= (apply + (map count (vals (get (:allocations replica) job-1-id)))) 7))))


;; Reproduce onyx-test scheduler issue
(def inner-job-id #uuid "f55c14f0-a847-42eb-81bb-0c0390a88608")

4 changes: 4 additions & 0 deletions test/onyx/log/generators.clj
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
[onyx.log.commands.common :refer [peer->allocated-job]]
[onyx.extensions :as extensions]
[onyx.api :as api]
[taoensso.timbre :as timbre :refer [info]]
[clojure.set :refer [intersection]]
[clojure.test.check :as tc]
[clojure.test.check.generators :as gen]
@@ -23,6 +24,9 @@
(vals (or (:accepted replica) {}))))
;; joining peer's prepared/accepted may have been removed
;; by a leave cluster but the joining peer is still there
(and (not (peerless-entry? entry))
(:observer (:args entry)))
(conj (:observer (:args entry)))
(and (not (peerless-entry? entry))
(:id (:args entry)))
(conj (:id (:args entry)))
Loading

0 comments on commit f7168a6

Please sign in to comment.