Skip to content

Commit

Permalink
Make more use of iid blooms for filtering pages
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Aug 29, 2023
1 parent 6019bb8 commit c7e5dac
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 54 deletions.
20 changes: 14 additions & 6 deletions core/src/main/clojure/xtdb/metadata.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
java.lang.AutoCloseable
java.nio.ByteBuffer
(java.util HashMap HashSet Map NavigableMap Set TreeMap)
(java.util.concurrent ConcurrentHashMap CompletableFuture)
(java.util.function BiFunction Consumer Function)
(java.util.concurrent ConcurrentHashMap)
(java.util.function BiFunction Function)
(java.util.stream IntStream)
(org.apache.arrow.memory ArrowBuf)
(org.apache.arrow.vector FieldVector VectorLoader VectorSchemaRoot)
Expand All @@ -32,8 +32,9 @@
(definterface ITableMetadata
(^xtdb.vector.IVectorReader metadataReader [])
(^java.util.Set columnNames [])
(^Long rowIndex [^String column-name, ^int pageIdx])
(^long pageCount []))
(^Long rowIndex [^String columnName, ^int pageIdx])
(^long pageCount [])
(^org.roaringbitmap.buffer.ImmutableRoaringBitmap iidBloomBitmap [^int pageIdx]))

#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(definterface IPageMetadataWriter
Expand Down Expand Up @@ -288,8 +289,15 @@
(reify ITableMetadata
(metadataReader [_] metadata-reader)
(columnNames [_] col-names)
(rowIndex [_ col-name block-idx] (get page-idx-cache [col-name block-idx]))
(pageCount [_] page-count)))
(rowIndex [_ col-name page-idx] (get page-idx-cache [col-name page-idx]))
(pageCount [_] page-count)
(iidBloomBitmap [_ page-idx]
(let [bloom-rdr (-> (.structKeyReader metadata-reader "columns")
(.listElementReader)
(.structKeyReader "bloom"))]
(when-let [bloom-vec-idx (get page-idx-cache ["xt$iid" page-idx])]
(when (not (nil? (.getObject bloom-rdr bloom-vec-idx)))
(bloom/bloom->bitmap bloom-rdr bloom-vec-idx)))))))

(deftype MetadataManager [^ObjectStore object-store
^IBufferPool buffer-pool
Expand Down
24 changes: 18 additions & 6 deletions core/src/main/clojure/xtdb/operator/scan.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
[org.apache.arrow.memory.util ArrowBufPointer]
(org.apache.arrow.vector VectorLoader VectorSchemaRoot)
[org.apache.arrow.vector.types.pojo Schema]
[org.roaringbitmap.buffer ImmutableRoaringBitmap]
xtdb.api.protocols.TransactionInstant
xtdb.buffer_pool.IBufferPool
xtdb.ICursor
xtdb.indexer.live_index.ILiveTableWatermark
(xtdb.metadata IMetadataManager)
(xtdb.metadata ITableMetadata)
xtdb.object_store.ObjectStore
xtdb.operator.IRelationSelector
(xtdb.trie ArrowHashTrie LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie$Leaf)
Expand Down Expand Up @@ -455,7 +457,9 @@
(transient {:chunk-idx chunk-idx})
files))))))

(defn table-merge-plan [^IBufferPool buffer-pool, table-chunks, chunk-idx->page-idxs, ^ILiveTableWatermark live-table-wm]
(defn table-merge-plan [^IBufferPool buffer-pool, ^IMetadataManager metadata-mgr, table-chunks,
chunk-idx->page-idxs, ^ILiveTableWatermark live-table-wm, table-name]

(util/with-open [trie-roots (ArrayList. (count table-chunks))]
;; TODO these could be kicked off asynchronously
(let [tries (cond-> (vec (for [{:keys [trie-file chunk-idx]} table-chunks]
Expand All @@ -465,9 +469,16 @@
(.load loader record-batch)
(.add trie-roots root)
{:trie (ArrowHashTrie/from root)
:chunk-idx chunk-idx
:page-idxs (chunk-idx->page-idxs chunk-idx)})))))
live-table-wm (conj {:trie (.compactLogs (.liveTrie live-table-wm))}))]
(trie/->merge-plan (mapv :trie tries) (mapv :page-idxs tries)))))
live-table-wm (conj {:trie (.compactLogs (.liveTrie live-table-wm))}))
chunk-idxs (mapv :chunk-idx tries)]
(letfn [(iid-bloom-bitmap ^ImmutableRoaringBitmap [ordinal page-idx]
@(meta/with-metadata metadata-mgr (nth chunk-idxs ordinal) table-name
(util/->jfn
(fn [^ITableMetadata table-meta]
(.iidBloomBitmap table-meta page-idx)))))]
(trie/->merge-plan (mapv :trie tries) (mapv :page-idxs tries) iid-bloom-bitmap)))))

