Skip to content

Commit

Permalink
Give system components child allocators
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Sep 15, 2023
1 parent 80236da commit ef213d7
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 36 deletions.
3 changes: 2 additions & 1 deletion core/src/main/clojure/xtdb/buffer_pool.clj
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@
(when cache-path
(util/delete-dir cache-path)
(util/mkdirs cache-path))
(->BufferPool allocator object-store (->buffer-cache cache-entries-size cache-bytes-size) (StampedLock.) cache-path))
(->BufferPool (util/->child-allocator allocator "buffer-pool-allocator") object-store
(->buffer-cache cache-entries-size cache-bytes-size) (StampedLock.) cache-path))

(defmethod ig/halt-key! ::buffer-pool [_ ^BufferPool buffer-pool]
(.close buffer-pool))
35 changes: 18 additions & 17 deletions core/src/main/clojure/xtdb/compactor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,24 @@
opts))

(defmethod ig/init-key :xtdb/compactor [_ {:keys [allocator ^ObjectStore obj-store metadata-mgr buffer-pool]}]
(reify ICompactor
(compactAll [_]
(log/info "compact-all")
(loop []
(let [jobs (for [table-name (->> (.listObjects obj-store "tables")
;; TODO should obj-store listObjects only return keys from the current level?
(into #{} (keep #(second (re-find #"^tables/([^/]+)" %)))))
job (compaction-jobs table-name (->> (trie/list-table-trie-files obj-store table-name)
(trie/current-table-tries)))]
job)
jobs? (boolean (seq jobs))]

(doseq [job jobs]
(exec-compaction-job! allocator obj-store metadata-mgr buffer-pool job))

(when jobs?
(recur)))))))
(let [allocator (util/->child-allocator allocator "compactor-allocator")]
(reify ICompactor
(compactAll [_]
(log/info "compact-all")
(loop []
(let [jobs (for [table-name (->> (.listObjects obj-store "tables")
;; TODO should obj-store listObjects only return keys from the current level?
(into #{} (keep #(second (re-find #"^tables/([^/]+)" %)))))
job (compaction-jobs table-name (->> (trie/list-table-trie-files obj-store table-name)
(trie/current-table-tries)))]
job)
jobs? (boolean (seq jobs))]

(doseq [job jobs]
(exec-compaction-job! allocator obj-store metadata-mgr buffer-pool job))

(when jobs?
(recur))))))))

(defmethod ig/halt-key! :xtdb/compactor [_ _compactor])

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/clojure/xtdb/indexer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@
[_ {:keys [allocator object-store metadata-mgr scan-emitter, ra-src, live-index, rows-per-chunk]}]

(let [{:keys [latest-completed-tx next-chunk-idx], :or {next-chunk-idx 0}} (meta/latest-chunk-metadata metadata-mgr)]
(->Indexer allocator object-store metadata-mgr scan-emitter ra-src live-index
(->Indexer (util/->child-allocator allocator "indexer-allocator") object-store metadata-mgr scan-emitter ra-src live-index

nil ; indexer-error

Expand Down
27 changes: 14 additions & 13 deletions core/src/main/clojure/xtdb/indexer/live_index.clj
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,19 @@
:or {->live-trie (fn [iid-rdr]
(LiveHashTrie/emptyTrie iid-rdr))}}]

(util/with-close-on-catch [rel (trie/open-leaf-root allocator)]
(let [iid-wtr (.writerForName rel "xt$iid")
op-wtr (.writerForName rel "op")
put-wtr (.writerForTypeId op-wtr (byte 0))
delete-wtr (.writerForTypeId op-wtr (byte 1))]
(->LiveTable allocator object-store table-name rel
(->live-trie (vw/vec-wtr->rdr iid-wtr))
iid-wtr (.writerForName rel "xt$system_from")
put-wtr (.structKeyWriter put-wtr "xt$valid_from") (.structKeyWriter put-wtr "xt$valid_to")
(.structKeyWriter put-wtr "xt$doc") delete-wtr (.structKeyWriter delete-wtr "xt$valid_from")
(.structKeyWriter delete-wtr "xt$valid_to")
(.writerForTypeId op-wtr (byte 2)))))))
(let [allocator (util/->child-allocator allocator (str "live-table-" table-name "-allocator"))]
(util/with-close-on-catch [rel (trie/open-leaf-root allocator)]
(let [iid-wtr (.writerForName rel "xt$iid")
op-wtr (.writerForName rel "op")
put-wtr (.writerForTypeId op-wtr (byte 0))
delete-wtr (.writerForTypeId op-wtr (byte 1))]
(->LiveTable allocator object-store table-name rel
(->live-trie (vw/vec-wtr->rdr iid-wtr))
iid-wtr (.writerForName rel "xt$system_from")
put-wtr (.structKeyWriter put-wtr "xt$valid_from") (.structKeyWriter put-wtr "xt$valid_to")
(.structKeyWriter put-wtr "xt$doc") delete-wtr (.structKeyWriter delete-wtr "xt$valid_from")
(.structKeyWriter delete-wtr "xt$valid_to")
(.writerForTypeId op-wtr (byte 2))))))))

