Skip to content

Commit

Permalink
range-point and range-range with TreeSet
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Aug 7, 2023
1 parent 05d36bf commit 33e74cb
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 129 deletions.
213 changes: 106 additions & 107 deletions core/src/main/clojure/xtdb/operator/scan.clj
Original file line number Diff line number Diff line change
Expand Up @@ -298,27 +298,10 @@

(deftype Interval [^long start, ^long end, ^long sys-from])

(defn- intersect [^Interval i1 ^Interval i2]
(and (<= (.start i1) (.end i2)) (<= (.start i2) (.end i1))))

(defn- split
"i1 comes before i2 in system time"
[^Interval i1 ^Interval i2]
(let [start1 (.start i1)
start2 (.start i2)
end1 (.end i1)
end2 (.end i2)
new-sys-from (.sys-from i2)]
(cond-> []
(< start1 start2)
(conj (Interval. start1 start2 new-sys-from))
(< end2 end1)
(conj (Interval. end2 end1 new-sys-from)))))

(defn range-point-row-picker
^java.util.function.IntConsumer [^IRelationWriter out-rel, ^RelationReader leaf-rel
col-names, ^longs temporal-ranges,
{:keys [^LinkedList !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr]}]
{:keys [^TreeSet !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr]}]
(let [leaf-row-count (.rowCount leaf-rel)
iid-rdr (.readerForName leaf-rel "xt$iid")
sys-from-rdr (.readerForName leaf-rel "xt$system_from")
Expand Down Expand Up @@ -391,17 +374,21 @@
(when (<= sys-from system-time)
(case (.getLeg op-rdr idx)
:put
(let [i1 (Interval. (.getLong put-valid-from-rdr idx) (.getLong put-valid-to-rdr idx) sys-from)
itr (.listIterator !ranges)]
(.add !new-ranges i1)
(while (.hasNext itr)
(let [i2 (.next itr)
inner-itr (.listIterator !new-ranges)]
(while (.hasNext inner-itr)
(let [i1 (.next inner-itr)]
(when (intersect i1 i2)
(.remove inner-itr)
(run! #(.add inner-itr %) (split i1 i2)))))))
(let [valid-from (.getLong put-valid-from-rdr idx)
valid-to (.getLong put-valid-to-rdr idx)
i1 (Interval. valid-from valid-to sys-from)
lower-bound (or (.lower !ranges i1) i1)
sub-range (.subSet !ranges lower-bound true (Interval. valid-to -1 -1) false)]
(if-not (.isEmpty sub-range)
(let [itr (.iterator sub-range)]
(loop [^Interval prev nil ^Interval next (.next itr)]
(let [new-valid-from (or (and prev (.end prev)) valid-from)
new-valid-to (or (and next (.start next)) valid-to)]
(when (< new-valid-from new-valid-to)
(.add !new-ranges (Interval. new-valid-from new-valid-to sys-from)))
(when next
(recur next (and (.hasNext itr) (.next itr)))))))
(.add !new-ranges i1))
(doseq [^Interval i !new-ranges]
(.add !ranges i)
(let [valid-from (.start i)
Expand All @@ -424,55 +411,30 @@
(.clear !new-ranges))

:delete
(let [i1 (Interval. (.getLong delete-valid-from-rdr idx) (.getLong delete-valid-to-rdr idx) sys-from)
itr (.listIterator !ranges)]
(.add !new-ranges i1)
(while (.hasNext itr)
(let [i2 (.next itr)
inner-itr (.listIterator !new-ranges)]
(while (.hasNext inner-itr)
(let [i1 (.next inner-itr)]
(when (intersect i1 i2)
(.remove inner-itr)
(run! #(.add inner-itr %) (split i1 i2)))))))
(doseq [i !new-ranges]
(let [valid-from (.getLong delete-valid-from-rdr idx)
valid-to (.getLong delete-valid-to-rdr idx)
sub-range (.subSet !ranges (Interval. valid-from -1 -1) true (Interval. valid-to -1 -1) false)]
(if-not (.isEmpty sub-range)
(let [itr (.iterator sub-range)]
(loop [^Interval prev nil ^Interval next (.next itr)]
(let [new-valid-from (or (and prev (.end prev)) valid-from)
new-valid-to (or (and next (.start next)) valid-to)]
(when (< new-valid-from new-valid-to)
(.add !new-ranges (Interval. new-valid-from new-valid-to sys-from)))
(when next
(recur next (and (.hasNext itr) (.next itr)))))))
(.add !new-ranges (Interval. valid-from valid-to sys-from)))
(doseq [^Interval i !new-ranges]
(.add !ranges i))
(.clear !new-ranges))))))))))))

(deftype Rectangle [^long valid-from, ^long valid-to,
^long sys-from, ^long sys-to])

(defn- rectangle-intersect [^Rectangle r1 ^Rectangle r2]
(not (or (> (.valid-from r2) (.valid-to r1))
(> (.valid-from r1) (.valid-to r2))
(> (.sys-from r2) (.sys-to r1))
(> (.sys-from r1) (.sys-to r2)))))

(defn- rectangle-split
"r1 comes before r2 in system time"
[^Rectangle r1 ^Rectangle r2]
(let [valid-from1 (.valid-from r1)
valid-from2 (.valid-from r2)
valid-to1 (.valid-to r1)
valid-to2 (.valid-to r2)
sys-from1 (.sys-from r1)
sys-from2 (.sys-from r2)
sys-to1 (.sys-to r1)
sys-to2 (.sys-to r2)]
(cond-> []
(< sys-from1 sys-from2)
(conj (Rectangle. valid-from1 valid-to1 sys-from1 sys-from2))
(< sys-to2 sys-to1)
(conj (Rectangle. valid-from1 valid-to1 sys-to2 sys-to1))
(< valid-from1 valid-from2)
(conj (Rectangle. valid-from1 valid-from2 (max sys-from1 sys-from2) (min sys-to1 sys-to2)))
(< valid-to2 valid-to1)
(conj (Rectangle. valid-to2 valid-to1 (max sys-from1 sys-from2) (min sys-to1 sys-to2))))))

(defn range-range-row-picker
^java.util.function.IntConsumer [^IRelationWriter out-rel, ^RelationReader leaf-rel
col-names, ^longs temporal-ranges,
{:keys [^LinkedList !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr]}]
{:keys [^TreeSet !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr]}]
(let [iid-rdr (.readerForName leaf-rel "xt$iid")
sys-from-rdr (.readerForName leaf-rel "xt$system_from")
op-rdr (.readerForName leaf-rel "op")
Expand Down Expand Up @@ -545,24 +507,28 @@
(if (= :evict (.getLeg op-rdr idx))
(duplicate-ptr skip-iid-ptr current-iid-ptr)
(let [system-from (.getLong sys-from-rdr idx)]
;; TODO potentially more fancy check here for skipping
(when (and (<= sys-from-lower system-from) (<= system-from sys-from-upper))
(case (.getLeg op-rdr idx)
:put
(let [r1 (Rectangle. (.getLong put-valid-from-rdr idx) (.getLong put-valid-to-rdr idx)
(.getLong sys-from-rdr idx) util/end-of-time-μs)
itr (.listIterator !ranges)]
(.add !new-ranges r1)
(while (.hasNext itr)
(let [r2 (.next itr)
inner-itr (.listIterator !new-ranges)]
(while (.hasNext inner-itr)
(let [r1 (.next inner-itr)]
(when (rectangle-intersect r1 r2)
(.remove inner-itr)
(run! #(.add inner-itr %) (rectangle-split r1 r2)))))))
(let [valid-from (.getLong put-valid-from-rdr idx)
valid-to (.getLong put-valid-to-rdr idx)
r1 (Rectangle. valid-from valid-to system-from util/end-of-time-μs)
lower-bound (or (.lower !ranges r1) r1)
sub-range (.subSet !ranges lower-bound true (Rectangle. valid-to -1 -1 -1) false)]
(if-not (.isEmpty sub-range)
(let [itr (.iterator sub-range)]
(loop [^Rectangle prev nil ^Rectangle next (.next itr)]
(let [new-valid-from (or (when prev (.valid-to prev)) valid-from)
new-valid-to (or (when next (.valid-from next)) valid-to)]
(when (< new-valid-from new-valid-to)
(.add !new-ranges (Rectangle. new-valid-from new-valid-to
system-from util/end-of-time-μs)))
(when next
(.add !new-ranges (Rectangle. (.valid-from next) (min valid-to (.valid-to next))
system-from (.sys-from next)))
(recur next (and (.hasNext itr) (.next itr)))))))
(.add !new-ranges r1))
(doseq [^Rectangle r !new-ranges]
(.add !ranges r)
(let [valid-from (.valid-from r)
valid-to (.valid-to r)
sys-from (.sys-from r)
Expand All @@ -588,24 +554,49 @@
(doseq [^IVectorWriter sys-to-wtr sys-to-wtrs]
(.writeLong sys-to-wtr sys-to))
(.endRow out-rel))))
(.clear !new-ranges))

(let [^Rectangle lower (when (not (.isEmpty sub-range)) (.first sub-range))
^Rectangle upper (when (not (.isEmpty sub-range)) (.last sub-range))
itr (.iterator sub-range)]
(while (.hasNext itr)
(.next itr)
(.remove itr))
(when (and lower (< (.valid-from lower) valid-from))
(.add !ranges (Rectangle. (.valid-from lower) valid-from (.sys-from lower) (.sys-to lower))))
(when (and upper (< valid-to (.valid-to upper)))
(.add !ranges (Rectangle. valid-to (.valid-to upper) (.sys-from upper) (.sys-to upper))))
(.add !ranges r1)
(.clear !new-ranges)))
:delete
(let [r1 (Rectangle. (.getLong delete-valid-from-rdr idx) (.getLong delete-valid-to-rdr idx)
(.getLong sys-from-rdr idx) util/end-of-time-μs)
itr (.listIterator !ranges)]
(.add !new-ranges r1)
(while (.hasNext itr)
(let [r2 (.next itr)
inner-itr (.listIterator !new-ranges)]
(while (.hasNext inner-itr)
(let [r1 (.next inner-itr)]
(when (rectangle-intersect r1 r2)
(.remove inner-itr)
(run! #(.add inner-itr %) (rectangle-split r1 r2)))))))
(doseq [i !new-ranges]
(.add !ranges i))
(.clear !new-ranges))))))))))))
(let [valid-from (.getLong delete-valid-from-rdr idx)
valid-to (.getLong delete-valid-to-rdr idx)
r1 (Rectangle. valid-from valid-to system-from util/end-of-time-μs)
lower-bound (or (.lower !ranges r1) r1)
sub-range (.subSet !ranges lower-bound true (Rectangle. valid-to -1 -1 -1) false)]
(if-not (.isEmpty sub-range)
(let [itr (.iterator sub-range)]
(loop [^Rectangle prev nil ^Rectangle next (.next itr)]
(let [new-valid-from (or (when prev (.valid-to prev)) valid-from)
new-valid-to (or (when next (.valid-from next)) valid-to)]
(when (< new-valid-from new-valid-to)
(.add !new-ranges (Rectangle. new-valid-from new-valid-to
system-from util/end-of-time-μs)))
(when next
(.add !new-ranges (Rectangle. (.valid-from next) (min valid-to (.valid-to next))
system-from (.sys-from next)))
(recur next (and (.hasNext itr) (.next itr)))))))
(.add !new-ranges r1))
(let [^Rectangle lower (when (not (.isEmpty sub-range)) (.first sub-range))
^Rectangle upper (when (not (.isEmpty sub-range)) (.last sub-range))
itr (.iterator sub-range)]
(while (.hasNext itr)
(.next itr)
(.remove itr))
(when (and lower (< (.valid-from lower) valid-from))
(.add !ranges (Rectangle. (.valid-from lower) valid-from (.sys-from lower) (.sys-to lower))))
(when (and upper (< valid-to (.valid-to upper)))
(.add !ranges (Rectangle. valid-to (.valid-to upper) (.sys-from upper) (.sys-to upper))))
(.add !ranges r1)
(.clear !new-ranges)))))))))))))

(deftype TrieCursor [^BufferAllocator allocator, arrow-leaves
^Iterator merge-tasks, ^ints leaf-idxs, ^ints current-arrow-page-idxs
Expand Down Expand Up @@ -718,12 +709,6 @@
;; The consumers for different leafs need to share some state so the logic of how to advance
;; is correct. For example if the `skip-iid-ptr` gets set in one leaf consumer it should also affect
;; the skipping in another leaf consumer.
(defn- ->picker-state []
{:current-bounds (long-array 2)
:!ranges (LinkedList.)
:skip-iid-ptr (ArrowBufPointer.)
:prev-iid-ptr (ArrowBufPointer.)
:current-iid-ptr (ArrowBufPointer.)})

(defn ->4r-cursor [^BufferAllocator allocator, ^ObjectStore obj-store, ^IBufferPool buffer-pool, ^IWatermark wm
table-name, col-names, ^longs temporal-range
Expand All @@ -748,7 +733,15 @@
col-names col-preds
temporal-range
params
(->picker-state))
{:current-bounds (long-array 2)
:!ranges (if (range-point-query? wm basis scan-opts)
(TreeSet. (fn [^Interval i1 ^Interval i2]
(< (.start i1) (.start i2))))
(TreeSet. (fn [^Rectangle i1 ^Rectangle i2]
(< (.valid-from i1) (.valid-from i2)))))
:skip-iid-ptr (ArrowBufPointer.)
:prev-iid-ptr (ArrowBufPointer.)
:current-iid-ptr (ArrowBufPointer.)})

(catch Throwable t
(util/close (map :leaf-buf arrow-leaves))
Expand All @@ -760,6 +753,12 @@
:buffer-pool (ig/ref ::bp/buffer-pool)
:object-store (ig/ref :xtdb/object-store)}))

(defn print-temporal-data [^longs data]
(prn "$$$$$$$$$$")
(doseq [t data]
(prn (util/micros->instant t)))
(prn "$$$$$$$$$$"))

(defmethod ig/init-key ::scan-emitter [_ {:keys [^ObjectStore object-store ^IMetadataManager metadata-mgr, ^IBufferPool buffer-pool]}]
(reify IScanEmitter
(tableColNames [_ wm table-name]
Expand Down
27 changes: 14 additions & 13 deletions src/test/clojure/xtdb/datalog_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1504,13 +1504,13 @@
"cross-time join - who was here in both 2018 and 2023?")

(t/is (= #{{:vt-start (util/->zdt #inst "2021")
:vt-end (util/->zdt util/end-of-time)
:tt-start (util/->zdt #inst "2020-01-01")
:tt-end (util/->zdt #inst "2020-01-02")}
{:vt-start (util/->zdt #inst "2021")
:vt-end (util/->zdt #inst "2022")
:tt-start (util/->zdt #inst "2020-01-02")
:tt-end (util/->zdt util/end-of-time)}}
:tt-start (util/->zdt #inst "2020")
:tt-end (util/->zdt util/end-of-time)}
{:vt-start (util/->zdt #inst "2022")
:vt-end (util/->zdt util/end-of-time)
:tt-start (util/->zdt #inst "2020")
:tt-end (util/->zdt #inst "2020-01-02")}}
(set (q '{:find [vt-start vt-end tt-start tt-end]
:where [(match :xt_docs {:xt/id :luke
:xt/valid-from vt-start
Expand Down Expand Up @@ -1657,13 +1657,14 @@
tx1, nil)))
"as-of 18 Jan")

(t/is (= [{:cust 145, :app-start (util/->zdt #inst "1998-01-05")}
{:cust 827, :app-start (util/->zdt #inst "1998-01-12")}]
(q '{:find [cust app-start]
:where [(match :docs {:customer-number cust, :xt/valid-from app-start}
{:for-valid-time :all-time})]
:order-by [[app-start :asc]]}
tx5, nil))
(t/is (= #{{:cust 145, :app-start (util/->zdt #inst "1998-01-05")}
{:cust 827, :app-start (util/->zdt #inst "1998-01-12")}}
(set (q '{:find [cust app-start]
:where [(match :docs {:customer-number cust,
:xt/valid-from app-start}
{:for-valid-time :all-time})]
:order-by [[app-start :asc]]}
tx5, nil)))
"as-of 29 Jan")

(t/is (= [{:cust 827, :app-start (util/->zdt #inst "1998-01-12"), :app-end (util/->zdt #inst "1998-01-20")}]
Expand Down
Loading

0 comments on commit 33e74cb

Please sign in to comment.