Skip to content

Commit

Permalink
Fix xtdb#2700 and open-arrow-hash-trie to test-util
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Aug 22, 2023
1 parent 4f93551 commit a63374b
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 14 deletions.
2 changes: 1 addition & 1 deletion core/src/main/clojure/xtdb/indexer/rrrr.clj
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@
t1-root (open-arrow-hash-trie-root al [[nil 1 nil 3] 1 nil 3])
log-root (open-arrow-hash-trie-root al 1)
log2-root (open-arrow-hash-trie-root al [nil nil 3 4])]
(trie/->merge-plan [nil (ArrowHashTrie/from t1-root) (ArrowHashTrie/from log-root) (ArrowHashTrie/from log2-root)])))
(trie/->merge-plan [nil (ArrowHashTrie/from t1-root) (ArrowHashTrie/from log-root) (ArrowHashTrie/from log2-root)] nil)))
21 changes: 11 additions & 10 deletions core/src/main/clojure/xtdb/trie.clj
Original file line number Diff line number Diff line change
Expand Up @@ -222,20 +222,21 @@
(range (alength first-children)))
(mapv (fn [bucket-idx]
(->merge-plan* (mapv (fn [node ^objects node-children]
(if node-children
(aget node-children bucket-idx)
node))
nodes trie-children)
(conj path bucket-idx)
(inc level)))))]}
(if node-children
(aget node-children bucket-idx)
node))
nodes trie-children)
(conj path bucket-idx)
(inc level)))))]}
{:path (byte-array path)
:node [:leaf (->> nodes
(into [] (keep-indexed
(fn [ordinal ^HashTrie$Node leaf-node]
(condp = (class leaf-node)
ArrowHashTrie$Leaf {:ordinal ordinal,
:trie-leaf {:page-idx (.getPageIndex ^ArrowHashTrie$Leaf leaf-node)}}
(when leaf-node
(condp = (class leaf-node)
ArrowHashTrie$Leaf {:ordinal ordinal,
:trie-leaf {:page-idx (.getPageIndex ^ArrowHashTrie$Leaf leaf-node)}}

LiveHashTrie$Leaf {:ordinal ordinal, :trie-leaf leaf-node})))))]})))]
LiveHashTrie$Leaf {:ordinal ordinal, :trie-leaf leaf-node}))))))]})))]

(->merge-plan* (map #(some-> ^HashTrie % (.rootNode)) tries) [] 0)))
49 changes: 46 additions & 3 deletions src/main/clojure/xtdb/test_util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,25 @@
[xtdb.util :as util]
[xtdb.vector :as vec]
[xtdb.vector.reader :as vr]
[xtdb.vector.writer :as vw])
[xtdb.vector.writer :as vw]
[xtdb.trie :as trie])
(:import [ch.qos.logback.classic Level Logger]
clojure.lang.ExceptionInfo
java.net.ServerSocket
(java.nio.file Files Path)
java.nio.file.attribute.FileAttribute
(java.time Duration Instant Period)
(java.util LinkedList)
java.util.function.Consumer
(java.util.function Consumer IntConsumer)
(org.apache.arrow.memory BufferAllocator RootAllocator)
(org.apache.arrow.vector FieldVector VectorSchemaRoot)
(org.apache.arrow.vector.types.pojo Schema)
org.slf4j.LoggerFactory
(xtdb ICursor InstantSource)
xtdb.indexer.IIndexer
(xtdb.operator IRaQuerySource PreparedQuery)
(xtdb.vector IVectorReader RelationReader)))
(xtdb.vector IVectorReader RelationReader)
(java.util.stream IntStream)))

