diff --git a/core/src/main/clojure/xtdb/operator/scan.clj b/core/src/main/clojure/xtdb/operator/scan.clj index f8cc9866e2..d7c1908ca5 100644 --- a/core/src/main/clojure/xtdb/operator/scan.clj +++ b/core/src/main/clojure/xtdb/operator/scan.clj @@ -17,10 +17,11 @@ xtdb.watermark) (:import (clojure.lang IPersistentMap MapEntry) [java.lang AutoCloseable] - (java.util ArrayList Arrays Iterator LinkedList List ListIterator Map) + java.nio.ByteBuffer + (java.util ArrayList Iterator LinkedList List ListIterator Map) (java.util.function IntConsumer) - org.apache.arrow.memory.ArrowBuf - org.apache.arrow.memory.BufferAllocator + (java.util.stream IntStream) + (org.apache.arrow.memory ArrowBuf BufferAllocator) [org.apache.arrow.memory.util ArrowBufPointer] (org.apache.arrow.vector VectorLoader VectorSchemaRoot) xtdb.api.protocols.TransactionInstant @@ -30,7 +31,7 @@ (xtdb.metadata IMetadataManager) xtdb.object_store.ObjectStore xtdb.operator.IRelationSelector - (xtdb.trie ArrowHashTrie ArrowHashTrie$Leaf HashTrie LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie$Leaf) + (xtdb.trie ArrowHashTrie ArrowHashTrie$Leaf LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie$Leaf) (xtdb.vector IRelationWriter IRowCopier IVectorReader IVectorWriter RelationReader) (xtdb.watermark IWatermark IWatermarkSource Watermark))) @@ -398,6 +399,31 @@ system-from nil)))))))))) +(defn iid-selector [^ByteBuffer iid-bb] + (reify IRelationSelector + (select [_ allocator rel-rdr _params] + (with-open [arrow-buf (util/->arrow-buf-view allocator iid-bb)] + (let [iid-ptr (ArrowBufPointer. arrow-buf 0 (.capacity iid-bb)) + ptr (ArrowBufPointer.) + iid-rdr (.readerForName rel-rdr "xt$iid") + value-count (.valueCount iid-rdr)] + (if (pos-int? value-count) + ;; lower-bound + (loop [left 0 right (dec value-count)] + (if (= left right) + (if (= iid-ptr (.getPointer iid-rdr left ptr)) + ;; upper bound + (loop [right left] + (if (or (>= right value-count) (not= iid-ptr (.getPointer iid-rdr right ptr))) + (.toArray (IntStream/range left right)) + (recur (inc right)))) + (int-array 0)) + (let [mid (quot (+ left right) 2)] + (if (<= (.compareTo iid-ptr (.getPointer iid-rdr mid ptr)) 0) + (recur left mid) + (recur (inc mid) right))))) + (int-array 0))))))) + (deftype TrieCursor [^BufferAllocator allocator, arrow-leaves, ^Iterator merge-tasks col-names, ^Map col-preds, ^longs temporal-timestamps, params, ^IPersistentMap picker-state] @@ -407,7 +433,10 @@ (let [{task-leaves :leaves, :keys [path]} (.next merge-tasks)] (with-open [out-rel (vw/->rel-writer allocator)] (let [loaded-leaves (mapv (fn [{:keys [load-leaf]}] - (load-leaf)) + (let [{:keys [^RelationReader rel-rdr] :as res} (load-leaf)] + (if-let [^IRelationSelector iid-pred (get col-preds "xt$iid")] + (assoc res :rel-rdr (.select rel-rdr (.select iid-pred allocator rel-rdr params))) + res))) task-leaves) merge-q (LeafMergeQueue. path (into-array IVectorReader @@ -433,7 +462,7 @@ (as-> rel (reduce (fn [^RelationReader rel, ^IRelationSelector col-pred] (.select rel (.select col-pred allocator rel params))) rel - (vals col-preds))))))) + (vals (dissoc col-preds "xt$iid")))))))) true) false)) @@ -441,7 +470,8 @@ (close [_] (util/close arrow-leaves))) -(defn- read-tries [^ObjectStore obj-store, ^IBufferPool buffer-pool, ^String table-name, ^ILiveTableWatermark live-table-wm] +(defn- read-tries [^ObjectStore obj-store, ^IBufferPool buffer-pool, ^String table-name, ^ILiveTableWatermark live-table-wm + {:keys [^ByteBuffer iid-bb] :as _opts}] (let [{trie-files :trie, leaf-files :leaf} (->> (.listObjects obj-store (format "tables/%s/chunks" table-name)) (keep (fn [file-name] (when-let [[_ file-type chunk-idx-str] (re-find #"/(leaf|trie)-c(.+?)\.arrow$" file-name)] @@ -477,41 +507,45 @@ (with-open [record-batch (util/->arrow-record-batch-view (first arrow-blocks) buf)] (.load loader record-batch) (.add trie-roots root))))) - - {:arrow-leaves (mapv (fn [{:keys [leaf-buf leaf-root]}] - (reify AutoCloseable - (close [_] - (util/close leaf-root) - (util/close leaf-buf)))) - arrow-leaves) - - :merge-tasks (vec (for [{:keys [path leaves]} (trie/trie-merge-tasks (cond-> (mapv #(ArrowHashTrie/from %) trie-roots) - live-table-wm (conj (.liveTrie live-table-wm))))] - {:path path - :leaves (mapv (fn [{:keys [ordinal leaf]}] - (condp = (class leaf) - ArrowHashTrie$Leaf - (let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf leaf) - {:keys [leaf-buf ^VectorLoader loader, ^VectorSchemaRoot leaf-root arrow-blocks, - !current-page-idx - ^LeafMergeQueue$LeafPointer leaf-ptr]} (nth arrow-leaves ordinal)] + (let [hash-tries (cond-> (mapv #(ArrowHashTrie/from %) trie-roots) + live-table-wm (conj (.liveTrie live-table-wm)))] + + {:arrow-leaves (mapv (fn [{:keys [leaf-buf leaf-root]}] + (reify AutoCloseable + (close [_] + (util/close leaf-root) + (util/close leaf-buf)))) + arrow-leaves) + + :merge-tasks (vec (for [{:keys [path leaves]} + (if iid-bb + (vector (trie/iid-trie-merge-task hash-tries (.array iid-bb))) + (trie/trie-merge-tasks hash-tries))] + {:path path + :leaves (mapv (fn [{:keys [ordinal leaf]}] + (condp = (class leaf) + ArrowHashTrie$Leaf + (let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf leaf) + {:keys [leaf-buf ^VectorLoader loader, ^VectorSchemaRoot leaf-root arrow-blocks, + !current-page-idx + ^LeafMergeQueue$LeafPointer leaf-ptr]} (nth arrow-leaves ordinal)] + {:load-leaf (fn [] + (when-not (= page-idx @!current-page-idx) + (reset! !current-page-idx page-idx) + + (with-open [rb (util/->arrow-record-batch-view (nth arrow-blocks page-idx) leaf-buf)] + (.load loader rb) + (.reset leaf-ptr))) + + {:rel-rdr (vr/<-root leaf-root) + :leaf-ptr leaf-ptr})}) + + LiveHashTrie$Leaf {:load-leaf (fn [] - (when-not (= page-idx @!current-page-idx) - (reset! !current-page-idx page-idx) - - (with-open [rb (util/->arrow-record-batch-view (nth arrow-blocks page-idx) leaf-buf)] - (.load loader rb) - (.reset leaf-ptr))) - - {:rel-rdr (vr/<-root leaf-root) - :leaf-ptr leaf-ptr})}) - - LiveHashTrie$Leaf - {:load-leaf (fn [] - {:rel-rdr (.select (.liveRelation live-table-wm) (.data ^LiveHashTrie$Leaf leaf)) - :leaf-ptr (LeafMergeQueue$LeafPointer. (count arrow-leaves))})})) + {:rel-rdr (.select (.liveRelation live-table-wm) (.data ^LiveHashTrie$Leaf leaf)) + :leaf-ptr (LeafMergeQueue$LeafPointer. (count arrow-leaves))})})) - leaves)}))}))))) + leaves)}))})))))) ;; The consumers for different leafs need to share some state so the logic of how to advance ;; is correct. For example if the `skip-iid-ptr` gets set in one leaf consumer it should also affect @@ -521,7 +555,9 @@ table-name, col-names, ^longs temporal-range ^Map col-preds, params, scan-opts] (let [^ILiveTableWatermark live-table-wm (some-> (.liveIndex wm) (.liveTable table-name)) - {:keys [arrow-leaves ^List merge-tasks]} (read-tries obj-store buffer-pool table-name live-table-wm)] + + {:keys [arrow-leaves ^List merge-tasks]} + (read-tries obj-store buffer-pool table-name live-table-wm (select-keys scan-opts [:iid-bb]))] (try (->TrieCursor allocator arrow-leaves (.iterator merge-tasks) col-names col-preds @@ -538,6 +574,12 @@ (util/close arrow-leaves) (throw t))))) +(defn selects->iid-byte-buffer ^ByteBuffer [selects] + (when-let [eid-select (or (get selects 'xt/id) (get selects 'xt$id))] + (let [eid (nth eid-select 2)] + (when (and (= '= (first eid-select)) (s/valid? ::lp/value eid)) + (trie/->iid eid))))) + (defmethod ig/prep-key ::scan-emitter [_ opts] (merge opts {:metadata-mgr (ig/ref ::meta/metadata-manager) @@ -576,7 +618,7 @@ (->> scan-cols (into {} (map (juxt identity ->col-type)))))) - (emitScan [_ {:keys [columns], {:keys [table for-valid-time] :as scan-opts} :scan-opts} scan-col-types param-types] + (emitScan [_ {:keys [columns], {:keys [table for-valid-time] :as scan-opts} :scan-opts :as opts} scan-col-types param-types] (let [col-names (->> columns (into [] (comp (map (fn [[col-type arg]] (case col-type @@ -602,12 +644,18 @@ (first arg)) (into {})) + iid-bb (selects->iid-byte-buffer selects) + col-preds (->> (for [[col-name select-form] selects] ;; for temporal preds, we may not need to re-apply these if they can be represented as a temporal range. (MapEntry/create (str col-name) (expr/->expression-relation-selector select-form {:col-types col-types, :param-types param-types}))) (into {})) + col-preds (cond-> col-preds + iid-bb + (assoc "xt$iid" (iid-selector iid-bb))) + metadata-args (vec (for [[col-name select] selects :when (not (types/temporal-column? (util/str->normal-form-str (str col-name))))] select)) @@ -625,7 +673,8 @@ (let [_metadata-pred (expr.meta/->metadata-selector (cons 'and metadata-args) col-types params) scan-opts (cond-> scan-opts (nil? for-valid-time) - (assoc :for-valid-time (if default-all-valid-time? [:all-time] [:at [:now :now]])))] + (assoc :for-valid-time (if default-all-valid-time? [:all-time] [:at [:now :now]]) + :iid-bb iid-bb))] (->4r-cursor allocator object-store buffer-pool watermark normalized-table-name diff --git a/core/src/main/clojure/xtdb/trie.clj b/core/src/main/clojure/xtdb/trie.clj index d7b9f7a207..381c8c618e 100644 --- a/core/src/main/clojure/xtdb/trie.clj +++ b/core/src/main/clojure/xtdb/trie.clj @@ -176,3 +176,29 @@ {:ordinal ordinal, :leaf node})))))}])))] (vec (trie-merge-tasks* (map #(some-> ^HashTrie % (.rootNode)) tries) [])))) + +(defn- bucket-for [^bytes iid level] + (let [level-offset-bits (* HashTrie/LEVEL_BITS (inc level)) + level-offset-bytes (/ (- level-offset-bits HashTrie/LEVEL_BITS) Byte/SIZE)] + (bit-and (bit-shift-right (get iid level-offset-bytes) (mod level-offset-bits Byte/SIZE)) HashTrie/LEVEL_MASK))) + +(defn iid-trie-merge-task [tries ^bytes iid] + (letfn [(trie-merge-tasks* [nodes path level] + (let [bucket-idx (bucket-for iid level) + trie-children (mapv #(some-> ^HashTrie$Node % (.children)) nodes)] + (if (some identity trie-children) + (trie-merge-tasks* (mapv (fn [node ^objects node-children] + (if node-children + (aget node-children bucket-idx) + node)) + nodes trie-children) + (conj path bucket-idx) + (inc level)) + {:path (byte-array path) + :leaves (->> nodes + (into [] (keep-indexed + (fn [ordinal ^HashTrie$Node node] + (when node + {:ordinal ordinal, :leaf node})))))})))] + + (trie-merge-tasks* (map #(some-> ^HashTrie % (.rootNode)) tries) [] 0))) diff --git a/core/src/main/clojure/xtdb/util.clj b/core/src/main/clojure/xtdb/util.clj index 78d0196dc1..080d06a129 100644 --- a/core/src/main/clojure/xtdb/util.clj +++ b/core/src/main/clojure/xtdb/util.clj @@ -24,7 +24,7 @@ (org.apache.arrow.compression CommonsCompressionFactory) (org.apache.arrow.flatbuf Footer Message RecordBatch) (org.apache.arrow.memory AllocationManager ArrowBuf BufferAllocator) - (org.apache.arrow.memory.util ByteFunctionHelpers MemoryUtil) + (org.apache.arrow.memory.util ArrowBufPointer ByteFunctionHelpers MemoryUtil) (org.apache.arrow.vector ValueVector VectorLoader VectorSchemaRoot) (org.apache.arrow.vector.ipc ArrowFileWriter ArrowStreamWriter ArrowWriter) (org.apache.arrow.vector.ipc.message ArrowBlock ArrowFooter ArrowRecordBatch MessageSerializer) @@ -117,6 +117,9 @@ (.position bb 0) bb)) +(defn byte-buffer->uuid [^ByteBuffer bb] + (UUID. (.getLong bb 0) (.getLong bb 1))) + (defn ->lex-hex-string "Turn a long into a lexicographically-sortable hex string by prepending the length" [^long l] diff --git a/src/test/clojure/xtdb/operator/scan_test.clj b/src/test/clojure/xtdb/operator/scan_test.clj index b02186219f..2fd1c01141 100644 --- a/src/test/clojure/xtdb/operator/scan_test.clj +++ b/src/test/clojure/xtdb/operator/scan_test.clj @@ -3,12 +3,16 @@ [xtdb.api :as xt] [xtdb.node :as node] [xtdb.operator :as op] + [xtdb.operator.scan :as scan] [xtdb.test-util :as tu] + [xtdb.trie :as trie] [xtdb.util :as util] - [xtdb.operator.scan :as scan]) - (:import xtdb.operator.IRaQuerySource - (java.util LinkedList) - (xtdb.operator.scan RowConsumer) )) + [xtdb.vector.writer :as vw]) + (:import (java.util LinkedList) + xtdb.operator.IRaQuerySource + xtdb.operator.IRelationSelector + (xtdb.operator.scan RowConsumer) + (xtdb.vector RelationReader))) (t/use-fixtures :each tu/with-mock-clock tu/with-allocator) @@ -386,6 +390,96 @@ '[:scan {:table xt_docs} [xt/id {col-x (= col-x "toto")}]] {:node node}))))) +(t/deftest test-iid-fast-path + (let [before-uuid #uuid "00000000-0000-0000-0000-000000000000" + search-uuid #uuid "80000000-0000-0000-0000-000000000000" + after-uuid #uuid "f0000000-0000-0000-0000-000000000000"] + (with-open [node (node/start-node {})] + (xt/submit-tx node [[:put :xt-docs {:xt/id before-uuid :version 1}] + [:put :xt-docs {:xt/id search-uuid :version 1}] + [:put :xt-docs {:xt/id after-uuid :version 1}]]) + (xt/submit-tx node [[:put :xt-docs {:xt/id search-uuid :version 2}]]) + + (t/is (nil? (scan/selects->iid-byte-buffer {}))) + + (t/is (= (util/uuid->byte-buffer search-uuid) + (scan/selects->iid-byte-buffer {'xt/id (list '= 'xt/id search-uuid)}))) + + (t/is (nil? (scan/selects->iid-byte-buffer {'xt/id (list '< 'xt/id search-uuid)}))) + + (t/is (= [{:version 2, :xt/id search-uuid}] + (tu/query-ra [:scan {:table 'xt_docs} ['version {'xt/id (list '= 'xt/id search-uuid)}]] + {:node node}))) + + (t/is (= [{:version 2, :xt/id search-uuid} + {:version 1, :xt/id search-uuid}] + (tu/query-ra [:scan {:table 'xt_docs + :for-valid-time :all-time} + ['version {'xt/id (list '= 'xt/id search-uuid)}]] + {:node node})))))) + +(t/deftest test-iid-fast-path-chunk-boundary + (let [before-uuid #uuid "00000000-0000-0000-0000-000000000000" + search-uuid #uuid "80000000-0000-0000-0000-000000000000" + after-uuid #uuid "f0000000-0000-0000-0000-000000000000" + uuids [before-uuid search-uuid after-uuid] + !search-uuid-versions (atom [])] + (with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 20}})] + (->> (for [i (range 110)] + (let [uuid (rand-nth uuids)] + (when (= uuid search-uuid) + (swap! !search-uuid-versions conj i)) + [[:put :xt_docs {:xt/id uuid :version i}]])) + (mapv #(xt/submit-tx node %))) + + (t/is (= [{:version (last @!search-uuid-versions), :xt/id search-uuid}] + (tu/query-ra [:scan {:table 'xt_docs} ['version {'xt/id (list '= 'xt/id search-uuid)}]] + {:node node}))) + + (t/is (= (into #{} @!search-uuid-versions) + (->> (tu/query-ra [:scan {:table 'xt_docs + :for-valid-time :all-time} + ['version {'xt/id (list '= 'xt/id search-uuid)}]] + {:node node}) + (map :version) + set)))))) + +(t/deftest test-iid-selector + (let [before-uuid #uuid "00000000-0000-0000-0000-000000000000" + search-uuid #uuid "80000000-0000-0000-0000-000000000000" + after-uuid #uuid "f0000000-0000-0000-0000-000000000000" + ^IRelationSelector iid-selector (scan/iid-selector (util/uuid->byte-buffer search-uuid))] + (letfn [(test-uuids [uuids] + (with-open [rel-wrt (vw/->rel-writer tu/*allocator*)] + (let [iid-wtr (.writerForName rel-wrt "xt$iid" [:fixed-size-binary 16])] + (doseq [uuid uuids] + (.writeBytes iid-wtr (util/uuid->byte-buffer uuid)))) + (.select iid-selector tu/*allocator* (vw/rel-wtr->rdr rel-wrt) nil)))] + + (t/is (= nil + (seq (test-uuids []))) + "empty relation") + + (t/is (= nil + (seq (test-uuids [before-uuid before-uuid]))) + "only \"smaller\" uuids") + + (t/is (= nil + (seq (test-uuids [after-uuid after-uuid]))) + "only \"larger\" uuids") + + (t/is (= [1 2] + (seq (test-uuids [before-uuid search-uuid search-uuid]))) + "smaller uuids and no larger ones") + + (t/is (= [0 1] + (seq (test-uuids [search-uuid search-uuid after-uuid]))) + "no smaller uuids but larger ones") + + (t/is (= [1 2] + (seq (test-uuids [before-uuid search-uuid search-uuid after-uuid]))) + "general case")))) + (t/deftest test-correct-rectangle-cutting (letfn [(test-er [& events] (let [!state (atom [])