From 6306a9bba29b0821c229daf9ec21212e8dd10b44 Mon Sep 17 00:00:00 2001 From: FiVo Date: Wed, 9 Aug 2023 11:52:32 +0200 Subject: [PATCH] iid fast path for scan --- core/src/main/clojure/xtdb/operator/scan.clj | 74 ++++++++++++++++---- core/src/main/clojure/xtdb/trie.clj | 26 +++++++ core/src/main/clojure/xtdb/util.clj | 14 +++- src/test/clojure/xtdb/operator/scan_test.clj | 63 +++++++++++++++-- 4 files changed, 160 insertions(+), 17 deletions(-) diff --git a/core/src/main/clojure/xtdb/operator/scan.clj b/core/src/main/clojure/xtdb/operator/scan.clj index f8cc9866e2..0cc79c8407 100644 --- a/core/src/main/clojure/xtdb/operator/scan.clj +++ b/core/src/main/clojure/xtdb/operator/scan.clj @@ -17,10 +17,11 @@ xtdb.watermark) (:import (clojure.lang IPersistentMap MapEntry) [java.lang AutoCloseable] - (java.util ArrayList Arrays Iterator LinkedList List ListIterator Map) + java.nio.ByteBuffer + (java.util ArrayList Iterator LinkedList List ListIterator Map) (java.util.function IntConsumer) - org.apache.arrow.memory.ArrowBuf - org.apache.arrow.memory.BufferAllocator + (java.util.stream IntStream) + (org.apache.arrow.memory ArrowBuf BufferAllocator) [org.apache.arrow.memory.util ArrowBufPointer] (org.apache.arrow.vector VectorLoader VectorSchemaRoot) xtdb.api.protocols.TransactionInstant @@ -30,7 +31,7 @@ (xtdb.metadata IMetadataManager) xtdb.object_store.ObjectStore xtdb.operator.IRelationSelector - (xtdb.trie ArrowHashTrie ArrowHashTrie$Leaf HashTrie LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie$Leaf) + (xtdb.trie ArrowHashTrie ArrowHashTrie$Leaf LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie$Leaf) (xtdb.vector IRelationWriter IRowCopier IVectorReader IVectorWriter RelationReader) (xtdb.watermark IWatermark IWatermarkSource Watermark))) @@ -398,6 +399,34 @@ system-from nil)))))))))) +(def ^:private neg-or-zero-int? (complement pos-int?)) + +(defn- iid-selector [^ByteBuffer iid-bb] + (reify IRelationSelector + (select [_ _allocator rel-rdr _params] + (let [ptr (ArrowBufPointer.) + iid-rdr (.readerForName rel-rdr "xt$iid") + value-count (.valueCount iid-rdr)] + ;; lower-bound + (loop [left 0 right (dec value-count)] + (if (= left right) + (if (zero? (util/compare-nio-buffers-unsigned iid-bb (util/arrow-buf-pointer->bb (.getPointer iid-rdr left ptr)))) + ;; upper bound + (loop [left1 left right value-count] + (if (= left1 right) + (.toArray (IntStream/range left left1)) + (let [mid (quot (+ left1 right) 2)] + (assert (< mid value-count)) + (if (neg-or-zero-int? (util/compare-nio-buffers-unsigned iid-bb (util/arrow-buf-pointer->bb (.getPointer iid-rdr mid ptr)))) + (recur (inc mid) right) + (recur left1 mid))))) + + (int-array 0)) + (let [mid (quot (+ left right) 2)] + (if (neg-or-zero-int? (util/compare-nio-buffers-unsigned iid-bb (util/arrow-buf-pointer->bb (.getPointer iid-rdr mid ptr)))) + (recur left mid) + (recur (inc mid) right))))))))) + (deftype TrieCursor [^BufferAllocator allocator, arrow-leaves, ^Iterator merge-tasks col-names, ^Map col-preds, ^longs temporal-timestamps, params, ^IPersistentMap picker-state] @@ -407,7 +436,10 @@ (let [{task-leaves :leaves, :keys [path]} (.next merge-tasks)] (with-open [out-rel (vw/->rel-writer allocator)] (let [loaded-leaves (mapv (fn [{:keys [load-leaf]}] - (load-leaf)) + (let [{:keys [^RelationReader rel-rdr] :as res} (load-leaf)] + (if-let [^IRelationSelector iid-pred (get col-preds "xt$iid")] + (assoc res :rel-rdr (.select rel-rdr (.select iid-pred allocator rel-rdr params))) + res))) task-leaves) merge-q (LeafMergeQueue. path (into-array IVectorReader @@ -433,7 +465,7 @@ (as-> rel (reduce (fn [^RelationReader rel, ^IRelationSelector col-pred] (.select rel (.select col-pred allocator rel params))) rel - (vals col-preds))))))) + (vals (dissoc col-preds "xt$iid")))))))) true) false)) @@ -441,7 +473,8 @@ (close [_] (util/close arrow-leaves))) -(defn- read-tries [^ObjectStore obj-store, ^IBufferPool buffer-pool, ^String table-name, ^ILiveTableWatermark live-table-wm] +(defn- read-tries [^ObjectStore obj-store, ^IBufferPool buffer-pool, ^String table-name, ^ILiveTableWatermark live-table-wm + {:keys [^ByteBuffer iid-bb] :as _opts}] (let [{trie-files :trie, leaf-files :leaf} (->> (.listObjects obj-store (format "tables/%s/chunks" table-name)) (keep (fn [file-name] (when-let [[_ file-type chunk-idx-str] (re-find #"/(leaf|trie)-c(.+?)\.arrow$" file-name)] @@ -485,8 +518,13 @@ (util/close leaf-buf)))) arrow-leaves) - :merge-tasks (vec (for [{:keys [path leaves]} (trie/trie-merge-tasks (cond-> (mapv #(ArrowHashTrie/from %) trie-roots) - live-table-wm (conj (.liveTrie live-table-wm))))] + :merge-tasks (vec (for [{:keys [path leaves]} + (if iid-bb + (vector (trie/iid-trie-merge-task (cond-> (mapv #(ArrowHashTrie/from %) trie-roots) + live-table-wm (conj (.liveTrie live-table-wm))) + (.array iid-bb))) + (trie/trie-merge-tasks (cond-> (mapv #(ArrowHashTrie/from %) trie-roots) + live-table-wm (conj (.liveTrie live-table-wm)))))] {:path path :leaves (mapv (fn [{:keys [ordinal leaf]}] (condp = (class leaf) @@ -521,7 +559,9 @@ table-name, col-names, ^longs temporal-range ^Map col-preds, params, scan-opts] (let [^ILiveTableWatermark live-table-wm (some-> (.liveIndex wm) (.liveTable table-name)) - {:keys [arrow-leaves ^List merge-tasks]} (read-tries obj-store buffer-pool table-name live-table-wm)] + + {:keys [arrow-leaves ^List merge-tasks]} + (read-tries obj-store buffer-pool table-name live-table-wm (select-keys scan-opts [:iid-bb]))] (try (->TrieCursor allocator arrow-leaves (.iterator merge-tasks) col-names col-preds @@ -576,7 +616,7 @@ (->> scan-cols (into {} (map (juxt identity ->col-type)))))) - (emitScan [_ {:keys [columns], {:keys [table for-valid-time] :as scan-opts} :scan-opts} scan-col-types param-types] + (emitScan [_ {:keys [columns], {:keys [table for-valid-time] :as scan-opts} :scan-opts :as opts} scan-col-types param-types] (let [col-names (->> columns (into [] (comp (map (fn [[col-type arg]] (case col-type @@ -608,6 +648,15 @@ (expr/->expression-relation-selector select-form {:col-types col-types, :param-types param-types}))) (into {})) + iid-bb (when-let [eid-select (or (get selects 'xt/id) (get selects 'xt$id))] + (let [eid (nth eid-select 2)] + (when (and (= '= (first eid-select)) (s/valid? ::lp/value eid)) + (trie/->iid eid)))) + + col-preds (cond-> col-preds + iid-bb + (assoc "xt$iid" (iid-selector iid-bb))) + metadata-args (vec (for [[col-name select] selects :when (not (types/temporal-column? (util/str->normal-form-str (str col-name))))] select)) @@ -625,7 +674,8 @@ (let [_metadata-pred (expr.meta/->metadata-selector (cons 'and metadata-args) col-types params) scan-opts (cond-> scan-opts (nil? for-valid-time) - (assoc :for-valid-time (if default-all-valid-time? [:all-time] [:at [:now :now]])))] + (assoc :for-valid-time (if default-all-valid-time? [:all-time] [:at [:now :now]]) + :iid-bb iid-bb))] (->4r-cursor allocator object-store buffer-pool watermark normalized-table-name diff --git a/core/src/main/clojure/xtdb/trie.clj b/core/src/main/clojure/xtdb/trie.clj index d7b9f7a207..381c8c618e 100644 --- a/core/src/main/clojure/xtdb/trie.clj +++ b/core/src/main/clojure/xtdb/trie.clj @@ -176,3 +176,29 @@ {:ordinal ordinal, :leaf node})))))}])))] (vec (trie-merge-tasks* (map #(some-> ^HashTrie % (.rootNode)) tries) [])))) + +(defn- bucket-for [^bytes iid level] + (let [level-offset-bits (* HashTrie/LEVEL_BITS (inc level)) + level-offset-bytes (/ (- level-offset-bits HashTrie/LEVEL_BITS) Byte/SIZE)] + (bit-and (bit-shift-right (get iid level-offset-bytes) (mod level-offset-bits Byte/SIZE)) HashTrie/LEVEL_MASK))) + +(defn iid-trie-merge-task [tries ^bytes iid] + (letfn [(trie-merge-tasks* [nodes path level] + (let [bucket-idx (bucket-for iid level) + trie-children (mapv #(some-> ^HashTrie$Node % (.children)) nodes)] + (if (some identity trie-children) + (trie-merge-tasks* (mapv (fn [node ^objects node-children] + (if node-children + (aget node-children bucket-idx) + node)) + nodes trie-children) + (conj path bucket-idx) + (inc level)) + {:path (byte-array path) + :leaves (->> nodes + (into [] (keep-indexed + (fn [ordinal ^HashTrie$Node node] + (when node + {:ordinal ordinal, :leaf node})))))})))] + + (trie-merge-tasks* (map #(some-> ^HashTrie % (.rootNode)) tries) [] 0))) diff --git a/core/src/main/clojure/xtdb/util.clj b/core/src/main/clojure/xtdb/util.clj index 78d0196dc1..8d665d964d 100644 --- a/core/src/main/clojure/xtdb/util.clj +++ b/core/src/main/clojure/xtdb/util.clj @@ -24,7 +24,7 @@ (org.apache.arrow.compression CommonsCompressionFactory) (org.apache.arrow.flatbuf Footer Message RecordBatch) (org.apache.arrow.memory AllocationManager ArrowBuf BufferAllocator) - (org.apache.arrow.memory.util ByteFunctionHelpers MemoryUtil) + (org.apache.arrow.memory.util ArrowBufPointer ByteFunctionHelpers MemoryUtil) (org.apache.arrow.vector ValueVector VectorLoader VectorSchemaRoot) (org.apache.arrow.vector.ipc ArrowFileWriter ArrowStreamWriter ArrowWriter) (org.apache.arrow.vector.ipc.message ArrowBlock ArrowFooter ArrowRecordBatch MessageSerializer) @@ -117,6 +117,18 @@ (.position bb 0) bb)) +(defn byte-buffer->uuid [^ByteBuffer bb] + (UUID. (.getLong bb 0) (.getLong bb 1))) + +(defn arrow-buf-pointer->bb [^ArrowBufPointer ptr] + (let [^ByteBuffer buf (.nioBuffer (.getBuf ptr)) + res (ByteBuffer/allocate (.getLength ptr))] + (.position buf (.getOffset ptr)) + (dotimes [_ (.getLength ptr)] + (.put res (.get buf))) + (.position res 0) + res)) + (defn ->lex-hex-string "Turn a long into a lexicographically-sortable hex string by prepending the length" [^long l] diff --git a/src/test/clojure/xtdb/operator/scan_test.clj b/src/test/clojure/xtdb/operator/scan_test.clj index b02186219f..64512feb87 100644 --- a/src/test/clojure/xtdb/operator/scan_test.clj +++ b/src/test/clojure/xtdb/operator/scan_test.clj @@ -3,11 +3,12 @@ [xtdb.api :as xt] [xtdb.node :as node] [xtdb.operator :as op] + [xtdb.operator.scan :as scan] [xtdb.test-util :as tu] - [xtdb.util :as util] - [xtdb.operator.scan :as scan]) - (:import xtdb.operator.IRaQuerySource - (java.util LinkedList) + [xtdb.trie :as trie] + [xtdb.util :as util]) + (:import (java.util LinkedList) + xtdb.operator.IRaQuerySource (xtdb.operator.scan RowConsumer) )) (t/use-fixtures :each tu/with-mock-clock tu/with-allocator) @@ -386,6 +387,60 @@ '[:scan {:table xt_docs} [xt/id {col-x (= col-x "toto")}]] {:node node}))))) +(t/deftest test-iid-fast-path + (let [before-uuid #uuid "00000000-0000-0000-0000-000000000000" + search-uuid #uuid "80000000-0000-0000-0000-000000000000" + after-uuid #uuid "f0000000-0000-0000-0000-000000000000"] + (with-open [node (node/start-node {})] + (xt/submit-tx node [[:put :xt-docs {:xt/id before-uuid :version 1}] + [:put :xt-docs {:xt/id search-uuid :version 1}] + [:put :xt-docs {:xt/id after-uuid :version 1}]]) + (xt/submit-tx node [[:put :xt-docs {:xt/id search-uuid :version 2}]]) + + (with-redefs [trie/iid-trie-merge-task (fn [& _args] (throw (UnsupportedOperationException. "")))] + (t/is (thrown-with-msg? + UnsupportedOperationException + #"" + (tu/query-ra [:scan {:table 'xt_docs} ['version {'xt/id (list '= 'xt/id search-uuid)}]] + {:node node})))) + + (t/is (= [{:version 2, :xt/id search-uuid}] + (tu/query-ra [:scan {:table 'xt_docs} ['version {'xt/id (list '= 'xt/id search-uuid)}]] + {:node node}))) + + (t/is (= [{:version 2, :xt/id search-uuid} + {:version 1, :xt/id search-uuid}] + (tu/query-ra [:scan {:table 'xt_docs + :for-valid-time :all-time} + ['version {'xt/id (list '= 'xt/id search-uuid)}]] + {:node node})))))) + +(t/deftest test-iid-fast-path-chunk-boundary + (let [before-uuid #uuid "00000000-0000-0000-0000-000000000000" + search-uuid #uuid "80000000-0000-0000-0000-000000000000" + after-uuid #uuid "f0000000-0000-0000-0000-000000000000" + uuids [before-uuid search-uuid after-uuid] + !search-uuid-versions (atom [])] + (with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 20}})] + (->> (for [i (range 110)] + (let [uuid (rand-nth uuids)] + (when (= uuid search-uuid) + (swap! !search-uuid-versions conj i)) + [[:put :xt_docs {:xt/id uuid :version i}]])) + (mapv #(xt/submit-tx node %))) + + (t/is (= [{:version (last @!search-uuid-versions), :xt/id search-uuid}] + (tu/query-ra [:scan {:table 'xt_docs} ['version {'xt/id (list '= 'xt/id search-uuid)}]] + {:node node}))) + + (t/is (= (into #{} @!search-uuid-versions) + (->> (tu/query-ra [:scan {:table 'xt_docs + :for-valid-time :all-time} + ['version {'xt/id (list '= 'xt/id search-uuid)}]] + {:node node}) + (map :version) + set)))))) + (t/deftest test-correct-rectangle-cutting (letfn [(test-er [& events] (let [!state (atom [])