#_{:clj-kondo/ignore [:uninitialized-var]}
(def ^:dynamic ^org.apache.arrow.memory.BufferAllocator *allocator*)
Expand Down Expand Up @@ -299,3 +301,44 @@
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(defmacro with-log-level [ns level & body]
`(with-log-levels {~ns ~level} ~@body))

(defn open-arrow-hash-trie-root ^org.apache.arrow.vector.VectorSchemaRoot [^BufferAllocator al, paths]
(util/with-close-on-catch [trie-root (VectorSchemaRoot/create trie/trie-schema al)]
(let [trie-wtr (vw/root->writer trie-root)
trie-wp (.writerPosition trie-wtr)
nodes-wtr (.writerForName trie-wtr "nodes")
nil-wtr (.writerForTypeId nodes-wtr (byte 0))
branch-wtr (.writerForTypeId nodes-wtr (byte 1))
branch-el-wtr (.listElementWriter branch-wtr)
leaf-wtr (.writerForTypeId nodes-wtr (byte 2))
page-idx-wtr (.structKeyWriter leaf-wtr "page-idx")]
(letfn [(write-paths [paths]
(cond
(nil? paths) (.writeNull nil-wtr nil)

(number? paths) (do
(.startStruct leaf-wtr)
(.writeInt page-idx-wtr paths)
(.endStruct leaf-wtr))

(vector? paths) (let [!page-idxs (IntStream/builder)]
(doseq [child paths]
(.add !page-idxs (if child
(do
(write-paths child)
(dec (.getPosition trie-wp)))
-1)))
(.startList branch-wtr)
(.forEach (.build !page-idxs)
(reify IntConsumer
(accept [_ idx]
(if (= idx -1)
(.writeNull branch-el-wtr nil)
(.writeInt branch-el-wtr idx)))))
(.endList branch-wtr)))
(.endRow trie-wtr))]
(write-paths paths))

(.syncRowCount trie-wtr))

trie-root))
11 changes: 11 additions & 0 deletions src/test/clojure/xtdb/operator/scan_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
(set (tu/query-ra '[:scan {:table xt_docs} [xt/id]]
{:node node}))))))

(t/deftest test-smaller-page-limit
(with-open [node (node/start-node {:xtdb.indexer/live-index {:page-limit 16}})]
(xt/submit-tx node (for [i (range 20)] [:put :xt_docs {:xt/id i}]))

(tu/finish-chunk! node)

(t/is (= (into #{} (map #(hash-map :xt/id %)) (range 20))
(set (tu/query-ra '[:scan {:table xt_docs} [xt/id]]
{:node node}))))))


(t/deftest test-past-point-point-queries
(with-open [node (node/start-node {})]
(let [tx1 (xt/submit-tx node [[:put :xt_docs {:xt/id :doc1 :v 1} {:for-valid-time [:from #inst "2015"]}]
Expand Down
55 changes: 55 additions & 0 deletions src/test/clojure/xtdb/trie_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
(ns xtdb.trie-test
(:require [clojure.test :as t :refer [deftest]]
[clojure.walk :as walk]
[xtdb.test-util :as tu]
[xtdb.trie :as trie])
(:import (clojure.lang MapEntry)
(xtdb.trie ArrowHashTrie)
(org.apache.arrow.memory RootAllocator)))

(deftest test-merge-plan-with-nil-nodes-2700
(with-open [al (RootAllocator.)
t1-root (tu/open-arrow-hash-trie-root al [[nil 1 nil 2] 1 nil 3])
log-root (tu/open-arrow-hash-trie-root al 1)
log2-root (tu/open-arrow-hash-trie-root al [nil nil 1 2])]
(t/is (= {:path [],
:node
[:branch
[{:path [0],
:node
[:branch
[{:path [0 0],
:node [:leaf [{:ordinal 2, :trie-leaf {:page-idx 1}}]]}
{:path [0 1],
:node
[:leaf
[{:ordinal 1, :trie-leaf {:page-idx 1}}
{:ordinal 2, :trie-leaf {:page-idx 1}}]]}
{:path [0 2],
:node [:leaf [{:ordinal 2, :trie-leaf {:page-idx 1}}]]}
{:path [0 3],
:node
[:leaf
[{:ordinal 1, :trie-leaf {:page-idx 2}}
{:ordinal 2, :trie-leaf {:page-idx 1}}]]}]]}
{:path [1],
:node
[:leaf
[{:ordinal 1, :trie-leaf {:page-idx 1}}
{:ordinal 2, :trie-leaf {:page-idx 1}}]]}
{:path [2],
:node
[:leaf
[{:ordinal 2, :trie-leaf {:page-idx 1}}
{:ordinal 3, :trie-leaf {:page-idx 1}}]]}
{:path [3],
:node
[:leaf
[{:ordinal 1, :trie-leaf {:page-idx 3}}
{:ordinal 2, :trie-leaf {:page-idx 1}}
{:ordinal 3, :trie-leaf {:page-idx 2}}]]}]]}
(->> (trie/->merge-plan [nil (ArrowHashTrie/from t1-root) (ArrowHashTrie/from log-root) (ArrowHashTrie/from log2-root)] nil)
(walk/postwalk (fn [x]
(if (and (map-entry? x) (= :path (key x)))
(MapEntry/create :path (into [] (val x)))
x))))))))

0 comments on commit a63374b

Please sign in to comment.