Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Aug 8, 2023
1 parent de86f58 commit 46ae728
Showing 1 changed file with 48 additions and 95 deletions.
143 changes: 48 additions & 95 deletions core/src/main/clojure/xtdb/operator/scan.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
[xtdb.vector.writer :as vw]
xtdb.watermark)
(:import (clojure.lang IPersistentMap MapEntry )
(java.util ArrayList Arrays Iterator LinkedList List Map TreeSet)
(java.util ArrayList Arrays Iterator LinkedList List Map TreeSet NavigableSet)
(java.util.function IntConsumer)
org.apache.arrow.memory.ArrowBuf
org.apache.arrow.memory.BufferAllocator
Expand Down Expand Up @@ -190,22 +190,11 @@
(deftype Rectangle [^long valid-from, ^long valid-to,
^long sys-from, ^long sys-to])

(defn- print-ranges
([ranges] (print-ranges "$$$$$$$$" ranges))
([identifier ranges]
(prn identifier)
(doseq [r ranges]
(prn [(util/micros->instant (.valid-from r))
(util/micros->instant (.valid-to r))]))))

(defn range-range-row-picker
^java.util.function.IntConsumer [^IRelationWriter out-rel, ^RelationReader leaf-rel
col-names, ^longs temporal-ranges,
{:keys [^TreeSet !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr
point-point? range-point? range-range?]}]
;; (prn point-point?)
;; (prn range-point?)
;; (prn range-range?)
(let [iid-rdr (.readerForName leaf-rel "xt$iid")
sys-from-rdr (.readerForName leaf-rel "xt$system_from")
op-rdr (.readerForName leaf-rel "op")
Expand Down Expand Up @@ -266,7 +255,49 @@
!new-ranges (LinkedList.)]

(letfn [(duplicate-ptr [^ArrowBufPointer dst, ^ArrowBufPointer src]
(.set dst (.getBuf src) (.getOffset src) (.getLength src)))]
(.set dst (.getBuf src) (.getOffset src) (.getLength src)))
(calc-new-ranges [^Rectangle new-r ^NavigableSet sub-range valid-from valid-to system-from]
(if-not (.isEmpty sub-range)
(let [itr (.iterator sub-range)]
(loop [^Rectangle prev nil ^Rectangle next (.next itr)]
(let [new-valid-from (or (when prev (.valid-to prev)) valid-from)
new-valid-to (or (when next (.valid-from next)) valid-to)]
(when (< new-valid-from new-valid-to)
(.add !new-ranges (Rectangle. new-valid-from new-valid-to
system-from util/end-of-time-μs)))
(when next
(when range-range?
(.add !new-ranges (Rectangle. (max valid-from (.valid-from next)) (min valid-to (.valid-to next))
system-from (.sys-from next))))
(recur next (and (.hasNext itr) (.next itr)))))))

(when (< valid-from valid-to)
(.add !new-ranges new-r))))
(update-ranges [^Rectangle new-r ^NavigableSet sub-range valid-from valid-to]
(cond
point-point?
(doseq [^Rectangle r !new-ranges]
(.add !ranges r)
(when (and (<= (.valid-from r) valid-from-upper) (< valid-from-upper (.valid-to r)))
(duplicate-ptr skip-iid-ptr current-iid-ptr)))

range-point?
(doseq [r !new-ranges]
(.add !ranges r))

range-range?
(let [^Rectangle lower (when (not (.isEmpty sub-range)) (.first sub-range))
^Rectangle upper (when (not (.isEmpty sub-range)) (.last sub-range))
itr (.iterator sub-range)]
(while (.hasNext itr)
(.next itr)
(.remove itr))
(when (and lower (< (.valid-from lower) valid-from))
(.add !ranges (Rectangle. (.valid-from lower) valid-from (.sys-from lower) (.sys-to lower))))
(when (and upper (< valid-to (.valid-to upper)))
(.add !ranges (Rectangle. valid-to (.valid-to upper) (.sys-from upper) (.sys-to upper))))
(.add !ranges new-r)))
(.clear !new-ranges))]

(reify IntConsumer
(accept [_ idx]
Expand All @@ -279,7 +310,6 @@
(duplicate-ptr skip-iid-ptr current-iid-ptr)
(let [system-from (.getLong sys-from-rdr idx)]
(when (and (<= sys-from-lower system-from) (<= system-from sys-from-upper))
;; (print-ranges "ranges" !ranges)
(case (.getLeg op-rdr idx)
:put
(let [valid-from (.getLong put-valid-from-rdr idx)
Expand All @@ -293,23 +323,7 @@
true
(Rectangle. valid-to -1 -1 -1)
false)]

(if-not (.isEmpty sub-range)
(let [itr (.iterator sub-range)]
(loop [^Rectangle prev nil ^Rectangle next (.next itr)]
(let [new-valid-from (or (when prev (.valid-to prev)) valid-from)
new-valid-to (or (when next (.valid-from next)) valid-to)]
(when (< new-valid-from new-valid-to)
(.add !new-ranges (Rectangle. new-valid-from new-valid-to
system-from util/end-of-time-μs)))
(when next
(when range-range?
(.add !new-ranges (Rectangle. (max valid-from (.valid-from next)) (min valid-to (.valid-to next))
system-from (.sys-from next))))
(recur next (and (.hasNext itr) (.next itr)))))))

(when (< valid-from valid-to)
(.add !new-ranges r1)))
(calc-new-ranges r1 sub-range valid-from valid-to system-from)

(doseq [^Rectangle r !new-ranges]
(let [valid-from (.valid-from r)
Expand Down Expand Up @@ -339,30 +353,7 @@
(.writeLong sys-to-wtr sys-to))
(.endRow out-rel))))

(cond
point-point?
(doseq [^Rectangle r !new-ranges]
(.add !ranges r)
(when (and (<= (.valid-from r) valid-from-upper) (< valid-from-upper (.valid-to r)))
(duplicate-ptr skip-iid-ptr current-iid-ptr)))

range-point?
(doseq [r !new-ranges]
(.add !ranges r))

range-range?
(let [^Rectangle lower (when (not (.isEmpty sub-range)) (.first sub-range))
^Rectangle upper (when (not (.isEmpty sub-range)) (.last sub-range))
itr (.iterator sub-range)]
(while (.hasNext itr)
(.next itr)
(.remove itr))
(when (and lower (< (.valid-from lower) valid-from))
(.add !ranges (Rectangle. (.valid-from lower) valid-from (.sys-from lower) (.sys-to lower))))
(when (and upper (< valid-to (.valid-to upper)))
(.add !ranges (Rectangle. valid-to (.valid-to upper) (.sys-from upper) (.sys-to upper))))
(.add !ranges r1)))
(.clear !new-ranges))
(update-ranges r1 sub-range valid-from valid-to))

:delete
(let [valid-from (.getLong delete-valid-from-rdr idx)
Expand All @@ -378,47 +369,9 @@
true
(Rectangle. valid-to -1 -1 -1)
false)]
(calc-new-ranges r1 sub-range valid-from valid-to system-from)

(if-not (.isEmpty sub-range)
(let [itr (.iterator sub-range)]
(loop [^Rectangle prev nil ^Rectangle next (.next itr)]
(let [new-valid-from (or (when prev (.valid-to prev)) valid-from)
new-valid-to (or (when next (.valid-from next)) valid-to)]
(when (< new-valid-from new-valid-to)
(.add !new-ranges (Rectangle. new-valid-from new-valid-to
system-from util/end-of-time-μs)))
(when next
(when range-range?
(.add !new-ranges (Rectangle. (max valid-from (.valid-from next)) (min valid-to (.valid-to next))
system-from (.sys-from next))))
(recur next (and (.hasNext itr) (.next itr)))))))
(when (< valid-from valid-to )
(.add !new-ranges r1)))

(cond
point-point?
(doseq [^Rectangle r !new-ranges]
(.add !ranges r)
(when (and (<= (.valid-from r) valid-from-upper) (< valid-from-upper (.valid-to r)))
(duplicate-ptr skip-iid-ptr current-iid-ptr)))

range-point?
(doseq [r !new-ranges]
(.add !ranges r))

range-range?
(let [^Rectangle lower (when (not (.isEmpty sub-range)) (.first sub-range))
^Rectangle upper (when (not (.isEmpty sub-range)) (.last sub-range))
itr (.iterator sub-range)]
(while (.hasNext itr)
(.next itr)
(.remove itr))
(when (and lower (< (.valid-from lower) valid-from))
(.add !ranges (Rectangle. (.valid-from lower) valid-from (.sys-from lower) (.sys-to lower))))
(when (and upper (< valid-to (.valid-to upper)))
(.add !ranges (Rectangle. valid-to (.valid-to upper) (.sys-from upper) (.sys-to upper))))
(.add !ranges r1)))
(.clear !new-ranges))))))))))))
(update-ranges r1 sub-range valid-from valid-to))))))))))))

(deftype TrieCursor [^BufferAllocator allocator, arrow-leaves
^Iterator merge-tasks, ^ints leaf-idxs, ^ints current-arrow-page-idxs
Expand Down

0 comments on commit 46ae728

Please sign in to comment.