diff --git a/core/src/main/clojure/xtdb/operator/scan.clj b/core/src/main/clojure/xtdb/operator/scan.clj index 8237b9f8ff..67de58b96a 100644 --- a/core/src/main/clojure/xtdb/operator/scan.clj +++ b/core/src/main/clojure/xtdb/operator/scan.clj @@ -535,85 +535,79 @@ params] ICursor (tryAdvance [_ c] - (let [!merged-relation (volatile! nil)] - (loop [] - (let [has-next (.tryAdvance trie-bucket-cursor - (reify Consumer - (accept [_ in-rel] - (vreset! !merged-relation in-rel))))] - (if has-next - (let [!selection-vec (IntStream/builder) - iid-col (.vectorForName @!merged-relation "xt$iid") - op-col (.vectorForName @!merged-relation "op") - ^DenseUnionVector op-vec (.getVector op-col) - ^IStructReader put-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 0))) - ^IStructReader delete-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 1))) - ^IStructReader doc-vec (some-> put-vec (.readerForKey "xt$doc") (.getVector) (iv/->StructReader)) - valid-time (aget temporal-range 0) - ^TimeStampMicroTZVector put-valid-from-vec (some-> put-vec (.readerForKey "xt$valid_from") (.getVector)) - ^TimeStampMicroTZVector put-valid-to-vec (some-> put-vec (.readerForKey "xt$valid_to") (.getVector )) - ^TimeStampMicroTZVector delete-valid-from-vec (some-> delete-vec (.readerForKey "xt$valid_from") (.getVector)) - ^TimeStampMicroTZVector delete-valid-to-vec (some-> delete-vec (.readerForKey "xt$valid_to") (.getVector)) - cmp (cmp/->comparator iid-col iid-col :nulls-last) - !new (volatile! true)] - (dotimes [idx (.rowCount @!merged-relation)] - ;; new iid - (when (and (pos? idx) (not (zero? (.applyAsInt cmp (dec idx) idx)))) - (vreset! !new true)) - (when @!new - (case (.getTypeId op-vec (.getIndex op-col idx)) - ;; a put won - 0 (when (and ;; TODO one check might be enough here ? - (<= (.getObject put-valid-from-vec (.getOffset op-vec idx)) valid-time) - (<= valid-time (.getObject put-valid-to-vec (.getOffset op-vec idx)))) - (vreset! !new false) - (.add !selection-vec idx)) - ;; a delete won - 1 (when (and ;; TODO one check might be enough here ? - (<= (.getObject delete-valid-from-vec (.getOffset op-vec idx)) valid-time) - (<= valid-time (.getObject delete-valid-to-vec (.getOffset op-vec idx)))) - (vreset! !new false)) - ;; TODO evict - (throw (ex-info "Should not happen!" {:idx idx - :indirection-idx (.getIndex op-col idx) - :type-id (.getTypeId op-vec (.getIndex op-col idx))}))))) - (let [idxs (.toArray (.build !selection-vec)) - ^IIndirectRelation rel - (iv/->indirect-rel - (->> (for [col-name col-names - :let [normalized-name (util/str->normal-form-str col-name)]] - (some-> (cond - (= normalized-name "xt$system_from") - (iv/->indirect-vec (.getVector (.vectorForName @!merged-relation "xt$system_from")) idxs) - - ;; FIXME - hack for now - (= normalized-name "xt$system_to") - (iv/->indirect-vec (->constant-time-stamp-vec allocator "xt$system_to" (alength idxs)) - (int-array (range (alength idxs)))) - - (temporal/temporal-column? normalized-name) - (iv/->indirect-vec (.getVector (.readerForKey put-vec normalized-name)) - (->> (map #(.getOffset op-vec %) idxs) - (int-array))) - - :else - (let [col-vec (.readerForKey doc-vec normalized-name)] - (when-not (instance? NullIndirectVector col-vec) - (iv/->indirect-vec (.getVector col-vec) - (->> (map #(.getOffset op-vec %) idxs) - (int-array)))))) - (.withName col-name))) - (filter some?)) - (alength idxs)) - ^IIndirectRelation res (reduce (fn [^IIndirectRelation rel, ^IRelationSelector col-pred] - (iv/select rel (.select col-pred allocator rel params))) - rel - (vals col-preds))] - (if (pos? (.rowCount res)) - (do (.accept c res) - true) - (recur)))) - false))))) + (.tryAdvance trie-bucket-cursor + (reify Consumer + (accept [_ block-irel] + (let [!selection-vec (IntStream/builder) + iid-col (.vectorForName block-irel "xt$iid") + op-col (.vectorForName block-irel "op") + ^DenseUnionVector op-vec (.getVector op-col) + ^IStructReader put-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 0))) + ^IStructReader delete-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 1))) + ^IStructReader doc-vec (some-> put-vec (.readerForKey "xt$doc") (.getVector) (iv/->StructReader)) + valid-time (aget temporal-range 0) + ^TimeStampMicroTZVector put-valid-from-vec (some-> put-vec (.readerForKey "xt$valid_from") (.getVector)) + ^TimeStampMicroTZVector put-valid-to-vec (some-> put-vec (.readerForKey "xt$valid_to") (.getVector )) + ^TimeStampMicroTZVector delete-valid-from-vec (some-> delete-vec (.readerForKey "xt$valid_from") (.getVector)) + ^TimeStampMicroTZVector delete-valid-to-vec (some-> delete-vec (.readerForKey "xt$valid_to") (.getVector)) + cmp (cmp/->comparator iid-col iid-col :nulls-last) + !new (volatile! true)] + (dotimes [idx (.rowCount block-irel)] + ;; new iid + (when (and (pos? idx) (not (zero? (.applyAsInt cmp (dec idx) idx)))) + (vreset! !new true)) + (when @!new + (case (.getTypeId op-vec (.getIndex op-col idx)) + ;; a put won + 0 (when (and ;; TODO one check might be enough here ? + (<= (.getObject put-valid-from-vec (.getOffset op-vec idx)) valid-time) + (<= valid-time (.getObject put-valid-to-vec (.getOffset op-vec idx)))) + (vreset! !new false) + (.add !selection-vec idx)) + ;; a delete won + 1 (when (and ;; TODO one check might be enough here ? + (<= (.getObject delete-valid-from-vec (.getOffset op-vec idx)) valid-time) + (<= valid-time (.getObject delete-valid-to-vec (.getOffset op-vec idx)))) + (vreset! !new false)) + ;; TODO evict + (throw (ex-info "Should not happen!" {:idx idx + :indirection-idx (.getIndex op-col idx) + :type-id (.getTypeId op-vec (.getIndex op-col idx))}))))) + (let [idxs (.toArray (.build !selection-vec)) + ^IIndirectRelation rel + (iv/->indirect-rel + (->> (for [col-name col-names + :let [normalized-name (util/str->normal-form-str col-name)]] + (some-> (cond + (= normalized-name "xt$system_from") + (iv/->indirect-vec (.getVector (.vectorForName block-irel "xt$system_from")) idxs) + + ;; FIXME - hack for now + (= normalized-name "xt$system_to") + (iv/->indirect-vec (->constant-time-stamp-vec allocator "xt$system_to" (alength idxs)) + (int-array (range (alength idxs)))) + + (temporal/temporal-column? normalized-name) + (iv/->indirect-vec (.getVector (.readerForKey put-vec normalized-name)) + (->> (map #(.getOffset op-vec %) idxs) + (int-array))) + + :else + (let [col-vec (.readerForKey doc-vec normalized-name)] + (when-not (instance? NullIndirectVector col-vec) + (iv/->indirect-vec (.getVector col-vec) + (->> (map #(.getOffset op-vec %) idxs) + (int-array)))))) + (.withName col-name))) + (filter some?)) + (alength idxs)) + ^IIndirectRelation res (reduce (fn [^IIndirectRelation rel, ^IRelationSelector col-pred] + (iv/select rel (.select col-pred allocator rel params))) + rel + (vals col-preds))] + + (.accept c res) + true)))))) (close [_] (util/close trie-bucket-cursor))) @@ -653,7 +647,6 @@ (.load (VectorLoader. leaf-vsr) leaf-record-batch) (iv/<-root leaf-vsr))) - (defn print-leaf-paths [^IBufferPool buffer-pool, trie-filename leaf-filename] (with-open [^ArrowBuf trie-buf @(.getBuffer buffer-pool trie-filename) ^ArrowBuf leaf-buf @(.getBuffer buffer-pool leaf-filename)] @@ -679,14 +672,9 @@ (util/->arrow-record-batch-view (.get (.getRecordBatches leaf-footer) (.getPageIndex leaf)) leaf-buf) (.load (VectorLoader. leaf-batch))) - #_[iid-vec (->> (iv/<-root leaf-batch) - (.vectorForName "xt$iid") - (.getVector))] - [[(->> (range 0 (.getValueCount iid-vec)) (mapv #(vector (util/bytes->uuid (.getObject iid-vec %)))))]])))))))) - (defn calc-leaf-paths [^IBufferPool buffer-pool, trie-filename] (with-open [^ArrowBuf trie-buf @(.getBuffer buffer-pool trie-filename)] (let [trie-footer (util/read-arrow-footer trie-buf)] @@ -719,21 +707,8 @@ bytes)) (comment - (-> (bit-shift-left 8 4) - (bit-shift-right 4)) - - (bit-shift-right -128 4) - - (util/uuid->bytes (java.util.UUID. (Long/reverseBytes 0x80) 0)) - - (path->bytes '(8 0 0)) - - (bytes->path (path->bytes '(8 0 0))) - - (bytes->path (path->bytes '(9 0 0))) - - ) + (bytes->path (path->bytes '(9 0 0)))) (defn sub-path? "returns true if p1 is a subpath of p2." @@ -765,31 +740,11 @@ (compare-paths '(3 2) '(1 2 3)) (compare-paths '(1 3) '(1 2 3))) - ;; trie 1 - A --- - ;; trie 2 - A3 A7 --- - - ;; 1 iteration - ;; A3 A - - - ;; trie 1 A - ;; trie 2 AB - ;; trie 3 B4 - ;; trie 4 ABC ABD - - ;; The size of the lowest leaf-page becomes batch size. - - ;; page-identifier - {:path [0 0 1] - :trie-idx 1 - :page-idx 0 - :position 0} - ;; TODO make more efficient (defn uuid-byte-prefix? [path bytes] (= path (take (count path) (bytes->path bytes)))) - ;; TODO use more info about the + ;; TODO use more info about the sorting of the relations (defn merge-page-rels [^BufferAllocator allocator page-rels page-identifiers] (let [path (:path (first page-identifiers)) page-positions (int-array (map :position page-identifiers)) @@ -834,7 +789,6 @@ (aset new-page-positions rel-idx idx))) [(vw/rel-wtr->rdr rel-wtr) new-page-positions]))) - (defn trie-idx+page-idx->irel [trie-idx->page-idx+page-irel {:keys [trie-idx page-idx]} leaf-buf leaf-footer leaf-vsr] (if-let [[current-page-idx page-irel] (.get trie-idx->page-idx+page-irel trie-idx)] @@ -842,12 +796,14 @@ page-irel (let [new-page-rel (load-leaf-vsr leaf-buf leaf-vsr leaf-footer page-idx)] (.put trie-idx->page-idx+page-irel trie-idx [page-idx new-page-rel]) - ;; (.close page-irel) new-page-rel)) (let [new-page-rel (load-leaf-vsr leaf-buf leaf-vsr leaf-footer page-idx)] (.put trie-idx->page-idx+page-irel trie-idx [page-idx new-page-rel]) new-page-rel))) + + + ;; page-identifier {:path [0 0 1] :trie-idx 1 @@ -865,120 +821,76 @@ ^IIndirectRelation live-relation] ICursor (tryAdvance [_ c] - (prn "pq - " (.isEmpty pq) "-" (seq pq)) (if-not (.isEmpty pq) (let [smallest-page-identifier (.poll pq) page-identifiers (ArrayList. [smallest-page-identifier])] - (prn "path - " (:path smallest-page-identifier)) (while (and (not (.isEmpty pq)) (sub-path? (:path smallest-page-identifier) (:path (.peek pq)))) (.add page-identifiers (.poll pq))) ;; setting up merging + merging (let [trie-idxs (map :trie-idx page-identifiers) - irels (map (fn [{:keys [trie-idx] :as page-identifier}] - (trie-idx+page-idx->irel trie-idx->page-idx+irel - page-identifier - (aget leaf-bufs trie-idx) - (aget leaf-footers trie-idx) - (aget leaf-vsrs trie-idx))) - page-identifiers) - ;; row-ids - if it's -1 the rel was finished + irels (mapv (fn [{:keys [trie-idx] :as page-identifier}] + (trie-idx+page-idx->irel trie-idx->page-idx+irel + page-identifier + (aget leaf-bufs trie-idx) + (aget leaf-footers trie-idx) + (aget leaf-vsrs trie-idx))) + page-identifiers) + ;; if new-page-position is -1 the page was finished [block-irel new-page-positions] (merge-page-rels allocator irels page-identifiers)] - ;; do we go to the next page? - (doseq [[idx [trie-idx new-position]] (->> (map vector trie-idxs new-page-positions) - (map-indexed vector))] - (if (pos? new-position) - (.add pq (assoc (.get page-identifiers idx) :position new-position)) - - (let [leaf-iterator (.get leaf-iterators trie-idx)] - (.close (nth irels idx)) - (when (.hasNext leaf-iterator) - (let [trie-path (.next leaf-iterator)] - (.add pq {:path (butlast trie-path) - :trie-idx trie-idx - :page-idx (first (last trie-path)) - :position 0})))))) - ;; TODO check for nothing in here? - (.accept c block-irel) - true)) + (try + ;; get a new page or not + (doseq [[idx [trie-idx new-position]] (->> (map vector trie-idxs new-page-positions) + (map-indexed vector))] + (if (pos? new-position) + (.add pq (assoc (.get page-identifiers idx) :position new-position)) + + (let [leaf-iterator (.get leaf-iterators trie-idx)] + (.close (nth irels idx)) + (when (.hasNext leaf-iterator) + (let [trie-path (.next leaf-iterator)] + (.add pq {:path (butlast trie-path) + :trie-idx trie-idx + :page-idx (first (last trie-path)) + :position 0})))))) + (.accept c block-irel) + true + (finally (.close block-irel))))) false)) + ;; A A2 A2C + (close [_] (run! util/close leaf-vsrs) - (run! util/close leaf-bufs))) - - - ;; chunk-idx 0 - '(1 2) - ;; chunk-idx 1 - '(1) - - + (run! util/close leaf-bufs) + #_(run! (comp util/close second) (.values trie-idx->page-idx+irel)))) ;; filenames is a list of [trie-filename leaf-filename] (defn ->trie-bucket-cursor ^xtdb.ICursor [^BufferAllocator allocator, ^IBufferPool buffer-pool, filenames ^IIndirectRelation live-relation, live-selection] - (clojure.pprint/pprint (map #(print-leaf-paths buffer-pool (first %) (second %)) filenames)) + #_(clojure.pprint/pprint (map #(print-leaf-paths buffer-pool (first %) (second %)) filenames)) (if (seq filenames) (let [leaf-iterators (ArrayList. (map (comp #(calc-leaf-paths buffer-pool %) first) filenames)) - #_#_ _ (clojure.pprint/pprint (map iterator-seq leaf-iterators)) - ;; FIXME needs to be adapted for depth leaf-buffers (->> (map (comp #(deref (.getBuffer buffer-pool %)) second) filenames) object-array) leaf-footers (->> (map util/read-arrow-footer leaf-buffers) object-array) leaf-vsrs (->> (map #(VectorSchemaRoot/create (.getSchema %1) (.getAllocator (.getReferenceManager %2))) leaf-footers leaf-buffers) object-array) - pq (PriorityQueue. (fn [{path1 :path trie-idx1 :trie-idx} {path2 :path trie-idx2 :trie-idx}] - (compare-paths path1 path2) - #_(let [res (compare-paths path1 path2)] - (if (zero? res) - (- trie-idx2 trie-idx1) - res))))] - ;; (prn (count leaf-iterators)) + pq (PriorityQueue. (fn [{path1 :path} {path2 :path}] + (compare-paths path1 path2)))] (doseq [[idx trie-leaf-path] (->> (map #(.next %) leaf-iterators) (map-indexed vector))] - ;; (prn [idx trie-leaf-path]) (.add pq {:path (butlast trie-leaf-path) :trie-idx idx :page-idx (first (last trie-leaf-path)) :position 0})) - (TrieBucketCursor. allocator leaf-buffers leaf-footers leaf-vsrs pq leaf-iterators (HashMap.) - live-relation)) + (TrieBucketCursor. allocator leaf-buffers leaf-footers leaf-vsrs pq leaf-iterators + (HashMap.) live-relation)) (TrieBucketCursor. allocator nil nil nil nil nil (HashMap.) live-relation))) - ;; FIXME there must be some way to do this more cleanly - ;; FIXME potentially do selection of col-names before the time resolution - (defn files->rel [^BufferAllocator allocator ^IBufferPool buffer-pool trie-filename leaf-filename] - (let [^ArrowBuf trie-buf @(.getBuffer buffer-pool trie-filename) - ^ArrowBuf leaf-buf @(.getBuffer buffer-pool leaf-filename) - trie-footer (util/read-arrow-footer trie-buf) - leaf-footer (util/read-arrow-footer leaf-buf) - #_#_rel-wtr (vw/->strict-rel-writer allocator) - rel-wtr (vw/->rel-writer allocator)] - (with-open [^VectorSchemaRoot trie-batch (VectorSchemaRoot/create (.getSchema trie-footer) - (.getAllocator (.getReferenceManager trie-buf))) - trie-record-batch (util/->arrow-record-batch-view (first (.getRecordBatches trie-footer)) trie-buf) - ^VectorSchemaRoot leaf-batch (VectorSchemaRoot/create (.getSchema leaf-footer) - (.getAllocator (.getReferenceManager leaf-buf)))] - - (.load (VectorLoader. trie-batch) trie-record-batch) - (.accept (ArrowHashTrie/from trie-batch) - (reify ArrowHashTrie$NodeVisitor - (visitBranch [this branch] - (mapcat #(.accept ^ArrowHashTrie$Node % this) (.getChildren branch))) - - (visitLeaf [_ leaf] - (with-open [leaf-record-batch (util/->arrow-record-batch-view (.get (.getRecordBatches leaf-footer) - (.getPageIndex leaf)) leaf-buf)] - (.load (VectorLoader. leaf-batch) leaf-record-batch) - (vw/strict-append-rel rel-wtr (iv/<-root leaf-batch))))))) - (.close trie-buf) - (.close leaf-buf) - (.syncRowCount rel-wtr) - (vw/rel-wtr->rdr rel-wtr))) - - (defn- ->4r-cursor [^BufferAllocator allocator, ^IBufferPool buffer-pool, ^IMetadataManager metadata-mgr, ^ILiveTableWatermark wm, table-name, col-names, ^longs temporal-range @@ -988,7 +900,7 @@ trie-filename (live-index/->trie-obj-key table-name chunk-idx)]] [trie-filename leaf-filename]) live-relation (-> (.liveRelation wm) - (iv/with-absent-cols allocator col-names)) + #_(iv/with-absent-cols allocator col-names)) ^LiveTrie trie (.liveTrie wm) live-selection (-> (.compactLogs trie) (.accept (reify LiveTrie$NodeVisitor @@ -999,10 +911,8 @@ (visitLeaf [_ trie-leaf] (.data trie-leaf))))) live-selection (int-array live-selection) - #_#__ (prn (seq live-selection)) trie-bucket-curser (->trie-bucket-cursor allocator buffer-pool filenames live-relation live-selection)] - (cond (at-now? scan-opts) ;; needed because of future updates