Skip to content

Commit

Permalink
iid fast path for scan
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Aug 15, 2023
1 parent d821cda commit e04b5f5
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 48 deletions.
135 changes: 92 additions & 43 deletions core/src/main/clojure/xtdb/operator/scan.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
xtdb.watermark)
(:import (clojure.lang IPersistentMap MapEntry)
[java.lang AutoCloseable]
(java.util ArrayList Arrays Iterator LinkedList List ListIterator Map)
java.nio.ByteBuffer
(java.util ArrayList Iterator LinkedList List ListIterator Map)
(java.util.function IntConsumer)
org.apache.arrow.memory.ArrowBuf
org.apache.arrow.memory.BufferAllocator
(java.util.stream IntStream)
(org.apache.arrow.memory ArrowBuf BufferAllocator)
[org.apache.arrow.memory.util ArrowBufPointer]
(org.apache.arrow.vector VectorLoader VectorSchemaRoot)
xtdb.api.protocols.TransactionInstant
Expand All @@ -30,7 +31,7 @@
(xtdb.metadata IMetadataManager)
xtdb.object_store.ObjectStore
xtdb.operator.IRelationSelector
(xtdb.trie ArrowHashTrie ArrowHashTrie$Leaf HashTrie LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie$Leaf)
(xtdb.trie ArrowHashTrie ArrowHashTrie$Leaf LeafMergeQueue LeafMergeQueue$LeafPointer LiveHashTrie$Leaf)
(xtdb.vector IRelationWriter IRowCopier IVectorReader IVectorWriter RelationReader)
(xtdb.watermark IWatermark IWatermarkSource Watermark)))

Expand Down Expand Up @@ -398,6 +399,31 @@
system-from
nil))))))))))

(defn iid-selector [^ByteBuffer iid-bb]
(reify IRelationSelector
(select [_ allocator rel-rdr _params]
(with-open [arrow-buf (util/->arrow-buf-view allocator iid-bb)]
(let [iid-ptr (ArrowBufPointer. arrow-buf 0 (.capacity iid-bb))
ptr (ArrowBufPointer.)
iid-rdr (.readerForName rel-rdr "xt$iid")
value-count (.valueCount iid-rdr)]
(if (pos-int? value-count)
;; lower-bound
(loop [left 0 right (dec value-count)]
(if (= left right)
(if (= iid-ptr (.getPointer iid-rdr left ptr))
;; upper bound
(loop [right left]
(if (or (>= right value-count) (not= iid-ptr (.getPointer iid-rdr right ptr)))
(.toArray (IntStream/range left right))
(recur (inc right))))
(int-array 0))
(let [mid (quot (+ left right) 2)]
(if (<= (.compareTo iid-ptr (.getPointer iid-rdr mid ptr)) 0)
(recur left mid)
(recur (inc mid) right)))))
(int-array 0)))))))

(deftype TrieCursor [^BufferAllocator allocator, arrow-leaves, ^Iterator merge-tasks
col-names, ^Map col-preds, ^longs temporal-timestamps,
params, ^IPersistentMap picker-state]
Expand All @@ -407,7 +433,10 @@
(let [{task-leaves :leaves, :keys [path]} (.next merge-tasks)]
(with-open [out-rel (vw/->rel-writer allocator)]
(let [loaded-leaves (mapv (fn [{:keys [load-leaf]}]
(load-leaf))
(let [{:keys [^RelationReader rel-rdr] :as res} (load-leaf)]
(if-let [^IRelationSelector iid-pred (get col-preds "xt$iid")]
(assoc res :rel-rdr (.select rel-rdr (.select iid-pred allocator rel-rdr params)))
res)))
task-leaves)
merge-q (LeafMergeQueue. path
(into-array IVectorReader
Expand All @@ -433,15 +462,16 @@
(as-> rel (reduce (fn [^RelationReader rel, ^IRelationSelector col-pred]
(.select rel (.select col-pred allocator rel params)))
rel
(vals col-preds)))))))
(vals (dissoc col-preds "xt$iid"))))))))
true)

false))

(close [_]
(util/close arrow-leaves)))

