Skip to content

Commit

Permalink
Give system components child allocators
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Sep 18, 2023
1 parent 500ebd0 commit 03e8e6f
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 74 deletions.
7 changes: 5 additions & 2 deletions core/src/main/clojure/xtdb/buffer_pool.clj
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@
(util/close (.next i))
(.remove i)))
(finally
(.unlock buffers-lock stamp))))))
(.unlock buffers-lock stamp)))
(util/close allocator))))

(defn- buffer-cache-bytes-size ^long [^Map buffers]
(long (reduce + (for [^ArrowBuf buffer (vals buffers)]
Expand Down Expand Up @@ -142,7 +143,9 @@
(when cache-path
(util/delete-dir cache-path)
(util/mkdirs cache-path))
(->BufferPool allocator object-store (->buffer-cache cache-entries-size cache-bytes-size) (StampedLock.) cache-path))
(util/with-close-on-catch [allocator (util/->child-allocator allocator "buffer-pool")]
(->BufferPool allocator object-store
(->buffer-cache cache-entries-size cache-bytes-size) (StampedLock.) cache-path)))

(defmethod ig/halt-key! ::buffer-pool [_ ^BufferPool buffer-pool]
(.close buffer-pool))
44 changes: 25 additions & 19 deletions core/src/main/clojure/xtdb/compactor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
[xtdb.vector.reader :as vr]
[xtdb.vector.writer :as vw])
(:import (java.util.function IntPredicate)
(java.lang AutoCloseable)
[org.apache.arrow.memory BufferAllocator]
org.apache.arrow.vector.types.pojo.Field
org.apache.arrow.vector.VectorSchemaRoot
Expand Down Expand Up @@ -123,25 +124,30 @@
opts))

(defmethod ig/init-key :xtdb/compactor [_ {:keys [allocator ^ObjectStore obj-store metadata-mgr buffer-pool]}]
(reify ICompactor
(compactAll [_]
(log/info "compact-all")
(loop []
(let [jobs (for [table-name (->> (.listObjects obj-store "tables")
;; TODO should obj-store listObjects only return keys from the current level?
(into #{} (keep #(second (re-find #"^tables/([^/]+)" %)))))
job (compaction-jobs table-name (->> (trie/list-table-trie-files obj-store table-name)
(trie/current-table-tries)))]
job)
jobs? (boolean (seq jobs))]

(doseq [job jobs]
(exec-compaction-job! allocator obj-store metadata-mgr buffer-pool job))

(when jobs?
(recur)))))))

(defmethod ig/halt-key! :xtdb/compactor [_ _compactor])
(util/with-close-on-catch [allocator (util/->child-allocator allocator "compactor")]
(reify ICompactor
(compactAll [_]
(log/info "compact-all")
(loop []
(let [jobs (for [table-name (->> (.listObjects obj-store "tables")
;; TODO should obj-store listObjects only return keys from the current level?
(into #{} (keep #(second (re-find #"^tables/([^/]+)" %)))))
job (compaction-jobs table-name (->> (trie/list-table-trie-files obj-store table-name)
(trie/current-table-tries)))]
job)
jobs? (boolean (seq jobs))]

(doseq [job jobs]
(exec-compaction-job! allocator obj-store metadata-mgr buffer-pool job))

(when jobs?
(recur)))))
AutoCloseable
(close [_]
(util/close allocator)))))

(defmethod ig/halt-key! :xtdb/compactor [_ compactor]
(util/close compactor))

#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(defn compact-all! [node]
Expand Down
20 changes: 11 additions & 9 deletions core/src/main/clojure/xtdb/indexer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@

Closeable
(close [_]
(util/close allocator)
(some-> shared-wm .close)))

(defmethod ig/prep-key :xtdb/indexer [_ opts]
Expand All @@ -644,19 +645,20 @@
[_ {:keys [allocator object-store metadata-mgr scan-emitter, ra-src, live-index, rows-per-chunk]}]

(let [{:keys [latest-completed-tx next-chunk-idx], :or {next-chunk-idx 0}} (meta/latest-chunk-metadata metadata-mgr)]
(->Indexer allocator object-store metadata-mgr scan-emitter ra-src live-index
(util/with-close-on-catch [allocator (util/->child-allocator allocator "indexer")]
(->Indexer allocator object-store metadata-mgr scan-emitter ra-src live-index

nil ; indexer-error
nil ;; indexer-error

latest-completed-tx
latest-completed-tx
(PriorityBlockingQueue.)
latest-completed-tx
latest-completed-tx
(PriorityBlockingQueue.)

(RowCounter. next-chunk-idx)
rows-per-chunk
(RowCounter. next-chunk-idx)
rows-per-chunk

nil ; watermark
(StampedLock.))))
nil ;; watermark
(StampedLock.)))))

