Skip to content

Commit

Permalink
scan metadata support
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Aug 23, 2023
1 parent dfad96c commit fdbb5d5
Show file tree
Hide file tree
Showing 21 changed files with 460 additions and 203 deletions.
8 changes: 4 additions & 4 deletions core/src/main/clojure/xtdb/bloom.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@
(^double [^long n ^long k ^long m]
(Math/pow (- 1 (Math/exp (/ (- k) (double (/ m n))))) k)))

(defn bloom->bitmap ^org.roaringbitmap.buffer.ImmutableRoaringBitmap [^VarBinaryVector bloom-vec ^long idx]
(let [pointer (.getDataPointer bloom-vec idx)
(defn bloom->bitmap ^org.roaringbitmap.buffer.ImmutableRoaringBitmap [^IVectorReader bloom-rdr ^long idx]
(let [pointer (.getPointer bloom-rdr idx)
nio-buffer (.nioBuffer (.getBuf pointer) (.getOffset pointer) (.getLength pointer))]
(ImmutableRoaringBitmap. nio-buffer)))

(defn bloom-contains? [^VarBinaryVector bloom-vec
(defn bloom-contains? [^IVectorReader bloom-rdr
^long idx
^ints hashes]
(let [^ImmutableRoaringBitmap bloom (bloom->bitmap bloom-vec idx)]
(let [^ImmutableRoaringBitmap bloom (bloom->bitmap bloom-rdr idx)]
(loop [n 0]
(if (= n (alength hashes))
true
Expand Down
75 changes: 40 additions & 35 deletions core/src/main/clojure/xtdb/expression/metadata.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
[xtdb.expression :as expr]
[xtdb.expression.walk :as ewalk]
[xtdb.metadata :as meta]
[xtdb.util :as util]
[xtdb.types :as types]
[xtdb.vector.reader :as vr])
(:import (xtdb.metadata IMetadataPredicate ITableMetadata)
(xtdb.vector RelationReader IVectorReader)
java.util.function.IntPredicate
[org.apache.arrow.vector VarBinaryVector]
[org.apache.arrow.vector.complex ListVector StructVector]))
[xtdb.util :as util])
(:import java.util.function.IntPredicate
(xtdb.metadata IMetadataPredicate ITableMetadata)
(xtdb.vector IVectorReader RelationReader)))

(set! *unchecked-math* :warn-on-boxed)

Expand All @@ -23,16 +20,20 @@

(declare meta-expr)

(def ^:private bool-metadata-types #{:null :bool :fixed-size-binary :clj-form})

(defn call-meta-expr [{:keys [f args] :as expr} {:keys [col-types] :as opts}]
(letfn [(var-param-expr [f meta-value field {:keys [param-type] :as param-expr}]
(let [base-col-types (-> (get col-types field)
types/flatten-union-types)]
(simplify-and-or-expr
{:op :call
:f :or
;; TODO this seems like it could make better use
;; of the polymorphic expr patterns?
:args (vec
;; TODO adapt for boolean metadata writer
(when-not (contains? bool-metadata-types param-type)
(let [base-col-types (-> (get col-types field)
types/flatten-union-types)]
(simplify-and-or-expr
{:op :call
:f :or
;; TODO this seems like it could make better use
;; of the polymorphic expr patterns?
:args (vec
(for [col-type (cond
(isa? types/col-type-hierarchy param-type :num)
(filterv types/num-types base-col-types)
Expand All @@ -50,7 +51,7 @@
:field field,
:param-expr param-expr
:bloom-hash-sym (when (= meta-value :bloom-filter)
(gensym 'bloom-hashes))}))})))
(gensym 'bloom-hashes))}))}))))

(bool-expr [var-param-f var-param-meta-fn
param-var-f param-var-meta-fn]
Expand Down Expand Up @@ -106,26 +107,31 @@
(bloom/literal-hashes params param-expr col-type))))

(def ^:private table-metadata-sym (gensym "table-metadata"))
(def ^:private metadata-root-sym (gensym "metadata-root"))
(def ^:private cols-vec-sym (gensym "cols-vec"))
(def ^:private cols-data-vec-sym (gensym "cols-data-vec"))
(def ^:private block-idx-sym (gensym "block-idx"))
(def ^:private types-vec-sym (gensym "types-vec"))
(def ^:private bloom-vec-sym (gensym "bloom-vec"))
(def ^:private metadata-rdr-sym (gensym "metadata-rdr"))
(def ^:private cols-rdr-sym (gensym "cols-rdr"))
(def ^:private col-rdr-sym (gensym "col-rdr"))
(def ^:private page-idx-sym (gensym "page-idx"))
(def ^:private types-rdr-sym (gensym "types-rdr"))
(def ^:private bloom-rdr-sym (gensym "bloom-rdr"))


(defmethod expr/codegen-expr :metadata-vp-call [{:keys [f meta-value field param-expr col-type bloom-hash-sym]} opts]
(let [field-name (util/str->normal-form-str (str field))

idx-code `(.rowIndex ~table-metadata-sym ~field-name ~block-idx-sym)]
idx-code `(.rowIndex ~table-metadata-sym ~field-name ~page-idx-sym)]

(prn meta-value)

(if (= meta-value :bloom-filter)
(cond
(= meta-value :bloom-filter)
{:return-type :bool
:continue (fn [cont]
(cont :bool
`(boolean
(when-let [~expr/idx-sym ~idx-code]
(bloom/bloom-contains? ~bloom-vec-sym ~expr/idx-sym ~bloom-hash-sym)))))}
(bloom/bloom-contains? ~bloom-rdr-sym ~expr/idx-sym ~bloom-hash-sym)))))}

:else
(let [col-sym (gensym 'meta_col)
col-field (types/col-type->field col-type)

Expand All @@ -141,9 +147,8 @@
(assoc-in [:var->col-type col-sym] (types/merge-col-types col-type :null))))]
{:return-type :bool
:batch-bindings [[(-> col-sym (expr/with-tag IVectorReader))
`(some-> ^StructVector (.getChild ~types-vec-sym ~(.getName col-field))
(.getChild ~(name meta-value))
vr/vec->reader)]]
`(some-> ^IVectorReader (.structKeyReader ~types-rdr-sym ~(.getName col-field))
(.structKeyReader ~(name meta-value)))]]
:children [emitted-expr]
:continue (fn [cont]
(cont :bool
Expand All @@ -165,17 +170,17 @@
:f (-> `(fn [~(-> table-metadata-sym (expr/with-tag ITableMetadata))
~(-> expr/params-sym (expr/with-tag RelationReader))
[~@(keep :bloom-hash-sym (ewalk/expr-seq expr))]]
(let [~metadata-root-sym (.metadataRoot ~table-metadata-sym)
~(-> cols-vec-sym (expr/with-tag ListVector)) (.getVector ~metadata-root-sym "columns")
~(-> cols-data-vec-sym (expr/with-tag StructVector)) (.getDataVector ~cols-vec-sym)
~(-> types-vec-sym (expr/with-tag StructVector)) (.getChild ~cols-data-vec-sym "types")
~(-> bloom-vec-sym (expr/with-tag VarBinaryVector)) (.getChild ~cols-data-vec-sym "bloom")
(let [~metadata-rdr-sym (.metadataReader ~table-metadata-sym)
~(-> cols-rdr-sym (expr/with-tag IVectorReader)) (.structKeyReader ~metadata-rdr-sym "columns")
~(-> col-rdr-sym (expr/with-tag IVectorReader)) (.listElementReader ~cols-rdr-sym)
~(-> types-rdr-sym (expr/with-tag IVectorReader)) (.structKeyReader ~col-rdr-sym "types")
~(-> bloom-rdr-sym (expr/with-tag IVectorReader)) (.structKeyReader ~col-rdr-sym "bloom")

~@(expr/batch-bindings emitted-expr)]
(reify IntPredicate
(~'test [_ ~block-idx-sym]
(~'test [_ ~page-idx-sym]
(boolean ~(continue (fn [_ code] code)))))))
#_(doto clojure.pprint/pprint)
;; (doto clojure.pprint/pprint)
(eval))}))

(util/lru-memoize)))
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/clojure/xtdb/indexer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
xtdb.operator.IRaQuerySource
(xtdb.operator.scan IScanEmitter)
xtdb.util.RowCounter
(xtdb.vector IRowCopier IVectorReader RelationReader)
(xtdb.vector IRowCopier IVectorReader IVectorPosition RelationReader)
(xtdb.watermark IWatermark IWatermarkSource)))

(set! *unchecked-math* :warn-on-boxed)
Expand Down Expand Up @@ -454,6 +454,8 @@
^RowCounter row-counter
^long rows-per-chunk

^IVectorPosition trie-counter

^:volatile-mutable ^IWatermark shared-wm
^StampedLock wm-lock]

Expand Down Expand Up @@ -547,7 +549,7 @@

tx-key)))

(await/notify-tx tx-key awaiters)
(await/notify-tx tx-key awaiters)

(catch Throwable t
(set! (.indexer-error this) t)
Expand Down Expand Up @@ -609,7 +611,8 @@
(.finishChunk metadata-mgr chunk-idx
{:latest-completed-tx latest-completed-tx
:next-chunk-idx (+ chunk-idx (.getChunkRowCount row-counter))
:tables table-metadata}))
:tables table-metadata
:trie-idx (.getPositionAndIncrement trie-counter)}))

(.nextChunk row-counter)
(set! (.-latest_completed_chunk_tx this) latest-completed-tx)
Expand Down Expand Up @@ -654,6 +657,8 @@
(RowCounter. next-chunk-idx)
rows-per-chunk

(IVectorPosition/build 0)

nil ; watermark
(StampedLock.))))

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/clojure/xtdb/indexer/live_index.clj
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
(when (pos? (.rowCount live-rel-rdr))
(let [bufs (trie/live-trie->bufs allocator live-trie live-rel-rdr)
chunk-idx-str (util/->lex-hex-string chunk-idx)
!fut (trie/write-trie-bufs! obj-store (format "tables/%s/chunks" table-name) chunk-idx-str bufs)
!fut (trie/write-trie-bufs! obj-store table-name chunk-idx-str bufs)
table-metadata (MapEntry/create table-name
{:col-types (live-rel->col-types live-rel-rdr)
:row-count (.rowCount live-rel-rdr)})]
Expand Down
Loading

0 comments on commit fdbb5d5

Please sign in to comment.