Skip to content

Commit

Permalink
Unified row-picker for scan
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Aug 11, 2023
1 parent 62a5e2b commit cc9c282
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 170 deletions.
213 changes: 111 additions & 102 deletions core/src/main/clojure/xtdb/operator/scan.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
[xtdb.vector.writer :as vw]
xtdb.watermark)
(:import (clojure.lang IPersistentMap MapEntry )
(java.util ArrayList Arrays Iterator LinkedList List Map TreeSet NavigableSet)
(java.util ArrayList Arrays Iterator LinkedList List Map TreeSet NavigableSet ListIterator)
(java.util.function IntConsumer)
org.apache.arrow.memory.ArrowBuf
org.apache.arrow.memory.BufferAllocator
Expand Down Expand Up @@ -190,11 +190,28 @@
(deftype Rectangle [^long valid-from, ^long valid-to,
^long sys-from, ^long sys-to])

(defn- ranges-invariant [^LinkedList !ranges]
(every? true? (map (fn [^Rectangle r1 ^Rectangle r2] (<= (.valid-to r1) (.valid-from r2)))
!ranges (rest !ranges))))

(defn- print-rectangle [^Rectangle r]
(prn {:valid-from (util/micros->instant (.valid-from r))
:valid-to (util/micros->instant (.valid-to r))
:system-from (util/micros->instant (.sys-from r))
:sys-to (util/micros->instant (.sys-to r))}))

(defn- print-rectangles
([^LinkedList rs] (print-rectangles "$$$$$$$$" rs))
([identifier ^LinkedList rs]
(prn identifier)
(doseq [r rs]
(print-rectangle r))))

(defn range-range-row-picker
^java.util.function.IntConsumer [^IRelationWriter out-rel, ^RelationReader leaf-rel
col-names, ^longs temporal-ranges,
{:keys [^TreeSet !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr
point-point? range-point? range-range?]}]
{:keys [^LinkedList !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr
point-point? range-range?]}]
(let [iid-rdr (.readerForName leaf-rel "xt$iid")
sys-from-rdr (.readerForName leaf-rel "xt$system_from")
op-rdr (.readerForName leaf-rel "op")
Expand Down Expand Up @@ -250,55 +267,55 @@
sys-to-wtrs (vec
(for [col-name col-names
:when (= "xt$system_to" (util/str->normal-form-str col-name))]
(.writerForName out-rel col-name types/temporal-col-type)))]
(.writerForName out-rel col-name types/temporal-col-type)))

!overlapping-ranges (ArrayList.)]

