Skip to content

Commit

Permalink
scan metadata support
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Aug 21, 2023
1 parent 2b16323 commit a5b004d
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 95 deletions.
8 changes: 4 additions & 4 deletions core/src/main/clojure/xtdb/bloom.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@
(^double [^long n ^long k ^long m]
(Math/pow (- 1 (Math/exp (/ (- k) (double (/ m n))))) k)))

(defn bloom->bitmap ^org.roaringbitmap.buffer.ImmutableRoaringBitmap [^VarBinaryVector bloom-vec ^long idx]
(let [pointer (.getDataPointer bloom-vec idx)
(defn bloom->bitmap ^org.roaringbitmap.buffer.ImmutableRoaringBitmap [^IVectorReader bloom-rdr ^long idx]
(let [pointer (.getPointer bloom-rdr idx)
nio-buffer (.nioBuffer (.getBuf pointer) (.getOffset pointer) (.getLength pointer))]
(ImmutableRoaringBitmap. nio-buffer)))

(defn bloom-contains? [^VarBinaryVector bloom-vec
(defn bloom-contains? [^IVectorReader bloom-rdr
^long idx
^ints hashes]
(let [^ImmutableRoaringBitmap bloom (bloom->bitmap bloom-vec idx)]
(let [^ImmutableRoaringBitmap bloom (bloom->bitmap bloom-rdr idx)]
(loop [n 0]
(if (= n (alength hashes))
true
Expand Down
35 changes: 17 additions & 18 deletions core/src/main/clojure/xtdb/expression/metadata.clj
Original file line number Diff line number Diff line change
Expand Up @@ -106,25 +106,25 @@
(bloom/literal-hashes params param-expr col-type))))

(def ^:private table-metadata-sym (gensym "table-metadata"))
(def ^:private metadata-root-sym (gensym "metadata-root"))
(def ^:private cols-vec-sym (gensym "cols-vec"))
(def ^:private cols-data-vec-sym (gensym "cols-data-vec"))
(def ^:private block-idx-sym (gensym "block-idx"))
(def ^:private types-vec-sym (gensym "types-vec"))
(def ^:private bloom-vec-sym (gensym "bloom-vec"))
(def ^:private metadata-rdr-sym (gensym "metadata-rdr"))
(def ^:private cols-rdr-sym (gensym "cols-rdr"))
(def ^:private col-rdr-sym (gensym "col-rdr"))
(def ^:private page-idx-sym (gensym "page-idx"))
(def ^:private types-rdr-sym (gensym "types-rdr"))
(def ^:private bloom-rdr-sym (gensym "bloom-rdr"))

(defmethod expr/codegen-expr :metadata-vp-call [{:keys [f meta-value field param-expr col-type bloom-hash-sym]} opts]
(let [field-name (util/str->normal-form-str (str field))

idx-code `(.rowIndex ~table-metadata-sym ~field-name ~block-idx-sym)]
idx-code `(.rowIndex ~table-metadata-sym ~field-name ~page-idx-sym)]

(if (= meta-value :bloom-filter)
{:return-type :bool
:continue (fn [cont]
(cont :bool
`(boolean
(when-let [~expr/idx-sym ~idx-code]
(bloom/bloom-contains? ~bloom-vec-sym ~expr/idx-sym ~bloom-hash-sym)))))}
(bloom/bloom-contains? ~bloom-rdr-sym ~expr/idx-sym ~bloom-hash-sym)))))}

(let [col-sym (gensym 'meta_col)
col-field (types/col-type->field col-type)
Expand All @@ -141,9 +141,8 @@
(assoc-in [:var->col-type col-sym] (types/merge-col-types col-type :null))))]
{:return-type :bool
:batch-bindings [[(-> col-sym (expr/with-tag IVectorReader))
`(some-> ^StructVector (.getChild ~types-vec-sym ~(.getName col-field))
(.getChild ~(name meta-value))
vr/vec->reader)]]
`(some-> ^IVectorReader (.structKeyReader ~types-rdr-sym ~(.getName col-field))
(.structKeyReader ~(name meta-value)))]]
:children [emitted-expr]
:continue (fn [cont]
(cont :bool
Expand All @@ -165,17 +164,17 @@
:f (-> `(fn [~(-> table-metadata-sym (expr/with-tag ITableMetadata))
~(-> expr/params-sym (expr/with-tag RelationReader))
[~@(keep :bloom-hash-sym (ewalk/expr-seq expr))]]
(let [~metadata-root-sym (.metadataRoot ~table-metadata-sym)
~(-> cols-vec-sym (expr/with-tag ListVector)) (.getVector ~metadata-root-sym "columns")
~(-> cols-data-vec-sym (expr/with-tag StructVector)) (.getDataVector ~cols-vec-sym)
~(-> types-vec-sym (expr/with-tag StructVector)) (.getChild ~cols-data-vec-sym "types")
~(-> bloom-vec-sym (expr/with-tag VarBinaryVector)) (.getChild ~cols-data-vec-sym "bloom")
(let [~metadata-rdr-sym (.metadataReader ~table-metadata-sym)
~(-> cols-rdr-sym (expr/with-tag IVectorReader)) (.structKeyReader ~metadata-rdr-sym "columns")
~(-> col-rdr-sym (expr/with-tag IVectorReader)) (.listElementReader ~cols-rdr-sym)
~(-> types-rdr-sym (expr/with-tag IVectorReader)) (.structKeyReader ~col-rdr-sym "types")
~(-> bloom-rdr-sym (expr/with-tag IVectorReader)) (.structKeyReader ~col-rdr-sym "bloom")

~@(expr/batch-bindings emitted-expr)]
(reify IntPredicate
(~'test [_ ~block-idx-sym]
(~'test [_ ~page-idx-sym]
(boolean ~(continue (fn [_ code] code)))))))
#_(doto clojure.pprint/pprint)
;; (doto clojure.pprint/pprint)
(eval))}))

(util/lru-memoize)))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/clojure/xtdb/indexer/live_index.clj
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
(let [live-rel-rdr (vw/rel-wtr->rdr live-rel)]
(when-let [bufs (trie/live-trie->bufs allocator (-> live-trie (.compactLogs)) live-rel-rdr)]
(let [chunk-idx-str (util/->lex-hex-string chunk-idx)
!fut (trie/write-trie-bufs! obj-store (format "tables/%s/chunks" table-name) chunk-idx-str bufs)
!fut (trie/write-trie-bufs! obj-store table-name chunk-idx-str bufs)
table-metadata (MapEntry/create table-name
{:col-types (live-rel->col-types live-rel-rdr)
:row-count (.rowCount live-rel-rdr)})]
Expand Down
136 changes: 90 additions & 46 deletions core/src/main/clojure/xtdb/metadata.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,33 @@
[xtdb.transit :as xt.transit]
[xtdb.types :as types]
[xtdb.util :as util]
[xtdb.vector :as vec]
[xtdb.vector.reader :as vr]
[xtdb.vector.writer :as vw])
(:import [clojure.lang Keyword MapEntry]
(java.io ByteArrayInputStream ByteArrayOutputStream)
(:import (java.io ByteArrayInputStream ByteArrayOutputStream)
java.lang.AutoCloseable
java.nio.ByteBuffer
(java.util HashMap HashSet Map NavigableMap Set TreeMap)
(java.util.concurrent ConcurrentHashMap)
java.util.concurrent.atomic.AtomicInteger
(java.util.concurrent ConcurrentHashMap CompletableFuture)
(java.util.function BiFunction Consumer Function)
(java.util.stream IntStream)
(org.apache.arrow.memory ArrowBuf BufferAllocator)
(org.apache.arrow.vector FieldVector IntVector VectorSchemaRoot)
(org.apache.arrow.vector.complex ListVector StructVector)
(org.apache.arrow.vector.types.pojo ArrowType$Union Schema)
(org.apache.arrow.memory ArrowBuf)
(org.apache.arrow.vector FieldVector VectorLoader VectorSchemaRoot)
(org.apache.arrow.vector.types.pojo ArrowType$Union)
(org.roaringbitmap RoaringBitmap)
xtdb.buffer_pool.IBufferPool
xtdb.object_store.ObjectStore
(xtdb.vector IVectorReader IVectorWriter RelationReader ValueVectorReader$DuvReader)))
(xtdb.vector IVectorReader IVectorWriter RelationReader)))

(set! *unchecked-math* :warn-on-boxed)

#_{:clj-kondo/ignore [:unused-binding :clojure-lsp/unused-public-var]}
(definterface ITableMetadata
(^org.apache.arrow.vector.VectorSchemaRoot metadataRoot [])
(^xtdb.vector.IVectorReader metadataReader [])
(^java.util.Set columnNames [])
(^Long rowIndex [^String column-name, ^int blockIdx]
"pass blockIdx = -1 for metadata about the whole chunk")
(^long blockCount []))
(^Long rowIndex [^String column-name, ^int pageIdx]
;; "pass blockIdx = -1 for metadata about the whole chunk"
)
(^long pageCount []))

#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(definterface IPageMetadataWriter
Expand All @@ -56,7 +54,7 @@
(definterface IMetadataPredicate
(^java.util.function.IntPredicate build [^xtdb.metadata.ITableMetadata tableMetadata]))

(defrecord ChunkMatch [^long chunk-idx, ^RoaringBitmap block-idxs, ^Set col-names])
(defrecord TrieMatch [^long trie-idx, ^RoaringBitmap page-idxs, ^Set col-names])

(defn- with-single-root [^IBufferPool buffer-pool, obj-key, f]
(-> (.getBuffer buffer-pool obj-key)
Expand Down Expand Up @@ -180,10 +178,10 @@
(.endStruct struct-wtr))))))))


(doseq [type-head #{:null :bool :fixed-size-binary :uuid :clj-form}]
(doseq [type-head #{:null :bool :fixed-size-binary :clj-form}]
(defmethod type->metadata-writer type-head [_write-col-meta! metadata-root col-type] (->bool-type-handler metadata-root col-type)))

(doseq [type-head #{:int :float :utf8 :varbinary :keyword :uri
(doseq [type-head #{:int :float :utf8 :varbinary :keyword :uri :uuid
:timestamp-tz :timestamp-local :date :interval :time-local}]
(defmethod type->metadata-writer type-head [_write-col-meta! metadata-root col-type] (->min-max-type-handler metadata-root col-type)))

Expand Down Expand Up @@ -270,8 +268,53 @@
(write-col-meta! true col))
(.endList cols-wtr))))))