(defmethod ig/halt-key! :xtdb/indexer [_ ^AutoCloseable indexer]
(.close indexer))
44 changes: 28 additions & 16 deletions core/src/main/clojure/xtdb/indexer/live_index.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns xtdb.indexer.live-index
(:require [juxt.clojars-mirrors.integrant.core :as ig]
(:require [clojure.tools.logging :as log]
[juxt.clojars-mirrors.integrant.core :as ig]
[xtdb.buffer-pool]
[xtdb.object-store]
[xtdb.trie :as trie]
Expand All @@ -9,12 +10,14 @@
[xtdb.vector.writer :as vw])
(:import [clojure.lang MapEntry]
(java.lang AutoCloseable)
(java.time Duration)
(java.util ArrayList HashMap Map)
(java.util.concurrent CompletableFuture)
(java.util.function Function)
(org.apache.arrow.memory BufferAllocator)
(xtdb.object_store ObjectStore)
(xtdb.trie LiveHashTrie)
(xtdb.util RefCounter)
(xtdb.vector IRelationWriter IVectorWriter)
(xtdb.watermark ILiveIndexWatermark ILiveTableWatermark)))

Expand Down Expand Up @@ -191,7 +194,6 @@
{:keys [->live-trie]
:or {->live-trie (fn [iid-rdr]
(LiveHashTrie/emptyTrie iid-rdr))}}]

(util/with-close-on-catch [rel (trie/open-leaf-root allocator)]
(let [iid-wtr (.writerForName rel "xt$iid")
op-wtr (.writerForName rel "op")
Expand All @@ -211,7 +213,8 @@
(.setPageLimit page-limit))
(.build)))

(defrecord LiveIndex [^BufferAllocator allocator, ^ObjectStore object-store, ^Map tables, ^long log-limit, ^long page-limit]
(defrecord LiveIndex [^BufferAllocator allocator, ^ObjectStore object-store,
^Map tables, ^RefCounter wm-cnt, ^long log-limit, ^long page-limit]
ILiveIndex
(liveTable [_ table-name] (.get tables table-name))

Expand Down Expand Up @@ -252,26 +255,31 @@
(liveTable [_ table-name] (.get wms table-name))

AutoCloseable
(close [_] (util/close wms)))))
(close [_]
(util/close wms)))))

AutoCloseable
(close [_]))))