(defn- read-tries [^ObjectStore obj-store, ^IBufferPool buffer-pool, ^String table-name, ^ILiveTableWatermark live-table-wm]
(defn- read-tries [^ObjectStore obj-store, ^IBufferPool buffer-pool, ^String table-name, ^ILiveTableWatermark live-table-wm
{:keys [^ByteBuffer iid-bb] :as _opts}]
(let [{trie-files :trie, leaf-files :leaf} (->> (.listObjects obj-store (format "tables/%s/chunks" table-name))
(keep (fn [file-name]
(when-let [[_ file-type chunk-idx-str] (re-find #"/(leaf|trie)-c(.+?)\.arrow$" file-name)]
Expand Down Expand Up @@ -477,41 +507,45 @@
(with-open [record-batch (util/->arrow-record-batch-view (first arrow-blocks) buf)]
(.load loader record-batch)
(.add trie-roots root)))))

{:arrow-leaves (mapv (fn [{:keys [leaf-buf leaf-root]}]
(reify AutoCloseable
(close [_]
(util/close leaf-root)
(util/close leaf-buf))))
arrow-leaves)

:merge-tasks (vec (for [{:keys [path leaves]} (trie/trie-merge-tasks (cond-> (mapv #(ArrowHashTrie/from %) trie-roots)
live-table-wm (conj (.liveTrie live-table-wm))))]
{:path path
:leaves (mapv (fn [{:keys [ordinal leaf]}]
(condp = (class leaf)
ArrowHashTrie$Leaf
(let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf leaf)
{:keys [leaf-buf ^VectorLoader loader, ^VectorSchemaRoot leaf-root arrow-blocks,
!current-page-idx
^LeafMergeQueue$LeafPointer leaf-ptr]} (nth arrow-leaves ordinal)]
(let [hash-tries (cond-> (mapv #(ArrowHashTrie/from %) trie-roots)
live-table-wm (conj (.liveTrie live-table-wm)))]

{:arrow-leaves (mapv (fn [{:keys [leaf-buf leaf-root]}]
(reify AutoCloseable
(close [_]
(util/close leaf-root)
(util/close leaf-buf))))
arrow-leaves)

:merge-tasks (vec (for [{:keys [path leaves]}
(if iid-bb
(vector (trie/iid-trie-merge-task hash-tries (.array iid-bb)))
(trie/trie-merge-tasks hash-tries))]
{:path path
:leaves (mapv (fn [{:keys [ordinal leaf]}]
(condp = (class leaf)
ArrowHashTrie$Leaf
(let [page-idx (.getPageIndex ^ArrowHashTrie$Leaf leaf)
{:keys [leaf-buf ^VectorLoader loader, ^VectorSchemaRoot leaf-root arrow-blocks,
!current-page-idx
^LeafMergeQueue$LeafPointer leaf-ptr]} (nth arrow-leaves ordinal)]
{:load-leaf (fn []
(when-not (= page-idx @!current-page-idx)
(reset! !current-page-idx page-idx)

(with-open [rb (util/->arrow-record-batch-view (nth arrow-blocks page-idx) leaf-buf)]
(.load loader rb)
(.reset leaf-ptr)))

{:rel-rdr (vr/<-root leaf-root)
:leaf-ptr leaf-ptr})})

LiveHashTrie$Leaf
{:load-leaf (fn []
(when-not (= page-idx @!current-page-idx)
(reset! !current-page-idx page-idx)

(with-open [rb (util/->arrow-record-batch-view (nth arrow-blocks page-idx) leaf-buf)]
(.load loader rb)
(.reset leaf-ptr)))

{:rel-rdr (vr/<-root leaf-root)
:leaf-ptr leaf-ptr})})

LiveHashTrie$Leaf
{:load-leaf (fn []
{:rel-rdr (.select (.liveRelation live-table-wm) (.data ^LiveHashTrie$Leaf leaf))
:leaf-ptr (LeafMergeQueue$LeafPointer. (count arrow-leaves))})}))
{:rel-rdr (.select (.liveRelation live-table-wm) (.data ^LiveHashTrie$Leaf leaf))
:leaf-ptr (LeafMergeQueue$LeafPointer. (count arrow-leaves))})}))

leaves)}))})))))
leaves)}))}))))))

;; The consumers for different leafs need to share some state so the logic of how to advance
;; is correct. For example if the `skip-iid-ptr` gets set in one leaf consumer it should also affect
Expand All @@ -521,7 +555,9 @@
table-name, col-names, ^longs temporal-range
^Map col-preds, params, scan-opts]
(let [^ILiveTableWatermark live-table-wm (some-> (.liveIndex wm) (.liveTable table-name))
{:keys [arrow-leaves ^List merge-tasks]} (read-tries obj-store buffer-pool table-name live-table-wm)]

{:keys [arrow-leaves ^List merge-tasks]}
(read-tries obj-store buffer-pool table-name live-table-wm (select-keys scan-opts [:iid-bb]))]
(try
(->TrieCursor allocator arrow-leaves (.iterator merge-tasks)
col-names col-preds
Expand All @@ -538,6 +574,12 @@
(util/close arrow-leaves)
(throw t)))))