(defn merge-plan->tasks ^java.lang.Iterable [{:keys [path node]}]
(when-let [[node-tag node-arg] node]
Expand Down Expand Up @@ -602,7 +613,8 @@
;; is correct. For example if the `skip-iid-ptr` gets set in one leaf consumer it should also affect
;; the skipping in another leaf consumer.

(defn ->4r-cursor [^BufferAllocator allocator, ^ObjectStore obj-store, ^IBufferPool buffer-pool, ^IWatermark wm
(defn ->4r-cursor [^BufferAllocator allocator, ^ObjectStore obj-store, ^IBufferPool buffer-pool,
^IMetadataManager metadata-mgr, ^IWatermark wm
table-name, col-names, ^longs temporal-range, ^List matching-tries
^Map col-preds, params, scan-opts]
(let [^ILiveTableWatermark live-table-wm (some-> (.liveIndex wm) (.liveTable table-name))
Expand All @@ -611,7 +623,7 @@
chunk-idx->page-idxs (->> matching-tries
(filter #(not-empty (set/intersection normalized-col-names (:col-names %))))
(into {} (map (juxt :chunk-idx :page-idxs))))
merge-plan (table-merge-plan buffer-pool table-chunks chunk-idx->page-idxs live-table-wm)]
merge-plan (table-merge-plan buffer-pool metadata-mgr table-chunks chunk-idx->page-idxs live-table-wm table-name)]
(util/with-close-on-catch [leaves (open-leaves buffer-pool table-chunks live-table-wm)]
(->TrieCursor allocator leaves (.iterator (let [^Iterable c (or (merge-plan->tasks merge-plan) [])] c))
col-names col-preds
Expand Down Expand Up @@ -729,7 +741,7 @@
(fn [fvt]
(or fvt (if default-all-valid-time? [:all-time] [:at [:now :now]])))))]
(->4r-cursor allocator object-store buffer-pool
watermark
metadata-mgr watermark
normalized-table-name
(set/union content-col-names temporal-col-names)
(->temporal-range params basis scan-opts)
Expand Down
50 changes: 28 additions & 22 deletions core/src/main/clojure/xtdb/trie.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
(org.apache.arrow.vector.types.pojo ArrowType$Union Schema)
org.apache.arrow.vector.types.UnionMode
(org.roaringbitmap RoaringBitmap)
(org.roaringbitmap.buffer MutableRoaringBitmap)
(xtdb.object_store ObjectStore)
(xtdb.trie ArrowHashTrie$Leaf HashTrie HashTrie$Node LiveHashTrie LiveHashTrie$Leaf)
(xtdb.util WritableByteBufferChannel)
Expand Down Expand Up @@ -206,7 +207,7 @@

(defn ->merge-plan
"Returns a tree of the tasks required to merge the given tries "
[tries, trie-page-idxs]
[tries, trie-page-idxs, iid-bloom-bitmap]

(letfn [(->merge-plan* [nodes path ^long level]
(let [trie-children (mapv #(some-> ^HashTrie$Node % (.children)) nodes)]
Expand All @@ -224,26 +225,31 @@
{:path (byte-array path)
:node [:branch branches]}))

(loop [ordinal 0
[node & more-nodes] nodes
node-taken? false
leaves []]
(cond
(not (< ordinal (count nodes))) (when node-taken?
{:path (byte-array path)
:node [:leaf leaves]})
node (condp = (class node)
ArrowHashTrie$Leaf
(let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf node)
take-node? (or node-taken?
(some-> ^RoaringBitmap (nth trie-page-idxs ordinal)
(.contains page-idx)))]
(recur (inc ordinal) more-nodes take-node?
(conj leaves (when take-node?
{:page-idx page-idx}))))
LiveHashTrie$Leaf
(recur (inc ordinal) more-nodes true (conj leaves node)))

:else (recur (inc ordinal) more-nodes node-taken? (conj leaves nil)))))))]
(let [^MutableRoaringBitmap cumulative-iid-bitmap (MutableRoaringBitmap.)]
(loop [ordinal 0
[node & more-nodes] nodes
node-taken? false
leaves []]
(cond
(not (< ordinal (count nodes))) (when node-taken?
{:path (byte-array path)
:node [:leaf leaves]})
node (condp = (class node)
ArrowHashTrie$Leaf
(let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf node)
take-node? (some-> ^RoaringBitmap (nth trie-page-idxs ordinal)
(.contains page-idx))]
(when take-node?
(.or cumulative-iid-bitmap (iid-bloom-bitmap ordinal page-idx)))
(recur (inc ordinal) more-nodes (or node-taken? take-node?)
(conj leaves (when (or take-node?
(when node-taken?
(when-let [iid-bitmap (iid-bloom-bitmap ordinal page-idx)]
(MutableRoaringBitmap/intersects cumulative-iid-bitmap iid-bitmap))))
{:page-idx page-idx}))))
LiveHashTrie$Leaf
(recur (inc ordinal) more-nodes true (conj leaves node)))

:else (recur (inc ordinal) more-nodes node-taken? (conj leaves nil))))))))]