(openWatermark [_]
(.acquire wm-cnt)
(try
(util/with-close-on-catch [wms (HashMap.)]

(util/with-close-on-catch [wms (HashMap.)]

(doseq [[table-name ^ILiveTable live-table] tables]
(.put wms table-name (.openWatermark live-table true)))
(doseq [[table-name ^ILiveTable live-table] tables]
(.put wms table-name (.openWatermark live-table true)))

(reify ILiveIndexWatermark
(allColumnTypes [_] (update-vals wms #(.columnTypes ^ILiveTableWatermark %)))
(reify ILiveIndexWatermark
(allColumnTypes [_] (update-vals wms #(.columnTypes ^ILiveTableWatermark %)))

(liveTable [_ table-name] (.get wms table-name))
(liveTable [_ table-name] (.get wms table-name))

AutoCloseable
(close [_]
(util/close wms)))))
AutoCloseable
(close [_]
(util/close wms)
(.release wm-cnt))))
(catch Throwable _t
(.release wm-cnt))))

(finishChunk [_ chunk-idx next-chunk-idx]
(let [trie-key (trie/->trie-key 0 chunk-idx next-chunk-idx)
Expand All @@ -292,7 +300,10 @@

AutoCloseable
(close [_]
(util/close tables)))
(util/close tables)
(if-not (.tryClose wm-cnt (Duration/ofMinutes 1))
(log/warn "Failed to shut down live-index after 60s due to outstanding watermarks.")
(util/close allocator))))

(defmethod ig/prep-key :xtdb.indexer/live-index [_ opts]
(merge {:allocator (ig/ref :xtdb/allocator)
Expand All @@ -301,7 +312,8 @@

(defmethod ig/init-key :xtdb.indexer/live-index [_ {:keys [allocator object-store log-limit page-limit]
:or {log-limit 64 page-limit 1024}}]
(->LiveIndex allocator object-store (HashMap.) log-limit page-limit))
(util/with-close-on-catch [allocator (util/->child-allocator allocator "live-index")]
(->LiveIndex allocator object-store (HashMap.) (RefCounter.) log-limit page-limit)))

(defmethod ig/halt-key! :xtdb.indexer/live-index [_ live-idx]
(util/close live-idx))
15 changes: 9 additions & 6 deletions core/src/main/clojure/xtdb/log/watcher.clj
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,17 @@
(throw (IllegalStateException. (format "Unrecognized log record type %d" (Byte/toUnsignedInt (.get ^ByteBuffer (.-record record) 0))))))))))
!cancel-hook))

(defmethod ig/init-key :xtdb.log/watcher [_ deps]
(let [!watcher-cancel-hook (watch-log! deps)]
(defmethod ig/init-key :xtdb.log/watcher [_ {:keys [allocator] :as deps}]
(util/with-close-on-catch [allocator (util/->child-allocator allocator "watcher")]
(let [!watcher-cancel-hook (watch-log! (-> deps
(assoc :allocator allocator)))]


(reify
AutoCloseable
(close [_]
(util/try-close @!watcher-cancel-hook)))))
(reify
AutoCloseable
(close [_]
(util/try-close @!watcher-cancel-hook)
(util/close allocator))))))

(defmethod ig/halt-key! :xtdb.log/watcher [_ log-watcher]
(util/close log-watcher))
3 changes: 2 additions & 1 deletion core/src/main/clojure/xtdb/node.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
(set! *unchecked-math* :warn-on-boxed)

(defmethod ig/init-key :xtdb/allocator [_ _] (RootAllocator.))
(defmethod ig/halt-key! :xtdb/allocator [_ ^BufferAllocator a] (.close a))
(defmethod ig/halt-key! :xtdb/allocator [_ ^BufferAllocator a]
(util/close a))

(defmethod ig/prep-key :xtdb/default-tz [_ default-tz]
(cond
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/clojure/xtdb/operator.clj
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
(.acquire ref-ctr)
(let [^BufferAllocator allocator
(if allocator
(.newChildAllocator allocator "BoundQuery/openCursor" AllocationListener/NOOP 0 Long/MAX_VALUE)
(util/->child-allocator allocator "BoundQuery/openCursor")
(RootAllocator.))
wm (some-> wm-src (.openWatermark wm-tx))]
(try
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/clojure/xtdb/tx_producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
[xtdb.vector :as vec]
[xtdb.vector.writer :as vw])
(:import (java.time Instant ZoneId)
(java.lang AutoCloseable)
(java.util ArrayList HashMap List)
org.apache.arrow.memory.BufferAllocator
(org.apache.arrow.vector VectorSchemaRoot)
Expand Down Expand Up @@ -403,7 +404,9 @@
(util/then-apply
(fn [^LogRecord result]
(cond-> (.tx result)
system-time (assoc :system-time system-time))))))))
system-time (assoc :system-time system-time)))))))
AutoCloseable
(close [_] (.close allocator)))

