Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Commit

Permalink
Failed BookKeeper writes now reboot the peer #390
Browse files Browse the repository at this point in the history
Also closes #500 by improving performance of write-take-batch
  • Loading branch information
lbradstreet committed Jan 25, 2016
1 parent cca4a8a commit 4d3684e
Showing 1 changed file with 60 additions and 57 deletions.
117 changes: 60 additions & 57 deletions src/onyx/state/log/bookkeeper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
(:require [onyx.log.curator :as curator]
[taoensso.timbre :refer [info error warn trace fatal] :as timbre]
[com.stuartsierra.component :as component]
[clojure.core.async :refer [chan timeout thread go >! <! <!! >!! alts!! close!]]
[clojure.core.async :refer [chan timeout thread go >! <! <!! >!! alts!! close! poll!]]
[clojure.core.async.impl.protocols :refer [closed?]]
[onyx.compression.nippy :as nippy]
[onyx.extensions :as extensions]
[onyx.monitoring.measurements :refer [emit-latency-value emit-latency]]
Expand Down Expand Up @@ -63,11 +64,11 @@

(def HandleWriteCallback
(reify AsyncCallback$AddCallback
(addComplete [this rc lh entry-id ack-fn]
(addComplete [this rc lh entry-id [success-fn fail-fn]]
(if (= rc (BKException$Code/OK))
(ack-fn)
;; TODO: should restart peer, see https://github.com/onyx-platform/onyx/issues/390
(warn "Unable to complete async write to bookkeeper. BookKeeper exception code:" rc)))))
(success-fn)
(do (warn "Unable to complete async write to bookkeeper. BookKeeper exception code:" rc)
(fail-fn))))))

(defn compaction-transition
"Transitions to a new compacted ledger, plus a newly created ledger created
Expand Down Expand Up @@ -107,56 +108,51 @@
(.asyncAddEntry compacted-ledger
compacted-serialized
HandleWriteCallback
(fn []
(close-handle previous-handle)
(emit-latency-value :window-log-compaction monitoring (- (System/currentTimeMillis) start-time))
(>!! outbox-ch
{:fn :compact-bookkeeper-log-ids
:args {:job-id job-id
:task-id task-id
:slot-id slot-id
:peer-id id
:prev-ledger-ids (vec (butlast current-ids))
:new-ledger-ids [compacted-ledger-id]}}))))))))
(list (fn []
(close-handle previous-handle)
(emit-latency-value :window-log-compaction monitoring (- (System/currentTimeMillis) start-time))
(>!! outbox-ch
{:fn :compact-bookkeeper-log-ids
:args {:job-id job-id
:task-id task-id
:slot-id slot-id
:peer-id id
:prev-ledger-ids (vec (butlast current-ids))
:new-ledger-ids [compacted-ledger-id]}}))
(fn []
(close! (:onyx.core/restart-ch event))))))))))

(defn ch->type [ch batch-ch timeout-ch kill-ch task-kill-ch]
(cond (= ch timeout-ch)
:timeout
(or (= ch kill-ch) (= ch task-kill-ch))
:shutdown
:else
:read))
(defn take-write-batch [batch-size batch-ch]
(loop [entries [] ack-fns [] i 0]
(if (< i batch-size)
(let [[entry ack-fn] (poll! batch-ch)]
(if entry
(recur (conj entries entry)
(conj ack-fns ack-fn)
(inc i))
[entries ack-fns]))
[entries ack-fns])))

(defn take-write-batch [peer-opts batch-ch kill-ch task-kill-ch]
(let [batch-size (arg-or-default :onyx.bookkeeper/write-batch-size peer-opts)
timeout-ms (arg-or-default :onyx.bookkeeper/write-batch-timeout peer-opts)
timeout-ch (timeout timeout-ms)]
(loop [entries [] ack-fns [] i 0]
(if (< i batch-size)
(let [[[entry ack-fn] ch] (alts!! [kill-ch task-kill-ch batch-ch timeout-ch] :priority true)]
(if entry
(recur (conj entries entry)
(conj ack-fns ack-fn)
(inc i))
(let [msg-type (ch->type ch batch-ch timeout-ch kill-ch task-kill-ch)]
[msg-type entries ack-fns])))
[:read entries ack-fns]))))

