Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Jul 13, 2023
1 parent 766d5b2 commit 5b1d76b
Showing 1 changed file with 126 additions and 133 deletions.
259 changes: 126 additions & 133 deletions core/src/main/clojure/xtdb/operator/scan.clj
Original file line number Diff line number Diff line change
Expand Up @@ -527,91 +527,96 @@
(iv/->StructReader v)))

(deftype ValidPointTrieCursor [^BufferAllocator allocator,
^IIndirectRelation merged-relation,
^ICursor trie-bucket-cursor,
^longs temporal-range,
^PersistentHashSet col-names
^Map col-preds
params
^:volatile-mutable finished]
params]
ICursor
(tryAdvance [this c]
(if (and (not finished) (pos? (.rowCount merged-relation)))
(let [!selection-vec (IntStream/builder)
iid-col (.vectorForName merged-relation "xt$iid")
op-col (.vectorForName merged-relation "op")
^DenseUnionVector op-vec (.getVector op-col)
^IStructReader put-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 0)))
^IStructReader delete-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 1)))
^IStructReader doc-vec (some-> put-vec (.readerForKey "xt$doc") (.getVector) (iv/->StructReader))
valid-time (aget temporal-range 0)
^TimeStampMicroTZVector put-valid-from-vec (some-> put-vec (.readerForKey "xt$valid_from") (.getVector))
^TimeStampMicroTZVector put-valid-to-vec (some-> put-vec (.readerForKey "xt$valid_to") (.getVector ))
^TimeStampMicroTZVector delete-valid-from-vec (some-> delete-vec (.readerForKey "xt$valid_from") (.getVector))
^TimeStampMicroTZVector delete-valid-to-vec (some-> delete-vec (.readerForKey "xt$valid_to") (.getVector))
cmp (cmp/->comparator iid-col iid-col :nulls-last)
!new (volatile! true)]
(dotimes [idx (.rowCount merged-relation)]
;; new iid
(when (and (pos? idx) (not (zero? (.applyAsInt cmp (dec idx) idx))))
(vreset! !new true))
(when @!new
(case (.getTypeId op-vec (.getIndex op-col idx))
;; a put won
0 (when (and ;; TODO one check might be enough here ?
(<= (.getObject put-valid-from-vec (.getOffset op-vec idx)) valid-time)
(<= valid-time (.getObject put-valid-to-vec (.getOffset op-vec idx))))
(vreset! !new false)
(.add !selection-vec idx))
;; a delete won
1 (when (and ;; TODO one check might be enough here ?
(<= (.getObject delete-valid-from-vec (.getOffset op-vec idx)) valid-time)
(<= valid-time (.getObject delete-valid-to-vec (.getOffset op-vec idx))))
(vreset! !new false))
(do
(throw (ex-info "Should not happen!" {:idx idx
:indirection-idx (.getIndex op-col idx)
:type-id (.getTypeId op-vec (.getIndex op-col idx))}))))))
(let [idxs (.toArray (.build !selection-vec))
^IIndirectRelation rel
(iv/->indirect-rel
(->> (for [col-name col-names
:let [normalized-name (util/str->normal-form-str col-name)]]
(some-> (cond
(= normalized-name "xt$system_from")
(iv/->indirect-vec (.getVector (.vectorForName merged-relation "xt$system_from")) idxs)

;; FIXME - hack for now
(= normalized-name "xt$system_to")
(iv/->indirect-vec (->constant-time-stamp-vec allocator "xt$system_to" (alength idxs))
(int-array (range (alength idxs))))

(temporal/temporal-column? normalized-name)
(iv/->indirect-vec (.getVector (.readerForKey put-vec normalized-name))
(->> (map #(.getOffset op-vec %) idxs)
(int-array)))

:else
(let [col-vec (.readerForKey doc-vec normalized-name)]
(when-not (instance? NullIndirectVector col-vec)
(iv/->indirect-vec (.getVector col-vec)
(tryAdvance [_ c]
(let [!merged-relation (volatile! nil)]
(loop []
(.tryAdvance trie-bucket-cursor
(reify Consumer
(accept [_ in-rel]
(vreset! !merged-relation in-rel))))

(if @!merged-relation
(let [!selection-vec (IntStream/builder)
iid-col (.vectorForName @!merged-relation "xt$iid")
op-col (.vectorForName @!merged-relation "op")
^DenseUnionVector op-vec (.getVector op-col)
^IStructReader put-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 0)))
^IStructReader delete-vec (struct-reader-or-nil (.getVectorByType op-vec (byte 1)))
^IStructReader doc-vec (some-> put-vec (.readerForKey "xt$doc") (.getVector) (iv/->StructReader))
valid-time (aget temporal-range 0)
^TimeStampMicroTZVector put-valid-from-vec (some-> put-vec (.readerForKey "xt$valid_from") (.getVector))
^TimeStampMicroTZVector put-valid-to-vec (some-> put-vec (.readerForKey "xt$valid_to") (.getVector ))
^TimeStampMicroTZVector delete-valid-from-vec (some-> delete-vec (.readerForKey "xt$valid_from") (.getVector))
^TimeStampMicroTZVector delete-valid-to-vec (some-> delete-vec (.readerForKey "xt$valid_to") (.getVector))
cmp (cmp/->comparator iid-col iid-col :nulls-last)
!new (volatile! true)]
(dotimes [idx (.rowCount @!merged-relation)]
;; new iid
(when (and (pos? idx) (not (zero? (.applyAsInt cmp (dec idx) idx))))
(vreset! !new true))
(when @!new
(case (.getTypeId op-vec (.getIndex op-col idx))
;; a put won
0 (when (and ;; TODO one check might be enough here ?
(<= (.getObject put-valid-from-vec (.getOffset op-vec idx)) valid-time)
(<= valid-time (.getObject put-valid-to-vec (.getOffset op-vec idx))))
(vreset! !new false)
(.add !selection-vec idx))
;; a delete won
1 (when (and ;; TODO one check might be enough here ?
(<= (.getObject delete-valid-from-vec (.getOffset op-vec idx)) valid-time)
(<= valid-time (.getObject delete-valid-to-vec (.getOffset op-vec idx))))
(vreset! !new false))
;; TODO evict
(throw (ex-info "Should not happen!" {:idx idx
:indirection-idx (.getIndex op-col idx)
:type-id (.getTypeId op-vec (.getIndex op-col idx))})))))
(let [idxs (.toArray (.build !selection-vec))
^IIndirectRelation rel
(iv/->indirect-rel
(->> (for [col-name col-names
:let [normalized-name (util/str->normal-form-str col-name)]]
(some-> (cond
(= normalized-name "xt$system_from")
(iv/->indirect-vec (.getVector (.vectorForName @!merged-relation "xt$system_from")) idxs)

;; FIXME - hack for now
(= normalized-name "xt$system_to")
(iv/->indirect-vec (->constant-time-stamp-vec allocator "xt$system_to" (alength idxs))
(int-array (range (alength idxs))))

(temporal/temporal-column? normalized-name)
(iv/->indirect-vec (.getVector (.readerForKey put-vec normalized-name))
(->> (map #(.getOffset op-vec %) idxs)
(int-array))))))
(.withName col-name)))
(filter some?))
(alength idxs))
^IIndirectRelation res (reduce (fn [^IIndirectRelation rel, ^IRelationSelector col-pred]
(iv/select rel (.select col-pred allocator rel params)))
rel
(vals col-preds))]
(set! (.finished this) true)
(if (pos? (.rowCount res))
(do (.accept c res)
true)
false)))
false))
(int-array)))

:else
(let [col-vec (.readerForKey doc-vec normalized-name)]
(when-not (instance? NullIndirectVector col-vec)
(iv/->indirect-vec (.getVector col-vec)
(->> (map #(.getOffset op-vec %) idxs)
(int-array))))))
(.withName col-name)))
(filter some?))
(alength idxs))
^IIndirectRelation res (reduce (fn [^IIndirectRelation rel, ^IRelationSelector col-pred]
(iv/select rel (.select col-pred allocator rel params)))
rel
(vals col-preds))]
(if (pos? (.rowCount res))
(do (.accept c res)
true)
(recur))))
false))))

