From 46ae72836408281fbae2697ab72e872217cd3d32 Mon Sep 17 00:00:00 2001 From: FiVo Date: Tue, 8 Aug 2023 10:44:30 +0200 Subject: [PATCH] wip --- core/src/main/clojure/xtdb/operator/scan.clj | 143 +++++++------------ 1 file changed, 48 insertions(+), 95 deletions(-) diff --git a/core/src/main/clojure/xtdb/operator/scan.clj b/core/src/main/clojure/xtdb/operator/scan.clj index 1ad3a79137..1b6dc2a8a8 100644 --- a/core/src/main/clojure/xtdb/operator/scan.clj +++ b/core/src/main/clojure/xtdb/operator/scan.clj @@ -16,7 +16,7 @@ [xtdb.vector.writer :as vw] xtdb.watermark) (:import (clojure.lang IPersistentMap MapEntry ) - (java.util ArrayList Arrays Iterator LinkedList List Map TreeSet) + (java.util ArrayList Arrays Iterator LinkedList List Map TreeSet NavigableSet) (java.util.function IntConsumer) org.apache.arrow.memory.ArrowBuf org.apache.arrow.memory.BufferAllocator @@ -190,22 +190,11 @@ (deftype Rectangle [^long valid-from, ^long valid-to, ^long sys-from, ^long sys-to]) -(defn- print-ranges - ([ranges] (print-ranges "$$$$$$$$" ranges)) - ([identifier ranges] - (prn identifier) - (doseq [r ranges] - (prn [(util/micros->instant (.valid-from r)) - (util/micros->instant (.valid-to r))])))) - (defn range-range-row-picker ^java.util.function.IntConsumer [^IRelationWriter out-rel, ^RelationReader leaf-rel col-names, ^longs temporal-ranges, {:keys [^TreeSet !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr point-point? range-point? range-range?]}] - ;; (prn point-point?) - ;; (prn range-point?) - ;; (prn range-range?) (let [iid-rdr (.readerForName leaf-rel "xt$iid") sys-from-rdr (.readerForName leaf-rel "xt$system_from") op-rdr (.readerForName leaf-rel "op") @@ -266,7 +255,49 @@ !new-ranges (LinkedList.)] (letfn [(duplicate-ptr [^ArrowBufPointer dst, ^ArrowBufPointer src] - (.set dst (.getBuf src) (.getOffset src) (.getLength src)))] + (.set dst (.getBuf src) (.getOffset src) (.getLength src))) + (calc-new-ranges [^Rectangle new-r ^NavigableSet sub-range valid-from valid-to system-from] + (if-not (.isEmpty sub-range) + (let [itr (.iterator sub-range)] + (loop [^Rectangle prev nil ^Rectangle next (.next itr)] + (let [new-valid-from (or (when prev (.valid-to prev)) valid-from) + new-valid-to (or (when next (.valid-from next)) valid-to)] + (when (< new-valid-from new-valid-to) + (.add !new-ranges (Rectangle. new-valid-from new-valid-to + system-from util/end-of-time-μs))) + (when next + (when range-range? + (.add !new-ranges (Rectangle. (max valid-from (.valid-from next)) (min valid-to (.valid-to next)) + system-from (.sys-from next)))) + (recur next (and (.hasNext itr) (.next itr))))))) + + (when (< valid-from valid-to) + (.add !new-ranges new-r)))) + (update-ranges [^Rectangle new-r ^NavigableSet sub-range valid-from valid-to] + (cond + point-point? + (doseq [^Rectangle r !new-ranges] + (.add !ranges r) + (when (and (<= (.valid-from r) valid-from-upper) (< valid-from-upper (.valid-to r))) + (duplicate-ptr skip-iid-ptr current-iid-ptr))) + + range-point? + (doseq [r !new-ranges] + (.add !ranges r)) + + range-range? + (let [^Rectangle lower (when (not (.isEmpty sub-range)) (.first sub-range)) + ^Rectangle upper (when (not (.isEmpty sub-range)) (.last sub-range)) + itr (.iterator sub-range)] + (while (.hasNext itr) + (.next itr) + (.remove itr)) + (when (and lower (< (.valid-from lower) valid-from)) + (.add !ranges (Rectangle. (.valid-from lower) valid-from (.sys-from lower) (.sys-to lower)))) + (when (and upper (< valid-to (.valid-to upper))) + (.add !ranges (Rectangle. valid-to (.valid-to upper) (.sys-from upper) (.sys-to upper)))) + (.add !ranges new-r))) + (.clear !new-ranges))] (reify IntConsumer (accept [_ idx] @@ -279,7 +310,6 @@ (duplicate-ptr skip-iid-ptr current-iid-ptr) (let [system-from (.getLong sys-from-rdr idx)] (when (and (<= sys-from-lower system-from) (<= system-from sys-from-upper)) - ;; (print-ranges "ranges" !ranges) (case (.getLeg op-rdr idx) :put (let [valid-from (.getLong put-valid-from-rdr idx) @@ -293,23 +323,7 @@ true (Rectangle. valid-to -1 -1 -1) false)] - - (if-not (.isEmpty sub-range) - (let [itr (.iterator sub-range)] - (loop [^Rectangle prev nil ^Rectangle next (.next itr)] - (let [new-valid-from (or (when prev (.valid-to prev)) valid-from) - new-valid-to (or (when next (.valid-from next)) valid-to)] - (when (< new-valid-from new-valid-to) - (.add !new-ranges (Rectangle. new-valid-from new-valid-to - system-from util/end-of-time-μs))) - (when next - (when range-range? - (.add !new-ranges (Rectangle. (max valid-from (.valid-from next)) (min valid-to (.valid-to next)) - system-from (.sys-from next)))) - (recur next (and (.hasNext itr) (.next itr))))))) - - (when (< valid-from valid-to) - (.add !new-ranges r1))) + (calc-new-ranges r1 sub-range valid-from valid-to system-from) (doseq [^Rectangle r !new-ranges] (let [valid-from (.valid-from r) @@ -339,30 +353,7 @@ (.writeLong sys-to-wtr sys-to)) (.endRow out-rel)))) - (cond - point-point? - (doseq [^Rectangle r !new-ranges] - (.add !ranges r) - (when (and (<= (.valid-from r) valid-from-upper) (< valid-from-upper (.valid-to r))) - (duplicate-ptr skip-iid-ptr current-iid-ptr))) - - range-point? - (doseq [r !new-ranges] - (.add !ranges r)) - - range-range? - (let [^Rectangle lower (when (not (.isEmpty sub-range)) (.first sub-range)) - ^Rectangle upper (when (not (.isEmpty sub-range)) (.last sub-range)) - itr (.iterator sub-range)] - (while (.hasNext itr) - (.next itr) - (.remove itr)) - (when (and lower (< (.valid-from lower) valid-from)) - (.add !ranges (Rectangle. (.valid-from lower) valid-from (.sys-from lower) (.sys-to lower)))) - (when (and upper (< valid-to (.valid-to upper))) - (.add !ranges (Rectangle. valid-to (.valid-to upper) (.sys-from upper) (.sys-to upper)))) - (.add !ranges r1))) - (.clear !new-ranges)) + (update-ranges r1 sub-range valid-from valid-to)) :delete (let [valid-from (.getLong delete-valid-from-rdr idx) @@ -378,47 +369,9 @@ true (Rectangle. valid-to -1 -1 -1) false)] + (calc-new-ranges r1 sub-range valid-from valid-to system-from) - (if-not (.isEmpty sub-range) - (let [itr (.iterator sub-range)] - (loop [^Rectangle prev nil ^Rectangle next (.next itr)] - (let [new-valid-from (or (when prev (.valid-to prev)) valid-from) - new-valid-to (or (when next (.valid-from next)) valid-to)] - (when (< new-valid-from new-valid-to) - (.add !new-ranges (Rectangle. new-valid-from new-valid-to - system-from util/end-of-time-μs))) - (when next - (when range-range? - (.add !new-ranges (Rectangle. (max valid-from (.valid-from next)) (min valid-to (.valid-to next)) - system-from (.sys-from next)))) - (recur next (and (.hasNext itr) (.next itr))))))) - (when (< valid-from valid-to ) - (.add !new-ranges r1))) - - (cond - point-point? - (doseq [^Rectangle r !new-ranges] - (.add !ranges r) - (when (and (<= (.valid-from r) valid-from-upper) (< valid-from-upper (.valid-to r))) - (duplicate-ptr skip-iid-ptr current-iid-ptr))) - - range-point? - (doseq [r !new-ranges] - (.add !ranges r)) - - range-range? - (let [^Rectangle lower (when (not (.isEmpty sub-range)) (.first sub-range)) - ^Rectangle upper (when (not (.isEmpty sub-range)) (.last sub-range)) - itr (.iterator sub-range)] - (while (.hasNext itr) - (.next itr) - (.remove itr)) - (when (and lower (< (.valid-from lower) valid-from)) - (.add !ranges (Rectangle. (.valid-from lower) valid-from (.sys-from lower) (.sys-to lower)))) - (when (and upper (< valid-to (.valid-to upper))) - (.add !ranges (Rectangle. valid-to (.valid-to upper) (.sys-from upper) (.sys-to upper)))) - (.add !ranges r1))) - (.clear !new-ranges)))))))))))) + (update-ranges r1 sub-range valid-from valid-to)))))))))))) (deftype TrieCursor [^BufferAllocator allocator, arrow-leaves ^Iterator merge-tasks, ^ints leaf-idxs, ^ints current-arrow-page-idxs