Skip to content

Commit

Permalink
Allow producer channel to pass a :crash signal to read-batch
Browse files Browse the repository at this point in the history
If we do not allow for this, there is no way for a producer thread to
force the plugin to crash. This is a common issue in all plugins.
  • Loading branch information
lbradstreet committed Dec 10, 2015
1 parent 318f09e commit ee5c96a
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions src/onyx/plugin/bookkeeper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@

(defn close-read-ledgers-resources
[{:keys [bookkeeper/producer-ch bookkeeper/commit-ch bookkeeper/read-ch bookkeeper/shutdown-ch] :as event} lifecycle]
;; Before we waited for producer ch to return. We should probably still do so by checking the shutdown-ch
(close! shutdown-ch)
(close! read-ch)
(close! commit-ch)
(close! shutdown-ch)
{})

(defn set-starting-offset! [log task-map checkpoint-key start]
Expand Down Expand Up @@ -171,10 +172,11 @@
(let [exit (loop [last-acked (inc (:largest checkpointed))]
(read-fn client ledger-id digest password read-ch deserializer-fn backoff-period last-acked max-id)
:finished)]
(if-not (= exit :shutdown)
(if (= exit :finished)
(>!! read-ch (t/input (random-uuid) :done))))
(catch Exception e
(fatal e))))]
(>!! read-ch (t/input (random-uuid) :crash))
(fatal "BookKeeper plugin: error reading." e))))]
{:bookkeeper/read-ch read-ch
:bookkeeper/shutdown-ch shutdown-ch
:bookkeeper/commit-ch commit-ch
Expand Down Expand Up @@ -212,6 +214,9 @@
(keep (fn [_] (first (alts!! [read-ch timeout-ch] :priority true))))))]
(doseq [m batch]
(let [message (:message m)]
(when (= message :crash)
(throw (Exception. "Plugin crashed. Crash read.")))

(when-not (= message :done)
(swap! top-index max (:entry-id message))
(swap! pending-indexes conj (:entry-id message))))
Expand Down

0 comments on commit ee5c96a

Please sign in to comment.