(defn ->table-metadata-idxs [^IVectorReader metadata-rdr]
(let [page-idx-cache (HashMap.)
meta-row-count (.valueCount metadata-rdr)
^IVectorReader #_<Int> page-idx-rdr (.structKeyReader metadata-rdr "page-idx")
^IVectorReader #_<List> cols-rdr(.structKeyReader metadata-rdr "columns")
^IVectorReader col-rdr (.listElementReader cols-rdr)
^IVectorReader column-name-rdr (.structKeyReader col-rdr "col-name")
^IVectorReader root-col-rdr (.structKeyReader col-rdr "root-col?")
col-names (HashSet.)]

(dotimes [meta-idx meta-row-count]
(let [cols-start-idx (.getListStartIndex cols-rdr meta-idx)
page-idx (if-let [page-idx (.getObject page-idx-rdr meta-idx)]
page-idx
-1)]
(dotimes [cols-data-idx (.getListCount cols-rdr meta-idx)]
(let [cols-data-idx (+ cols-start-idx cols-data-idx)
col-name (str (.getObject column-name-rdr cols-data-idx))]
(.add col-names col-name)
(when (.getBoolean root-col-rdr cols-data-idx)
(.put page-idx-cache [col-name page-idx] cols-data-idx))))))

{:col-names (into #{} col-names)
:page-idx-cache (into {} page-idx-cache)
:page-count (loop [page-count 0, idx 0]
(cond
(>= idx meta-row-count) (inc page-count)
:else (recur (cond-> page-count
(not (nil? (.getObject page-idx-rdr idx))) inc)
(inc idx))))}))

;; TODO copied because circular dep with xtdb.trie
(defn- table-name->dir [table-name] (format "tables/%s/chunks" table-name))
(defn ->table-trie-obj-key [table-name chunk-idx]
(format "%s/trie-c%s.arrow" (table-name->dir table-name) (util/->lex-hex-string chunk-idx)))

(defn ->table-metadata ^xtdb.metadata.ITableMetadata [^IVectorReader metadata-reader, {:keys [col-names page-idx-cache, page-count]}]
(reify ITableMetadata
(metadataReader [_] metadata-reader)
(columnNames [_] col-names)
(rowIndex [_ col-name block-idx] (get page-idx-cache [col-name block-idx]))
(pageCount [_] page-count)))

(deftype MetadataManager [^ObjectStore object-store
^IBufferPool buffer-pool
^NavigableMap chunks-metadata
^Map table-metadata-idxs
^:volatile-mutable ^Map col-types]
IMetadataManager
(finishChunk [this chunk-idx new-chunk-metadata]
Expand All @@ -281,16 +324,23 @@
(.put chunks-metadata chunk-idx new-chunk-metadata))

(withMetadata [_ chunk-idx table-name f]
(throw (UnsupportedOperationException.))
#_
(with-single-root buffer-pool (->table-metadata-obj-key chunk-idx table-name)
(fn [metadata-root]
(.apply f (->table-metadata metadata-root
(.computeIfAbsent table-metadata-idxs
[chunk-idx table-name]
(reify Function
(apply [_ _]
(->table-metadata-idxs metadata-root)))))))))
(CompletableFuture/completedFuture
(with-open [^ArrowBuf trie-buf (-> @(.getBuffer buffer-pool (->table-trie-obj-key table-name chunk-idx))
(util/rethrowing-cause))]
(let [{:keys [^VectorLoader loader ^VectorSchemaRoot root arrow-blocks]} (util/read-arrow-buf trie-buf)]
(try
(with-open [record-batch (util/->arrow-record-batch-view (first arrow-blocks) trie-buf)]
(.load loader record-batch))
(let [^RelationReader trie-rdr (vr/<-root root)
^IVectorReader metadata-reader (.metadataReader (.typeIdReader (.readerForName trie-rdr "nodes") (byte 2)))]
(.apply f (->table-metadata metadata-reader
(.computeIfAbsent table-metadata-idxs
[chunk-idx table-name]
(reify Function
(apply [_ _]
(->table-metadata-idxs metadata-reader)))))))
(finally
(.close root)))))))

(chunksMetadata [_] chunks-metadata)

Expand Down Expand Up @@ -333,23 +383,21 @@
:buffer-pool (ig/ref :xtdb.buffer-pool/buffer-pool)}
opts))