(close [_]
(util/close merged-relation)))
(util/close trie-bucket-cursor)))

;; assumption here is that rels are ordered from new to old
(defn merge-rels [^BufferAllocator allocator rels]
Expand Down Expand Up @@ -672,7 +677,9 @@
^"[Lorg.apache.arrow.vector.ipc.message.ArrowFooter;" leaf-footers,
^"[Lorg.apache.arrow.vector.VectorSchemaRoot;" leaf-vsrs,
^ints page-idxs
^:volatile-mutable finished]
^:volatile-mutable finished
^IIndirectRelation live-relation
^ints live-selection]
ICursor
(tryAdvance [this c]
(if-not finished
Expand All @@ -681,7 +688,12 @@
(aget leaf-footers %1)
(aget page-idxs %1))
(range (alength page-idxs)))
rel (merge-rels allocator rels)]
live-part (iv/->indirect-rel
(for [^IIndirectVector col live-relation]
(iv/->indirect-vec (.getVector col) live-selection)))
rel (merge-rels allocator (cond->> rels
(pos? (alength live-selection))
(cons live-part)))]
(set! (.finished this) true)
(if (pos? (.rowCount rel))
(do (.accept c rel)
Expand All @@ -695,17 +707,19 @@
(run! util/close leaf-bufs)))