(defn process-batches [{:keys [ledger-handle next-ledger-handle batch-ch] :as log}
(defn process-batches [{:keys [ledger-handle next-ledger-handle batch-ch] :as log}
{:keys [onyx.core/kill-ch onyx.core/task-kill-ch onyx.core/peer-opts] :as event}]
(thread
(loop [[result batch ack-fns] (take-write-batch peer-opts batch-ch kill-ch task-kill-ch)]
;; Safe point to transition to the next ledger handle
(when @next-ledger-handle
(compaction-transition log event))
(when-not (empty? batch)
(.asyncAddEntry ^LedgerHandle @ledger-handle
^bytes (nippy/window-log-compress batch)
HandleWriteCallback
(fn [] (run! (fn [f] (f)) ack-fns))))
(if-not (= :shutdown result)
(recur (take-write-batch peer-opts batch-ch kill-ch task-kill-ch))))
(info "BookKeeper: shutting down batch processing")))
(thread
(let [batch-size (arg-or-default :onyx.bookkeeper/write-batch-size peer-opts)
batch-backoff (arg-or-default :onyx.bookkeeper/write-batch-backoff peer-opts)]
(loop [[batch ack-fns] (take-write-batch batch-size batch-ch)]
;; Safe point to transition to the next ledger handle
(when @next-ledger-handle
(compaction-transition log event))
(if (empty? batch)
(Thread/sleep batch-backoff)
(.asyncAddEntry ^LedgerHandle @ledger-handle
^bytes (nippy/window-log-compress batch)
HandleWriteCallback
(list (fn [] (run! (fn [f] (f)) ack-fns))
(fn [] (close! (:onyx.core/restart-ch event))))))
(if (and (not (closed? kill-ch))
(not (closed? task-kill-ch)))
(recur (take-write-batch batch-size batch-ch)))))
(info "BookKeeper: shutting down batch processing")))

(defn assign-bookkeeper-log-id-spin [{:keys [onyx.core/peer-opts
onyx.core/job-id onyx.core/task-id
Expand All @@ -170,10 +166,16 @@
:task-id task-id
:slot-id slot-id
:ledger-id new-ledger-id}})
(while (and (first (alts!! [kill-ch task-kill-ch] :default true))
(not= new-ledger-id (last (event->ledger-ids event))))
(info "New ledger id has not been published yet. Backing off.")
(Thread/sleep (arg-or-default :onyx.bookkeeper/ledger-id-written-back-off peer-opts)))))
(loop []
(let [exit? (nil? (first (alts!! [kill-ch task-kill-ch] :default true)))
not-added? (not= new-ledger-id (last (event->ledger-ids event)))]
(cond exit?
(info "Exiting assign-bookkeeper-log-id-spin early as peer has been reassigned.")
not-added?
(do
(info "New ledger id has not been published yet. Backing off.")
(Thread/sleep (arg-or-default :onyx.bookkeeper/ledger-id-written-back-off peer-opts))
(recur)))))))

(defmethod state-extensions/initialize-log :bookkeeper [log-type {:keys [onyx.core/peer-opts] :as event}]
(let [bk-client (bookkeeper peer-opts)
Expand Down Expand Up @@ -268,7 +270,8 @@
(defmethod state-extensions/close-log onyx.state.log.bookkeeper.BookKeeperLog
[{:keys [client ledger-handle next-ledger-handle]} event]
(try
(close-handle @ledger-handle)
(when @ledger-handle
(close-handle @ledger-handle))
(when @next-ledger-handle
(close-handle @next-ledger-handle))
(catch Throwable t
Expand Down

0 comments on commit 4d3684e

Please sign in to comment.