diff --git a/core/src/main/clojure/xtdb/operator/scan.clj b/core/src/main/clojure/xtdb/operator/scan.clj index 5f20c577d8..95f0d3b94a 100644 --- a/core/src/main/clojure/xtdb/operator/scan.clj +++ b/core/src/main/clojure/xtdb/operator/scan.clj @@ -298,27 +298,10 @@ (deftype Interval [^long start, ^long end, ^long sys-from]) -(defn- intersect [^Interval i1 ^Interval i2] - (and (<= (.start i1) (.end i2)) (<= (.start i2) (.end i1)))) - -(defn- split - "i1 comes before i2 in system time" - [^Interval i1 ^Interval i2] - (let [start1 (.start i1) - start2 (.start i2) - end1 (.end i1) - end2 (.end i2) - new-sys-from (.sys-from i2)] - (cond-> [] - (< start1 start2) - (conj (Interval. start1 start2 new-sys-from)) - (< end2 end1) - (conj (Interval. end2 end1 new-sys-from))))) - (defn range-point-row-picker ^java.util.function.IntConsumer [^IRelationWriter out-rel, ^RelationReader leaf-rel col-names, ^longs temporal-ranges, - {:keys [^LinkedList !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr]}] + {:keys [^TreeSet !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr]}] (let [leaf-row-count (.rowCount leaf-rel) iid-rdr (.readerForName leaf-rel "xt$iid") sys-from-rdr (.readerForName leaf-rel "xt$system_from") @@ -391,17 +374,21 @@ (when (<= sys-from system-time) (case (.getLeg op-rdr idx) :put - (let [i1 (Interval. (.getLong put-valid-from-rdr idx) (.getLong put-valid-to-rdr idx) sys-from) - itr (.listIterator !ranges)] - (.add !new-ranges i1) - (while (.hasNext itr) - (let [i2 (.next itr) - inner-itr (.listIterator !new-ranges)] - (while (.hasNext inner-itr) - (let [i1 (.next inner-itr)] - (when (intersect i1 i2) - (.remove inner-itr) - (run! #(.add inner-itr %) (split i1 i2))))))) + (let [valid-from (.getLong put-valid-from-rdr idx) + valid-to (.getLong put-valid-to-rdr idx) + i1 (Interval. valid-from valid-to sys-from) + lower-bound (or (.lower !ranges i1) i1) + sub-range (.subSet !ranges lower-bound true (Interval. valid-to -1 -1) false)] + (if-not (.isEmpty sub-range) + (let [itr (.iterator sub-range)] + (loop [^Interval prev nil ^Interval next (.next itr)] + (let [new-valid-from (or (and prev (.end prev)) valid-from) + new-valid-to (or (and next (.start next)) valid-to)] + (when (< new-valid-from new-valid-to) + (.add !new-ranges (Interval. new-valid-from new-valid-to sys-from))) + (when next + (recur next (and (.hasNext itr) (.next itr))))))) + (.add !new-ranges i1)) (doseq [^Interval i !new-ranges] (.add !ranges i) (let [valid-from (.start i) @@ -424,55 +411,30 @@ (.clear !new-ranges)) :delete - (let [i1 (Interval. (.getLong delete-valid-from-rdr idx) (.getLong delete-valid-to-rdr idx) sys-from) - itr (.listIterator !ranges)] - (.add !new-ranges i1) - (while (.hasNext itr) - (let [i2 (.next itr) - inner-itr (.listIterator !new-ranges)] - (while (.hasNext inner-itr) - (let [i1 (.next inner-itr)] - (when (intersect i1 i2) - (.remove inner-itr) - (run! #(.add inner-itr %) (split i1 i2))))))) - (doseq [i !new-ranges] + (let [valid-from (.getLong delete-valid-from-rdr idx) + valid-to (.getLong delete-valid-to-rdr idx) + sub-range (.subSet !ranges (Interval. valid-from -1 -1) true (Interval. valid-to -1 -1) false)] + (if-not (.isEmpty sub-range) + (let [itr (.iterator sub-range)] + (loop [^Interval prev nil ^Interval next (.next itr)] + (let [new-valid-from (or (and prev (.end prev)) valid-from) + new-valid-to (or (and next (.start next)) valid-to)] + (when (< new-valid-from new-valid-to) + (.add !new-ranges (Interval. new-valid-from new-valid-to sys-from))) + (when next + (recur next (and (.hasNext itr) (.next itr))))))) + (.add !new-ranges (Interval. valid-from valid-to sys-from))) + (doseq [^Interval i !new-ranges] (.add !ranges i)) (.clear !new-ranges)))))))))))) (deftype Rectangle [^long valid-from, ^long valid-to, ^long sys-from, ^long sys-to]) -(defn- rectangle-intersect [^Rectangle r1 ^Rectangle r2] - (not (or (> (.valid-from r2) (.valid-to r1)) - (> (.valid-from r1) (.valid-to r2)) - (> (.sys-from r2) (.sys-to r1)) - (> (.sys-from r1) (.sys-to r2))))) - -(defn- rectangle-split - "r1 comes before r2 in system time" - [^Rectangle r1 ^Rectangle r2] - (let [valid-from1 (.valid-from r1) - valid-from2 (.valid-from r2) - valid-to1 (.valid-to r1) - valid-to2 (.valid-to r2) - sys-from1 (.sys-from r1) - sys-from2 (.sys-from r2) - sys-to1 (.sys-to r1) - sys-to2 (.sys-to r2)] - (cond-> [] - (< sys-from1 sys-from2) - (conj (Rectangle. valid-from1 valid-to1 sys-from1 sys-from2)) - (< sys-to2 sys-to1) - (conj (Rectangle. valid-from1 valid-to1 sys-to2 sys-to1)) - (< valid-from1 valid-from2) - (conj (Rectangle. valid-from1 valid-from2 (max sys-from1 sys-from2) (min sys-to1 sys-to2))) - (< valid-to2 valid-to1) - (conj (Rectangle. valid-to2 valid-to1 (max sys-from1 sys-from2) (min sys-to1 sys-to2)))))) - (defn range-range-row-picker ^java.util.function.IntConsumer [^IRelationWriter out-rel, ^RelationReader leaf-rel col-names, ^longs temporal-ranges, - {:keys [^LinkedList !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr]}] + {:keys [^TreeSet !ranges skip-iid-ptr prev-iid-ptr current-iid-ptr]}] (let [iid-rdr (.readerForName leaf-rel "xt$iid") sys-from-rdr (.readerForName leaf-rel "xt$system_from") op-rdr (.readerForName leaf-rel "op") @@ -545,24 +507,28 @@ (if (= :evict (.getLeg op-rdr idx)) (duplicate-ptr skip-iid-ptr current-iid-ptr) (let [system-from (.getLong sys-from-rdr idx)] - ;; TODO potentially more fancy check here for skipping (when (and (<= sys-from-lower system-from) (<= system-from sys-from-upper)) (case (.getLeg op-rdr idx) :put - (let [r1 (Rectangle. (.getLong put-valid-from-rdr idx) (.getLong put-valid-to-rdr idx) - (.getLong sys-from-rdr idx) util/end-of-time-μs) - itr (.listIterator !ranges)] - (.add !new-ranges r1) - (while (.hasNext itr) - (let [r2 (.next itr) - inner-itr (.listIterator !new-ranges)] - (while (.hasNext inner-itr) - (let [r1 (.next inner-itr)] - (when (rectangle-intersect r1 r2) - (.remove inner-itr) - (run! #(.add inner-itr %) (rectangle-split r1 r2))))))) + (let [valid-from (.getLong put-valid-from-rdr idx) + valid-to (.getLong put-valid-to-rdr idx) + r1 (Rectangle. valid-from valid-to system-from util/end-of-time-μs) + lower-bound (or (.lower !ranges r1) r1) + sub-range (.subSet !ranges lower-bound true (Rectangle. valid-to -1 -1 -1) false)] + (if-not (.isEmpty sub-range) + (let [itr (.iterator sub-range)] + (loop [^Rectangle prev nil ^Rectangle next (.next itr)] + (let [new-valid-from (or (when prev (.valid-to prev)) valid-from) + new-valid-to (or (when next (.valid-from next)) valid-to)] + (when (< new-valid-from new-valid-to) + (.add !new-ranges (Rectangle. new-valid-from new-valid-to + system-from util/end-of-time-μs))) + (when next + (.add !new-ranges (Rectangle. (.valid-from next) (min valid-to (.valid-to next)) + system-from (.sys-from next))) + (recur next (and (.hasNext itr) (.next itr))))))) + (.add !new-ranges r1)) (doseq [^Rectangle r !new-ranges] - (.add !ranges r) (let [valid-from (.valid-from r) valid-to (.valid-to r) sys-from (.sys-from r) @@ -588,24 +554,49 @@ (doseq [^IVectorWriter sys-to-wtr sys-to-wtrs] (.writeLong sys-to-wtr sys-to)) (.endRow out-rel)))) - (.clear !new-ranges)) - + (let [^Rectangle lower (when (not (.isEmpty sub-range)) (.first sub-range)) + ^Rectangle upper (when (not (.isEmpty sub-range)) (.last sub-range)) + itr (.iterator sub-range)] + (while (.hasNext itr) + (.next itr) + (.remove itr)) + (when (and lower (< (.valid-from lower) valid-from)) + (.add !ranges (Rectangle. (.valid-from lower) valid-from (.sys-from lower) (.sys-to lower)))) + (when (and upper (< valid-to (.valid-to upper))) + (.add !ranges (Rectangle. valid-to (.valid-to upper) (.sys-from upper) (.sys-to upper)))) + (.add !ranges r1) + (.clear !new-ranges))) :delete - (let [r1 (Rectangle. (.getLong delete-valid-from-rdr idx) (.getLong delete-valid-to-rdr idx) - (.getLong sys-from-rdr idx) util/end-of-time-μs) - itr (.listIterator !ranges)] - (.add !new-ranges r1) - (while (.hasNext itr) - (let [r2 (.next itr) - inner-itr (.listIterator !new-ranges)] - (while (.hasNext inner-itr) - (let [r1 (.next inner-itr)] - (when (rectangle-intersect r1 r2) - (.remove inner-itr) - (run! #(.add inner-itr %) (rectangle-split r1 r2))))))) - (doseq [i !new-ranges] - (.add !ranges i)) - (.clear !new-ranges)))))))))))) + (let [valid-from (.getLong delete-valid-from-rdr idx) + valid-to (.getLong delete-valid-to-rdr idx) + r1 (Rectangle. valid-from valid-to system-from util/end-of-time-μs) + lower-bound (or (.lower !ranges r1) r1) + sub-range (.subSet !ranges lower-bound true (Rectangle. valid-to -1 -1 -1) false)] + (if-not (.isEmpty sub-range) + (let [itr (.iterator sub-range)] + (loop [^Rectangle prev nil ^Rectangle next (.next itr)] + (let [new-valid-from (or (when prev (.valid-to prev)) valid-from) + new-valid-to (or (when next (.valid-from next)) valid-to)] + (when (< new-valid-from new-valid-to) + (.add !new-ranges (Rectangle. new-valid-from new-valid-to + system-from util/end-of-time-μs))) + (when next + (.add !new-ranges (Rectangle. (.valid-from next) (min valid-to (.valid-to next)) + system-from (.sys-from next))) + (recur next (and (.hasNext itr) (.next itr))))))) + (.add !new-ranges r1)) + (let [^Rectangle lower (when (not (.isEmpty sub-range)) (.first sub-range)) + ^Rectangle upper (when (not (.isEmpty sub-range)) (.last sub-range)) + itr (.iterator sub-range)] + (while (.hasNext itr) + (.next itr) + (.remove itr)) + (when (and lower (< (.valid-from lower) valid-from)) + (.add !ranges (Rectangle. (.valid-from lower) valid-from (.sys-from lower) (.sys-to lower)))) + (when (and upper (< valid-to (.valid-to upper))) + (.add !ranges (Rectangle. valid-to (.valid-to upper) (.sys-from upper) (.sys-to upper)))) + (.add !ranges r1) + (.clear !new-ranges))))))))))))) (deftype TrieCursor [^BufferAllocator allocator, arrow-leaves ^Iterator merge-tasks, ^ints leaf-idxs, ^ints current-arrow-page-idxs @@ -718,12 +709,6 @@ ;; The consumers for different leafs need to share some state so the logic of how to advance ;; is correct. For example if the `skip-iid-ptr` gets set in one leaf consumer it should also affect ;; the skipping in another leaf consumer. -(defn- ->picker-state [] - {:current-bounds (long-array 2) - :!ranges (LinkedList.) - :skip-iid-ptr (ArrowBufPointer.) - :prev-iid-ptr (ArrowBufPointer.) - :current-iid-ptr (ArrowBufPointer.)}) (defn ->4r-cursor [^BufferAllocator allocator, ^ObjectStore obj-store, ^IBufferPool buffer-pool, ^IWatermark wm table-name, col-names, ^longs temporal-range @@ -748,7 +733,15 @@ col-names col-preds temporal-range params - (->picker-state)) + {:current-bounds (long-array 2) + :!ranges (if (range-point-query? wm basis scan-opts) + (TreeSet. (fn [^Interval i1 ^Interval i2] + (< (.start i1) (.start i2)))) + (TreeSet. (fn [^Rectangle i1 ^Rectangle i2] + (< (.valid-from i1) (.valid-from i2))))) + :skip-iid-ptr (ArrowBufPointer.) + :prev-iid-ptr (ArrowBufPointer.) + :current-iid-ptr (ArrowBufPointer.)}) (catch Throwable t (util/close (map :leaf-buf arrow-leaves)) @@ -760,6 +753,12 @@ :buffer-pool (ig/ref ::bp/buffer-pool) :object-store (ig/ref :xtdb/object-store)})) +(defn print-temporal-data [^longs data] + (prn "$$$$$$$$$$") + (doseq [t data] + (prn (util/micros->instant t))) + (prn "$$$$$$$$$$")) + (defmethod ig/init-key ::scan-emitter [_ {:keys [^ObjectStore object-store ^IMetadataManager metadata-mgr, ^IBufferPool buffer-pool]}] (reify IScanEmitter (tableColNames [_ wm table-name] diff --git a/src/test/clojure/xtdb/datalog_test.clj b/src/test/clojure/xtdb/datalog_test.clj index 76844085a7..ebc4002fef 100644 --- a/src/test/clojure/xtdb/datalog_test.clj +++ b/src/test/clojure/xtdb/datalog_test.clj @@ -1504,13 +1504,13 @@ "cross-time join - who was here in both 2018 and 2023?") (t/is (= #{{:vt-start (util/->zdt #inst "2021") - :vt-end (util/->zdt util/end-of-time) - :tt-start (util/->zdt #inst "2020-01-01") - :tt-end (util/->zdt #inst "2020-01-02")} - {:vt-start (util/->zdt #inst "2021") :vt-end (util/->zdt #inst "2022") - :tt-start (util/->zdt #inst "2020-01-02") - :tt-end (util/->zdt util/end-of-time)}} + :tt-start (util/->zdt #inst "2020") + :tt-end (util/->zdt util/end-of-time)} + {:vt-start (util/->zdt #inst "2022") + :vt-end (util/->zdt util/end-of-time) + :tt-start (util/->zdt #inst "2020") + :tt-end (util/->zdt #inst "2020-01-02")}} (set (q '{:find [vt-start vt-end tt-start tt-end] :where [(match :xt_docs {:xt/id :luke :xt/valid-from vt-start @@ -1657,13 +1657,14 @@ tx1, nil))) "as-of 18 Jan") - (t/is (= [{:cust 145, :app-start (util/->zdt #inst "1998-01-05")} - {:cust 827, :app-start (util/->zdt #inst "1998-01-12")}] - (q '{:find [cust app-start] - :where [(match :docs {:customer-number cust, :xt/valid-from app-start} - {:for-valid-time :all-time})] - :order-by [[app-start :asc]]} - tx5, nil)) + (t/is (= #{{:cust 145, :app-start (util/->zdt #inst "1998-01-05")} + {:cust 827, :app-start (util/->zdt #inst "1998-01-12")}} + (set (q '{:find [cust app-start] + :where [(match :docs {:customer-number cust, + :xt/valid-from app-start} + {:for-valid-time :all-time})] + :order-by [[app-start :asc]]} + tx5, nil))) "as-of 29 Jan") (t/is (= [{:cust 827, :app-start (util/->zdt #inst "1998-01-12"), :app-end (util/->zdt #inst "1998-01-20")}] diff --git a/src/test/clojure/xtdb/operator/scan_test.clj b/src/test/clojure/xtdb/operator/scan_test.clj index 4a7c71ad5a..a3989075c1 100644 --- a/src/test/clojure/xtdb/operator/scan_test.clj +++ b/src/test/clojure/xtdb/operator/scan_test.clj @@ -224,18 +224,30 @@ #_(tu/finish-chunk! node) - (t/is (= #{{:xt/system-from (util/->zdt #inst "3000") - :xt/system-to (util/->zdt #inst "3001") - :last_updated "tx1"} - {:xt/system-from (util/->zdt #inst "3001") - :xt/system-to (util/->zdt util/end-of-time) - :last_updated "tx1"} - {:xt/system-from (util/->zdt #inst "3001") - :xt/system-to (util/->zdt util/end-of-time) - :last_updated "tx2"}} + (t/is (= #{{:last_updated "tx2", + :xt/valid-from #time/zoned-date-time "3001-01-01T00:00Z[UTC]", + :xt/valid-to + #time/zoned-date-time "9999-12-31T23:59:59.999999Z[UTC]", + :xt/system-from #time/zoned-date-time "3001-01-01T00:00Z[UTC]", + :xt/system-to + #time/zoned-date-time "9999-12-31T23:59:59.999999Z[UTC]"} + {:last_updated "tx1", + :xt/valid-from #time/zoned-date-time "3000-01-01T00:00Z[UTC]", + :xt/valid-to #time/zoned-date-time "3001-01-01T00:00Z[UTC]", + :xt/system-from #time/zoned-date-time "3000-01-01T00:00Z[UTC]", + :xt/system-to + #time/zoned-date-time "9999-12-31T23:59:59.999999Z[UTC]"} + {:last_updated "tx1", + :xt/valid-from #time/zoned-date-time "3001-01-01T00:00Z[UTC]", + :xt/valid-to + #time/zoned-date-time "9999-12-31T23:59:59.999999Z[UTC]", + :xt/system-from #time/zoned-date-time "3000-01-01T00:00Z[UTC]", + :xt/system-to #time/zoned-date-time "3001-01-01T00:00Z[UTC]"}} (set (tu/query-ra '[:scan {:table foo, :for-system-time :all-time} [{xt/system-from (< xt/system-from #time/zoned-date-time "3002-01-01T00:00Z")} {xt/system-to (> xt/system-to #time/zoned-date-time "2999-01-01T00:00Z")} + xt/valid-from + xt/valid-to last_updated]] {:node node :default-all-valid-time? true}))))))