(defmethod ig/prep-key ::tx-producer [_ opts]
(merge {:log (ig/ref :xtdb/log)
Expand All @@ -412,4 +415,7 @@
opts))

(defmethod ig/init-key ::tx-producer [_ {:keys [log allocator default-tz]}]
(TxProducer. allocator log default-tz))
(TxProducer. (util/->child-allocator allocator "tx-producer") log default-tz))

(defmethod ig/halt-key! ::tx-producer [_ tx-producer]
(util/close tx-producer))
2 changes: 2 additions & 0 deletions core/src/main/clojure/xtdb/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -712,3 +712,5 @@
(defn normal-form-str->datalog-form-str ^String [^String s]
(NormalForm/datalogForm s))

(defn ->child-allocator [^BufferAllocator allocator name]
(.newChildAllocator allocator name (.getInitReservation allocator) (.getLimit allocator)))
33 changes: 18 additions & 15 deletions modules/flight-sql/src/main/clojure/xtdb/flight_sql.clj
Original file line number Diff line number Diff line change
Expand Up @@ -308,21 +308,24 @@
(defmethod ig/init-key ::server [_ {:keys [allocator node indexer ra-src wm-src host ^long port]}]
(let [fsql-txs (ConcurrentHashMap.)
stmts (ConcurrentHashMap.)
tickets (ConcurrentHashMap.)
server (doto (-> (FlightServer/builder allocator (Location/forGrpcInsecure host port)
(->fsql-producer {:allocator allocator, :node node, :idxer indexer, :ra-src ra-src, :wm-src wm-src
:fsql-txs fsql-txs, :stmts stmts, :tickets tickets}))

#_(doto with-error-logging-middleware)

(.build))
(.start))]
(log/infof "Flight SQL server started, port %d" port)
(reify AutoCloseable
(close [_]
(util/try-close server)
(run! util/try-close (vals stmts))
(log/info "Flight SQL server stopped")))))
tickets (ConcurrentHashMap.)]
(util/with-close-on-catch [allocator (util/->child-allocator allocator "flight-sql")
server (doto (-> (FlightServer/builder allocator (Location/forGrpcInsecure host port)
(->fsql-producer {:allocator allocator, :node node, :idxer indexer, :ra-src ra-src, :wm-src wm-src
:fsql-txs fsql-txs, :stmts stmts, :tickets tickets}))

#_(doto with-error-logging-middleware)

(.build))
(.start))]

(log/infof "Flight SQL server started, port %d" port)
(reify AutoCloseable
(close [_]
(util/try-close server)
(run! util/try-close (vals stmts))
(util/close allocator)
(log/info "Flight SQL server stopped"))))))

(defmethod ig/halt-key! ::server [_ server]
(util/try-close server))
8 changes: 5 additions & 3 deletions src/test/clojure/xtdb/indexer/live_table_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
(org.apache.arrow.memory RootAllocator)
(xtdb.indexer.live_index ILiveIndex TestLiveTable)
(xtdb.trie LiveHashTrie LiveHashTrie$Leaf)
(xtdb.util RefCounter)
xtdb.vector.IVectorPosition
xtdb.watermark.ILiveTableWatermark))

Expand Down Expand Up @@ -149,9 +150,10 @@
(let [uuids [#uuid "7fffffff-ffff-ffff-4fff-ffffffffffff"]
table-name "foo"]
(with-open [obj-store (obj-store-test/in-memory)
allocator (RootAllocator.)
^ILiveIndex live-index (live-index/->LiveIndex allocator obj-store (HashMap.) 64 1024)]
(let [live-index-tx (.startTx live-index (xtp/->TransactionInstant 0 (.toInstant #inst "2000")))
allocator (RootAllocator.)]
(let [live-index-allocator (util/->child-allocator allocator "live-index")
^ILiveIndex live-index (live-index/->LiveIndex live-index-allocator obj-store (HashMap.) (RefCounter.) 64 1024)
live-index-tx (.startTx live-index (xtp/->TransactionInstant 0 (.toInstant #inst "2000")))
live-table-tx (.liveTable live-index-tx table-name)]

(let [wp (IVectorPosition/build)]
Expand Down

0 comments on commit 03e8e6f

Please sign in to comment.