(->merge-plan* (map #(some-> ^HashTrie % (.rootNode)) tries) [] 0)))
45 changes: 25 additions & 20 deletions src/test/clojure/xtdb/trie_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
[xtdb.test-util :as tu]
[xtdb.trie :as trie])
(:import (clojure.lang MapEntry)
(org.apache.arrow.memory RootAllocator)
(org.roaringbitmap RoaringBitmap)
(xtdb.trie ArrowHashTrie)
(org.apache.arrow.memory RootAllocator)))
(org.roaringbitmap.buffer MutableRoaringBitmap)
(xtdb.trie ArrowHashTrie)))

(deftest test-merge-plan-with-nil-nodes-2700
(with-open [al (RootAllocator.)
Expand All @@ -16,21 +17,25 @@
(let [trie-page-idxs [(RoaringBitmap.)
(RoaringBitmap/bitmapOf (int-array '(0 1 2 3)))
(RoaringBitmap/bitmapOf (int-array '(0)))
(RoaringBitmap/bitmapOf (int-array '(0 1)))]]
(t/is (= {:path [],
:node [:branch
[{:path [0],
:node [:branch
[{:path [0 0], :node [:leaf [nil nil {:page-idx 0} nil]]}
{:path [0 1], :node [:leaf [nil {:page-idx 0} {:page-idx 0} nil]]}
{:path [0 2], :node [:leaf [nil nil {:page-idx 0} nil]]}
{:path [0 3], :node [:leaf [nil {:page-idx 1} {:page-idx 0} nil]]}]]}
{:path [1], :node [:leaf [nil {:page-idx 2} {:page-idx 0} nil]]}
{:path [2], :node [:leaf [nil nil {:page-idx 0} {:page-idx 0}]]}
{:path [3], :node [:leaf [nil {:page-idx 3} {:page-idx 0} {:page-idx 1}]]}]]}
(->> (trie/->merge-plan [nil (ArrowHashTrie/from t1-root) (ArrowHashTrie/from log-root) (ArrowHashTrie/from log2-root)]
trie-page-idxs)
(walk/postwalk (fn [x]
(if (and (map-entry? x) (= :path (key x)))
(MapEntry/create :path (into [] (val x)))
x)))))))))
(RoaringBitmap/bitmapOf (int-array '(0 1)))]
constant-iid-bloom-bitmap (.toImmutableRoaringBitmap (MutableRoaringBitmap/bitmapOf (int-array '(1000 1001 1002))))]
(letfn [(iid-bloom-bitmap [_ordinal _page-idx]
constant-iid-bloom-bitmap)]
(t/is (= {:path [],
:node [:branch
[{:path [0],
:node [:branch
[{:path [0 0], :node [:leaf [nil nil {:page-idx 0} nil]]}
{:path [0 1], :node [:leaf [nil {:page-idx 0} {:page-idx 0} nil]]}
{:path [0 2], :node [:leaf [nil nil {:page-idx 0} nil]]}
{:path [0 3], :node [:leaf [nil {:page-idx 1} {:page-idx 0} nil]]}]]}
{:path [1], :node [:leaf [nil {:page-idx 2} {:page-idx 0} nil]]}
{:path [2], :node [:leaf [nil nil {:page-idx 0} {:page-idx 0}]]}
{:path [3], :node [:leaf [nil {:page-idx 3} {:page-idx 0} {:page-idx 1}]]}]]}
(->> (trie/->merge-plan [nil (ArrowHashTrie/from t1-root) (ArrowHashTrie/from log-root) (ArrowHashTrie/from log2-root)]
trie-page-idxs
iid-bloom-bitmap)
(walk/postwalk (fn [x]
(if (and (map-entry? x) (= :path (key x)))
(MapEntry/create :path (into [] (val x)))
x))))))))))

0 comments on commit c7e5dac

Please sign in to comment.