(letfn [(duplicate-ptr [^ArrowBufPointer dst, ^ArrowBufPointer src]
(.set dst (.getBuf src) (.getOffset src) (.getLength src)))

(calc-new-ranges [^Rectangle new-r ^NavigableSet sub-range ^LinkedList !new-ranges
valid-from valid-to system-from]
(if-not (.isEmpty sub-range)
(let [itr (.iterator sub-range)]
(loop [^Rectangle prev nil ^Rectangle next (.next itr)]
(let [new-valid-from (or (when prev (.valid-to prev)) valid-from)
new-valid-to (or (when next (.valid-from next)) valid-to)]
(when (< new-valid-from new-valid-to)
(.add !new-ranges (Rectangle. new-valid-from new-valid-to
system-from util/end-of-time-μs)))
(when next
(when range-range?
(.add !new-ranges (Rectangle. (max valid-from (.valid-from next)) (min valid-to (.valid-to next))
system-from (.sys-from next))))
(recur next (and (.hasNext itr) (.next itr)))))))

(when (< valid-from valid-to)
(.add !new-ranges new-r))))

(update-ranges [^Rectangle new-r ^NavigableSet sub-range ^LinkedList !new-ranges
valid-from valid-to]
(cond
point-point?
(doseq [^Rectangle r !new-ranges]
(.add !ranges r)
(when (and (<= (.valid-from r) valid-from-upper) (< valid-from-upper (.valid-to r)))
(duplicate-ptr skip-iid-ptr current-iid-ptr)))

range-point?
(doseq [r !new-ranges]
(.add !ranges r))

range-range?
(let [^Rectangle lower (when (not (.isEmpty sub-range)) (.first sub-range))
^Rectangle upper (when (not (.isEmpty sub-range)) (.last sub-range))
itr (.iterator sub-range)]
(while (.hasNext itr)
(.next itr)
(.remove itr))
(when (and lower (< (.valid-from lower) valid-from))
(.add !ranges (Rectangle. (.valid-from lower) valid-from (.sys-from lower) (.sys-to lower))))
(when (and upper (< valid-to (.valid-to upper)))
(.add !ranges (Rectangle. valid-to (.valid-to upper) (.sys-from upper) (.sys-to upper))))
(.add !ranges new-r))))]
(write-r [idx valid-from valid-to sys-from sys-to]
(when (and (<= valid-from-lower valid-from)
(<= valid-from valid-from-upper)
(<= valid-to-lower valid-to)
(<= valid-to valid-to-upper)
(<= sys-from-lower sys-from)
(<= sys-from sys-from-upper)
(<= sys-to-lower sys-to)
(<= sys-to sys-to-upper)
(not= valid-from valid-to)
(not= sys-from sys-to))
(when point-point? (duplicate-ptr skip-iid-ptr current-iid-ptr))
(.startRow out-rel)
(doseq [^IRowCopier copier row-copiers]
(.copyRow copier idx))
(doseq [^IVectorWriter valid-from-wtr valid-from-wtrs]
(.writeLong valid-from-wtr valid-from))
(doseq [^IVectorWriter valid-to-wtr valid-to-wtrs]
(.writeLong valid-to-wtr valid-to))
(doseq [^IVectorWriter sys-from-wtr sys-from-wtrs]
(.writeLong sys-from-wtr sys-from))
(doseq [^IVectorWriter sys-to-wtr sys-to-wtrs]
(.writeLong sys-to-wtr sys-to))
(.endRow out-rel)))
(calc-overlapping-ranges [valid-from valid-to]
(let [^ListIterator itr (.iterator !ranges)
^Rectangle cur (loop []
(when (.hasNext itr)
(let [^Rectangle next (.next itr)]
(if (<= (.valid-to next) valid-from)
(recur)
next))))]
(loop [^Rectangle cur cur]
(when cur
(if (< (.valid-from cur) valid-to)
(do
(.add !overlapping-ranges cur)
(.remove itr)
(recur (when (.hasNext itr) (.next itr))))
(when (<= valid-to (.valid-from cur))
(.previous itr)))))
itr))]

