This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 204
/
Copy pathbookkeeper.clj
283 lines (258 loc) · 13.9 KB
/
bookkeeper.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
(ns ^:no-doc onyx.state.log.bookkeeper
(:require [onyx.log.curator :as curator]
[taoensso.timbre :refer [info error warn trace fatal] :as timbre]
[com.stuartsierra.component :as component]
[clojure.core.async :refer [chan timeout thread go >! <! <!! >!! alts!! close! poll!]]
[clojure.core.async.impl.protocols :refer [closed?]]
[onyx.compression.nippy :as nippy]
[onyx.extensions :as extensions]
[onyx.monitoring.measurements :refer [emit-latency-value emit-latency]]
[onyx.peer.operation :as operation]
[onyx.windowing.aggregation :as agg]
[onyx.state.state-extensions :as state-extensions]
[onyx.types :refer [inc-count! dec-count!]]
[onyx.log.replica]
[onyx.log.commands.common :refer [peer-slot-id]]
[onyx.log.zookeeper :as zk]
[onyx.static.default-vals :refer [arg-or-default defaults]])
(:import [org.apache.bookkeeper.client LedgerHandle LedgerEntry BookKeeper BookKeeper$DigestType
BKException BKException$Code AsyncCallback$AddCallback]
[org.apache.bookkeeper.conf ClientConfiguration]
[org.apache.curator.framework CuratorFramework CuratorFrameworkFactory]))
(defn event->ledger-ids [{:keys [onyx.core/replica onyx.core/job-id onyx.core/task-id] :as event}]
(get-in @replica [:state-logs job-id task-id (peer-slot-id event)]))
(defrecord BookKeeperLog [client ledger-handle next-ledger-handle batch-ch])
(defn open-ledger ^LedgerHandle [^BookKeeper client id digest-type password]
(.openLedger client id digest-type password))
(defn open-ledger-no-recovery ^LedgerHandle [^BookKeeper client id digest-type password]
(.openLedgerNoRecovery client id digest-type password))
(defn create-ledger ^LedgerHandle [^BookKeeper client ensemble-size quorum-size digest-type password]
(.createLedger client ensemble-size quorum-size digest-type password))
(defn close-handle [^LedgerHandle ledger-handle]
(.close ledger-handle))
(defn bookkeeper
([opts]
(bookkeeper (:zookeeper/address opts)
(zk/ledgers-path (:onyx/id opts))
(arg-or-default :onyx.bookkeeper/client-timeout opts)
(arg-or-default :onyx.bookkeeper/client-throttle opts)))
([zk-addr zk-root-path timeout throttle]
(let [conf (doto (ClientConfiguration.)
(.setZkServers zk-addr)
(.setZkTimeout timeout)
(.setThrottleValue throttle)
(.setZkLedgersRootPath zk-root-path))]
(BookKeeper. conf))))
(def digest-type
(BookKeeper$DigestType/MAC))
(defn password [peer-opts]
(.getBytes ^String (arg-or-default :onyx.bookkeeper/ledger-password peer-opts)))
(defn new-ledger ^LedgerHandle [client peer-opts]
(let [ensemble-size (arg-or-default :onyx.bookkeeper/ledger-ensemble-size peer-opts)
quorum-size (arg-or-default :onyx.bookkeeper/ledger-quorum-size peer-opts)]
(create-ledger client ensemble-size quorum-size digest-type (password peer-opts))))
(def HandleWriteCallback
(reify AsyncCallback$AddCallback
(addComplete [this rc lh entry-id [success-fn fail-fn]]
(if (= rc (BKException$Code/OK))
(success-fn)
(do (warn "Unable to complete async write to bookkeeper. BookKeeper exception code:" rc)
(fail-fn))))))
(defn compaction-transition
"Transitions to a new compacted ledger, plus a newly created ledger created
earlier. For example, if there were ledgers [1, 2, 3, 4], we've created a
ledger id 5 to start writing to, making [1, 2, 3, 4, 5], then we create a compacted
ledger 6, write the updated state to it, and swap [1, 2, 3, 4] in the replica
for 6, leaving [6, 5]"
[{:keys [client ledger-handle next-ledger-handle] :as log}
{:keys [onyx.core/peer-opts onyx.core/job-id onyx.core/replica
onyx.core/id onyx.core/task-id onyx.core/monitoring onyx.core/window-state onyx.core/outbox-ch]
:as event}]
(info "Transitioning to new handle after gc" (.getId ^LedgerHandle @next-ledger-handle))
(let [previous-handle @ledger-handle
start-time (System/currentTimeMillis)
slot-id (peer-slot-id event)
extent-snapshot (:state @window-state)
;; Deref future later. This way we can immediately return and continue processing
filter-snapshot (state-extensions/snapshot-filter (:filter @window-state) event)
current-ids (get-in @replica [:state-logs job-id task-id slot-id])]
(reset! ledger-handle @next-ledger-handle)
(reset! next-ledger-handle nil)
;; Don't throw an exception, maybe we can give the next GC a chance to succeed
;; Log is still in a known good state, we have transitioned to a ledger that is in the replica
(if-not (= (last current-ids) (.getId ^LedgerHandle @ledger-handle))
(warn "Could not swap compacted log. Next ledger handle is no longer the next published ledger"
{:job-id job-id :task-id task-id :slot-id slot-id
:ledger-handle (.getId ^LedgerHandle @ledger-handle) :current-ids current-ids})
(future
(let [;; Write compacted as a batch
compacted [{:type :compacted
;; TODO: add a timeout when derefing here.
:filter-snapshot @filter-snapshot
:extent-state extent-snapshot}]
compacted-ledger (new-ledger client peer-opts)
compacted-ledger-id (.getId compacted-ledger)
compacted-serialized ^bytes (nippy/window-log-compress compacted)]
(.asyncAddEntry compacted-ledger
compacted-serialized
HandleWriteCallback
(list (fn []
(close-handle previous-handle)
(emit-latency-value :window-log-compaction monitoring (- (System/currentTimeMillis) start-time))
(>!! outbox-ch
{:fn :compact-bookkeeper-log-ids
:args {:job-id job-id
:task-id task-id
:slot-id slot-id
:peer-id id
:prev-ledger-ids (vec (butlast current-ids))
:new-ledger-ids [compacted-ledger-id]}}))
(fn []
(close! (:onyx.core/restart-ch event))))))))))
(defn take-write-batch [batch-size batch-ch]
(loop [entries [] ack-fns [] i 0]
(if (< i batch-size)
(let [[entry ack-fn] (poll! batch-ch)]
(if entry
(recur (conj entries entry)
(conj ack-fns ack-fn)
(inc i))
[entries ack-fns]))
[entries ack-fns])))
(defn process-batches [{:keys [ledger-handle next-ledger-handle batch-ch] :as log}
{:keys [onyx.core/kill-ch onyx.core/task-kill-ch onyx.core/peer-opts] :as event}]
(thread
(let [batch-size (arg-or-default :onyx.bookkeeper/write-batch-size peer-opts)
batch-backoff (arg-or-default :onyx.bookkeeper/write-batch-backoff peer-opts)]
(loop [[batch ack-fns] (take-write-batch batch-size batch-ch)]
;; Safe point to transition to the next ledger handle
(when @next-ledger-handle
(compaction-transition log event))
(if (empty? batch)
(Thread/sleep batch-backoff)
(.asyncAddEntry ^LedgerHandle @ledger-handle
^bytes (nippy/window-log-compress batch)
HandleWriteCallback
(list (fn [] (run! (fn [f] (f)) ack-fns))
(fn [] (close! (:onyx.core/restart-ch event))))))
(if (and (not (closed? kill-ch))
(not (closed? task-kill-ch)))
(recur (take-write-batch batch-size batch-ch)))))
(info "BookKeeper: shutting down batch processing")))
(defn assign-bookkeeper-log-id-spin [{:keys [onyx.core/peer-opts
onyx.core/job-id onyx.core/task-id
onyx.core/kill-ch onyx.core/task-kill-ch
onyx.core/outbox-ch] :as event}
new-ledger-id]
(let [slot-id (peer-slot-id event)]
(>!! outbox-ch
{:fn :assign-bookkeeper-log-id
:args {:job-id job-id
:task-id task-id
:slot-id slot-id
:ledger-id new-ledger-id}})
(loop []
(let [exit? (nil? (first (alts!! [kill-ch task-kill-ch] :default true)))
not-added? (not= new-ledger-id (last (event->ledger-ids event)))]
(cond exit?
(info "Exiting assign-bookkeeper-log-id-spin early as peer has been reassigned.")
not-added?
(do
(info "New ledger id has not been published yet. Backing off.")
(Thread/sleep (arg-or-default :onyx.bookkeeper/ledger-id-written-back-off peer-opts))
(recur)))))))
(defmethod state-extensions/initialize-log :bookkeeper [log-type {:keys [onyx.core/peer-opts] :as event}]
(let [bk-client (bookkeeper peer-opts)
ledger-handle (new-ledger bk-client peer-opts)
new-ledger-id (.getId ledger-handle)
batch-ch (chan (arg-or-default :onyx.bookkeeper/write-buffer-size peer-opts))
next-ledger-handle nil]
(assign-bookkeeper-log-id-spin event new-ledger-id)
(info "Ledger id" new-ledger-id "published")
(doto (->BookKeeperLog bk-client (atom ledger-handle) (atom next-ledger-handle) batch-ch)
(process-batches event))))
(defn playback-batch-entry [state apply-entry-fn batch]
(reduce apply-entry-fn state batch))
(defn playback-entries-chunk [state apply-entry-fn ^LedgerHandle lh start end event]
(let [entries (.readEntries lh start end)]
(if (.hasMoreElements entries)
(loop [state' state element ^LedgerEntry (.nextElement entries)]
(let [entry-val (nippy/window-log-decompress ^bytes (.getEntry element))
state'' (playback-batch-entry state' apply-entry-fn entry-val)]
(if (.hasMoreElements entries)
(recur state'' (.nextElement entries))
state'')))
state)))
(defn check-abort-playback!
"Check whether playback should be aborted if the peer is already rescheduled or killed"
[{:keys [onyx.core/task-kill-ch onyx.core/kill-ch] :as event}]
(when (nil? (first (alts!! [kill-ch task-kill-ch] :default true)))
(throw (ex-info "Playback aborted as peer has been rescheduled during state-log playback. Restarting peer."
{:playback-aborted? true}))))
(defn playback-ledger [state apply-entry-fn ^LedgerHandle lh last-confirmed {:keys [onyx.core/peer-opts] :as event}]
(let [chunk-size (arg-or-default :onyx.bookkeeper/read-batch-size peer-opts)]
(if-not (neg? last-confirmed)
(loop [loop-state state start 0 end (min chunk-size last-confirmed)]
(check-abort-playback! event)
(let [new-state (playback-entries-chunk loop-state apply-entry-fn lh start end event)]
(if (= end last-confirmed)
new-state
(recur new-state
(inc end)
(min (+ chunk-size end) last-confirmed)))))
state)))
(defn playback-ledgers [bk-client state apply-entry-fn ledger-ids {:keys [onyx.core/peer-opts] :as event}]
(try
(let [pwd (password peer-opts)]
(reduce (fn [state' ledger-id]
(let [lh (open-ledger bk-client ledger-id digest-type pwd)]
(try
(let [last-confirmed (.getLastAddConfirmed lh)]
(info "Opened ledger:" ledger-id "last confirmed:" last-confirmed)
(playback-ledger state' apply-entry-fn lh last-confirmed event))
(finally
(close-handle lh)))))
state
ledger-ids))
(catch clojure.lang.ExceptionInfo e
;; Playback was aborted, safe to return empty state
;; as peer will no longer be allocated to this task
(if (:playback-aborted? (ex-data e))
(do
(warn "Playback aborted as task or peer was killed." (ex-data e))
state)
(throw e)))))
(defmethod state-extensions/playback-log-entries onyx.state.log.bookkeeper.BookKeeperLog
[{:keys [client] :as log}
{:keys [onyx.core/monitoring onyx.core/task-id] :as event}
state
apply-entry-fn]
(emit-latency :window-log-playback
monitoring
(fn []
(let [;; Don't play back the final ledger id because we just created it
prev-ledger-ids (butlast (event->ledger-ids event))]
(info "Playing back ledgers for" task-id "ledger-ids" prev-ledger-ids)
(playback-ledgers client state apply-entry-fn prev-ledger-ids event)))))
(defmethod state-extensions/compact-log onyx.state.log.bookkeeper.BookKeeperLog
[{:keys [client ledger-handle next-ledger-handle]}
{:keys [onyx.core/peer-opts] :as event}
_]
(future
(let [new-ledger-handle (new-ledger client peer-opts)
new-ledger-id (.getId new-ledger-handle)]
(assign-bookkeeper-log-id-spin event new-ledger-id)
(reset! next-ledger-handle new-ledger-handle))))
(defmethod state-extensions/close-log onyx.state.log.bookkeeper.BookKeeperLog
[{:keys [client ledger-handle next-ledger-handle]} event]
(try
(when @ledger-handle
(close-handle @ledger-handle))
(when @next-ledger-handle
(close-handle @next-ledger-handle))
(catch Throwable t
(warn t "Error closing BookKeeper handle")))
(.close ^BookKeeper client))
(defmethod state-extensions/store-log-entry onyx.state.log.bookkeeper.BookKeeperLog
[{:keys [ledger-handle next-ledger-handle batch-ch] :as log} event ack-fn entry]
(>!! batch-ch (list entry ack-fn)))