;; filenames is a list of [trie-filename leaf-filename]
(defn ->trie-bucket-cursor [^BufferAllocator allocator, ^IBufferPool buffer-pool, filenames]
(defn ->trie-bucket-cursor ^xtdb.ICursor [^BufferAllocator allocator, ^IBufferPool buffer-pool, filenames
^IIndirectRelation live-relation, live-selection]
(let [leaf-indices (map (comp #(calc-leaf-indices buffer-pool %) first) filenames)
;; FIXME needs to be adapted for depth
page-idxs (map first leaf-indices)
page-idxs (int-array (map first leaf-indices))
leaf-buffers (->> (map (comp #(deref (.getBuffer buffer-pool %)) second) filenames)
object-array)
leaf-footers (->> (map util/read-arrow-footer leaf-buffers) object-array)
leaf-vsrs (->> (map #(VectorSchemaRoot/create (.getSchema %1)
(.getAllocator (.getReferenceManager %2))) leaf-footers leaf-buffers)
object-array)]
(TrieBucketCursor. allocator leaf-buffers leaf-footers leaf-vsrs page-idxs false)))
(TrieBucketCursor. allocator leaf-buffers leaf-footers leaf-vsrs page-idxs false
live-relation live-selection)))

;; FIXME there must be some way to do this more cleanly
;; FIXME potentially do selection of col-names before the time resolution
Expand Down Expand Up @@ -738,62 +752,41 @@
(.syncRowCount rel-wtr)
(vw/rel-wtr->rdr rel-wtr)))


(defn ->merged-rel [^BufferAllocator allocator ^IBufferPool buffer-pool
^IMetadataManager metadata-mgr table-name, ^IIndirectRelation live-rel]
(let [filenames (for [chunk-idx (-> (map util/->lex-hex-string (keys (.chunksMetadata metadata-mgr))) sort reverse)
:let [leaf-filename (live-index/->leaf-obj-key table-name chunk-idx)
trie-filename (live-index/->trie-obj-key table-name chunk-idx)]]
[trie-filename leaf-filename])
rels (for [chunk-idx (-> (map util/->lex-hex-string (keys (.chunksMetadata metadata-mgr))) sort reverse)
:let [leaf-filename (live-index/->leaf-obj-key table-name chunk-idx)
trie-filename (live-index/->trie-obj-key table-name chunk-idx)]]
(files->rel allocator buffer-pool trie-filename leaf-filename))
res (merge-rels allocator (cond->> rels live-rel (cons live-rel)))]
(let [trie-cursor (->trie-bucket-cursor allocator buffer-pool filenames)]
(.close trie-cursor))
(run! util/close rels)
res))

