Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Jul 18, 2023
1 parent 784c765 commit ec5a7d8
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 167 deletions.
263 changes: 101 additions & 162 deletions core/src/main/clojure/xtdb/operator/scan.clj
Original file line number Diff line number Diff line change
Expand Up @@ -537,84 +537,83 @@
(tryAdvance [_ c]
(let [!merged-relation (volatile! nil)]
(loop []
(.tryAdvance trie-bucket-cursor
(reify Consumer
(accept [_ in-rel]
(vreset! !merged-relation in-rel))))

(if @!merged-relation
(let [!selection-vec (IntStream/builder)
iid-col (.vectorForName @!merged-relation "xt$iid")
op-col (.vectorForName @!merged-relation "op")
^DenseUnionVector op-vec (.getVector op-col)
^IStructReader put-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 0)))
^IStructReader delete-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 1)))
^IStructReader doc-vec (some-> put-vec (.readerForKey "xt$doc") (.getVector) (iv/->StructReader))
valid-time (aget temporal-range 0)
^TimeStampMicroTZVector put-valid-from-vec (some-> put-vec (.readerForKey "xt$valid_from") (.getVector))
^TimeStampMicroTZVector put-valid-to-vec (some-> put-vec (.readerForKey "xt$valid_to") (.getVector ))
^TimeStampMicroTZVector delete-valid-from-vec (some-> delete-vec (.readerForKey "xt$valid_from") (.getVector))
^TimeStampMicroTZVector delete-valid-to-vec (some-> delete-vec (.readerForKey "xt$valid_to") (.getVector))
cmp (cmp/->comparator iid-col iid-col :nulls-last)
!new (volatile! true)]
(dotimes [idx (.rowCount @!merged-relation)]
;; new iid
(when (and (pos? idx) (not (zero? (.applyAsInt cmp (dec idx) idx))))
(vreset! !new true))
(when @!new
(case (.getTypeId op-vec (.getIndex op-col idx))
;; a put won
0 (when (and ;; TODO one check might be enough here ?
(<= (.getObject put-valid-from-vec (.getOffset op-vec idx)) valid-time)
(<= valid-time (.getObject put-valid-to-vec (.getOffset op-vec idx))))
(vreset! !new false)
(.add !selection-vec idx))
;; a delete won
1 (when (and ;; TODO one check might be enough here ?
(<= (.getObject delete-valid-from-vec (.getOffset op-vec idx)) valid-time)
(<= valid-time (.getObject delete-valid-to-vec (.getOffset op-vec idx))))
(vreset! !new false))
;; TODO evict
(throw (ex-info "Should not happen!" {:idx idx
:indirection-idx (.getIndex op-col idx)
:type-id (.getTypeId op-vec (.getIndex op-col idx))})))))
(let [idxs (.toArray (.build !selection-vec))
^IIndirectRelation rel
(iv/->indirect-rel
(->> (for [col-name col-names
:let [normalized-name (util/str->normal-form-str col-name)]]
(some-> (cond
(= normalized-name "xt$system_from")
(iv/->indirect-vec (.getVector (.vectorForName @!merged-relation "xt$system_from")) idxs)

;; FIXME - hack for now
(= normalized-name "xt$system_to")
(iv/->indirect-vec (->constant-time-stamp-vec allocator "xt$system_to" (alength idxs))
(int-array (range (alength idxs))))

(temporal/temporal-column? normalized-name)
(iv/->indirect-vec (.getVector (.readerForKey put-vec normalized-name))
(->> (map #(.getOffset op-vec %) idxs)
(int-array)))

:else
(let [col-vec (.readerForKey doc-vec normalized-name)]
(when-not (instance? NullIndirectVector col-vec)
(iv/->indirect-vec (.getVector col-vec)
(->> (map #(.getOffset op-vec %) idxs)
(int-array))))))
(.withName col-name)))
(filter some?))
(alength idxs))
^IIndirectRelation res (reduce (fn [^IIndirectRelation rel, ^IRelationSelector col-pred]
(iv/select rel (.select col-pred allocator rel params)))
rel
(vals col-preds))]
(if (pos? (.rowCount res))
(do (.accept c res)
true)
(recur))))
false))))
(let [has-next (.tryAdvance trie-bucket-cursor
(reify Consumer
(accept [_ in-rel]
(vreset! !merged-relation in-rel))))]
(if has-next
(let [!selection-vec (IntStream/builder)
iid-col (.vectorForName @!merged-relation "xt$iid")
op-col (.vectorForName @!merged-relation "op")
^DenseUnionVector op-vec (.getVector op-col)
^IStructReader put-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 0)))
^IStructReader delete-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 1)))
^IStructReader doc-vec (some-> put-vec (.readerForKey "xt$doc") (.getVector) (iv/->StructReader))
valid-time (aget temporal-range 0)
^TimeStampMicroTZVector put-valid-from-vec (some-> put-vec (.readerForKey "xt$valid_from") (.getVector))
^TimeStampMicroTZVector put-valid-to-vec (some-> put-vec (.readerForKey "xt$valid_to") (.getVector ))
^TimeStampMicroTZVector delete-valid-from-vec (some-> delete-vec (.readerForKey "xt$valid_from") (.getVector))
^TimeStampMicroTZVector delete-valid-to-vec (some-> delete-vec (.readerForKey "xt$valid_to") (.getVector))
cmp (cmp/->comparator iid-col iid-col :nulls-last)
!new (volatile! true)]
(dotimes [idx (.rowCount @!merged-relation)]
;; new iid
(when (and (pos? idx) (not (zero? (.applyAsInt cmp (dec idx) idx))))
(vreset! !new true))
(when @!new
(case (.getTypeId op-vec (.getIndex op-col idx))
;; a put won
0 (when (and ;; TODO one check might be enough here ?
(<= (.getObject put-valid-from-vec (.getOffset op-vec idx)) valid-time)
(<= valid-time (.getObject put-valid-to-vec (.getOffset op-vec idx))))
(vreset! !new false)
(.add !selection-vec idx))
;; a delete won
1 (when (and ;; TODO one check might be enough here ?
(<= (.getObject delete-valid-from-vec (.getOffset op-vec idx)) valid-time)
(<= valid-time (.getObject delete-valid-to-vec (.getOffset op-vec idx))))
(vreset! !new false))
;; TODO evict
(throw (ex-info "Should not happen!" {:idx idx
:indirection-idx (.getIndex op-col idx)
:type-id (.getTypeId op-vec (.getIndex op-col idx))})))))
(let [idxs (.toArray (.build !selection-vec))
^IIndirectRelation rel
(iv/->indirect-rel
(->> (for [col-name col-names
:let [normalized-name (util/str->normal-form-str col-name)]]
(some-> (cond
(= normalized-name "xt$system_from")
(iv/->indirect-vec (.getVector (.vectorForName @!merged-relation "xt$system_from")) idxs)

;; FIXME - hack for now
(= normalized-name "xt$system_to")
(iv/->indirect-vec (->constant-time-stamp-vec allocator "xt$system_to" (alength idxs))
(int-array (range (alength idxs))))

(temporal/temporal-column? normalized-name)
(iv/->indirect-vec (.getVector (.readerForKey put-vec normalized-name))
(->> (map #(.getOffset op-vec %) idxs)
(int-array)))

:else
(let [col-vec (.readerForKey doc-vec normalized-name)]
(when-not (instance? NullIndirectVector col-vec)
(iv/->indirect-vec (.getVector col-vec)
(->> (map #(.getOffset op-vec %) idxs)
(int-array))))))
(.withName col-name)))
(filter some?))
(alength idxs))
^IIndirectRelation res (reduce (fn [^IIndirectRelation rel, ^IRelationSelector col-pred]
(iv/select rel (.select col-pred allocator rel params)))
rel
(vals col-preds))]
(if (pos? (.rowCount res))
(do (.accept c res)
true)
(recur))))
false)))))

