Skip to content

Commit

Permalink
Make more use of iid blooms for filtering pages
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 authored and jarohen committed Aug 30, 2023
1 parent 7dc48b6 commit 3bde725
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 54 deletions.
16 changes: 12 additions & 4 deletions core/src/main/clojure/xtdb/metadata.clj
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
(definterface ITableMetadata
(^xtdb.vector.IVectorReader metadataReader [])
(^java.util.Set columnNames [])
(^Long rowIndex [^String column-name, ^int pageIdx])
(^long pageCount []))
(^Long rowIndex [^String columnName, ^int pageIdx])
(^long pageCount [])
(^org.roaringbitmap.buffer.ImmutableRoaringBitmap iidBloomBitmap [^int pageIdx]))

#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(definterface IPageMetadataWriter
Expand Down Expand Up @@ -280,8 +281,15 @@
(reify ITableMetadata
(metadataReader [_] metadata-reader)
(columnNames [_] col-names)
(rowIndex [_ col-name block-idx] (get page-idx-cache [col-name block-idx]))
(pageCount [_] page-count)))
(rowIndex [_ col-name page-idx] (get page-idx-cache [col-name page-idx]))
(pageCount [_] page-count)
(iidBloomBitmap [_ page-idx]
(let [bloom-rdr (-> (.structKeyReader metadata-reader "columns")
(.listElementReader)
(.structKeyReader "bloom"))]
(when-let [bloom-vec-idx (get page-idx-cache ["xt$iid" page-idx])]
(when (not (nil? (.getObject bloom-rdr bloom-vec-idx)))
(bloom/bloom->bitmap bloom-rdr bloom-vec-idx)))))))

(deftype MetadataManager [^ObjectStore object-store
^IBufferPool buffer-pool
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/clojure/xtdb/operator/scan.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
(org.apache.arrow.memory BufferAllocator)
[org.apache.arrow.memory.util ArrowBufPointer]
[org.roaringbitmap RoaringBitmap]
[org.roaringbitmap.buffer MutableRoaringBitmap]
[org.roaringbitmap.buffer ImmutableRoaringBitmap MutableRoaringBitmap]
xtdb.api.protocols.TransactionInstant
xtdb.buffer_pool.IBufferPool
xtdb.ICursor
Expand Down Expand Up @@ -597,7 +597,7 @@
(map (partial filter-trie-match metadata-mgr col-names))
(remove nil?)
(into {} (map (juxt :buf-key :page-idxs))))
merge-plan (trie/table-merge-plan buffer-pool table-chunks trie-file->page-idxs live-table-wm)]
merge-plan (trie/table-merge-plan buffer-pool metadata-mgr table-chunks trie-file->page-idxs live-table-wm)]

;; The consumers for different leafs need to share some state so the logic of how to advance
;; is correct. For example if the `skip-iid-ptr` gets set in one leaf consumer it should also affect
Expand Down
72 changes: 44 additions & 28 deletions core/src/main/clojure/xtdb/trie.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
(org.apache.arrow.vector.types.pojo ArrowType$Union Schema)
org.apache.arrow.vector.types.UnionMode
(org.roaringbitmap RoaringBitmap)
(org.roaringbitmap.buffer ImmutableRoaringBitmap MutableRoaringBitmap)
xtdb.buffer_pool.IBufferPool
(xtdb.metadata IMetadataManager ITableMetadata)
(xtdb.object_store ObjectStore)
(xtdb.trie ArrowHashTrie ArrowHashTrie$Leaf HashTrie HashTrie$Node LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie LiveHashTrie$Leaf)
(xtdb.util WritableByteBufferChannel)
Expand Down Expand Up @@ -232,7 +234,7 @@

