Skip to content

Commit

Permalink
iid fast path for scan
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Aug 14, 2023
1 parent d821cda commit 6306a9b
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 17 deletions.
74 changes: 62 additions & 12 deletions core/src/main/clojure/xtdb/operator/scan.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
xtdb.watermark)
(:import (clojure.lang IPersistentMap MapEntry)
[java.lang AutoCloseable]
(java.util ArrayList Arrays Iterator LinkedList List ListIterator Map)
java.nio.ByteBuffer
(java.util ArrayList Iterator LinkedList List ListIterator Map)
(java.util.function IntConsumer)
org.apache.arrow.memory.ArrowBuf
org.apache.arrow.memory.BufferAllocator
(java.util.stream IntStream)
(org.apache.arrow.memory ArrowBuf BufferAllocator)
[org.apache.arrow.memory.util ArrowBufPointer]
(org.apache.arrow.vector VectorLoader VectorSchemaRoot)
xtdb.api.protocols.TransactionInstant
Expand All @@ -30,7 +31,7 @@
(xtdb.metadata IMetadataManager)
xtdb.object_store.ObjectStore
xtdb.operator.IRelationSelector
(xtdb.trie ArrowHashTrie ArrowHashTrie$Leaf HashTrie LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie$Leaf)
(xtdb.trie ArrowHashTrie ArrowHashTrie$Leaf LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie$Leaf)
(xtdb.vector IRelationWriter IRowCopier IVectorReader IVectorWriter RelationReader)
(xtdb.watermark IWatermark IWatermarkSource Watermark)))

Expand Down Expand Up @@ -398,6 +399,34 @@
system-from
nil))))))))))

(def ^:private neg-or-zero-int? (complement pos-int?))

(defn- iid-selector [^ByteBuffer iid-bb]
(reify IRelationSelector
(select [_ _allocator rel-rdr _params]
(let [ptr (ArrowBufPointer.)
iid-rdr (.readerForName rel-rdr "xt$iid")
value-count (.valueCount iid-rdr)]
;; lower-bound
(loop [left 0 right (dec value-count)]
(if (= left right)
(if (zero? (util/compare-nio-buffers-unsigned iid-bb (util/arrow-buf-pointer->bb (.getPointer iid-rdr left ptr))))
;; upper bound
(loop [left1 left right value-count]
(if (= left1 right)
(.toArray (IntStream/range left left1))
(let [mid (quot (+ left1 right) 2)]
(assert (< mid value-count))
(if (neg-or-zero-int? (util/compare-nio-buffers-unsigned iid-bb (util/arrow-buf-pointer->bb (.getPointer iid-rdr mid ptr))))
(recur (inc mid) right)
(recur left1 mid)))))

(int-array 0))
(let [mid (quot (+ left right) 2)]
(if (neg-or-zero-int? (util/compare-nio-buffers-unsigned iid-bb (util/arrow-buf-pointer->bb (.getPointer iid-rdr mid ptr))))
(recur left mid)
(recur (inc mid) right)))))))))

(deftype TrieCursor [^BufferAllocator allocator, arrow-leaves, ^Iterator merge-tasks
col-names, ^Map col-preds, ^longs temporal-timestamps,
params, ^IPersistentMap picker-state]
Expand All @@ -407,7 +436,10 @@
(let [{task-leaves :leaves, :keys [path]} (.next merge-tasks)]
(with-open [out-rel (vw/->rel-writer allocator)]
(let [loaded-leaves (mapv (fn [{:keys [load-leaf]}]
(load-leaf))
(let [{:keys [^RelationReader rel-rdr] :as res} (load-leaf)]
(if-let [^IRelationSelector iid-pred (get col-preds "xt$iid")]
(assoc res :rel-rdr (.select rel-rdr (.select iid-pred allocator rel-rdr params)))
res)))
task-leaves)
merge-q (LeafMergeQueue. path
(into-array IVectorReader
Expand All @@ -433,15 +465,16 @@
(as-> rel (reduce (fn [^RelationReader rel, ^IRelationSelector col-pred]
(.select rel (.select col-pred allocator rel params)))
rel
(vals col-preds)))))))
(vals (dissoc col-preds "xt$iid"))))))))
true)

false))

(close [_]
(util/close arrow-leaves)))

