From 03253abf6feba1845f9b740de0756e5aa237479e Mon Sep 17 00:00:00 2001 From: FiVo Date: Fri, 15 Sep 2023 10:40:30 +0200 Subject: [PATCH] Give system components child allocators --- core/src/main/clojure/xtdb/buffer_pool.clj | 7 +- core/src/main/clojure/xtdb/compactor.clj | 44 ++++++----- core/src/main/clojure/xtdb/indexer.clj | 20 ++--- .../main/clojure/xtdb/indexer/live_index.clj | 75 ++++++++++++------- core/src/main/clojure/xtdb/log/watcher.clj | 15 ++-- core/src/main/clojure/xtdb/node.clj | 3 +- core/src/main/clojure/xtdb/operator.clj | 2 +- core/src/main/clojure/xtdb/tx_producer.clj | 10 ++- core/src/main/clojure/xtdb/util.clj | 2 + .../src/main/clojure/xtdb/flight_sql.clj | 33 ++++---- .../clojure/xtdb/indexer/live_table_test.clj | 10 ++- 11 files changed, 133 insertions(+), 88 deletions(-) diff --git a/core/src/main/clojure/xtdb/buffer_pool.clj b/core/src/main/clojure/xtdb/buffer_pool.clj index 8944ab1563..12e019f6d3 100644 --- a/core/src/main/clojure/xtdb/buffer_pool.clj +++ b/core/src/main/clojure/xtdb/buffer_pool.clj @@ -113,7 +113,8 @@ (util/close (.next i)) (.remove i))) (finally - (.unlock buffers-lock stamp)))))) + (.unlock buffers-lock stamp))) + (util/close allocator)))) (defn- buffer-cache-bytes-size ^long [^Map buffers] (long (reduce + (for [^ArrowBuf buffer (vals buffers)] @@ -142,7 +143,9 @@ (when cache-path (util/delete-dir cache-path) (util/mkdirs cache-path)) - (->BufferPool allocator object-store (->buffer-cache cache-entries-size cache-bytes-size) (StampedLock.) cache-path)) + (util/with-close-on-catch [allocator (util/->child-allocator allocator "buffer-pool")] + (->BufferPool allocator object-store + (->buffer-cache cache-entries-size cache-bytes-size) (StampedLock.) cache-path))) (defmethod ig/halt-key! ::buffer-pool [_ ^BufferPool buffer-pool] (.close buffer-pool)) diff --git a/core/src/main/clojure/xtdb/compactor.clj b/core/src/main/clojure/xtdb/compactor.clj index d86740d2a5..3b33839119 100644 --- a/core/src/main/clojure/xtdb/compactor.clj +++ b/core/src/main/clojure/xtdb/compactor.clj @@ -10,6 +10,7 @@ [xtdb.vector.reader :as vr] [xtdb.vector.writer :as vw]) (:import (java.util.function IntPredicate) + (java.lang AutoCloseable) [org.apache.arrow.memory BufferAllocator] org.apache.arrow.vector.types.pojo.Field org.apache.arrow.vector.VectorSchemaRoot @@ -123,25 +124,30 @@ opts)) (defmethod ig/init-key :xtdb/compactor [_ {:keys [allocator ^ObjectStore obj-store metadata-mgr buffer-pool]}] - (reify ICompactor - (compactAll [_] - (log/info "compact-all") - (loop [] - (let [jobs (for [table-name (->> (.listObjects obj-store "tables") - ;; TODO should obj-store listObjects only return keys from the current level? - (into #{} (keep #(second (re-find #"^tables/([^/]+)" %))))) - job (compaction-jobs table-name (->> (trie/list-table-trie-files obj-store table-name) - (trie/current-table-tries)))] - job) - jobs? (boolean (seq jobs))] - - (doseq [job jobs] - (exec-compaction-job! allocator obj-store metadata-mgr buffer-pool job)) - - (when jobs? - (recur))))))) - -(defmethod ig/halt-key! :xtdb/compactor [_ _compactor]) + (util/with-close-on-catch [allocator (util/->child-allocator allocator "compactor")] + (reify ICompactor + (compactAll [_] + (log/info "compact-all") + (loop [] + (let [jobs (for [table-name (->> (.listObjects obj-store "tables") + ;; TODO should obj-store listObjects only return keys from the current level? + (into #{} (keep #(second (re-find #"^tables/([^/]+)" %))))) + job (compaction-jobs table-name (->> (trie/list-table-trie-files obj-store table-name) + (trie/current-table-tries)))] + job) + jobs? (boolean (seq jobs))] + + (doseq [job jobs] + (exec-compaction-job! allocator obj-store metadata-mgr buffer-pool job)) + + (when jobs? + (recur))))) + AutoCloseable + (close [_] + (util/close allocator))))) + +(defmethod ig/halt-key! :xtdb/compactor [_ compactor] + (util/close compactor)) #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} (defn compact-all! [node] diff --git a/core/src/main/clojure/xtdb/indexer.clj b/core/src/main/clojure/xtdb/indexer.clj index a36ec5ac56..f6253be9cb 100644 --- a/core/src/main/clojure/xtdb/indexer.clj +++ b/core/src/main/clojure/xtdb/indexer.clj @@ -628,6 +628,7 @@ Closeable (close [_] + (util/close allocator) (some-> shared-wm .close))) (defmethod ig/prep-key :xtdb/indexer [_ opts] @@ -644,19 +645,20 @@ [_ {:keys [allocator object-store metadata-mgr scan-emitter, ra-src, live-index, rows-per-chunk]}] (let [{:keys [latest-completed-tx next-chunk-idx], :or {next-chunk-idx 0}} (meta/latest-chunk-metadata metadata-mgr)] - (->Indexer allocator object-store metadata-mgr scan-emitter ra-src live-index + (util/with-close-on-catch [allocator (util/->child-allocator allocator "indexer")] + (->Indexer allocator object-store metadata-mgr scan-emitter ra-src live-index - nil ; indexer-error + nil ;; indexer-error - latest-completed-tx - latest-completed-tx - (PriorityBlockingQueue.) + latest-completed-tx + latest-completed-tx + (PriorityBlockingQueue.) - (RowCounter. next-chunk-idx) - rows-per-chunk + (RowCounter. next-chunk-idx) + rows-per-chunk - nil ; watermark - (StampedLock.)))) + nil ;; watermark + (StampedLock.))))) (defmethod ig/halt-key! :xtdb/indexer [_ ^AutoCloseable indexer] (.close indexer)) diff --git a/core/src/main/clojure/xtdb/indexer/live_index.clj b/core/src/main/clojure/xtdb/indexer/live_index.clj index c68fd56cd1..ca7fd25890 100644 --- a/core/src/main/clojure/xtdb/indexer/live_index.clj +++ b/core/src/main/clojure/xtdb/indexer/live_index.clj @@ -1,5 +1,6 @@ (ns xtdb.indexer.live-index - (:require [juxt.clojars-mirrors.integrant.core :as ig] + (:require [clojure.tools.logging :as log] + [juxt.clojars-mirrors.integrant.core :as ig] [xtdb.buffer-pool] [xtdb.object-store] [xtdb.trie :as trie] @@ -9,12 +10,14 @@ [xtdb.vector.writer :as vw]) (:import [clojure.lang MapEntry] (java.lang AutoCloseable) + (java.time Duration) (java.util ArrayList HashMap Map) (java.util.concurrent CompletableFuture) (java.util.function Function) (org.apache.arrow.memory BufferAllocator) (xtdb.object_store ObjectStore) (xtdb.trie LiveHashTrie) + (xtdb.util RefCounter) (xtdb.vector IRelationWriter IVectorWriter) (xtdb.watermark ILiveIndexWatermark ILiveTableWatermark))) @@ -191,7 +194,6 @@ {:keys [->live-trie] :or {->live-trie (fn [iid-rdr] (LiveHashTrie/emptyTrie iid-rdr))}}] - (util/with-close-on-catch [rel (trie/open-leaf-root allocator)] (let [iid-wtr (.writerForName rel "xt$iid") op-wtr (.writerForName rel "op") @@ -211,7 +213,8 @@ (.setPageLimit page-limit)) (.build))) -(defrecord LiveIndex [^BufferAllocator allocator, ^ObjectStore object-store, ^Map tables, ^long log-limit, ^long page-limit] +(defrecord LiveIndex [^BufferAllocator allocator, ^ObjectStore object-store, + ^Map tables, ^RefCounter wm-cnt, ^long log-limit, ^long page-limit] ILiveIndex (liveTable [_ table-name] (.get tables table-name)) @@ -239,39 +242,49 @@ (.abort live-table-tx))) (openWatermark [_] - (util/with-close-on-catch [wms (HashMap.)] - (doseq [[table-name ^ILiveTableTx live-table-tx] table-txs] - (.put wms table-name (.openWatermark live-table-tx))) - - (doseq [[table-name ^ILiveTable live-table] tables] - (.computeIfAbsent wms table-name - (util/->jfn (fn [_] (.openWatermark live-table false))))) - - (reify ILiveIndexWatermark - (allColumnTypes [_] (update-vals wms #(.columnTypes ^ILiveTableWatermark %))) - (liveTable [_ table-name] (.get wms table-name)) - - AutoCloseable - (close [_] (util/close wms))))) + (.acquire wm-cnt) + (try + (util/with-close-on-catch [wms (HashMap.)] + (doseq [[table-name ^ILiveTableTx live-table-tx] table-txs] + (.put wms table-name (.openWatermark live-table-tx))) + + (doseq [[table-name ^ILiveTable live-table] tables] + (.computeIfAbsent wms table-name + (util/->jfn (fn [_] (.openWatermark live-table false))))) + + (reify ILiveIndexWatermark + (allColumnTypes [_] (update-vals wms #(.columnTypes ^ILiveTableWatermark %))) + (liveTable [_ table-name] (.get wms table-name)) + + AutoCloseable + (close [_] + (util/close wms) + (.release wm-cnt)))) + (catch Throwable _t + (.release wm-cnt)))) AutoCloseable (close [_])))) (openWatermark [_] + (.acquire wm-cnt) + (try + (util/with-close-on-catch [wms (HashMap.)] - (util/with-close-on-catch [wms (HashMap.)] + (doseq [[table-name ^ILiveTable live-table] tables] + (.put wms table-name (.openWatermark live-table true))) - (doseq [[table-name ^ILiveTable live-table] tables] - (.put wms table-name (.openWatermark live-table true))) + (reify ILiveIndexWatermark + (allColumnTypes [_] (update-vals wms #(.columnTypes ^ILiveTableWatermark %))) - (reify ILiveIndexWatermark - (allColumnTypes [_] (update-vals wms #(.columnTypes ^ILiveTableWatermark %))) + (liveTable [_ table-name] (.get wms table-name)) - (liveTable [_ table-name] (.get wms table-name)) - - AutoCloseable - (close [_] - (util/close wms))))) + AutoCloseable + (close [_] + (util/close wms) + (.release wm-cnt)))) + (catch Throwable _t + (.release wm-cnt)))) (finishChunk [_ chunk-idx next-chunk-idx] (let [trie-key (trie/->trie-key 0 chunk-idx next-chunk-idx) @@ -292,7 +305,10 @@ AutoCloseable (close [_] - (util/close tables))) + (util/close tables) + (if-not (.tryClose wm-cnt (Duration/ofMinutes 1)) + (log/warn "Failed to shut down live-index after 60s due to outstanding watermarks.") + (util/close allocator)))) (defmethod ig/prep-key :xtdb.indexer/live-index [_ opts] (merge {:allocator (ig/ref :xtdb/allocator) @@ -301,7 +317,8 @@ (defmethod ig/init-key :xtdb.indexer/live-index [_ {:keys [allocator object-store log-limit page-limit] :or {log-limit 64 page-limit 1024}}] - (->LiveIndex allocator object-store (HashMap.) log-limit page-limit)) + (util/with-close-on-catch [allocator (util/->child-allocator allocator "live-index")] + (->LiveIndex allocator object-store (HashMap.) (RefCounter.) log-limit page-limit))) (defmethod ig/halt-key! :xtdb.indexer/live-index [_ live-idx] (util/close live-idx)) diff --git a/core/src/main/clojure/xtdb/log/watcher.clj b/core/src/main/clojure/xtdb/log/watcher.clj index 4c5d56c6f1..48674f1831 100644 --- a/core/src/main/clojure/xtdb/log/watcher.clj +++ b/core/src/main/clojure/xtdb/log/watcher.clj @@ -62,14 +62,17 @@ (throw (IllegalStateException. (format "Unrecognized log record type %d" (Byte/toUnsignedInt (.get ^ByteBuffer (.-record record) 0)))))))))) !cancel-hook)) -(defmethod ig/init-key :xtdb.log/watcher [_ deps] - (let [!watcher-cancel-hook (watch-log! deps)] +(defmethod ig/init-key :xtdb.log/watcher [_ {:keys [allocator] :as deps}] + (util/with-close-on-catch [allocator (util/->child-allocator allocator "watcher")] + (let [!watcher-cancel-hook (watch-log! (-> deps + (assoc :allocator allocator)))] - (reify - AutoCloseable - (close [_] - (util/try-close @!watcher-cancel-hook))))) + (reify + AutoCloseable + (close [_] + (util/try-close @!watcher-cancel-hook) + (util/close allocator)))))) (defmethod ig/halt-key! :xtdb.log/watcher [_ log-watcher] (util/close log-watcher)) diff --git a/core/src/main/clojure/xtdb/node.clj b/core/src/main/clojure/xtdb/node.clj index e7a6462b7d..7a72c14915 100644 --- a/core/src/main/clojure/xtdb/node.clj +++ b/core/src/main/clojure/xtdb/node.clj @@ -22,7 +22,8 @@ (set! *unchecked-math* :warn-on-boxed) (defmethod ig/init-key :xtdb/allocator [_ _] (RootAllocator.)) -(defmethod ig/halt-key! :xtdb/allocator [_ ^BufferAllocator a] (.close a)) +(defmethod ig/halt-key! :xtdb/allocator [_ ^BufferAllocator a] + (util/close a)) (defmethod ig/prep-key :xtdb/default-tz [_ default-tz] (cond diff --git a/core/src/main/clojure/xtdb/operator.clj b/core/src/main/clojure/xtdb/operator.clj index 960f27d80f..2be08e5e04 100644 --- a/core/src/main/clojure/xtdb/operator.clj +++ b/core/src/main/clojure/xtdb/operator.clj @@ -132,7 +132,7 @@ (.acquire ref-ctr) (let [^BufferAllocator allocator (if allocator - (.newChildAllocator allocator "BoundQuery/openCursor" AllocationListener/NOOP 0 Long/MAX_VALUE) + (util/->child-allocator allocator "BoundQuery/openCursor") (RootAllocator.)) wm (some-> wm-src (.openWatermark wm-tx))] (try diff --git a/core/src/main/clojure/xtdb/tx_producer.clj b/core/src/main/clojure/xtdb/tx_producer.clj index 99b6d1cd42..d7e9916b9a 100644 --- a/core/src/main/clojure/xtdb/tx_producer.clj +++ b/core/src/main/clojure/xtdb/tx_producer.clj @@ -9,6 +9,7 @@ [xtdb.vector :as vec] [xtdb.vector.writer :as vw]) (:import (java.time Instant ZoneId) + (java.lang AutoCloseable) (java.util ArrayList HashMap List) org.apache.arrow.memory.BufferAllocator (org.apache.arrow.vector VectorSchemaRoot) @@ -403,7 +404,9 @@ (util/then-apply (fn [^LogRecord result] (cond-> (.tx result) - system-time (assoc :system-time system-time)))))))) + system-time (assoc :system-time system-time))))))) + AutoCloseable + (close [_] (.close allocator))) (defmethod ig/prep-key ::tx-producer [_ opts] (merge {:log (ig/ref :xtdb/log) @@ -412,4 +415,7 @@ opts)) (defmethod ig/init-key ::tx-producer [_ {:keys [log allocator default-tz]}] - (TxProducer. allocator log default-tz)) + (TxProducer. (util/->child-allocator allocator "tx-producer") log default-tz)) + +(defmethod ig/halt-key! ::tx-producer [_ tx-producer] + (util/close tx-producer)) diff --git a/core/src/main/clojure/xtdb/util.clj b/core/src/main/clojure/xtdb/util.clj index 0d16b72014..f8831718f5 100644 --- a/core/src/main/clojure/xtdb/util.clj +++ b/core/src/main/clojure/xtdb/util.clj @@ -712,3 +712,5 @@ (defn normal-form-str->datalog-form-str ^String [^String s] (NormalForm/datalogForm s)) +(defn ->child-allocator [^BufferAllocator allocator name] + (.newChildAllocator allocator name (.getInitReservation allocator) (.getLimit allocator))) diff --git a/modules/flight-sql/src/main/clojure/xtdb/flight_sql.clj b/modules/flight-sql/src/main/clojure/xtdb/flight_sql.clj index 0f417206c1..5d47003af1 100644 --- a/modules/flight-sql/src/main/clojure/xtdb/flight_sql.clj +++ b/modules/flight-sql/src/main/clojure/xtdb/flight_sql.clj @@ -308,21 +308,24 @@ (defmethod ig/init-key ::server [_ {:keys [allocator node indexer ra-src wm-src host ^long port]}] (let [fsql-txs (ConcurrentHashMap.) stmts (ConcurrentHashMap.) - tickets (ConcurrentHashMap.) - server (doto (-> (FlightServer/builder allocator (Location/forGrpcInsecure host port) - (->fsql-producer {:allocator allocator, :node node, :idxer indexer, :ra-src ra-src, :wm-src wm-src - :fsql-txs fsql-txs, :stmts stmts, :tickets tickets})) - - #_(doto with-error-logging-middleware) - - (.build)) - (.start))] - (log/infof "Flight SQL server started, port %d" port) - (reify AutoCloseable - (close [_] - (util/try-close server) - (run! util/try-close (vals stmts)) - (log/info "Flight SQL server stopped"))))) + tickets (ConcurrentHashMap.)] + (util/with-close-on-catch [allocator (util/->child-allocator allocator "flight-sql") + server (doto (-> (FlightServer/builder allocator (Location/forGrpcInsecure host port) + (->fsql-producer {:allocator allocator, :node node, :idxer indexer, :ra-src ra-src, :wm-src wm-src + :fsql-txs fsql-txs, :stmts stmts, :tickets tickets})) + + #_(doto with-error-logging-middleware) + + (.build)) + (.start))] + + (log/infof "Flight SQL server started, port %d" port) + (reify AutoCloseable + (close [_] + (util/try-close server) + (run! util/try-close (vals stmts)) + (util/close allocator) + (log/info "Flight SQL server stopped")))))) (defmethod ig/halt-key! ::server [_ server] (util/try-close server)) diff --git a/src/test/clojure/xtdb/indexer/live_table_test.clj b/src/test/clojure/xtdb/indexer/live_table_test.clj index f474ebc6c8..29d1f7d407 100644 --- a/src/test/clojure/xtdb/indexer/live_table_test.clj +++ b/src/test/clojure/xtdb/indexer/live_table_test.clj @@ -14,6 +14,7 @@ (org.apache.arrow.memory RootAllocator) (xtdb.indexer.live_index ILiveIndex TestLiveTable) (xtdb.trie LiveHashTrie LiveHashTrie$Leaf) + (xtdb.util RefCounter) xtdb.vector.IVectorPosition xtdb.watermark.ILiveTableWatermark)) @@ -149,9 +150,10 @@ (let [uuids [#uuid "7fffffff-ffff-ffff-4fff-ffffffffffff"] table-name "foo"] (with-open [obj-store (obj-store-test/in-memory) - allocator (RootAllocator.) - ^ILiveIndex live-index (live-index/->LiveIndex allocator obj-store (HashMap.) 64 1024)] - (let [live-index-tx (.startTx live-index (xtp/->TransactionInstant 0 (.toInstant #inst "2000"))) + allocator (RootAllocator.)] + (let [live-index-allocator (util/->child-allocator allocator "live-index") + ^ILiveIndex live-index (live-index/->LiveIndex live-index-allocator obj-store (HashMap.) (RefCounter.) 64 1024) + live-index-tx (.startTx live-index (xtp/->TransactionInstant 0 (.toInstant #inst "2000"))) live-table-tx (.liveTable live-index-tx table-name)] (let [wp (IVectorPosition/build)] @@ -166,7 +168,7 @@ (let [live-table-before (live-table-wm->data (.liveTable live-index-wm table-name))] (.finishChunk live-index 0 10) - (.close live-index) + (future (.close live-index)) (let [live-table-after (live-table-wm->data (.liveTable live-index-wm table-name))]