(defn ->merge-plan
"Returns a tree of the tasks required to merge the given tries "
[tries, trie-page-idxs]
[tries, trie-page-idxs, iid-bloom-bitmap]

(letfn [(->merge-plan* [nodes path ^long level]
(let [trie-children (mapv #(some-> ^HashTrie$Node % (.children)) nodes)]
Expand All @@ -250,43 +252,57 @@
{:path (byte-array path)
:node [:branch branches]}))

(loop [ordinal 0
[node & more-nodes] nodes
node-taken? false
leaves []]
(cond
(not (< ordinal (count nodes))) (when node-taken?
{:path (byte-array path)
:node [:leaf leaves]})
node (condp = (class node)
ArrowHashTrie$Leaf
(let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf node)
take-node? (or node-taken?
(some-> ^RoaringBitmap (nth trie-page-idxs ordinal)
(.contains page-idx)))]
(recur (inc ordinal) more-nodes take-node?
(conj leaves (when take-node?
{:page-idx page-idx}))))
LiveHashTrie$Leaf
(recur (inc ordinal) more-nodes true (conj leaves node)))

:else (recur (inc ordinal) more-nodes node-taken? (conj leaves nil)))))))]
(let [^MutableRoaringBitmap cumulative-iid-bitmap (MutableRoaringBitmap.)]
(loop [ordinal 0
[node & more-nodes] nodes
node-taken? false
leaves []]
(cond
(not (< ordinal (count nodes))) (when node-taken?
{:path (byte-array path)
:node [:leaf leaves]})
node (condp = (class node)
ArrowHashTrie$Leaf
(let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf node)
take-node? (some-> ^RoaringBitmap (nth trie-page-idxs ordinal)
(.contains page-idx))]
(when take-node?
(.or cumulative-iid-bitmap (iid-bloom-bitmap ordinal page-idx)))
(recur (inc ordinal) more-nodes (or node-taken? take-node?)
(conj leaves (when (or take-node?
(when node-taken?
(when-let [iid-bitmap (iid-bloom-bitmap ordinal page-idx)]
(MutableRoaringBitmap/intersects cumulative-iid-bitmap iid-bitmap))))
{:page-idx page-idx}))))
LiveHashTrie$Leaf
(recur (inc ordinal) more-nodes true (conj leaves node)))

:else (recur (inc ordinal) more-nodes node-taken? (conj leaves nil))))))))]

(->merge-plan* (map #(some-> ^HashTrie % (.rootNode)) tries) [] 0)))

(defn table-merge-plan [^IBufferPool buffer-pool, table-tries, trie-file->page-idxs, ^ILiveTableWatermark live-table-wm]
(util/with-open [trie-roots (ArrayList. (count table-tries))]
(defn table-merge-plan [^IBufferPool buffer-pool, ^IMetadataManager metadata-mgr, table-chunks,
trie-file->page-idxs, ^ILiveTableWatermark live-table-wm]

(util/with-open [trie-roots (ArrayList. (count table-chunks))]
;; TODO these could be kicked off asynchronously
(let [tries (cond-> (vec (for [{:keys [trie-file]} table-tries]
(let [tries (cond-> (vec (for [{:keys [trie-file chunk-idx]} table-chunks]
(with-open [^ArrowBuf buf @(.getBuffer buffer-pool trie-file)]
(let [{:keys [^VectorLoader loader root arrow-blocks]} (util/read-arrow-buf buf)]
(with-open [record-batch (util/->arrow-record-batch-view (first arrow-blocks) buf)]
(.load loader record-batch)
(.add trie-roots root)
{:trie (ArrowHashTrie/from root)
:page-idxs (trie-file->page-idxs trie-file)})))))
live-table-wm (conj {:trie (.compactLogs (.liveTrie live-table-wm))}))]
(->merge-plan (mapv :trie tries) (mapv :page-idxs tries)))))
:trie-file trie-file
:page-idxs (trie-file->page-idxs chunk-idx)})))))
live-table-wm (conj {:trie (.compactLogs (.liveTrie live-table-wm))}))
trie-files (mapv :trie-file tries)]
(letfn [(iid-bloom-bitmap ^ImmutableRoaringBitmap [ordinal page-idx]
@(meta/with-metadata metadata-mgr (nth trie-files ordinal)
(util/->jfn
(fn [^ITableMetadata table-meta]
(.iidBloomBitmap table-meta page-idx)))))]
(->merge-plan (mapv :trie tries) (mapv :page-idxs tries) iid-bloom-bitmap)))))