(defn ->live-trie [log-limit page-limit iid-rdr]
(-> (doto (LiveHashTrie/builder iid-rdr)
Expand Down Expand Up @@ -301,7 +302,7 @@

(defmethod ig/init-key :xtdb.indexer/live-index [_ {:keys [allocator object-store log-limit page-limit]
:or {log-limit 64 page-limit 1024}}]
(->LiveIndex allocator object-store (HashMap.) log-limit page-limit))
(->LiveIndex (util/->child-allocator allocator "live-index-allocator") object-store (HashMap.) log-limit page-limit))

(defmethod ig/halt-key! :xtdb.indexer/live-index [_ live-idx]
(util/close live-idx))
3 changes: 2 additions & 1 deletion core/src/main/clojure/xtdb/log/watcher.clj
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
!cancel-hook))

(defmethod ig/init-key :xtdb.log/watcher [_ deps]
(let [!watcher-cancel-hook (watch-log! deps)]
(let [!watcher-cancel-hook (watch-log! (-> deps
(update :allocator util/->child-allocator "watcher-allocator")))]


(reify
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/clojure/xtdb/node.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
(set! *unchecked-math* :warn-on-boxed)

(defmethod ig/init-key :xtdb/allocator [_ _] (RootAllocator.))
(defmethod ig/halt-key! :xtdb/allocator [_ ^BufferAllocator a] (.close a))
(defmethod ig/halt-key! :xtdb/allocator [_ ^BufferAllocator a]
(loop [allocators '() q [a]]
(if-let [^BufferAllocator cur (peek q)]
(recur (cons allocators cur) (into (pop q) (.getChildAllocators cur)))
(run! util/close allocators))))

(defmethod ig/prep-key :xtdb/default-tz [_ default-tz]
(cond
Expand Down Expand Up @@ -110,6 +114,7 @@

(defmethod ig/init-key :xtdb/node [_ deps]
(map->Node (-> deps
(update :allocator util/->child-allocator "node-allocator")
(assoc :!latest-submitted-tx (atom nil)))))

(defmethod ig/halt-key! :xtdb/node [_ node]
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/clojure/xtdb/tx_producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -412,4 +412,4 @@
opts))

(defmethod ig/init-key ::tx-producer [_ {:keys [log allocator default-tz]}]
(TxProducer. allocator log default-tz))
(TxProducer. (util/->child-allocator allocator "tx-producer-allocator") log default-tz))
2 changes: 2 additions & 0 deletions core/src/main/clojure/xtdb/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -712,3 +712,5 @@
(defn normal-form-str->datalog-form-str ^String [^String s]
(NormalForm/datalogForm s))

(defn ->child-allocator [^BufferAllocator allocator name]
(.newChildAllocator allocator name (.getInitReservation allocator) (.getLimit allocator)))
2 changes: 1 addition & 1 deletion modules/flight-sql/src/main/clojure/xtdb/flight_sql.clj
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@
(let [fsql-txs (ConcurrentHashMap.)
stmts (ConcurrentHashMap.)
tickets (ConcurrentHashMap.)
server (doto (-> (FlightServer/builder allocator (Location/forGrpcInsecure host port)
server (doto (-> (FlightServer/builder (util/->child-allocator allocator "flight-sql-allocator") (Location/forGrpcInsecure host port)
(->fsql-producer {:allocator allocator, :node node, :idxer indexer, :ra-src ra-src, :wm-src wm-src
:fsql-txs fsql-txs, :stmts stmts, :tickets tickets}))

Expand Down

0 comments on commit ef213d7

Please sign in to comment.