(defmethod ig/init-key ::metadata-manager [_ {:keys [^ObjectStore object-store], :as deps}]
(defmethod ig/init-key ::metadata-manager [_ {:keys [^ObjectStore object-store, ^IBufferPool buffer-pool], :as deps}]
(let [chunks-metadata (load-chunks-metadata deps)]
(MetadataManager. object-store
buffer-pool
chunks-metadata
(ConcurrentHashMap.)
(->> (vals chunks-metadata) (reduce merge-col-types {})))))

(defmethod ig/halt-key! ::metadata-manager [_ mgr]
(util/try-close mgr))

(defn with-metadata [^IMetadataManager metadata-mgr, ^long chunk-idx, ^String table-name, ^Function f]
(throw (UnsupportedOperationException.))
#_ ; TODO reinstate when we bring back metadata into scan
(.withMetadata metadata-mgr chunk-idx table-name f))

(defn with-all-metadata [^IMetadataManager metadata-mgr, table-name, ^BiFunction f]
(throw (UnsupportedOperationException.))
#_ ; TODO reinstate when we bring back metadata into scan
(->> (for [[^long chunk-idx, chunk-metadata] (.chunksMetadata metadata-mgr)
:let [table (get-in chunk-metadata [:tables table-name])]
:when table]
Expand All @@ -361,19 +409,15 @@
(into [] (keep deref))
(util/rethrowing-cause)))

(defn matching-chunks [^IMetadataManager metadata-mgr, table-name, ^IMetadataPredicate metadata-pred]
(throw (UnsupportedOperationException.))
#_ ; TODO reinstate when we bring back metadata into scan
(defn matching-tries [^IMetadataManager metadata-mgr, table-name, ^IMetadataPredicate metadata-pred]
(with-all-metadata metadata-mgr table-name
(util/->jbifn
(fn [^long chunk-idx, ^ITableMetadata table-metadata]
(let [pred (.build metadata-pred table-metadata)]
(when (.test pred -1)
(let [block-idxs (RoaringBitmap.)]
(dotimes [block-idx (.blockCount table-metadata)]
(when (.test pred block-idx)
(.add block-idxs block-idx)))

(when-not (.isEmpty block-idxs)
(->ChunkMatch chunk-idx block-idxs (.columnNames table-metadata))))))))))

(fn [^long chunk-idx, ^ITableMetadata table-metadata]
(let [pred (.build metadata-pred table-metadata)
page-idxs (RoaringBitmap.)]
(dotimes [page-idx (.pageCount table-metadata)]
(prn page-idx)
(when (.test pred page-idx)
(.add page-idxs page-idx)))
(when-not (.isEmpty page-idxs)
(->TrieMatch chunk-idx page-idxs (.columnNames table-metadata))))))))
Loading

0 comments on commit a5b004d

Please sign in to comment.