(defn- read-tries [^ObjectStore obj-store, ^IBufferPool buffer-pool, ^String table-name, ^ILiveTableWatermark live-table-wm]
(defn- read-tries [^ObjectStore obj-store, ^IBufferPool buffer-pool, ^String table-name, ^ILiveTableWatermark live-table-wm
{:keys [^ByteBuffer iid-bb] :as _opts}]
(let [{trie-files :trie, leaf-files :leaf} (->> (.listObjects obj-store (format "tables/%s/chunks" table-name))
(keep (fn [file-name]
(when-let [[_ file-type chunk-idx-str] (re-find #"/(leaf|trie)-c(.+?)\.arrow$" file-name)]
Expand Down Expand Up @@ -485,8 +518,13 @@
(util/close leaf-buf))))
arrow-leaves)

:merge-tasks (vec (for [{:keys [path leaves]} (trie/trie-merge-tasks (cond-> (mapv #(ArrowHashTrie/from %) trie-roots)
live-table-wm (conj (.liveTrie live-table-wm))))]
:merge-tasks (vec (for [{:keys [path leaves]}
(if iid-bb
(vector (trie/iid-trie-merge-task (cond-> (mapv #(ArrowHashTrie/from %) trie-roots)
live-table-wm (conj (.liveTrie live-table-wm)))
(.array iid-bb)))
(trie/trie-merge-tasks (cond-> (mapv #(ArrowHashTrie/from %) trie-roots)
live-table-wm (conj (.liveTrie live-table-wm)))))]
{:path path
:leaves (mapv (fn [{:keys [ordinal leaf]}]
(condp = (class leaf)
Expand Down Expand Up @@ -521,7 +559,9 @@
table-name, col-names, ^longs temporal-range
^Map col-preds, params, scan-opts]
(let [^ILiveTableWatermark live-table-wm (some-> (.liveIndex wm) (.liveTable table-name))
{:keys [arrow-leaves ^List merge-tasks]} (read-tries obj-store buffer-pool table-name live-table-wm)]

{:keys [arrow-leaves ^List merge-tasks]}
(read-tries obj-store buffer-pool table-name live-table-wm (select-keys scan-opts [:iid-bb]))]
(try
(->TrieCursor allocator arrow-leaves (.iterator merge-tasks)
col-names col-preds
Expand Down Expand Up @@ -576,7 +616,7 @@
(->> scan-cols
(into {} (map (juxt identity ->col-type))))))

(emitScan [_ {:keys [columns], {:keys [table for-valid-time] :as scan-opts} :scan-opts} scan-col-types param-types]
(emitScan [_ {:keys [columns], {:keys [table for-valid-time] :as scan-opts} :scan-opts :as opts} scan-col-types param-types]
(let [col-names (->> columns
(into [] (comp (map (fn [[col-type arg]]
(case col-type
Expand Down Expand Up @@ -608,6 +648,15 @@
(expr/->expression-relation-selector select-form {:col-types col-types, :param-types param-types})))
(into {}))

iid-bb (when-let [eid-select (or (get selects 'xt/id) (get selects 'xt$id))]
(let [eid (nth eid-select 2)]
(when (and (= '= (first eid-select)) (s/valid? ::lp/value eid))
(trie/->iid eid))))

col-preds (cond-> col-preds
iid-bb
(assoc "xt$iid" (iid-selector iid-bb)))

metadata-args (vec (for [[col-name select] selects
:when (not (types/temporal-column? (util/str->normal-form-str (str col-name))))]
select))
Expand All @@ -625,7 +674,8 @@
(let [_metadata-pred (expr.meta/->metadata-selector (cons 'and metadata-args) col-types params)
scan-opts (cond-> scan-opts
(nil? for-valid-time)
(assoc :for-valid-time (if default-all-valid-time? [:all-time] [:at [:now :now]])))]
(assoc :for-valid-time (if default-all-valid-time? [:all-time] [:at [:now :now]])
:iid-bb iid-bb))]
(->4r-cursor allocator object-store buffer-pool
watermark
normalized-table-name
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/clojure/xtdb/trie.clj
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,29 @@
{:ordinal ordinal, :leaf node})))))}])))]

(vec (trie-merge-tasks* (map #(some-> ^HashTrie % (.rootNode)) tries) []))))

(defn- bucket-for [^bytes iid level]
(let [level-offset-bits (* HashTrie/LEVEL_BITS (inc level))
level-offset-bytes (/ (- level-offset-bits HashTrie/LEVEL_BITS) Byte/SIZE)]
(bit-and (bit-shift-right (get iid level-offset-bytes) (mod level-offset-bits Byte/SIZE)) HashTrie/LEVEL_MASK)))

(defn iid-trie-merge-task [tries ^bytes iid]
(letfn [(trie-merge-tasks* [nodes path level]
(let [bucket-idx (bucket-for iid level)
trie-children (mapv #(some-> ^HashTrie$Node % (.children)) nodes)]
(if (some identity trie-children)
(trie-merge-tasks* (mapv (fn [node ^objects node-children]
(if node-children
(aget node-children bucket-idx)
node))
nodes trie-children)
(conj path bucket-idx)
(inc level))
{:path (byte-array path)
:leaves (->> nodes
(into [] (keep-indexed
(fn [ordinal ^HashTrie$Node node]
(when node
{:ordinal ordinal, :leaf node})))))})))]

(trie-merge-tasks* (map #(some-> ^HashTrie % (.rootNode)) tries) [] 0)))
14 changes: 13 additions & 1 deletion core/src/main/clojure/xtdb/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
(org.apache.arrow.compression CommonsCompressionFactory)
(org.apache.arrow.flatbuf Footer Message RecordBatch)
(org.apache.arrow.memory AllocationManager ArrowBuf BufferAllocator)
(org.apache.arrow.memory.util ByteFunctionHelpers MemoryUtil)
(org.apache.arrow.memory.util ArrowBufPointer ByteFunctionHelpers MemoryUtil)
(org.apache.arrow.vector ValueVector VectorLoader VectorSchemaRoot)
(org.apache.arrow.vector.ipc ArrowFileWriter ArrowStreamWriter ArrowWriter)
(org.apache.arrow.vector.ipc.message ArrowBlock ArrowFooter ArrowRecordBatch MessageSerializer)
Expand Down Expand Up @@ -117,6 +117,18 @@
(.position bb 0)
bb))

(defn byte-buffer->uuid [^ByteBuffer bb]
(UUID. (.getLong bb 0) (.getLong bb 1)))

(defn arrow-buf-pointer->bb [^ArrowBufPointer ptr]
(let [^ByteBuffer buf (.nioBuffer (.getBuf ptr))
res (ByteBuffer/allocate (.getLength ptr))]
(.position buf (.getOffset ptr))
(dotimes [_ (.getLength ptr)]
(.put res (.get buf)))
(.position res 0)
res))

(defn ->lex-hex-string
"Turn a long into a lexicographically-sortable hex string by prepending the length"
[^long l]
Expand Down
63 changes: 59 additions & 4 deletions src/test/clojure/xtdb/operator/scan_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
[xtdb.api :as xt]
[xtdb.node :as node]
[xtdb.operator :as op]
[xtdb.operator.scan :as scan]
[xtdb.test-util :as tu]
[xtdb.util :as util]
[xtdb.operator.scan :as scan])
(:import xtdb.operator.IRaQuerySource
(java.util LinkedList)
[xtdb.trie :as trie]
[xtdb.util :as util])
(:import (java.util LinkedList)
xtdb.operator.IRaQuerySource
(xtdb.operator.scan RowConsumer) ))

(t/use-fixtures :each tu/with-mock-clock tu/with-allocator)
Expand Down Expand Up @@ -386,6 +387,60 @@
'[:scan {:table xt_docs} [xt/id {col-x (= col-x "toto")}]]
{:node node})))))

(t/deftest test-iid-fast-path
(let [before-uuid #uuid "00000000-0000-0000-0000-000000000000"
search-uuid #uuid "80000000-0000-0000-0000-000000000000"
after-uuid #uuid "f0000000-0000-0000-0000-000000000000"]
(with-open [node (node/start-node {})]
(xt/submit-tx node [[:put :xt-docs {:xt/id before-uuid :version 1}]
[:put :xt-docs {:xt/id search-uuid :version 1}]
[:put :xt-docs {:xt/id after-uuid :version 1}]])
(xt/submit-tx node [[:put :xt-docs {:xt/id search-uuid :version 2}]])

(with-redefs [trie/iid-trie-merge-task (fn [& _args] (throw (UnsupportedOperationException. "")))]
(t/is (thrown-with-msg?
UnsupportedOperationException
#""
(tu/query-ra [:scan {:table 'xt_docs} ['version {'xt/id (list '= 'xt/id search-uuid)}]]
{:node node}))))

(t/is (= [{:version 2, :xt/id search-uuid}]
(tu/query-ra [:scan {:table 'xt_docs} ['version {'xt/id (list '= 'xt/id search-uuid)}]]
{:node node})))

(t/is (= [{:version 2, :xt/id search-uuid}
{:version 1, :xt/id search-uuid}]
(tu/query-ra [:scan {:table 'xt_docs
:for-valid-time :all-time}
['version {'xt/id (list '= 'xt/id search-uuid)}]]
{:node node}))))))

(t/deftest test-iid-fast-path-chunk-boundary
(let [before-uuid #uuid "00000000-0000-0000-0000-000000000000"
search-uuid #uuid "80000000-0000-0000-0000-000000000000"
after-uuid #uuid "f0000000-0000-0000-0000-000000000000"
uuids [before-uuid search-uuid after-uuid]
!search-uuid-versions (atom [])]
(with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 20}})]
(->> (for [i (range 110)]
(let [uuid (rand-nth uuids)]
(when (= uuid search-uuid)
(swap! !search-uuid-versions conj i))
[[:put :xt_docs {:xt/id uuid :version i}]]))
(mapv #(xt/submit-tx node %)))

(t/is (= [{:version (last @!search-uuid-versions), :xt/id search-uuid}]
(tu/query-ra [:scan {:table 'xt_docs} ['version {'xt/id (list '= 'xt/id search-uuid)}]]
{:node node})))

(t/is (= (into #{} @!search-uuid-versions)
(->> (tu/query-ra [:scan {:table 'xt_docs
:for-valid-time :all-time}
['version {'xt/id (list '= 'xt/id search-uuid)}]]
{:node node})
(map :version)
set))))))

(t/deftest test-correct-rectangle-cutting
(letfn [(test-er [& events]
(let [!state (atom [])
Expand Down

0 comments on commit 6306a9b

Please sign in to comment.