(close [_]
(util/close trie-bucket-cursor)))
Expand Down Expand Up @@ -708,12 +707,34 @@
(.iterator))))))

(defn path->bytes ^bytes [path]
(assert (every? #(<= 0 % 15) path))
(assert (every? #(<= 0 % 0xf) path))
(->> (partition-all 2 path)
(map (fn [[high low]]
(bit-or (bit-shift-left high 4) (or low 0))))
byte-array))

(defn bytes->path [bytes]
(mapcat #(list (mod (bit-shift-right % 4) 16)
(bit-and % (dec (bit-shift-left 1 4))))
bytes))

(comment
(-> (bit-shift-left 8 4)
(bit-shift-right 4))

(bit-shift-right -128 4)

(util/uuid->bytes (java.util.UUID. (Long/reverseBytes 0x80) 0))

(path->bytes '(8 0 0))


(bytes->path (path->bytes '(8 0 0)))

(bytes->path (path->bytes '(9 0 0)))

)

(defn sub-path?
"returns true if p1 is a subpath of p2."
[p1 p2]
Expand Down Expand Up @@ -764,49 +785,9 @@
:page-idx 0
:position 0}

(comment
(util/uuid->bytes (java.util.UUID. (Long/reverse 1) 0))
(util/uuid->bytes (java.util.UUID. (Long/reverse 1) 0))

(util/uuid->bytes (java.util.UUID. (Long/reverseBytes 0xf0) 0))

(path->bytes '(15 15 0))

(path->bytes '(8 0 0))

(bit-shift-left 8 4)

(util/compare-nio-buffers-unsigned
(ByteBuffer/wrap (path->bytes '(15 0 0)))
(ByteBuffer/wrap (util/uuid->bytes (java.util.UUID. (Long/reverseBytes 0xf1) 0)))
)

(byte-prefix?
(path->bytes '(15 0 0))
(util/uuid->bytes (java.util.UUID. (Long/reverseBytes 0xf0) 0))
)

(count '(8 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0))

(util/bytes->uuid (path->bytes '(8 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0)))


(util/uuid->bytes (java.util.UUID. (Long/reverseBytes 0x80) 0))

(path->bytes '(8 0 0))

)

(defn bytes->path [bytes]
(mapcat #(list (bit-shift-right % 4)
(bit-and % (dec (bit-shift-left 1 4))))
bytes))

(bytes->path (path->bytes '(0 8 0)))

;; TODO make more efficient
(defn uuid-byte-prefix? [path bytes]
(= path (take (bytes->path bytes))))
(= path (take (count path) (bytes->path bytes))))

;; TODO use more info about the
(defn merge-page-rels [^BufferAllocator allocator page-rels page-identifiers]
Expand Down Expand Up @@ -841,8 +822,6 @@
(when (not (.isEmpty prio))
(let [[rel-idx idx] (.poll prio)
^bytes uuid-bytes (.getObject (.getVector (aget iid-vecs rel-idx)) idx)]
(prn (util/bytes->uuid uuid-bytes))
(prn [path (bytes->path uuid-bytes)])
(when (uuid-byte-prefix? path uuid-bytes)
(.copyRow ^IRowCopier (aget copiers rel-idx) idx)
(when (< idx (dec (.getValueCount ^IIndirectVector (aget iid-vecs rel-idx))))
Expand All @@ -855,46 +834,6 @@
(aset new-page-positions rel-idx idx)))
[(vw/rel-wtr->rdr rel-wtr) new-page-positions])))

;; load that page
#_(let [trie+page-idx->irel (fn [page-identifier] )
pq (PriorityQueue. (comparator (fn [{path1 :path} {path2 :path}] (compare-paths path1 path2))))
trie-leaf-paths (map ->iterator '([(0 (0)) (1 0 0 (1))] [(0 0 0 (0)) (1 (1))]))]
(doseq [[idx trie-leaf-path] (->> (map #(.next %) trie-leaf-paths)
(map-indexed vector))]
(.add pq {:path (butlast trie-leaf-path)
:trie-idx idx
:page-idx (first (last trie-leaf-path))
:position 0}))

;; gets the page-identifiers for current merge
(let [smallest-page-identifier (.poll pq)
page-identifiers (ArrayList.)]
(while (and (not (.isEmpty pq))
(sub-path? (:path smallest-page-identifier) (:path (.peek pq))))
(.add page-identifiers (.poll pq)))

;; setting up merging + merging
(let [trie-idxs (map :trie-idx page-identifiers)
irels (map trie+page-idx->irel page-identifiers)
page-positions (map :position page-identifiers)
;; row-ids - if it's -1 the rel was finished
[res-irel new-page-positions] (merge-rels irels page-positions)]

;; do we go to the next page?
(for [[idx new-position] (map vector trie-idxs new-page-positions)]
(if (pos? new-position)
(.add pq (assoc (.get page-identifiers idx) :position new-position))

(let [trie-idx (.get trie-idxs idx)
trie-leaves (nth trie-leaf-paths trie-idx)]
(when (.hasNext trie-leaves)
(let [trie-path (.next trie-leaf-paths)]
(.add pq {:path (butlast trie-path)
:trie-idx trie-idx
:page-idx (first (last trie-path))
:position 0}))))))
res-irel)))


(defn trie-idx+page-idx->irel [trie-idx->page-idx+page-irel {:keys [trie-idx page-idx]}
leaf-buf leaf-footer leaf-vsr]
Expand Down
6 changes: 1 addition & 5 deletions src/test/clojure/xtdb/operator/scan_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,7 @@
(->> (tu/query-ra '[:scan {:table xt_docs} [xt/id]]
{:node node})
(map :xt/id)
set)))

(t/is (= 15
(count (tu/query-ra '[:scan {:table xt_docs} [xt/id]]
{:node node})))))))
set))))))


(t/deftest test-trie-skew
Expand Down

0 comments on commit ec5a7d8

Please sign in to comment.