diff --git a/core/src/main/clojure/xtdb/metadata.clj b/core/src/main/clojure/xtdb/metadata.clj index a21b7a05f3..30f519d410 100644 --- a/core/src/main/clojure/xtdb/metadata.clj +++ b/core/src/main/clojure/xtdb/metadata.clj @@ -15,8 +15,8 @@ java.lang.AutoCloseable java.nio.ByteBuffer (java.util HashMap HashSet Map NavigableMap Set TreeMap) - (java.util.concurrent ConcurrentHashMap CompletableFuture) - (java.util.function BiFunction Consumer Function) + (java.util.concurrent ConcurrentHashMap) + (java.util.function BiFunction Function) (java.util.stream IntStream) (org.apache.arrow.memory ArrowBuf) (org.apache.arrow.vector FieldVector VectorLoader VectorSchemaRoot) @@ -32,8 +32,9 @@ (definterface ITableMetadata (^xtdb.vector.IVectorReader metadataReader []) (^java.util.Set columnNames []) - (^Long rowIndex [^String column-name, ^int pageIdx]) - (^long pageCount [])) + (^Long rowIndex [^String columnName, ^int pageIdx]) + (^long pageCount []) + (^org.roaringbitmap.buffer.ImmutableRoaringBitmap iidBloomBitmap [^int pageIdx])) #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} (definterface IPageMetadataWriter @@ -288,8 +289,15 @@ (reify ITableMetadata (metadataReader [_] metadata-reader) (columnNames [_] col-names) - (rowIndex [_ col-name block-idx] (get page-idx-cache [col-name block-idx])) - (pageCount [_] page-count))) + (rowIndex [_ col-name page-idx] (get page-idx-cache [col-name page-idx])) + (pageCount [_] page-count) + (iidBloomBitmap [_ page-idx] + (let [bloom-rdr (-> (.structKeyReader metadata-reader "columns") + (.listElementReader) + (.structKeyReader "bloom"))] + (when-let [bloom-vec-idx (get page-idx-cache ["xt$iid" page-idx])] + (when (not (nil? (.getObject bloom-rdr bloom-vec-idx))) + (bloom/bloom->bitmap bloom-rdr bloom-vec-idx))))))) (deftype MetadataManager [^ObjectStore object-store ^IBufferPool buffer-pool diff --git a/core/src/main/clojure/xtdb/operator/scan.clj b/core/src/main/clojure/xtdb/operator/scan.clj index 9232c11aa8..8105918a31 100644 --- a/core/src/main/clojure/xtdb/operator/scan.clj +++ b/core/src/main/clojure/xtdb/operator/scan.clj @@ -25,11 +25,13 @@ [org.apache.arrow.memory.util ArrowBufPointer] (org.apache.arrow.vector VectorLoader VectorSchemaRoot) [org.apache.arrow.vector.types.pojo Schema] + [org.roaringbitmap.buffer ImmutableRoaringBitmap] xtdb.api.protocols.TransactionInstant xtdb.buffer_pool.IBufferPool xtdb.ICursor xtdb.indexer.live_index.ILiveTableWatermark (xtdb.metadata IMetadataManager) + (xtdb.metadata ITableMetadata) xtdb.object_store.ObjectStore xtdb.operator.IRelationSelector (xtdb.trie ArrowHashTrie LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie$Leaf) @@ -455,7 +457,9 @@ (transient {:chunk-idx chunk-idx}) files)))))) -(defn table-merge-plan [^IBufferPool buffer-pool, table-chunks, chunk-idx->page-idxs, ^ILiveTableWatermark live-table-wm] +(defn table-merge-plan [^IBufferPool buffer-pool, ^IMetadataManager metadata-mgr, table-chunks, + chunk-idx->page-idxs, ^ILiveTableWatermark live-table-wm, table-name] + (util/with-open [trie-roots (ArrayList. (count table-chunks))] ;; TODO these could be kicked off asynchronously (let [tries (cond-> (vec (for [{:keys [trie-file chunk-idx]} table-chunks] @@ -465,9 +469,16 @@ (.load loader record-batch) (.add trie-roots root) {:trie (ArrowHashTrie/from root) + :chunk-idx chunk-idx :page-idxs (chunk-idx->page-idxs chunk-idx)}))))) - live-table-wm (conj {:trie (.compactLogs (.liveTrie live-table-wm))}))] - (trie/->merge-plan (mapv :trie tries) (mapv :page-idxs tries))))) + live-table-wm (conj {:trie (.compactLogs (.liveTrie live-table-wm))})) + chunk-idxs (mapv :chunk-idx tries)] + (letfn [(iid-bloom-bitmap ^ImmutableRoaringBitmap [ordinal page-idx] + @(meta/with-metadata metadata-mgr (nth chunk-idxs ordinal) table-name + (util/->jfn + (fn [^ITableMetadata table-meta] + (.iidBloomBitmap table-meta page-idx)))))] + (trie/->merge-plan (mapv :trie tries) (mapv :page-idxs tries) iid-bloom-bitmap))))) (defn merge-plan->tasks ^java.lang.Iterable [{:keys [path node]}] (when-let [[node-tag node-arg] node] @@ -602,7 +613,8 @@ ;; is correct. For example if the `skip-iid-ptr` gets set in one leaf consumer it should also affect ;; the skipping in another leaf consumer. -(defn ->4r-cursor [^BufferAllocator allocator, ^ObjectStore obj-store, ^IBufferPool buffer-pool, ^IWatermark wm +(defn ->4r-cursor [^BufferAllocator allocator, ^ObjectStore obj-store, ^IBufferPool buffer-pool, + ^IMetadataManager metadata-mgr, ^IWatermark wm table-name, col-names, ^longs temporal-range, ^List matching-tries ^Map col-preds, params, scan-opts] (let [^ILiveTableWatermark live-table-wm (some-> (.liveIndex wm) (.liveTable table-name)) @@ -611,7 +623,7 @@ chunk-idx->page-idxs (->> matching-tries (filter #(not-empty (set/intersection normalized-col-names (:col-names %)))) (into {} (map (juxt :chunk-idx :page-idxs)))) - merge-plan (table-merge-plan buffer-pool table-chunks chunk-idx->page-idxs live-table-wm)] + merge-plan (table-merge-plan buffer-pool metadata-mgr table-chunks chunk-idx->page-idxs live-table-wm table-name)] (util/with-close-on-catch [leaves (open-leaves buffer-pool table-chunks live-table-wm)] (->TrieCursor allocator leaves (.iterator (let [^Iterable c (or (merge-plan->tasks merge-plan) [])] c)) col-names col-preds @@ -724,7 +736,7 @@ (fn [fvt] (or fvt (if default-all-valid-time? [:all-time] [:at [:now :now]])))))] (->4r-cursor allocator object-store buffer-pool - watermark + metadata-mgr watermark normalized-table-name (set/union content-col-names temporal-col-names) (->temporal-range params basis scan-opts) diff --git a/core/src/main/clojure/xtdb/trie.clj b/core/src/main/clojure/xtdb/trie.clj index 98e31c0e2c..25be23bf4f 100644 --- a/core/src/main/clojure/xtdb/trie.clj +++ b/core/src/main/clojure/xtdb/trie.clj @@ -18,6 +18,7 @@ (org.apache.arrow.vector.types.pojo ArrowType$Union Schema) org.apache.arrow.vector.types.UnionMode (org.roaringbitmap RoaringBitmap) + (org.roaringbitmap.buffer MutableRoaringBitmap) (xtdb.object_store ObjectStore) (xtdb.trie ArrowHashTrie$Leaf HashTrie HashTrie$Node LiveHashTrie LiveHashTrie$Leaf) (xtdb.util WritableByteBufferChannel) @@ -206,7 +207,7 @@ (defn ->merge-plan "Returns a tree of the tasks required to merge the given tries " - [tries, trie-page-idxs] + [tries, trie-page-idxs, iid-bloom-bitmap] (letfn [(->merge-plan* [nodes path ^long level] (let [trie-children (mapv #(some-> ^HashTrie$Node % (.children)) nodes)] @@ -224,26 +225,31 @@ {:path (byte-array path) :node [:branch branches]})) - (loop [ordinal 0 - [node & more-nodes] nodes - node-taken? false - leaves []] - (cond - (not (< ordinal (count nodes))) (when node-taken? - {:path (byte-array path) - :node [:leaf leaves]}) - node (condp = (class node) - ArrowHashTrie$Leaf - (let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf node) - take-node? (or node-taken? - (some-> ^RoaringBitmap (nth trie-page-idxs ordinal) - (.contains page-idx)))] - (recur (inc ordinal) more-nodes take-node? - (conj leaves (when take-node? - {:page-idx page-idx})))) - LiveHashTrie$Leaf - (recur (inc ordinal) more-nodes true (conj leaves node))) - - :else (recur (inc ordinal) more-nodes node-taken? (conj leaves nil)))))))] + (let [^MutableRoaringBitmap cumulative-iid-bitmap (MutableRoaringBitmap.)] + (loop [ordinal 0 + [node & more-nodes] nodes + node-taken? false + leaves []] + (cond + (not (< ordinal (count nodes))) (when node-taken? + {:path (byte-array path) + :node [:leaf leaves]}) + node (condp = (class node) + ArrowHashTrie$Leaf + (let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf node) + take-node? (some-> ^RoaringBitmap (nth trie-page-idxs ordinal) + (.contains page-idx))] + (when take-node? + (.or cumulative-iid-bitmap (iid-bloom-bitmap ordinal page-idx))) + (recur (inc ordinal) more-nodes (or node-taken? take-node?) + (conj leaves (when (or take-node? + (when node-taken? + (when-let [iid-bitmap (iid-bloom-bitmap ordinal page-idx)] + (MutableRoaringBitmap/intersects cumulative-iid-bitmap iid-bitmap)))) + {:page-idx page-idx})))) + LiveHashTrie$Leaf + (recur (inc ordinal) more-nodes true (conj leaves node))) + + :else (recur (inc ordinal) more-nodes node-taken? (conj leaves nil))))))))] (->merge-plan* (map #(some-> ^HashTrie % (.rootNode)) tries) [] 0))) diff --git a/src/test/clojure/xtdb/trie_test.clj b/src/test/clojure/xtdb/trie_test.clj index 1cf29042de..ebd0c9ea6f 100644 --- a/src/test/clojure/xtdb/trie_test.clj +++ b/src/test/clojure/xtdb/trie_test.clj @@ -4,9 +4,10 @@ [xtdb.test-util :as tu] [xtdb.trie :as trie]) (:import (clojure.lang MapEntry) + (org.apache.arrow.memory RootAllocator) (org.roaringbitmap RoaringBitmap) - (xtdb.trie ArrowHashTrie) - (org.apache.arrow.memory RootAllocator))) + (org.roaringbitmap.buffer MutableRoaringBitmap) + (xtdb.trie ArrowHashTrie))) (deftest test-merge-plan-with-nil-nodes-2700 (with-open [al (RootAllocator.) @@ -16,21 +17,25 @@ (let [trie-page-idxs [(RoaringBitmap.) (RoaringBitmap/bitmapOf (int-array '(0 1 2 3))) (RoaringBitmap/bitmapOf (int-array '(0))) - (RoaringBitmap/bitmapOf (int-array '(0 1)))]] - (t/is (= {:path [], - :node [:branch - [{:path [0], - :node [:branch - [{:path [0 0], :node [:leaf [nil nil {:page-idx 0} nil]]} - {:path [0 1], :node [:leaf [nil {:page-idx 0} {:page-idx 0} nil]]} - {:path [0 2], :node [:leaf [nil nil {:page-idx 0} nil]]} - {:path [0 3], :node [:leaf [nil {:page-idx 1} {:page-idx 0} nil]]}]]} - {:path [1], :node [:leaf [nil {:page-idx 2} {:page-idx 0} nil]]} - {:path [2], :node [:leaf [nil nil {:page-idx 0} {:page-idx 0}]]} - {:path [3], :node [:leaf [nil {:page-idx 3} {:page-idx 0} {:page-idx 1}]]}]]} - (->> (trie/->merge-plan [nil (ArrowHashTrie/from t1-root) (ArrowHashTrie/from log-root) (ArrowHashTrie/from log2-root)] - trie-page-idxs) - (walk/postwalk (fn [x] - (if (and (map-entry? x) (= :path (key x))) - (MapEntry/create :path (into [] (val x))) - x))))))))) + (RoaringBitmap/bitmapOf (int-array '(0 1)))] + constant-iid-bloom-bitmap (.toImmutableRoaringBitmap (MutableRoaringBitmap/bitmapOf (int-array '(1000 1001 1002))))] + (letfn [(iid-bloom-bitmap [_ordinal _page-idx] + constant-iid-bloom-bitmap)] + (t/is (= {:path [], + :node [:branch + [{:path [0], + :node [:branch + [{:path [0 0], :node [:leaf [nil nil {:page-idx 0} nil]]} + {:path [0 1], :node [:leaf [nil {:page-idx 0} {:page-idx 0} nil]]} + {:path [0 2], :node [:leaf [nil nil {:page-idx 0} nil]]} + {:path [0 3], :node [:leaf [nil {:page-idx 1} {:page-idx 0} nil]]}]]} + {:path [1], :node [:leaf [nil {:page-idx 2} {:page-idx 0} nil]]} + {:path [2], :node [:leaf [nil nil {:page-idx 0} {:page-idx 0}]]} + {:path [3], :node [:leaf [nil {:page-idx 3} {:page-idx 0} {:page-idx 1}]]}]]} + (->> (trie/->merge-plan [nil (ArrowHashTrie/from t1-root) (ArrowHashTrie/from log-root) (ArrowHashTrie/from log2-root)] + trie-page-idxs + iid-bloom-bitmap) + (walk/postwalk (fn [x] + (if (and (map-entry? x) (= :path (key x))) + (MapEntry/create :path (into [] (val x))) + x))))))))))