diff --git a/core/src/main/clojure/xtdb/operator/scan.clj b/core/src/main/clojure/xtdb/operator/scan.clj index 168be84061..8237b9f8ff 100644 --- a/core/src/main/clojure/xtdb/operator/scan.clj +++ b/core/src/main/clojure/xtdb/operator/scan.clj @@ -537,84 +537,83 @@ (tryAdvance [_ c] (let [!merged-relation (volatile! nil)] (loop [] - (.tryAdvance trie-bucket-cursor - (reify Consumer - (accept [_ in-rel] - (vreset! !merged-relation in-rel)))) - - (if @!merged-relation - (let [!selection-vec (IntStream/builder) - iid-col (.vectorForName @!merged-relation "xt$iid") - op-col (.vectorForName @!merged-relation "op") - ^DenseUnionVector op-vec (.getVector op-col) - ^IStructReader put-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 0))) - ^IStructReader delete-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 1))) - ^IStructReader doc-vec (some-> put-vec (.readerForKey "xt$doc") (.getVector) (iv/->StructReader)) - valid-time (aget temporal-range 0) - ^TimeStampMicroTZVector put-valid-from-vec (some-> put-vec (.readerForKey "xt$valid_from") (.getVector)) - ^TimeStampMicroTZVector put-valid-to-vec (some-> put-vec (.readerForKey "xt$valid_to") (.getVector )) - ^TimeStampMicroTZVector delete-valid-from-vec (some-> delete-vec (.readerForKey "xt$valid_from") (.getVector)) - ^TimeStampMicroTZVector delete-valid-to-vec (some-> delete-vec (.readerForKey "xt$valid_to") (.getVector)) - cmp (cmp/->comparator iid-col iid-col :nulls-last) - !new (volatile! true)] - (dotimes [idx (.rowCount @!merged-relation)] - ;; new iid - (when (and (pos? idx) (not (zero? (.applyAsInt cmp (dec idx) idx)))) - (vreset! !new true)) - (when @!new - (case (.getTypeId op-vec (.getIndex op-col idx)) - ;; a put won - 0 (when (and ;; TODO one check might be enough here ? - (<= (.getObject put-valid-from-vec (.getOffset op-vec idx)) valid-time) - (<= valid-time (.getObject put-valid-to-vec (.getOffset op-vec idx)))) - (vreset! !new false) - (.add !selection-vec idx)) - ;; a delete won - 1 (when (and ;; TODO one check might be enough here ? - (<= (.getObject delete-valid-from-vec (.getOffset op-vec idx)) valid-time) - (<= valid-time (.getObject delete-valid-to-vec (.getOffset op-vec idx)))) - (vreset! !new false)) - ;; TODO evict - (throw (ex-info "Should not happen!" {:idx idx - :indirection-idx (.getIndex op-col idx) - :type-id (.getTypeId op-vec (.getIndex op-col idx))}))))) - (let [idxs (.toArray (.build !selection-vec)) - ^IIndirectRelation rel - (iv/->indirect-rel - (->> (for [col-name col-names - :let [normalized-name (util/str->normal-form-str col-name)]] - (some-> (cond - (= normalized-name "xt$system_from") - (iv/->indirect-vec (.getVector (.vectorForName @!merged-relation "xt$system_from")) idxs) - - ;; FIXME - hack for now - (= normalized-name "xt$system_to") - (iv/->indirect-vec (->constant-time-stamp-vec allocator "xt$system_to" (alength idxs)) - (int-array (range (alength idxs)))) - - (temporal/temporal-column? normalized-name) - (iv/->indirect-vec (.getVector (.readerForKey put-vec normalized-name)) - (->> (map #(.getOffset op-vec %) idxs) - (int-array))) - - :else - (let [col-vec (.readerForKey doc-vec normalized-name)] - (when-not (instance? NullIndirectVector col-vec) - (iv/->indirect-vec (.getVector col-vec) - (->> (map #(.getOffset op-vec %) idxs) - (int-array)))))) - (.withName col-name))) - (filter some?)) - (alength idxs)) - ^IIndirectRelation res (reduce (fn [^IIndirectRelation rel, ^IRelationSelector col-pred] - (iv/select rel (.select col-pred allocator rel params))) - rel - (vals col-preds))] - (if (pos? (.rowCount res)) - (do (.accept c res) - true) - (recur)))) - false)))) + (let [has-next (.tryAdvance trie-bucket-cursor + (reify Consumer + (accept [_ in-rel] + (vreset! !merged-relation in-rel))))] + (if has-next + (let [!selection-vec (IntStream/builder) + iid-col (.vectorForName @!merged-relation "xt$iid") + op-col (.vectorForName @!merged-relation "op") + ^DenseUnionVector op-vec (.getVector op-col) + ^IStructReader put-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 0))) + ^IStructReader delete-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 1))) + ^IStructReader doc-vec (some-> put-vec (.readerForKey "xt$doc") (.getVector) (iv/->StructReader)) + valid-time (aget temporal-range 0) + ^TimeStampMicroTZVector put-valid-from-vec (some-> put-vec (.readerForKey "xt$valid_from") (.getVector)) + ^TimeStampMicroTZVector put-valid-to-vec (some-> put-vec (.readerForKey "xt$valid_to") (.getVector )) + ^TimeStampMicroTZVector delete-valid-from-vec (some-> delete-vec (.readerForKey "xt$valid_from") (.getVector)) + ^TimeStampMicroTZVector delete-valid-to-vec (some-> delete-vec (.readerForKey "xt$valid_to") (.getVector)) + cmp (cmp/->comparator iid-col iid-col :nulls-last) + !new (volatile! true)] + (dotimes [idx (.rowCount @!merged-relation)] + ;; new iid + (when (and (pos? idx) (not (zero? (.applyAsInt cmp (dec idx) idx)))) + (vreset! !new true)) + (when @!new + (case (.getTypeId op-vec (.getIndex op-col idx)) + ;; a put won + 0 (when (and ;; TODO one check might be enough here ? + (<= (.getObject put-valid-from-vec (.getOffset op-vec idx)) valid-time) + (<= valid-time (.getObject put-valid-to-vec (.getOffset op-vec idx)))) + (vreset! !new false) + (.add !selection-vec idx)) + ;; a delete won + 1 (when (and ;; TODO one check might be enough here ? + (<= (.getObject delete-valid-from-vec (.getOffset op-vec idx)) valid-time) + (<= valid-time (.getObject delete-valid-to-vec (.getOffset op-vec idx)))) + (vreset! !new false)) + ;; TODO evict + (throw (ex-info "Should not happen!" {:idx idx + :indirection-idx (.getIndex op-col idx) + :type-id (.getTypeId op-vec (.getIndex op-col idx))}))))) + (let [idxs (.toArray (.build !selection-vec)) + ^IIndirectRelation rel + (iv/->indirect-rel + (->> (for [col-name col-names + :let [normalized-name (util/str->normal-form-str col-name)]] + (some-> (cond + (= normalized-name "xt$system_from") + (iv/->indirect-vec (.getVector (.vectorForName @!merged-relation "xt$system_from")) idxs) + + ;; FIXME - hack for now + (= normalized-name "xt$system_to") + (iv/->indirect-vec (->constant-time-stamp-vec allocator "xt$system_to" (alength idxs)) + (int-array (range (alength idxs)))) + + (temporal/temporal-column? normalized-name) + (iv/->indirect-vec (.getVector (.readerForKey put-vec normalized-name)) + (->> (map #(.getOffset op-vec %) idxs) + (int-array))) + + :else + (let [col-vec (.readerForKey doc-vec normalized-name)] + (when-not (instance? NullIndirectVector col-vec) + (iv/->indirect-vec (.getVector col-vec) + (->> (map #(.getOffset op-vec %) idxs) + (int-array)))))) + (.withName col-name))) + (filter some?)) + (alength idxs)) + ^IIndirectRelation res (reduce (fn [^IIndirectRelation rel, ^IRelationSelector col-pred] + (iv/select rel (.select col-pred allocator rel params))) + rel + (vals col-preds))] + (if (pos? (.rowCount res)) + (do (.accept c res) + true) + (recur)))) + false))))) (close [_] (util/close trie-bucket-cursor))) @@ -708,12 +707,34 @@ (.iterator)))))) (defn path->bytes ^bytes [path] - (assert (every? #(<= 0 % 15) path)) + (assert (every? #(<= 0 % 0xf) path)) (->> (partition-all 2 path) (map (fn [[high low]] (bit-or (bit-shift-left high 4) (or low 0)))) byte-array)) + (defn bytes->path [bytes] + (mapcat #(list (mod (bit-shift-right % 4) 16) + (bit-and % (dec (bit-shift-left 1 4)))) + bytes)) + + (comment + (-> (bit-shift-left 8 4) + (bit-shift-right 4)) + + (bit-shift-right -128 4) + + (util/uuid->bytes (java.util.UUID. (Long/reverseBytes 0x80) 0)) + + (path->bytes '(8 0 0)) + + + (bytes->path (path->bytes '(8 0 0))) + + (bytes->path (path->bytes '(9 0 0))) + + ) + (defn sub-path? "returns true if p1 is a subpath of p2." [p1 p2] @@ -764,49 +785,9 @@ :page-idx 0 :position 0} - (comment - (util/uuid->bytes (java.util.UUID. (Long/reverse 1) 0)) - (util/uuid->bytes (java.util.UUID. (Long/reverse 1) 0)) - - (util/uuid->bytes (java.util.UUID. (Long/reverseBytes 0xf0) 0)) - - (path->bytes '(15 15 0)) - - (path->bytes '(8 0 0)) - - (bit-shift-left 8 4) - - (util/compare-nio-buffers-unsigned - (ByteBuffer/wrap (path->bytes '(15 0 0))) - (ByteBuffer/wrap (util/uuid->bytes (java.util.UUID. (Long/reverseBytes 0xf1) 0))) - ) - - (byte-prefix? - (path->bytes '(15 0 0)) - (util/uuid->bytes (java.util.UUID. (Long/reverseBytes 0xf0) 0)) - ) - - (count '(8 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0)) - - (util/bytes->uuid (path->bytes '(8 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0))) - - - (util/uuid->bytes (java.util.UUID. (Long/reverseBytes 0x80) 0)) - - (path->bytes '(8 0 0)) - - ) - - (defn bytes->path [bytes] - (mapcat #(list (bit-shift-right % 4) - (bit-and % (dec (bit-shift-left 1 4)))) - bytes)) - - (bytes->path (path->bytes '(0 8 0))) - ;; TODO make more efficient (defn uuid-byte-prefix? [path bytes] - (= path (take (bytes->path bytes)))) + (= path (take (count path) (bytes->path bytes)))) ;; TODO use more info about the (defn merge-page-rels [^BufferAllocator allocator page-rels page-identifiers] @@ -841,8 +822,6 @@ (when (not (.isEmpty prio)) (let [[rel-idx idx] (.poll prio) ^bytes uuid-bytes (.getObject (.getVector (aget iid-vecs rel-idx)) idx)] - (prn (util/bytes->uuid uuid-bytes)) - (prn [path (bytes->path uuid-bytes)]) (when (uuid-byte-prefix? path uuid-bytes) (.copyRow ^IRowCopier (aget copiers rel-idx) idx) (when (< idx (dec (.getValueCount ^IIndirectVector (aget iid-vecs rel-idx)))) @@ -855,46 +834,6 @@ (aset new-page-positions rel-idx idx))) [(vw/rel-wtr->rdr rel-wtr) new-page-positions]))) - ;; load that page - #_(let [trie+page-idx->irel (fn [page-identifier] ) - pq (PriorityQueue. (comparator (fn [{path1 :path} {path2 :path}] (compare-paths path1 path2)))) - trie-leaf-paths (map ->iterator '([(0 (0)) (1 0 0 (1))] [(0 0 0 (0)) (1 (1))]))] - (doseq [[idx trie-leaf-path] (->> (map #(.next %) trie-leaf-paths) - (map-indexed vector))] - (.add pq {:path (butlast trie-leaf-path) - :trie-idx idx - :page-idx (first (last trie-leaf-path)) - :position 0})) - - ;; gets the page-identifiers for current merge - (let [smallest-page-identifier (.poll pq) - page-identifiers (ArrayList.)] - (while (and (not (.isEmpty pq)) - (sub-path? (:path smallest-page-identifier) (:path (.peek pq)))) - (.add page-identifiers (.poll pq))) - - ;; setting up merging + merging - (let [trie-idxs (map :trie-idx page-identifiers) - irels (map trie+page-idx->irel page-identifiers) - page-positions (map :position page-identifiers) - ;; row-ids - if it's -1 the rel was finished - [res-irel new-page-positions] (merge-rels irels page-positions)] - - ;; do we go to the next page? - (for [[idx new-position] (map vector trie-idxs new-page-positions)] - (if (pos? new-position) - (.add pq (assoc (.get page-identifiers idx) :position new-position)) - - (let [trie-idx (.get trie-idxs idx) - trie-leaves (nth trie-leaf-paths trie-idx)] - (when (.hasNext trie-leaves) - (let [trie-path (.next trie-leaf-paths)] - (.add pq {:path (butlast trie-path) - :trie-idx trie-idx - :page-idx (first (last trie-path)) - :position 0})))))) - res-irel))) - (defn trie-idx+page-idx->irel [trie-idx->page-idx+page-irel {:keys [trie-idx page-idx]} leaf-buf leaf-footer leaf-vsr] diff --git a/src/test/clojure/xtdb/operator/scan_test.clj b/src/test/clojure/xtdb/operator/scan_test.clj index 6c6b33a7dc..a9d053a8a2 100644 --- a/src/test/clojure/xtdb/operator/scan_test.clj +++ b/src/test/clojure/xtdb/operator/scan_test.clj @@ -73,11 +73,7 @@ (->> (tu/query-ra '[:scan {:table xt_docs} [xt/id]] {:node node}) (map :xt/id) - set))) - - (t/is (= 15 - (count (tu/query-ra '[:scan {:table xt_docs} [xt/id]] - {:node node}))))))) + set)))))) (t/deftest test-trie-skew