(reify IntConsumer
(accept [_ idx]
Expand All @@ -307,69 +324,62 @@
(.clear !ranges)
(duplicate-ptr prev-iid-ptr current-iid-ptr))

(assert (ranges-invariant !ranges))

(if (= :evict (.getLeg op-rdr idx))
(duplicate-ptr skip-iid-ptr current-iid-ptr)
(let [!new-ranges (LinkedList.)
system-from (.getLong sys-from-rdr idx)]
(let [system-from (.getLong sys-from-rdr idx)]
(.clear !overlapping-ranges)
(when (and (<= sys-from-lower system-from) (<= system-from sys-from-upper))
(case (.getLeg op-rdr idx)
:put
(let [valid-from (.getLong put-valid-from-rdr idx)
valid-to (.getLong put-valid-to-rdr idx)
r1 (Rectangle. valid-from valid-to system-from util/end-of-time-μs)
^Rectangle lower-bound (.lower !ranges r1)
sub-range (.subSet !ranges
(if (and lower-bound (< valid-from (.valid-to lower-bound))) lower-bound r1)
true
(Rectangle. valid-to -1 -1 -1)
false)]
(calc-new-ranges r1 sub-range !new-ranges valid-from valid-to system-from)

(doseq [^Rectangle r !new-ranges]
(let [valid-from (.valid-from r)
valid-to (.valid-to r)
sys-from (.sys-from r)
sys-to (.sys-to r)]
(when (and (<= valid-from-lower valid-from)
(<= valid-from valid-from-upper)
(<= valid-to-lower valid-to)
(<= valid-to valid-to-upper)
(<= sys-from-lower sys-from)
(<= sys-from sys-from-upper)
(<= sys-to-lower sys-to)
(<= sys-to sys-to-upper)
(not= valid-from valid-to)
(not= sys-from sys-to))
(.startRow out-rel)
(doseq [^IRowCopier copier row-copiers]
(.copyRow copier idx))
(doseq [^IVectorWriter valid-from-wtr valid-from-wtrs]
(.writeLong valid-from-wtr valid-from))
(doseq [^IVectorWriter valid-to-wtr valid-to-wtrs]
(.writeLong valid-to-wtr valid-to))
(doseq [^IVectorWriter sys-from-wtr sys-from-wtrs]
(.writeLong sys-from-wtr sys-from))
(doseq [^IVectorWriter sys-to-wtr sys-to-wtrs]
(.writeLong sys-to-wtr sys-to))
(.endRow out-rel))))

(update-ranges r1 sub-range !new-ranges valid-from valid-to))
^ListIterator itr (calc-overlapping-ranges valid-from valid-to)]

(when-let [^Rectangle begin (first !overlapping-ranges)]
(when (< (.valid-from begin) valid-from)
(.add itr (Rectangle. (.valid-from begin) valid-from (.sys-from begin) (.sys-to begin))))
(when (< valid-from (.valid-from begin))
(write-r idx valid-from (.valid-from begin) system-from util/end-of-time-μs)))

(if (seq !overlapping-ranges)
(do
(dorun (map (fn [^Rectangle r1 ^Rectangle r2]
(when (< (.valid-to r1) (.valid-from r2))
(write-r idx (.valid-to r1) (.valid-from r2) system-from util/end-of-time-μs)))
!overlapping-ranges (rest !overlapping-ranges)))

(when range-range?
(doseq [^Rectangle r !overlapping-ranges]
(let [new-valid-from (max valid-from (.valid-from r))
new-valid-to (min valid-to (.valid-to r))]
(when (< new-valid-from new-valid-to)
(write-r idx new-valid-from new-valid-to system-from (.sys-from r)))))))
(write-r idx valid-from valid-to system-from util/end-of-time-μs))

(.add itr (Rectangle. valid-from valid-to system-from util/end-of-time-μs))

(when-let [^Rectangle end (last !overlapping-ranges)]
(when (< valid-to (.valid-to end))
(.add itr (Rectangle. valid-to (.valid-to end) (.sys-from end) (.sys-to end))))
(when (< (.valid-to end) valid-to)
(write-r idx (.valid-to end) valid-to system-from util/end-of-time-μs))))

:delete
(let [valid-from (.getLong delete-valid-from-rdr idx)
valid-to (.getLong delete-valid-to-rdr idx)
r1 (Rectangle. valid-from valid-to system-from util/end-of-time-μs)
^Rectangle lower-bound (.lower !ranges r1)
sub-range (.subSet !ranges
(if (and lower-bound (< valid-from (.valid-to lower-bound)))
lower-bound
r1)
true
(Rectangle. valid-to -1 -1 -1)
false)]
(calc-new-ranges r1 sub-range !new-ranges valid-from valid-to system-from)

(update-ranges r1 sub-range !new-ranges valid-from valid-to))))))))))))
^ListIterator itr (calc-overlapping-ranges valid-from valid-to)]

(when-let [^Rectangle begin (first !overlapping-ranges)]
(when (< (.valid-from begin) valid-from)
(.add itr (Rectangle. (.valid-from begin) valid-from (.sys-from begin) (.sys-to begin)))))

(.add itr (Rectangle. valid-from valid-to system-from util/end-of-time-μs))

(when-let [^Rectangle end (last !overlapping-ranges)]
(when (< valid-to (.valid-to end))
(.add itr (Rectangle. valid-to (.valid-to end) (.sys-from end) (.sys-to end))))))))))))))))

(deftype TrieCursor [^BufferAllocator allocator, arrow-leaves
^Iterator merge-tasks, ^ints leaf-idxs, ^ints current-arrow-page-idxs
Expand Down Expand Up @@ -496,8 +506,7 @@
col-names col-preds
temporal-range
params
(merge {:!ranges (TreeSet. (fn [^Rectangle r1 ^Rectangle r2]
(< (.valid-from r1) (.valid-from r2))))
(merge {:!ranges (LinkedList.)
:skip-iid-ptr (ArrowBufPointer.)
:prev-iid-ptr (ArrowBufPointer.)
:current-iid-ptr (ArrowBufPointer.)}
Expand Down
Loading

0 comments on commit cc9c282

Please sign in to comment.