diff --git a/core/src/main/clojure/xtdb/operator/scan.clj b/core/src/main/clojure/xtdb/operator/scan.clj index dfdf96cb4d..6dc46c5e08 100644 --- a/core/src/main/clojure/xtdb/operator/scan.clj +++ b/core/src/main/clojure/xtdb/operator/scan.clj @@ -555,14 +555,15 @@ iid-pred (mapv #(update % :rel-rdr (fn [^RelationReader rel-rdr] (.select rel-rdr (.select iid-pred allocator rel-rdr params)))))) merge-q (->merge-queue loaded-leaves merge-task) - ^"[Ljava.util.function.IntConsumer;" - row-pickers (into-array IntConsumer - (for [{:keys [rel-rdr]} loaded-leaves] - (range-range-row-picker out-rel rel-rdr col-names temporal-timestamps picker-state)))] + ordinal->row-pickers (->> (for [{:keys [^LeafMergeQueue$LeafPointer leaf-ptr rel-rdr]} loaded-leaves] + (MapEntry/create + (.getOrdinal leaf-ptr) + (range-range-row-picker out-rel rel-rdr col-names temporal-timestamps picker-state))) + (into {}))] (loop [] (when-let [lp (.poll merge-q)] - (.accept ^IntConsumer (aget row-pickers (.getOrdinal lp)) (.getIndex lp)) + (.accept ^IntConsumer (ordinal->row-pickers (.getOrdinal lp)) (.getIndex lp)) (.advance merge-q lp) (recur))) diff --git a/core/src/main/java/xtdb/trie/LeafMergeQueue.java b/core/src/main/java/xtdb/trie/LeafMergeQueue.java index 2021949243..30eabcd945 100644 --- a/core/src/main/java/xtdb/trie/LeafMergeQueue.java +++ b/core/src/main/java/xtdb/trie/LeafMergeQueue.java @@ -4,6 +4,7 @@ import xtdb.vector.IVectorReader; import java.util.Collection; +import java.util.HashMap; import java.util.PriorityQueue; public class LeafMergeQueue { @@ -34,32 +35,43 @@ public String toString() { } private final byte[] path; - private final IVectorReader[] rdrs; + private final HashMap ordinalToVectorReader; private final PriorityQueue pq; private final ArrowBufPointer leftCmp = new ArrowBufPointer(); private final ArrowBufPointer rightCmp = new ArrowBufPointer(); private final ArrowBufPointer isValidCmp = new ArrowBufPointer(); + private HashMap createOrdinalToVectorReader(IVectorReader [] rdrs, Collection lps) { + var res = new HashMap(); + var itr = lps.iterator(); + int index = 0; + while (itr.hasNext()) { + res.put(itr.next().ordinal, rdrs[index]); + index++; + } + return res; + } + public LeafMergeQueue(byte[] path, IVectorReader[] rdrs, Collection lps) { - this.rdrs = rdrs; this.path = path; + this.ordinalToVectorReader = createOrdinalToVectorReader(rdrs, lps); this.pq = new PriorityQueue<>((l, r) -> { - int cmp = getPointer(l, leftCmp).compareTo(getPointer(r, rightCmp)); - if (cmp != 0) return cmp; - return Long.compare(r.ordinal, l.ordinal); + int cmp = getPointer(l, leftCmp).compareTo(getPointer(r, rightCmp)); + if (cmp != 0) return cmp; + return Long.compare(r.ordinal, l.ordinal); }); pq.addAll(lps.stream().filter(this::isValid).toList()); } private ArrowBufPointer getPointer(LeafPointer lp, ArrowBufPointer ptr) { - return rdrs[lp.ordinal].getPointer(lp.index, ptr); + return ordinalToVectorReader.get(lp.ordinal).getPointer(lp.index, ptr); } private boolean isValid(LeafPointer lp) { - return lp.index < rdrs[lp.ordinal].valueCount() && HashTrie.compareToPath(getPointer(lp, isValidCmp), path) <= 0; + return lp.index < ordinalToVectorReader.get(lp.ordinal).valueCount() && HashTrie.compareToPath(getPointer(lp, isValidCmp), path) <= 0; } public void advance(LeafPointer lp) { diff --git a/src/test/clojure/xtdb/trie/leaf_merge_queue_test.clj b/src/test/clojure/xtdb/trie/leaf_merge_queue_test.clj new file mode 100644 index 0000000000..e8a5008a59 --- /dev/null +++ b/src/test/clojure/xtdb/trie/leaf_merge_queue_test.clj @@ -0,0 +1,31 @@ +(ns xtdb.trie.leaf-merge-queue-test + (:require [clojure.test :as t :refer [deftest]] + [xtdb.util :as util] + [xtdb.vector.writer :as vw]) + (:import (org.apache.arrow.memory RootAllocator) + (xtdb.trie LeafMergeQueue LeafMergeQueue$LeafPointer) + xtdb.vector.IVectorReader)) + +(deftest test-leaf-merge-queue-non-consecutive-ordinals-2714 + (with-open [allocator (RootAllocator.) + rel1 (vw/->rel-writer allocator) + iid-wrt1 (.writerForName rel1 "xt$iid" [:fixed-size-binary 16]) + rel2 (vw/->rel-writer allocator) + iid-wrt2 (.writerForName rel2 "xt$iid" [:fixed-size-binary 16])] + (.writeBytes iid-wrt1 (util/uuid->byte-buffer #uuid "00000000-0000-0000-0000-000000000000")) + (.writeBytes iid-wrt2 (util/uuid->byte-buffer #uuid "01000000-0000-0000-0000-000000000000")) + (.writeBytes iid-wrt1 (util/uuid->byte-buffer #uuid "02000000-0000-0000-0000-000000000000")) + (.writeBytes iid-wrt2 (util/uuid->byte-buffer #uuid "03000000-0000-0000-0000-000000000000")) + (let [lmq (LeafMergeQueue. (byte-array 0) + (into-array IVectorReader + [(vw/vec-wtr->rdr iid-wrt1) + (vw/vec-wtr->rdr iid-wrt2)]) + [(LeafMergeQueue$LeafPointer. 1) + (LeafMergeQueue$LeafPointer. 3)])] + (letfn [(poll-and-advance [] + (let [leaf-ptr (.poll lmq)] + (when leaf-ptr + (.advance lmq leaf-ptr) + (.getOrdinal leaf-ptr))))] + (t/is (= '(1 3 1 3 nil) + (repeatedly 5 poll-and-advance)))))))