#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(definterface ILeafLoader
Expand Down
45 changes: 25 additions & 20 deletions src/test/clojure/xtdb/trie_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
[xtdb.test-util :as tu]
[xtdb.trie :as trie])
(:import (clojure.lang MapEntry)
(org.apache.arrow.memory RootAllocator)
(org.roaringbitmap RoaringBitmap)
(xtdb.trie ArrowHashTrie)
(org.apache.arrow.memory RootAllocator)))
(org.roaringbitmap.buffer MutableRoaringBitmap)
(xtdb.trie ArrowHashTrie)))

(deftest test-merge-plan-with-nil-nodes-2700
(with-open [al (RootAllocator.)
Expand All @@ -16,21 +17,25 @@
(let [trie-page-idxs [(RoaringBitmap.)
(RoaringBitmap/bitmapOf (int-array '(0 1 2 3)))
(RoaringBitmap/bitmapOf (int-array '(0)))
(RoaringBitmap/bitmapOf (int-array '(0 1)))]]
(t/is (= {:path [],
:node [:branch
[{:path [0],
:node [:branch
[{:path [0 0], :node [:leaf [nil nil {:page-idx 0} nil]]}
{:path [0 1], :node [:leaf [nil {:page-idx 0} {:page-idx 0} nil]]}
{:path [0 2], :node [:leaf [nil nil {:page-idx 0} nil]]}
{:path [0 3], :node [:leaf [nil {:page-idx 1} {:page-idx 0} nil]]}]]}
{:path [1], :node [:leaf [nil {:page-idx 2} {:page-idx 0} nil]]}
{:path [2], :node [:leaf [nil nil {:page-idx 0} {:page-idx 0}]]}
{:path [3], :node [:leaf [nil {:page-idx 3} {:page-idx 0} {:page-idx 1}]]}]]}
(->> (trie/->merge-plan [nil (ArrowHashTrie/from t1-root) (ArrowHashTrie/from log-root) (ArrowHashTrie/from log2-root)]
trie-page-idxs)
(walk/postwalk (fn [x]
(if (and (map-entry? x) (= :path (key x)))
(MapEntry/create :path (into [] (val x)))
x)))))))))
(RoaringBitmap/bitmapOf (int-array '(0 1)))]
constant-iid-bloom-bitmap (.toImmutableRoaringBitmap (MutableRoaringBitmap/bitmapOf (int-array '(1000 1001 1002))))]
(letfn [(iid-bloom-bitmap [_ordinal _page-idx]
constant-iid-bloom-bitmap)]
(t/is (= {:path [],
:node [:branch
[{:path [0],
:node [:branch
[{:path [0 0], :node [:leaf [nil nil {:page-idx 0} nil]]}
{:path [0 1], :node [:leaf [nil {:page-idx 0} {:page-idx 0} nil]]}
{:path [0 2], :node [:leaf [nil nil {:page-idx 0} nil]]}
{:path [0 3], :node [:leaf [nil {:page-idx 1} {:page-idx 0} nil]]}]]}
{:path [1], :node [:leaf [nil {:page-idx 2} {:page-idx 0} nil]]}
{:path [2], :node [:leaf [nil nil {:page-idx 0} {:page-idx 0}]]}
{:path [3], :node [:leaf [nil {:page-idx 3} {:page-idx 0} {:page-idx 1}]]}]]}
(->> (trie/->merge-plan [nil (ArrowHashTrie/from t1-root) (ArrowHashTrie/from log-root) (ArrowHashTrie/from log2-root)]
trie-page-idxs
iid-bloom-bitmap)
(walk/postwalk (fn [x]
(if (and (map-entry? x) (= :path (key x)))
(MapEntry/create :path (into [] (val x)))
x))))))))))

0 comments on commit 3bde725

Please sign in to comment.