From 3bde725e00102421dfbe1b30738e13e1fd4a684c Mon Sep 17 00:00:00 2001 From: FiVo Date: Fri, 25 Aug 2023 19:04:17 +0200 Subject: [PATCH] Make more use of iid blooms for filtering pages --- core/src/main/clojure/xtdb/metadata.clj | 16 +++-- core/src/main/clojure/xtdb/operator/scan.clj | 4 +- core/src/main/clojure/xtdb/trie.clj | 72 ++++++++++++-------- src/test/clojure/xtdb/trie_test.clj | 45 ++++++------ 4 files changed, 83 insertions(+), 54 deletions(-) diff --git a/core/src/main/clojure/xtdb/metadata.clj b/core/src/main/clojure/xtdb/metadata.clj index 5e7a3c54bd..b9966e9dfc 100644 --- a/core/src/main/clojure/xtdb/metadata.clj +++ b/core/src/main/clojure/xtdb/metadata.clj @@ -32,8 +32,9 @@ (definterface ITableMetadata (^xtdb.vector.IVectorReader metadataReader []) (^java.util.Set columnNames []) - (^Long rowIndex [^String column-name, ^int pageIdx]) - (^long pageCount [])) + (^Long rowIndex [^String columnName, ^int pageIdx]) + (^long pageCount []) + (^org.roaringbitmap.buffer.ImmutableRoaringBitmap iidBloomBitmap [^int pageIdx])) #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} (definterface IPageMetadataWriter @@ -280,8 +281,15 @@ (reify ITableMetadata (metadataReader [_] metadata-reader) (columnNames [_] col-names) - (rowIndex [_ col-name block-idx] (get page-idx-cache [col-name block-idx])) - (pageCount [_] page-count))) + (rowIndex [_ col-name page-idx] (get page-idx-cache [col-name page-idx])) + (pageCount [_] page-count) + (iidBloomBitmap [_ page-idx] + (let [bloom-rdr (-> (.structKeyReader metadata-reader "columns") + (.listElementReader) + (.structKeyReader "bloom"))] + (when-let [bloom-vec-idx (get page-idx-cache ["xt$iid" page-idx])] + (when (not (nil? (.getObject bloom-rdr bloom-vec-idx))) + (bloom/bloom->bitmap bloom-rdr bloom-vec-idx))))))) (deftype MetadataManager [^ObjectStore object-store ^IBufferPool buffer-pool diff --git a/core/src/main/clojure/xtdb/operator/scan.clj b/core/src/main/clojure/xtdb/operator/scan.clj index 5b6d780ee3..a61c4f6e7b 100644 --- a/core/src/main/clojure/xtdb/operator/scan.clj +++ b/core/src/main/clojure/xtdb/operator/scan.clj @@ -24,7 +24,7 @@ (org.apache.arrow.memory BufferAllocator) [org.apache.arrow.memory.util ArrowBufPointer] [org.roaringbitmap RoaringBitmap] - [org.roaringbitmap.buffer MutableRoaringBitmap] + [org.roaringbitmap.buffer ImmutableRoaringBitmap MutableRoaringBitmap] xtdb.api.protocols.TransactionInstant xtdb.buffer_pool.IBufferPool xtdb.ICursor @@ -597,7 +597,7 @@ (map (partial filter-trie-match metadata-mgr col-names)) (remove nil?) (into {} (map (juxt :buf-key :page-idxs)))) - merge-plan (trie/table-merge-plan buffer-pool table-chunks trie-file->page-idxs live-table-wm)] + merge-plan (trie/table-merge-plan buffer-pool metadata-mgr table-chunks trie-file->page-idxs live-table-wm)] ;; The consumers for different leafs need to share some state so the logic of how to advance ;; is correct. For example if the `skip-iid-ptr` gets set in one leaf consumer it should also affect diff --git a/core/src/main/clojure/xtdb/trie.clj b/core/src/main/clojure/xtdb/trie.clj index 8e2d4cdc73..9be5315724 100644 --- a/core/src/main/clojure/xtdb/trie.clj +++ b/core/src/main/clojure/xtdb/trie.clj @@ -20,7 +20,9 @@ (org.apache.arrow.vector.types.pojo ArrowType$Union Schema) org.apache.arrow.vector.types.UnionMode (org.roaringbitmap RoaringBitmap) + (org.roaringbitmap.buffer ImmutableRoaringBitmap MutableRoaringBitmap) xtdb.buffer_pool.IBufferPool + (xtdb.metadata IMetadataManager ITableMetadata) (xtdb.object_store ObjectStore) (xtdb.trie ArrowHashTrie ArrowHashTrie$Leaf HashTrie HashTrie$Node LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie LiveHashTrie$Leaf) (xtdb.util WritableByteBufferChannel) @@ -232,7 +234,7 @@ (defn ->merge-plan "Returns a tree of the tasks required to merge the given tries " - [tries, trie-page-idxs] + [tries, trie-page-idxs, iid-bloom-bitmap] (letfn [(->merge-plan* [nodes path ^long level] (let [trie-children (mapv #(some-> ^HashTrie$Node % (.children)) nodes)] @@ -250,43 +252,57 @@ {:path (byte-array path) :node [:branch branches]})) - (loop [ordinal 0 - [node & more-nodes] nodes - node-taken? false - leaves []] - (cond - (not (< ordinal (count nodes))) (when node-taken? - {:path (byte-array path) - :node [:leaf leaves]}) - node (condp = (class node) - ArrowHashTrie$Leaf - (let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf node) - take-node? (or node-taken? - (some-> ^RoaringBitmap (nth trie-page-idxs ordinal) - (.contains page-idx)))] - (recur (inc ordinal) more-nodes take-node? - (conj leaves (when take-node? - {:page-idx page-idx})))) - LiveHashTrie$Leaf - (recur (inc ordinal) more-nodes true (conj leaves node))) - - :else (recur (inc ordinal) more-nodes node-taken? (conj leaves nil)))))))] + (let [^MutableRoaringBitmap cumulative-iid-bitmap (MutableRoaringBitmap.)] + (loop [ordinal 0 + [node & more-nodes] nodes + node-taken? false + leaves []] + (cond + (not (< ordinal (count nodes))) (when node-taken? + {:path (byte-array path) + :node [:leaf leaves]}) + node (condp = (class node) + ArrowHashTrie$Leaf + (let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf node) + take-node? (some-> ^RoaringBitmap (nth trie-page-idxs ordinal) + (.contains page-idx))] + (when take-node? + (.or cumulative-iid-bitmap (iid-bloom-bitmap ordinal page-idx))) + (recur (inc ordinal) more-nodes (or node-taken? take-node?) + (conj leaves (when (or take-node? + (when node-taken? + (when-let [iid-bitmap (iid-bloom-bitmap ordinal page-idx)] + (MutableRoaringBitmap/intersects cumulative-iid-bitmap iid-bitmap)))) + {:page-idx page-idx})))) + LiveHashTrie$Leaf + (recur (inc ordinal) more-nodes true (conj leaves node))) + + :else (recur (inc ordinal) more-nodes node-taken? (conj leaves nil))))))))] (->merge-plan* (map #(some-> ^HashTrie % (.rootNode)) tries) [] 0))) -(defn table-merge-plan [^IBufferPool buffer-pool, table-tries, trie-file->page-idxs, ^ILiveTableWatermark live-table-wm] - (util/with-open [trie-roots (ArrayList. (count table-tries))] +(defn table-merge-plan [^IBufferPool buffer-pool, ^IMetadataManager metadata-mgr, table-chunks, + trie-file->page-idxs, ^ILiveTableWatermark live-table-wm] + + (util/with-open [trie-roots (ArrayList. (count table-chunks))] ;; TODO these could be kicked off asynchronously - (let [tries (cond-> (vec (for [{:keys [trie-file]} table-tries] + (let [tries (cond-> (vec (for [{:keys [trie-file chunk-idx]} table-chunks] (with-open [^ArrowBuf buf @(.getBuffer buffer-pool trie-file)] (let [{:keys [^VectorLoader loader root arrow-blocks]} (util/read-arrow-buf buf)] (with-open [record-batch (util/->arrow-record-batch-view (first arrow-blocks) buf)] (.load loader record-batch) (.add trie-roots root) {:trie (ArrowHashTrie/from root) - :page-idxs (trie-file->page-idxs trie-file)}))))) - live-table-wm (conj {:trie (.compactLogs (.liveTrie live-table-wm))}))] - (->merge-plan (mapv :trie tries) (mapv :page-idxs tries))))) + :trie-file trie-file + :page-idxs (trie-file->page-idxs chunk-idx)}))))) + live-table-wm (conj {:trie (.compactLogs (.liveTrie live-table-wm))})) + trie-files (mapv :trie-file tries)] + (letfn [(iid-bloom-bitmap ^ImmutableRoaringBitmap [ordinal page-idx] + @(meta/with-metadata metadata-mgr (nth trie-files ordinal) + (util/->jfn + (fn [^ITableMetadata table-meta] + (.iidBloomBitmap table-meta page-idx)))))] + (->merge-plan (mapv :trie tries) (mapv :page-idxs tries) iid-bloom-bitmap))))) #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} (definterface ILeafLoader diff --git a/src/test/clojure/xtdb/trie_test.clj b/src/test/clojure/xtdb/trie_test.clj index 1cf29042de..ebd0c9ea6f 100644 --- a/src/test/clojure/xtdb/trie_test.clj +++ b/src/test/clojure/xtdb/trie_test.clj @@ -4,9 +4,10 @@ [xtdb.test-util :as tu] [xtdb.trie :as trie]) (:import (clojure.lang MapEntry) + (org.apache.arrow.memory RootAllocator) (org.roaringbitmap RoaringBitmap) - (xtdb.trie ArrowHashTrie) - (org.apache.arrow.memory RootAllocator))) + (org.roaringbitmap.buffer MutableRoaringBitmap) + (xtdb.trie ArrowHashTrie))) (deftest test-merge-plan-with-nil-nodes-2700 (with-open [al (RootAllocator.) @@ -16,21 +17,25 @@ (let [trie-page-idxs [(RoaringBitmap.) (RoaringBitmap/bitmapOf (int-array '(0 1 2 3))) (RoaringBitmap/bitmapOf (int-array '(0))) - (RoaringBitmap/bitmapOf (int-array '(0 1)))]] - (t/is (= {:path [], - :node [:branch - [{:path [0], - :node [:branch - [{:path [0 0], :node [:leaf [nil nil {:page-idx 0} nil]]} - {:path [0 1], :node [:leaf [nil {:page-idx 0} {:page-idx 0} nil]]} - {:path [0 2], :node [:leaf [nil nil {:page-idx 0} nil]]} - {:path [0 3], :node [:leaf [nil {:page-idx 1} {:page-idx 0} nil]]}]]} - {:path [1], :node [:leaf [nil {:page-idx 2} {:page-idx 0} nil]]} - {:path [2], :node [:leaf [nil nil {:page-idx 0} {:page-idx 0}]]} - {:path [3], :node [:leaf [nil {:page-idx 3} {:page-idx 0} {:page-idx 1}]]}]]} - (->> (trie/->merge-plan [nil (ArrowHashTrie/from t1-root) (ArrowHashTrie/from log-root) (ArrowHashTrie/from log2-root)] - trie-page-idxs) - (walk/postwalk (fn [x] - (if (and (map-entry? x) (= :path (key x))) - (MapEntry/create :path (into [] (val x))) - x))))))))) + (RoaringBitmap/bitmapOf (int-array '(0 1)))] + constant-iid-bloom-bitmap (.toImmutableRoaringBitmap (MutableRoaringBitmap/bitmapOf (int-array '(1000 1001 1002))))] + (letfn [(iid-bloom-bitmap [_ordinal _page-idx] + constant-iid-bloom-bitmap)] + (t/is (= {:path [], + :node [:branch + [{:path [0], + :node [:branch + [{:path [0 0], :node [:leaf [nil nil {:page-idx 0} nil]]} + {:path [0 1], :node [:leaf [nil {:page-idx 0} {:page-idx 0} nil]]} + {:path [0 2], :node [:leaf [nil nil {:page-idx 0} nil]]} + {:path [0 3], :node [:leaf [nil {:page-idx 1} {:page-idx 0} nil]]}]]} + {:path [1], :node [:leaf [nil {:page-idx 2} {:page-idx 0} nil]]} + {:path [2], :node [:leaf [nil nil {:page-idx 0} {:page-idx 0}]]} + {:path [3], :node [:leaf [nil {:page-idx 3} {:page-idx 0} {:page-idx 1}]]}]]} + (->> (trie/->merge-plan [nil (ArrowHashTrie/from t1-root) (ArrowHashTrie/from log-root) (ArrowHashTrie/from log2-root)] + trie-page-idxs + iid-bloom-bitmap) + (walk/postwalk (fn [x] + (if (and (map-entry? x) (= :path (key x))) + (MapEntry/create :path (into [] (val x))) + x))))))))))