(defn- ->4r-cursor [^BufferAllocator allocator, ^IBufferPool buffer-pool,
^IMetadataManager metadata-mgr, ^ILiveTableWatermark wm,
table-name, col-names, ^longs temporal-range
^Map col-preds, params, scan-opts, basis]
(let [!leaves (Stream/builder)
(let [filenames (for [chunk-idx (-> (map util/->lex-hex-string (keys (.chunksMetadata metadata-mgr))) sort reverse)
:let [leaf-filename (live-index/->leaf-obj-key table-name chunk-idx)
trie-filename (live-index/->trie-obj-key table-name chunk-idx)]]
[trie-filename leaf-filename])
live-relation (-> (.liveRelation wm)
(iv/with-absent-cols allocator col-names))
^LiveTrie trie (.liveTrie wm)]
(-> (.compactLogs trie)
(.accept (reify LiveTrie$NodeVisitor
(visitBranch [this branch]
(run! #(.accept ^LiveTrie$Node % this) (.children branch)))

(visitLeaf [_ trie-leaf]
(.add !leaves trie-leaf)))))
(let [data-idxs (->> (iterator-seq (.iterator (.build !leaves)))
(map #(.data ^LiveTrie$Leaf %)))
data (int-array (transduce (map alength) + data-idxs))]
(loop [cur 0 data-idxs data-idxs]
(when-let [^ints data-idx (first data-idxs)]
(System/arraycopy data cur data-idx 0 (alength data-idx))
(recur (+ cur (alength data-idx)) (rest data-idxs))))

(let [live-relation (iv/->indirect-rel
(for [^IIndirectVector col live-relation]
(iv/->indirect-vec (.getVector col) data)))
merged-rel (->merged-rel allocator buffer-pool metadata-mgr table-name
(when (pos? (.rowCount live-relation)) live-relation))]

(cond
(at-now? scan-opts)
;; needed because of future updates
#_(NowTrieCursor. allocator log-relation (.iterator (.build !leaves)) temporal-range col-names col-preds params)
(ValidPointTrieCursor. allocator merged-rel temporal-range col-names col-preds params false)

(at-valid-time-point? scan-opts)
(ValidPointTrieCursor. allocator merged-rel temporal-range col-names col-preds params false)

:else (throw (ex-info "TODO - invalid 4r option" {}))))))))
^LiveTrie trie (.liveTrie wm)
live-indices (-> (.compactLogs trie)
(.accept (reify LiveTrie$NodeVisitor
(visitBranch [this branch]
(into [] (comp (map-indexed (fn [idx ^ArrowHashTrie$Node child]
(map #(cons idx %) (when child (.accept child this)))))
cat)
(.getChildren branch)))

(visitLeaf [_ trie-leaf]
[(.data trie-leaf)]))))
live-selection (first live-indices)
trie-bucket-curser (->trie-bucket-cursor allocator buffer-pool filenames
live-relation live-selection)]

(cond
(at-now? scan-opts)
;; needed because of future updates
#_(NowTrieCursor. allocator log-relation (.iterator (.build !leaves)) temporal-range col-names col-preds params)
(ValidPointTrieCursor. allocator trie-bucket-curser temporal-range col-names col-preds params)

(at-valid-time-point? scan-opts)
(ValidPointTrieCursor. allocator trie-bucket-curser temporal-range col-names col-preds params)

:else (throw (ex-info "TODO - invalid 4r option" {}))))))

(defn tables-with-cols [basis ^IWatermarkSource wm-src ^IScanEmitter scan-emitter]
(let [{:keys [tx, after-tx]} basis
Expand Down

0 comments on commit 5b1d76b

Please sign in to comment.