Skip to content

Commit

Permalink
Fix LeafMergeQueue for non-consecutive ordinals
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Aug 23, 2023
1 parent 753cc01 commit dfad96c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 12 deletions.
11 changes: 6 additions & 5 deletions core/src/main/clojure/xtdb/operator/scan.clj
Original file line number Diff line number Diff line change
Expand Up @@ -556,14 +556,15 @@
iid-pred (mapv #(update % :rel-rdr (fn [^RelationReader rel-rdr]
(.select rel-rdr (.select iid-pred allocator rel-rdr params))))))
merge-q (->merge-queue loaded-leaves merge-task)
^"[Ljava.util.function.IntConsumer;"
row-pickers (into-array IntConsumer
(for [{:keys [rel-rdr]} loaded-leaves]
(range-range-row-picker out-rel rel-rdr col-names temporal-timestamps picker-state)))]
ordinal->row-pickers (->> (for [{:keys [^LeafMergeQueue$LeafPointer leaf-ptr rel-rdr]} loaded-leaves]
(MapEntry/create
(.getOrdinal leaf-ptr)
(range-range-row-picker out-rel rel-rdr col-names temporal-timestamps picker-state)))
(into {}))]

(loop []
(when-let [lp (.poll merge-q)]
(.accept ^IntConsumer (aget row-pickers (.getOrdinal lp)) (.getIndex lp))
(.accept ^IntConsumer (ordinal->row-pickers (.getOrdinal lp)) (.getIndex lp))
(.advance merge-q lp)
(recur)))

Expand Down
26 changes: 19 additions & 7 deletions core/src/main/java/xtdb/trie/LeafMergeQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import xtdb.vector.IVectorReader;

import java.util.Collection;
import java.util.HashMap;
import java.util.PriorityQueue;

public class LeafMergeQueue {
Expand Down Expand Up @@ -34,32 +35,43 @@ public String toString() {
}

private final byte[] path;
private final IVectorReader[] rdrs;
private final HashMap<Integer, IVectorReader> ordinalToVectorReader;
private final PriorityQueue<LeafPointer> pq;

private final ArrowBufPointer leftCmp = new ArrowBufPointer();
private final ArrowBufPointer rightCmp = new ArrowBufPointer();
private final ArrowBufPointer isValidCmp = new ArrowBufPointer();

private HashMap<Integer, IVectorReader> createOrdinalToVectorReader(IVectorReader [] rdrs, Collection<LeafPointer> lps) {
var res = new HashMap<Integer, IVectorReader>();
var itr = lps.iterator();
int index = 0;
while (itr.hasNext()) {
res.put(itr.next().ordinal, rdrs[index]);
index++;
}
return res;
}

public LeafMergeQueue(byte[] path, IVectorReader[] rdrs, Collection<LeafPointer> lps) {
this.rdrs = rdrs;
this.path = path;
this.ordinalToVectorReader = createOrdinalToVectorReader(rdrs, lps);

this.pq = new PriorityQueue<>((l, r) -> {
int cmp = getPointer(l, leftCmp).compareTo(getPointer(r, rightCmp));
if (cmp != 0) return cmp;
return Long.compare(r.ordinal, l.ordinal);
int cmp = getPointer(l, leftCmp).compareTo(getPointer(r, rightCmp));
if (cmp != 0) return cmp;
return Long.compare(r.ordinal, l.ordinal);
});

pq.addAll(lps.stream().filter(this::isValid).toList());
}

private ArrowBufPointer getPointer(LeafPointer lp, ArrowBufPointer ptr) {
return rdrs[lp.ordinal].getPointer(lp.index, ptr);
return ordinalToVectorReader.get(lp.ordinal).getPointer(lp.index, ptr);
}

private boolean isValid(LeafPointer lp) {
return lp.index < rdrs[lp.ordinal].valueCount() && HashTrie.compareToPath(getPointer(lp, isValidCmp), path) <= 0;
return lp.index < ordinalToVectorReader.get(lp.ordinal).valueCount() && HashTrie.compareToPath(getPointer(lp, isValidCmp), path) <= 0;
}

public void advance(LeafPointer lp) {
Expand Down
31 changes: 31 additions & 0 deletions src/test/clojure/xtdb/trie/leaf_merge_queue_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
(ns xtdb.trie.leaf-merge-queue-test
(:require [clojure.test :as t :refer [deftest]]
[xtdb.util :as util]
[xtdb.vector.writer :as vw])
(:import (org.apache.arrow.memory RootAllocator)
(xtdb.trie LeafMergeQueue LeafMergeQueue$LeafPointer)
xtdb.vector.IVectorReader))

(deftest test-leaf-merge-queue-non-consecutive-ordinals-2714
(with-open [allocator (RootAllocator.)
rel1 (vw/->rel-writer allocator)
iid-wrt1 (.writerForName rel1 "xt$iid" [:fixed-size-binary 16])
rel2 (vw/->rel-writer allocator)
iid-wrt2 (.writerForName rel2 "xt$iid" [:fixed-size-binary 16])]
(.writeBytes iid-wrt1 (util/uuid->byte-buffer #uuid "00000000-0000-0000-0000-000000000000"))
(.writeBytes iid-wrt2 (util/uuid->byte-buffer #uuid "01000000-0000-0000-0000-000000000000"))
(.writeBytes iid-wrt1 (util/uuid->byte-buffer #uuid "02000000-0000-0000-0000-000000000000"))
(.writeBytes iid-wrt2 (util/uuid->byte-buffer #uuid "03000000-0000-0000-0000-000000000000"))
(let [lmq (LeafMergeQueue. (byte-array 0)
(into-array IVectorReader
[(vw/vec-wtr->rdr iid-wrt1)
(vw/vec-wtr->rdr iid-wrt2)])
[(LeafMergeQueue$LeafPointer. 1)
(LeafMergeQueue$LeafPointer. 3)])]
(letfn [(poll-and-advance []
(let [leaf-ptr (.poll lmq)]
(when leaf-ptr
(.advance lmq leaf-ptr)
(.getOrdinal leaf-ptr))))]
(t/is (= '(1 3 1 3 nil)
(repeatedly 5 poll-and-advance)))))))

0 comments on commit dfad96c

Please sign in to comment.