(defn selects->iid-byte-buffer ^ByteBuffer [selects]
(when-let [eid-select (or (get selects 'xt/id) (get selects 'xt$id))]
(let [eid (nth eid-select 2)]
(when (and (= '= (first eid-select)) (s/valid? ::lp/value eid))
(trie/->iid eid)))))

(defmethod ig/prep-key ::scan-emitter [_ opts]
(merge opts
{:metadata-mgr (ig/ref ::meta/metadata-manager)
Expand Down Expand Up @@ -576,7 +618,7 @@
(->> scan-cols
(into {} (map (juxt identity ->col-type))))))

(emitScan [_ {:keys [columns], {:keys [table for-valid-time] :as scan-opts} :scan-opts} scan-col-types param-types]
(emitScan [_ {:keys [columns], {:keys [table for-valid-time] :as scan-opts} :scan-opts :as opts} scan-col-types param-types]
(let [col-names (->> columns
(into [] (comp (map (fn [[col-type arg]]
(case col-type
Expand All @@ -602,12 +644,18 @@
(first arg))
(into {}))

iid-bb (selects->iid-byte-buffer selects)

col-preds (->> (for [[col-name select-form] selects]
;; for temporal preds, we may not need to re-apply these if they can be represented as a temporal range.
(MapEntry/create (str col-name)
(expr/->expression-relation-selector select-form {:col-types col-types, :param-types param-types})))
(into {}))

col-preds (cond-> col-preds
iid-bb
(assoc "xt$iid" (iid-selector iid-bb)))

metadata-args (vec (for [[col-name select] selects
:when (not (types/temporal-column? (util/str->normal-form-str (str col-name))))]
select))
Expand All @@ -625,7 +673,8 @@
(let [_metadata-pred (expr.meta/->metadata-selector (cons 'and metadata-args) col-types params)
scan-opts (cond-> scan-opts
(nil? for-valid-time)
(assoc :for-valid-time (if default-all-valid-time? [:all-time] [:at [:now :now]])))]
(assoc :for-valid-time (if default-all-valid-time? [:all-time] [:at [:now :now]])
:iid-bb iid-bb))]
(->4r-cursor allocator object-store buffer-pool
watermark
normalized-table-name
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/clojure/xtdb/trie.clj
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,29 @@
{:ordinal ordinal, :leaf node})))))}])))]

(vec (trie-merge-tasks* (map #(some-> ^HashTrie % (.rootNode)) tries) []))))

(defn- bucket-for [^bytes iid level]
(let [level-offset-bits (* HashTrie/LEVEL_BITS (inc level))
level-offset-bytes (/ (- level-offset-bits HashTrie/LEVEL_BITS) Byte/SIZE)]
(bit-and (bit-shift-right (get iid level-offset-bytes) (mod level-offset-bits Byte/SIZE)) HashTrie/LEVEL_MASK)))

(defn iid-trie-merge-task [tries ^bytes iid]
(letfn [(trie-merge-tasks* [nodes path level]
(let [bucket-idx (bucket-for iid level)
trie-children (mapv #(some-> ^HashTrie$Node % (.children)) nodes)]
(if (some identity trie-children)
(trie-merge-tasks* (mapv (fn [node ^objects node-children]
(if node-children
(aget node-children bucket-idx)
node))
nodes trie-children)
(conj path bucket-idx)
(inc level))
{:path (byte-array path)
:leaves (->> nodes
(into [] (keep-indexed
(fn [ordinal ^HashTrie$Node node]
(when node
{:ordinal ordinal, :leaf node})))))})))]

(trie-merge-tasks* (map #(some-> ^HashTrie % (.rootNode)) tries) [] 0)))
5 changes: 4 additions & 1 deletion core/src/main/clojure/xtdb/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
(org.apache.arrow.compression CommonsCompressionFactory)
(org.apache.arrow.flatbuf Footer Message RecordBatch)
(org.apache.arrow.memory AllocationManager ArrowBuf BufferAllocator)
(org.apache.arrow.memory.util ByteFunctionHelpers MemoryUtil)
(org.apache.arrow.memory.util ArrowBufPointer ByteFunctionHelpers MemoryUtil)
(org.apache.arrow.vector ValueVector VectorLoader VectorSchemaRoot)
(org.apache.arrow.vector.ipc ArrowFileWriter ArrowStreamWriter ArrowWriter)
(org.apache.arrow.vector.ipc.message ArrowBlock ArrowFooter ArrowRecordBatch MessageSerializer)
Expand Down Expand Up @@ -117,6 +117,9 @@
(.position bb 0)
bb))

(defn byte-buffer->uuid [^ByteBuffer bb]
(UUID. (.getLong bb 0) (.getLong bb 1)))

(defn ->lex-hex-string
"Turn a long into a lexicographically-sortable hex string by prepending the length"
[^long l]
Expand Down
Loading

0 comments on commit e04b5f5

Please sign in to comment.