diff --git a/core/src/main/clojure/xtdb/bloom.clj b/core/src/main/clojure/xtdb/bloom.clj index 49a8490120..3238502c7e 100644 --- a/core/src/main/clojure/xtdb/bloom.clj +++ b/core/src/main/clojure/xtdb/bloom.clj @@ -36,15 +36,15 @@ (^double [^long n ^long k ^long m] (Math/pow (- 1 (Math/exp (/ (- k) (double (/ m n))))) k))) -(defn bloom->bitmap ^org.roaringbitmap.buffer.ImmutableRoaringBitmap [^VarBinaryVector bloom-vec ^long idx] - (let [pointer (.getDataPointer bloom-vec idx) +(defn bloom->bitmap ^org.roaringbitmap.buffer.ImmutableRoaringBitmap [^IVectorReader bloom-rdr ^long idx] + (let [pointer (.getPointer bloom-rdr idx) nio-buffer (.nioBuffer (.getBuf pointer) (.getOffset pointer) (.getLength pointer))] (ImmutableRoaringBitmap. nio-buffer))) -(defn bloom-contains? [^VarBinaryVector bloom-vec +(defn bloom-contains? [^IVectorReader bloom-rdr ^long idx ^ints hashes] - (let [^ImmutableRoaringBitmap bloom (bloom->bitmap bloom-vec idx)] + (let [^ImmutableRoaringBitmap bloom (bloom->bitmap bloom-rdr idx)] (loop [n 0] (if (= n (alength hashes)) true diff --git a/core/src/main/clojure/xtdb/expression/metadata.clj b/core/src/main/clojure/xtdb/expression/metadata.clj index 61d10f6735..95613124b3 100644 --- a/core/src/main/clojure/xtdb/expression/metadata.clj +++ b/core/src/main/clojure/xtdb/expression/metadata.clj @@ -106,17 +106,17 @@ (bloom/literal-hashes params param-expr col-type)))) (def ^:private table-metadata-sym (gensym "table-metadata")) -(def ^:private metadata-root-sym (gensym "metadata-root")) -(def ^:private cols-vec-sym (gensym "cols-vec")) -(def ^:private cols-data-vec-sym (gensym "cols-data-vec")) -(def ^:private block-idx-sym (gensym "block-idx")) -(def ^:private types-vec-sym (gensym "types-vec")) -(def ^:private bloom-vec-sym (gensym "bloom-vec")) +(def ^:private metadata-rdr-sym (gensym "metadata-rdr")) +(def ^:private cols-rdr-sym (gensym "cols-rdr")) +(def ^:private col-rdr-sym (gensym "col-rdr")) +(def ^:private page-idx-sym (gensym "page-idx")) +(def ^:private types-rdr-sym (gensym "types-rdr")) +(def ^:private bloom-rdr-sym (gensym "bloom-rdr")) (defmethod expr/codegen-expr :metadata-vp-call [{:keys [f meta-value field param-expr col-type bloom-hash-sym]} opts] (let [field-name (util/str->normal-form-str (str field)) - idx-code `(.rowIndex ~table-metadata-sym ~field-name ~block-idx-sym)] + idx-code `(.rowIndex ~table-metadata-sym ~field-name ~page-idx-sym)] (if (= meta-value :bloom-filter) {:return-type :bool @@ -124,7 +124,7 @@ (cont :bool `(boolean (when-let [~expr/idx-sym ~idx-code] - (bloom/bloom-contains? ~bloom-vec-sym ~expr/idx-sym ~bloom-hash-sym)))))} + (bloom/bloom-contains? ~bloom-rdr-sym ~expr/idx-sym ~bloom-hash-sym)))))} (let [col-sym (gensym 'meta_col) col-field (types/col-type->field col-type) @@ -141,9 +141,8 @@ (assoc-in [:var->col-type col-sym] (types/merge-col-types col-type :null))))] {:return-type :bool :batch-bindings [[(-> col-sym (expr/with-tag IVectorReader)) - `(some-> ^StructVector (.getChild ~types-vec-sym ~(.getName col-field)) - (.getChild ~(name meta-value)) - vr/vec->reader)]] + `(some-> ^IVectorReader (.structKeyReader ~types-rdr-sym ~(.getName col-field)) + (.structKeyReader ~(name meta-value)))]] :children [emitted-expr] :continue (fn [cont] (cont :bool @@ -165,17 +164,17 @@ :f (-> `(fn [~(-> table-metadata-sym (expr/with-tag ITableMetadata)) ~(-> expr/params-sym (expr/with-tag RelationReader)) [~@(keep :bloom-hash-sym (ewalk/expr-seq expr))]] - (let [~metadata-root-sym (.metadataRoot ~table-metadata-sym) - ~(-> cols-vec-sym (expr/with-tag ListVector)) (.getVector ~metadata-root-sym "columns") - ~(-> cols-data-vec-sym (expr/with-tag StructVector)) (.getDataVector ~cols-vec-sym) - ~(-> types-vec-sym (expr/with-tag StructVector)) (.getChild ~cols-data-vec-sym "types") - ~(-> bloom-vec-sym (expr/with-tag VarBinaryVector)) (.getChild ~cols-data-vec-sym "bloom") + (let [~metadata-rdr-sym (.metadataReader ~table-metadata-sym) + ~(-> cols-rdr-sym (expr/with-tag IVectorReader)) (.structKeyReader ~metadata-rdr-sym "columns") + ~(-> col-rdr-sym (expr/with-tag IVectorReader)) (.listElementReader ~cols-rdr-sym) + ~(-> types-rdr-sym (expr/with-tag IVectorReader)) (.structKeyReader ~col-rdr-sym "types") + ~(-> bloom-rdr-sym (expr/with-tag IVectorReader)) (.structKeyReader ~col-rdr-sym "bloom") ~@(expr/batch-bindings emitted-expr)] (reify IntPredicate - (~'test [_ ~block-idx-sym] + (~'test [_ ~page-idx-sym] (boolean ~(continue (fn [_ code] code))))))) - #_(doto clojure.pprint/pprint) + ;; (doto clojure.pprint/pprint) (eval))})) (util/lru-memoize))) diff --git a/core/src/main/clojure/xtdb/indexer/live_index.clj b/core/src/main/clojure/xtdb/indexer/live_index.clj index 2f49938d6d..b7d94884eb 100644 --- a/core/src/main/clojure/xtdb/indexer/live_index.clj +++ b/core/src/main/clojure/xtdb/indexer/live_index.clj @@ -166,7 +166,7 @@ (let [live-rel-rdr (vw/rel-wtr->rdr live-rel)] (when-let [bufs (trie/live-trie->bufs allocator (-> live-trie (.compactLogs)) live-rel-rdr)] (let [chunk-idx-str (util/->lex-hex-string chunk-idx) - !fut (trie/write-trie-bufs! obj-store (format "tables/%s/chunks" table-name) chunk-idx-str bufs) + !fut (trie/write-trie-bufs! obj-store table-name chunk-idx-str bufs) table-metadata (MapEntry/create table-name {:col-types (live-rel->col-types live-rel-rdr) :row-count (.rowCount live-rel-rdr)})] diff --git a/core/src/main/clojure/xtdb/metadata.clj b/core/src/main/clojure/xtdb/metadata.clj index 565464655f..4bdeb4c373 100644 --- a/core/src/main/clojure/xtdb/metadata.clj +++ b/core/src/main/clojure/xtdb/metadata.clj @@ -9,35 +9,33 @@ [xtdb.transit :as xt.transit] [xtdb.types :as types] [xtdb.util :as util] - [xtdb.vector :as vec] + [xtdb.vector.reader :as vr] [xtdb.vector.writer :as vw]) - (:import [clojure.lang Keyword MapEntry] - (java.io ByteArrayInputStream ByteArrayOutputStream) + (:import (java.io ByteArrayInputStream ByteArrayOutputStream) java.lang.AutoCloseable java.nio.ByteBuffer (java.util HashMap HashSet Map NavigableMap Set TreeMap) - (java.util.concurrent ConcurrentHashMap) - java.util.concurrent.atomic.AtomicInteger + (java.util.concurrent ConcurrentHashMap CompletableFuture) (java.util.function BiFunction Consumer Function) (java.util.stream IntStream) - (org.apache.arrow.memory ArrowBuf BufferAllocator) - (org.apache.arrow.vector FieldVector IntVector VectorSchemaRoot) - (org.apache.arrow.vector.complex ListVector StructVector) - (org.apache.arrow.vector.types.pojo ArrowType$Union Schema) + (org.apache.arrow.memory ArrowBuf) + (org.apache.arrow.vector FieldVector VectorLoader VectorSchemaRoot) + (org.apache.arrow.vector.types.pojo ArrowType$Union) (org.roaringbitmap RoaringBitmap) xtdb.buffer_pool.IBufferPool xtdb.object_store.ObjectStore - (xtdb.vector IVectorReader IVectorWriter RelationReader ValueVectorReader$DuvReader))) + (xtdb.vector IVectorReader IVectorWriter RelationReader))) (set! *unchecked-math* :warn-on-boxed) #_{:clj-kondo/ignore [:unused-binding :clojure-lsp/unused-public-var]} (definterface ITableMetadata - (^org.apache.arrow.vector.VectorSchemaRoot metadataRoot []) + (^xtdb.vector.IVectorReader metadataReader []) (^java.util.Set columnNames []) - (^Long rowIndex [^String column-name, ^int blockIdx] - "pass blockIdx = -1 for metadata about the whole chunk") - (^long blockCount [])) + (^Long rowIndex [^String column-name, ^int pageIdx] + ;; "pass blockIdx = -1 for metadata about the whole chunk" + ) + (^long pageCount [])) #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} (definterface IPageMetadataWriter @@ -56,7 +54,7 @@ (definterface IMetadataPredicate (^java.util.function.IntPredicate build [^xtdb.metadata.ITableMetadata tableMetadata])) -(defrecord ChunkMatch [^long chunk-idx, ^RoaringBitmap block-idxs, ^Set col-names]) +(defrecord TrieMatch [^long trie-idx, ^RoaringBitmap page-idxs, ^Set col-names]) (defn- with-single-root [^IBufferPool buffer-pool, obj-key, f] (-> (.getBuffer buffer-pool obj-key) @@ -180,10 +178,10 @@ (.endStruct struct-wtr)))))))) -(doseq [type-head #{:null :bool :fixed-size-binary :uuid :clj-form}] +(doseq [type-head #{:null :bool :fixed-size-binary :clj-form}] (defmethod type->metadata-writer type-head [_write-col-meta! metadata-root col-type] (->bool-type-handler metadata-root col-type))) -(doseq [type-head #{:int :float :utf8 :varbinary :keyword :uri +(doseq [type-head #{:int :float :utf8 :varbinary :keyword :uri :uuid :timestamp-tz :timestamp-local :date :interval :time-local}] (defmethod type->metadata-writer type-head [_write-col-meta! metadata-root col-type] (->min-max-type-handler metadata-root col-type))) @@ -270,8 +268,53 @@ (write-col-meta! true col)) (.endList cols-wtr)))))) +(defn ->table-metadata-idxs [^IVectorReader metadata-rdr] + (let [page-idx-cache (HashMap.) + meta-row-count (.valueCount metadata-rdr) + ^IVectorReader #_ page-idx-rdr (.structKeyReader metadata-rdr "page-idx") + ^IVectorReader #_ cols-rdr(.structKeyReader metadata-rdr "columns") + ^IVectorReader col-rdr (.listElementReader cols-rdr) + ^IVectorReader column-name-rdr (.structKeyReader col-rdr "col-name") + ^IVectorReader root-col-rdr (.structKeyReader col-rdr "root-col?") + col-names (HashSet.)] + + (dotimes [meta-idx meta-row-count] + (let [cols-start-idx (.getListStartIndex cols-rdr meta-idx) + page-idx (if-let [page-idx (.getObject page-idx-rdr meta-idx)] + page-idx + -1)] + (dotimes [cols-data-idx (.getListCount cols-rdr meta-idx)] + (let [cols-data-idx (+ cols-start-idx cols-data-idx) + col-name (str (.getObject column-name-rdr cols-data-idx))] + (.add col-names col-name) + (when (.getBoolean root-col-rdr cols-data-idx) + (.put page-idx-cache [col-name page-idx] cols-data-idx)))))) + + {:col-names (into #{} col-names) + :page-idx-cache (into {} page-idx-cache) + :page-count (loop [page-count 0, idx 0] + (cond + (>= idx meta-row-count) (inc page-count) + :else (recur (cond-> page-count + (not (nil? (.getObject page-idx-rdr idx))) inc) + (inc idx))))})) + +;; TODO copied because circular dep with xtdb.trie +(defn- table-name->dir [table-name] (format "tables/%s/chunks" table-name)) +(defn ->table-trie-obj-key [table-name chunk-idx] + (format "%s/trie-c%s.arrow" (table-name->dir table-name) (util/->lex-hex-string chunk-idx))) + +(defn ->table-metadata ^xtdb.metadata.ITableMetadata [^IVectorReader metadata-reader, {:keys [col-names page-idx-cache, page-count]}] + (reify ITableMetadata + (metadataReader [_] metadata-reader) + (columnNames [_] col-names) + (rowIndex [_ col-name block-idx] (get page-idx-cache [col-name block-idx])) + (pageCount [_] page-count))) + (deftype MetadataManager [^ObjectStore object-store + ^IBufferPool buffer-pool ^NavigableMap chunks-metadata + ^Map table-metadata-idxs ^:volatile-mutable ^Map col-types] IMetadataManager (finishChunk [this chunk-idx new-chunk-metadata] @@ -281,16 +324,23 @@ (.put chunks-metadata chunk-idx new-chunk-metadata)) (withMetadata [_ chunk-idx table-name f] - (throw (UnsupportedOperationException.)) - #_ - (with-single-root buffer-pool (->table-metadata-obj-key chunk-idx table-name) - (fn [metadata-root] - (.apply f (->table-metadata metadata-root - (.computeIfAbsent table-metadata-idxs - [chunk-idx table-name] - (reify Function - (apply [_ _] - (->table-metadata-idxs metadata-root))))))))) + (CompletableFuture/completedFuture + (with-open [^ArrowBuf trie-buf (-> @(.getBuffer buffer-pool (->table-trie-obj-key table-name chunk-idx)) + (util/rethrowing-cause))] + (let [{:keys [^VectorLoader loader ^VectorSchemaRoot root arrow-blocks]} (util/read-arrow-buf trie-buf)] + (try + (with-open [record-batch (util/->arrow-record-batch-view (first arrow-blocks) trie-buf)] + (.load loader record-batch)) + (let [^RelationReader trie-rdr (vr/<-root root) + ^IVectorReader metadata-reader (.metadataReader (.typeIdReader (.readerForName trie-rdr "nodes") (byte 2)))] + (.apply f (->table-metadata metadata-reader + (.computeIfAbsent table-metadata-idxs + [chunk-idx table-name] + (reify Function + (apply [_ _] + (->table-metadata-idxs metadata-reader))))))) + (finally + (.close root))))))) (chunksMetadata [_] chunks-metadata) @@ -333,23 +383,21 @@ :buffer-pool (ig/ref :xtdb.buffer-pool/buffer-pool)} opts)) -(defmethod ig/init-key ::metadata-manager [_ {:keys [^ObjectStore object-store], :as deps}] +(defmethod ig/init-key ::metadata-manager [_ {:keys [^ObjectStore object-store, ^IBufferPool buffer-pool], :as deps}] (let [chunks-metadata (load-chunks-metadata deps)] (MetadataManager. object-store + buffer-pool chunks-metadata + (ConcurrentHashMap.) (->> (vals chunks-metadata) (reduce merge-col-types {}))))) (defmethod ig/halt-key! ::metadata-manager [_ mgr] (util/try-close mgr)) (defn with-metadata [^IMetadataManager metadata-mgr, ^long chunk-idx, ^String table-name, ^Function f] - (throw (UnsupportedOperationException.)) - #_ ; TODO reinstate when we bring back metadata into scan (.withMetadata metadata-mgr chunk-idx table-name f)) (defn with-all-metadata [^IMetadataManager metadata-mgr, table-name, ^BiFunction f] - (throw (UnsupportedOperationException.)) - #_ ; TODO reinstate when we bring back metadata into scan (->> (for [[^long chunk-idx, chunk-metadata] (.chunksMetadata metadata-mgr) :let [table (get-in chunk-metadata [:tables table-name])] :when table] @@ -361,19 +409,15 @@ (into [] (keep deref)) (util/rethrowing-cause))) -(defn matching-chunks [^IMetadataManager metadata-mgr, table-name, ^IMetadataPredicate metadata-pred] - (throw (UnsupportedOperationException.)) - #_ ; TODO reinstate when we bring back metadata into scan +(defn matching-tries [^IMetadataManager metadata-mgr, table-name, ^IMetadataPredicate metadata-pred] (with-all-metadata metadata-mgr table-name (util/->jbifn - (fn [^long chunk-idx, ^ITableMetadata table-metadata] - (let [pred (.build metadata-pred table-metadata)] - (when (.test pred -1) - (let [block-idxs (RoaringBitmap.)] - (dotimes [block-idx (.blockCount table-metadata)] - (when (.test pred block-idx) - (.add block-idxs block-idx))) - - (when-not (.isEmpty block-idxs) - (->ChunkMatch chunk-idx block-idxs (.columnNames table-metadata)))))))))) - + (fn [^long chunk-idx, ^ITableMetadata table-metadata] + (let [pred (.build metadata-pred table-metadata) + page-idxs (RoaringBitmap.)] + (dotimes [page-idx (.pageCount table-metadata)] + (prn page-idx) + (when (.test pred page-idx) + (.add page-idxs page-idx))) + (when-not (.isEmpty page-idxs) + (->TrieMatch chunk-idx page-idxs (.columnNames table-metadata)))))))) diff --git a/core/src/main/clojure/xtdb/operator/scan.clj b/core/src/main/clojure/xtdb/operator/scan.clj index 375cd09488..1acb323853 100644 --- a/core/src/main/clojure/xtdb/operator/scan.clj +++ b/core/src/main/clojure/xtdb/operator/scan.clj @@ -471,15 +471,19 @@ (util/close arrow-leaves))) (defn- read-tries [^ObjectStore obj-store, ^IBufferPool buffer-pool, ^String table-name, ^ILiveTableWatermark live-table-wm - {:keys [^ByteBuffer iid-bb] :as _opts}] + ^LinkedList matching-tries {:keys [^ByteBuffer iid-bb] :as _opts}] (let [{trie-files :trie, leaf-files :leaf} (->> (.listObjects obj-store (format "tables/%s/chunks" table-name)) (keep (fn [file-name] - (when-let [[_ file-type chunk-idx-str] (re-find #"/(leaf|trie)-c(.+?)\.arrow$" file-name)] + (when-let [[_ file-type chunk-idx-str] + (re-find #"/(leaf|trie)-c(.+?)\.arrow$" file-name)] {:file-name file-name :file-type (case file-type "leaf" :leaf, "trie" :trie) :chunk-idx chunk-idx-str}))) (group-by :file-type)) - leaf-files (into {} (map (juxt :chunk-idx identity)) leaf-files)] + leaf-files (into {} (map (juxt :chunk-idx identity)) leaf-files) + trie-idx->page-idxs (into {} (map (juxt :trie-idx :page-idxs)) matching-tries)] + + (prn matching-tries) (util/with-close-on-catch [leaf-bufs (ArrayList.)] ;; TODO get hold of these a page at a time if it's a small query, @@ -520,7 +524,7 @@ :merge-tasks (vec (for [{:keys [path leaves]} (if iid-bb (vector (trie/iid-trie-merge-task hash-tries (.array iid-bb))) - (trie/trie-merge-tasks hash-tries))] + (trie/trie-merge-tasks hash-tries trie-idx->page-idxs))] {:path path :leaves (mapv (fn [{:keys [ordinal leaf]}] (condp = (class leaf) @@ -552,12 +556,12 @@ ;; the skipping in another leaf consumer. (defn ->4r-cursor [^BufferAllocator allocator, ^ObjectStore obj-store, ^IBufferPool buffer-pool, ^IWatermark wm - table-name, col-names, ^longs temporal-range + table-name, col-names, ^longs temporal-range, ^LinkedList matching-tries ^Map col-preds, params, scan-opts] (let [^ILiveTableWatermark live-table-wm (some-> (.liveIndex wm) (.liveTable table-name)) {:keys [arrow-leaves ^List merge-tasks]} - (read-tries obj-store buffer-pool table-name live-table-wm (select-keys scan-opts [:iid-bb]))] + (read-tries obj-store buffer-pool table-name live-table-wm matching-tries (select-keys scan-opts [:iid-bb]))] (try (->TrieCursor allocator arrow-leaves (.iterator merge-tasks) col-names col-preds @@ -670,7 +674,7 @@ :stats {:row-count row-count} :->cursor (fn [{:keys [allocator, ^IWatermark watermark, basis, params default-all-valid-time?]}] ;; TODO reinstate metadata checks on pages - (let [_metadata-pred (expr.meta/->metadata-selector (cons 'and metadata-args) col-types params) + (let [metadata-pred (expr.meta/->metadata-selector (cons 'and metadata-args) col-types params) scan-opts (cond-> scan-opts (nil? for-valid-time) (assoc :for-valid-time (if default-all-valid-time? [:all-time] [:at [:now :now]]) @@ -680,6 +684,7 @@ normalized-table-name (set/union content-col-names temporal-col-names) (->temporal-range params basis scan-opts) + (ArrayList. (or (meta/matching-tries metadata-mgr (str table) metadata-pred) [])) col-preds params scan-opts)))})))) diff --git a/core/src/main/clojure/xtdb/trie.clj b/core/src/main/clojure/xtdb/trie.clj index 381c8c618e..e52e91ae9b 100644 --- a/core/src/main/clojure/xtdb/trie.clj +++ b/core/src/main/clojure/xtdb/trie.clj @@ -7,17 +7,18 @@ [xtdb.vector.writer :as vw]) (:import (java.nio ByteBuffer) java.security.MessageDigest - (java.util Arrays) + java.security.MessageDigest + (java.util Arrays ArrayList) (java.util.concurrent.atomic AtomicInteger) (java.util.function IntConsumer Supplier) (java.util.stream IntStream) - java.security.MessageDigest (org.apache.arrow.memory BufferAllocator) (org.apache.arrow.vector VectorSchemaRoot) org.apache.arrow.vector.types.UnionMode (org.apache.arrow.vector.types.pojo ArrowType$Union Schema) + (org.roaringbitmap RoaringBitmap) (xtdb.object_store ObjectStore) - (xtdb.trie HashTrie HashTrie$Node LiveHashTrie LiveHashTrie$Leaf) + (xtdb.trie ArrowHashTrie ArrowHashTrie$Leaf HashTrie HashTrie$Node LiveHashTrie LiveHashTrie$Leaf) (xtdb.vector IVectorReader RelationReader))) (def ^:private ^java.lang.ThreadLocal !msg-digest @@ -149,14 +150,22 @@ {:leaf-buf leaf-buf :trie-buf (util/root->arrow-ipc-byte-buffer trie-vsr :file)})))) -(defn write-trie-bufs! [^ObjectStore obj-store, ^String dir, ^String chunk-idx +(defn- table-name->dir [table-name] (format "tables/%s/chunks" table-name)) + +(defn ->table-leaf-obj-key [table-name chunk-idx] + (format "%s/leaf-c%s.arrow" (table-name->dir table-name) chunk-idx)) + +(defn ->table-trie-obj-key [table-name chunk-idx] + (format "%s/trie-c%s.arrow" (table-name->dir table-name) chunk-idx)) + +(defn write-trie-bufs! [^ObjectStore obj-store, ^String table-name, ^String chunk-idx {:keys [^ByteBuffer leaf-buf ^ByteBuffer trie-buf]}] - (-> (.putObject obj-store (format "%s/leaf-c%s.arrow" dir chunk-idx) leaf-buf) + (-> (.putObject obj-store (->table-leaf-obj-key table-name chunk-idx) leaf-buf) (util/then-compose - (fn [_] - (.putObject obj-store (format "%s/trie-c%s.arrow" dir chunk-idx) trie-buf))))) + (fn [_] + (.putObject obj-store (->table-trie-obj-key table-name chunk-idx) trie-buf))))) -(defn trie-merge-tasks [tries] +(defn trie-merge-tasks [tries trie-idx->page-idxs] (letfn [(trie-merge-tasks* [nodes path] (let [trie-children (mapv #(some-> ^HashTrie$Node % (.children)) nodes)] (if-let [^objects first-children (some identity trie-children)] @@ -170,12 +179,26 @@ (conj path bucket-idx))))) [{:path (byte-array path) :leaves (->> nodes - (into [] (keep-indexed - (fn [ordinal ^HashTrie$Node node] - (when node - {:ordinal ordinal, :leaf node})))))}])))] - - (vec (trie-merge-tasks* (map #(some-> ^HashTrie % (.rootNode)) tries) [])))) + (map-indexed vector) + (reduce (fn [[take-previous? acc] [ordinal ^HashTrie$Node node]] + (if node + (condp = (class node) + ArrowHashTrie$Leaf + (let [^ArrowHashTrie$Leaf node node] + (if (or take-previous? + (some-> ^RoaringBitmap (get trie-idx->page-idxs ordinal) + (.contains (.getPageIndex node)))) + [true (conj acc {:ordinal ordinal, :leaf node})] + [false acc])) + LiveHashTrie$Leaf + [true (conj acc {:ordinal ordinal, :leaf node})]) + [take-previous? acc])) + [false []]) + second)}])))] + + (->> (trie-merge-tasks* (map #(some-> ^HashTrie % (.rootNode)) tries) []) + (filter (comp seq :leaves)) + vec))) (defn- bucket-for [^bytes iid level] (let [level-offset-bits (* HashTrie/LEVEL_BITS (inc level)) diff --git a/core/src/main/clojure/xtdb/util.clj b/core/src/main/clojure/xtdb/util.clj index e072c04cb6..aff106f2cd 100644 --- a/core/src/main/clojure/xtdb/util.clj +++ b/core/src/main/clojure/xtdb/util.clj @@ -20,11 +20,11 @@ java.time.temporal.ChronoUnit (java.util ArrayList Collections Date Iterator LinkedHashMap LinkedList Map Queue UUID WeakHashMap) (java.util.concurrent CompletableFuture ExecutionException ExecutorService Executors ThreadFactory TimeUnit) - (java.util.function Consumer Function Supplier) + (java.util.function BiFunction Consumer Function Supplier) (org.apache.arrow.compression CommonsCompressionFactory) (org.apache.arrow.flatbuf Footer Message RecordBatch) (org.apache.arrow.memory AllocationManager ArrowBuf BufferAllocator) - (org.apache.arrow.memory.util ArrowBufPointer ByteFunctionHelpers MemoryUtil) + (org.apache.arrow.memory.util ByteFunctionHelpers MemoryUtil) (org.apache.arrow.vector ValueVector VectorLoader VectorSchemaRoot) (org.apache.arrow.vector.ipc ArrowFileWriter ArrowStreamWriter ArrowWriter) (org.apache.arrow.vector.ipc.message ArrowBlock ArrowFooter ArrowRecordBatch MessageSerializer) @@ -344,6 +344,11 @@ (apply [_ v] (f v)))) +(defn ->jbifn {:style/indent :defn} ^java.util.function.BiFunction [f] + (reify BiFunction + (apply [_ a b] + (f a b)))) + (defn then-apply {:style/indent :defn} ^java.util.concurrent.CompletableFuture [^CompletableFuture fut f] diff --git a/src/test/clojure/xtdb/operator/scan_test.clj b/src/test/clojure/xtdb/operator/scan_test.clj index ea94585fac..0d83d62f5f 100644 --- a/src/test/clojure/xtdb/operator/scan_test.clj +++ b/src/test/clojure/xtdb/operator/scan_test.clj @@ -5,14 +5,12 @@ [xtdb.operator :as op] [xtdb.operator.scan :as scan] [xtdb.test-util :as tu] - [xtdb.trie :as trie] [xtdb.util :as util] [xtdb.vector.writer :as vw]) (:import (java.util LinkedList) xtdb.operator.IRaQuerySource xtdb.operator.IRelationSelector - (xtdb.operator.scan RowConsumer) - (xtdb.vector RelationReader))) + (xtdb.operator.scan RowConsumer))) (t/use-fixtures :each tu/with-mock-clock tu/with-allocator) @@ -57,6 +55,39 @@ (set (tu/query-ra '[:scan {:table xt_docs} [xt/id]] {:node node})))))) +(t/deftest test-metadata + (with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 20}})] + (->> (for [i (range 100)] + [:put :xt_docs {:xt/id i}]) + (partition-all 20) + (mapv #(xt/submit-tx node %))) + + (t/is (= (set (concat (for [i (range 20)] {:xt/id i}) (for [i (range 80 100)] {:xt/id i}))) + (set (tu/query-ra '[:scan {:table xt_docs} [{xt/id (or (< xt/id 20) + (>= xt/id 80))}]] + {:node node}))) + "testing only getting some trie matches")) + + (with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 20}})] + (xt/submit-tx node (for [i (range 20)] [:put :xt_docs {:xt/id i}])) + (xt/submit-tx node (for [i (range 20)] [:delete :xt_docs i])) + + (t/is (= [] + (tu/query-ra '[:scan {:table xt_docs} [{xt/id (< xt/id 20)}]] + {:node node})) + "testing newer chunks relevant even if not matching")) + + + (with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 16} + :xtdb.indexer/live-index {:page-limit 16}})] + (xt/submit-tx node (for [i (range 20)] [:put :xt_docs {:xt/id i}])) + + (t/is (= [{:xt/id 10}] + (tu/query-ra '[:scan {:table xt_docs} [{xt/id (and (< xt/id 11) + (> xt/id 9))}]] + {:node node})) + "testing when only certain pages match"))) + (t/deftest test-past-point-point-queries (with-open [node (node/start-node {})] (let [tx1 (xt/submit-tx node [[:put :xt_docs {:xt/id :doc1 :v 1} {:for-valid-time [:from #inst "2015"]}]