Skip to content

Commit

Permalink
remove the live-chunk files, xtdb#2663
Browse files Browse the repository at this point in the history
  • Loading branch information
jarohen committed Aug 4, 2023
1 parent cc24631 commit c044fe2
Show file tree
Hide file tree
Showing 105 changed files with 114 additions and 8,289 deletions.
219 changes: 70 additions & 149 deletions core/src/main/clojure/xtdb/indexer.clj

Large diffs are not rendered by default.

451 changes: 0 additions & 451 deletions core/src/main/clojure/xtdb/live_chunk.clj

This file was deleted.

3 changes: 0 additions & 3 deletions core/src/main/clojure/xtdb/metadata.clj
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@
(defn- ->table-metadata-obj-key [chunk-idx table-name]
(format "chunk-%s/%s/metadata.arrow" (util/->lex-hex-string chunk-idx) table-name))

(defn ->chunk-obj-key [chunk-idx table-name column-name]
(format "chunk-%s/%s/content-%s.arrow" (util/->lex-hex-string chunk-idx) table-name column-name))

(defn- obj-key->chunk-idx [obj-key]
(some-> (second (re-matches #"chunk-metadata/(\p{XDigit}+).transit.json" obj-key))
(util/<-lex-hex-string)))
Expand Down
1 change: 0 additions & 1 deletion core/src/main/clojure/xtdb/node.clj
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@
:xtdb/allocator {}
:xtdb/default-tz nil
:xtdb/indexer {}
:xtdb/live-chunk {}
:xtdb.indexer/live-index {}
:xtdb/ingester {}
:xtdb.metadata/metadata-manager {}
Expand Down
14 changes: 4 additions & 10 deletions core/src/main/clojure/xtdb/watermark.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@
(:require [clojure.tools.logging :as log]
xtdb.api.protocols
xtdb.indexer.live-index
xtdb.live-chunk
[xtdb.util :as util])
(:import java.lang.AutoCloseable
java.util.concurrent.atomic.AtomicInteger
xtdb.api.protocols.TransactionInstant
xtdb.indexer.live_index.ILiveIndexWatermark
xtdb.live_chunk.ILiveChunkWatermark))
xtdb.indexer.live_index.ILiveIndexWatermark))

#_{:clj-kondo/ignore [:unused-binding :clojure-lsp/unused-public-var]}
(definterface IWatermark
(^xtdb.api.protocols.TransactionInstant txBasis [])
(^xtdb.live_chunk.ILiveChunkWatermark liveChunk [])
(^xtdb.indexer.live_index.ILiveIndexWatermark liveIndex [])

(^void retain [])
Expand All @@ -25,11 +22,9 @@
(definterface IWatermarkSource
(^xtdb.watermark.IWatermark openWatermark [^xtdb.api.protocols.TransactionInstant txKey]))

(deftype Watermark [^TransactionInstant tx-key, ^ILiveChunkWatermark live-chunk, ^ILiveIndexWatermark live-idx-wm
^AtomicInteger ref-cnt]
(deftype Watermark [^TransactionInstant tx-key, ^ILiveIndexWatermark live-idx-wm, ^AtomicInteger ref-cnt]
IWatermark
(txBasis [_] tx-key)
(liveChunk [_] live-chunk)
(liveIndex [_] live-idx-wm)

(retain [this]
Expand All @@ -43,8 +38,7 @@

(when (zero? (.decrementAndGet ref-cnt))
(log/trace "close wm" (hash this))
(util/try-close live-chunk)
(util/try-close live-idx-wm))))

(defn ->wm ^xtdb.watermark.IWatermark [tx-key live-chunk live-idx-wm]
(Watermark. tx-key live-chunk live-idx-wm (AtomicInteger. 1)))
(defn ->wm ^xtdb.watermark.IWatermark [tx-key live-idx-wm]
(Watermark. tx-key live-idx-wm (AtomicInteger. 1)))
6 changes: 1 addition & 5 deletions src/main/clojure/xtdb/test_util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@
(with-opts {:xtdb.log/memory-log {:instant-src (->mock-clock)}} f))

(defn finish-chunk! [node]
(idx/finish-block! (component node :xtdb/indexer))
(idx/finish-chunk! (component node :xtdb/indexer)))

(defn open-vec
Expand Down Expand Up @@ -246,10 +245,7 @@
:xtdb.buffer-pool/buffer-pool {:cache-path (.resolve node-dir buffers-dir)}
:xtdb.object-store/file-system-object-store {:root-path (.resolve node-dir "objects")}
:xtdb/indexer (->> {:rows-per-chunk rows-per-chunk}
(into {} (filter val)))
:xtdb/live-chunk (->> {:rows-per-block rows-per-block
:rows-per-chunk rows-per-chunk}
(into {} (filter val)))})))
(into {} (filter val)))})))

(defn ->local-submit-node ^java.lang.AutoCloseable [{:keys [^Path node-dir]}]
(node/start-submit-node {:xtdb.tx-producer/tx-producer {:clock (->mock-clock)}
Expand Down
10 changes: 5 additions & 5 deletions src/test/clojure/xtdb/datalog_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@
:spectre]))))

(t/deftest bug-non-string-table-names-599
(with-open [node (node/start-node {:xtdb/live-chunk {:rows-per-block 10, :rows-per-chunk 1000}})]
(with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 1000}})]
(letfn [(submit-ops! [ids]
(last (for [tx-ops (->> (for [id ids]
[:put :t1 {:xt/id id,
Expand All @@ -1095,7 +1095,7 @@
(t/is (= 160 (count-table tx)))))))

(t/deftest bug-dont-throw-on-non-existing-column-597
(with-open [node (node/start-node {:xtdb/live-chunk {:rows-per-block 10, :rows-per-chunk 1000}})]
(with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 1000}})]
(letfn [(submit-ops! [ids]
(last (for [tx-ops (->> (for [id ids]
[:put :t1 {:xt/id id,
Expand All @@ -1117,7 +1117,7 @@
:where [(match :xt_docs [xt/id some-attr])]}))))))

(t/deftest add-better-metadata-support-for-keywords
(with-open [node (node/start-node {:xtdb/live-chunk {:rows-per-block 10, :rows-per-chunk 1000}})]
(with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 1000}})]
(letfn [(submit-ops! [ids]
(last (for [tx-ops (->> (for [id ids]
[:put :t1 {:xt/id id,
Expand Down Expand Up @@ -2190,7 +2190,7 @@
:where [(match :foo {:xt/id id})]}))))

(t/deftest test-metadata-filtering-for-time-data-607
(with-open [node (node/start-node {:xtdb/live-chunk {:rows-per-block 1, :rows-per-chunk 1}})]
(with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 1}})]
(xt/submit-tx node [[:put :xt_docs {:xt/id 1 :start-date #time/date "2000-01-01"}]
[:put :xt_docs {:xt/id 2 :start-date #time/date "3000-01-01"}]])
(t/is (= [{:id 1}]
Expand Down Expand Up @@ -2319,7 +2319,7 @@
[(= #time/time "08:12:13.366" #time/time "08:12:13.366") d]]}))))

(t/deftest bug-temporal-queries-wrong-at-boundary-2531
(with-open [node (node/start-node {:xtdb/live-chunk {:rows-per-block 10, :rows-per-chunk 10}
(with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 10}
:xtdb.tx-producer/tx-producer {:instant-src (tu/->mock-clock)}
:xtdb.log/memory-log {:instant-src (tu/->mock-clock)}})]
(doseq [i (range 10)]
Expand Down
119 changes: 25 additions & 94 deletions src/test/clojure/xtdb/indexer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -99,35 +99,17 @@
(t/is (= last-tx-key
(tu/then-await-tx last-tx-key node (Duration/ofSeconds 2))))

(t/testing "watermark"
(with-open [^IWatermark watermark (.openWatermark wm-src last-tx-key)]
(let [live-blocks (-> (.liveChunk watermark)
(.liveTable "device_info")
(.liveBlocks #{"xt$id" "model"} nil))
!res (volatile! [])]
(.forEachRemaining live-blocks
(reify Consumer
(accept [_ content-cols]
(vswap! !res conj (vr/rel->rows content-cols)))))

(t/is (= [[{:xt$id "device-info-demo000000", :model "pinto"}
{:xt$id "device-info-demo000001", :model "mustang"}]]
@!res)))))

(tu/finish-chunk! node)

(t/is (= {:latest-completed-tx last-tx-key
:next-chunk-idx 6}
(-> (meta/latest-chunk-metadata mm)
(select-keys [:latest-completed-tx :next-chunk-idx]))))

(let [objects-list (->> (.listObjects os "chunk-00/device_info") (filter #(str/ends-with? % "/metadata.arrow")))]
(t/is (= 1 (count objects-list)))
(t/is (= ["chunk-00/device_info/metadata.arrow"] objects-list)))

(tj/check-json (.toPath (io/as-file (io/resource "xtdb/indexer-test/can-build-chunk-as-arrow-ipc-file-format")))
(.resolve node-dir "objects"))

#_ ; TODO port to buffer pool test that doesn't depend on the structure of the indexer
(t/testing "buffer pool"
(let [buffer-name "chunk-00/device_info/metadata.arrow"
^ArrowBuf buffer @(.getBuffer bp buffer-name)
Expand Down Expand Up @@ -419,10 +401,12 @@
(select-keys [:latest-completed-tx :next-chunk-idx]))))

(let [objs (.listObjects os)]
(t/is (= 1 (count (filter #(re-matches #"chunk-\p{XDigit}+/device_info/metadata\.arrow" %) objs))))
(t/is (= 4 (count (filter #(re-matches #"chunk-\p{XDigit}+/device_readings/metadata\.arrow" %) objs))))
(t/is (= 1 (count (filter #(re-matches #"chunk-.*/device_info/content-api_version\.arrow" %) objs))))
(t/is (= 4 (count (filter #(re-matches #"chunk-.*/device_readings/content-battery_level\.arrow" %) objs))))))))))
(t/is (= 4 (count (filter #(re-matches #"chunk-metadata/\p{XDigit}+\.transit.json" %) objs))))
(t/is (= 2 (count (filter #(re-matches #"tables/device_info/chunks/.+\.arrow" %) objs))))
(t/is (= 4 (count (filter #(re-matches #"tables/device_readings/chunks/leaf-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 4 (count (filter #(re-matches #"tables/device_readings/chunks/trie-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 4 (count (filter #(re-matches #"tables/xt\$txs/chunks/leaf-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 4 (count (filter #(re-matches #"tables/xt\$txs/chunks/trie-\p{XDigit}+\.arrow" %) objs))))))))))

(t/deftest can-ingest-ts-devices-mini-into-multiple-nodes
(let [node-dir (util/->path "target/can-ingest-ts-devices-mini-into-multiple-nodes")
Expand Down Expand Up @@ -455,9 +439,12 @@
(t/is (= last-tx-key (tu/latest-completed-tx node)))

(let [objs (.listObjects os)]
(t/is (= 13 (count (filter #(re-matches #"chunk-\p{XDigit}+/device_(?:info|readings)/metadata.arrow" %) objs))))
(t/is (= 2 (count (filter #(re-matches #"chunk-\p{XDigit}+/device_info/content-api_version\.arrow" %) objs))))
(t/is (= 11 (count (filter #(re-matches #"chunk-\p{XDigit}+/device_readings/content-battery_level\.arrow" %) objs)))))))))))
(t/is (= 11 (count (filter #(re-matches #"chunk-metadata/\p{XDigit}+\.transit.json" %) objs))))
(t/is (= 4 (count (filter #(re-matches #"tables/device_info/chunks/.+\.arrow" %) objs))))
(t/is (= 11 (count (filter #(re-matches #"tables/device_readings/chunks/leaf-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 11 (count (filter #(re-matches #"tables/device_readings/chunks/trie-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 11 (count (filter #(re-matches #"tables/xt\$txs/chunks/leaf-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 11 (count (filter #(re-matches #"tables/xt\$txs/chunks/trie-\p{XDigit}+\.arrow" %) objs)))))))))))

(t/deftest can-ingest-ts-devices-mini-with-stop-start-and-reach-same-state
(let [node-dir (util/->path "target/can-ingest-ts-devices-mini-with-stop-start-and-reach-same-state")
Expand Down Expand Up @@ -499,10 +486,12 @@
(t/is (< next-chunk-idx (count first-half-tx-ops)))

(let [objs (.listObjects os)]
(t/is (= 2 (count (filter #(re-matches #"chunk-\p{XDigit}+/device_info/metadata\.arrow" %) objs))))
(t/is (= 5 (count (filter #(re-matches #"chunk-\p{XDigit}+/device_readings/metadata\.arrow" %) objs))))
(t/is (= 2 (count (filter #(re-matches #"chunk-.*/device_info/content-api_version\.arrow" %) objs))))
(t/is (= 5 (count (filter #(re-matches #"chunk-.*/device_readings/content-battery_level\.arrow" %) objs))))))
(t/is (= 5 (count (filter #(re-matches #"chunk-metadata/\p{XDigit}+\.transit.json" %) objs))))
(t/is (= 4 (count (filter #(re-matches #"tables/device_info/chunks/.+\.arrow" %) objs))))
(t/is (= 5 (count (filter #(re-matches #"tables/device_readings/chunks/leaf-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 5 (count (filter #(re-matches #"tables/device_readings/chunks/trie-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 5 (count (filter #(re-matches #"tables/xt\$txs/chunks/leaf-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 5 (count (filter #(re-matches #"tables/xt\$txs/chunks/trie-\p{XDigit}+\.arrow" %) objs))))))

(t/is (= :utf8 (.columnType mm "device_readings" "xt$id")))

Expand Down Expand Up @@ -538,10 +527,12 @@
^IMetadataManager mm (tu/component node ::meta/metadata-manager)]]

(let [objs (.listObjects os)]
(t/is (= 2 (count (filter #(re-matches #"chunk-\p{XDigit}+/device_info/metadata\.arrow" %) objs))))
(t/is (= 11 (count (filter #(re-matches #"chunk-\p{XDigit}+/device_readings/metadata\.arrow" %) objs))))
(t/is (= 2 (count (filter #(re-matches #"chunk-.*/device_info/content-api_version\.arrow" %) objs))))
(t/is (= 11 (count (filter #(re-matches #"chunk-.*/device_readings/content-battery_level\.arrow" %) objs)))))
(t/is (= 11 (count (filter #(re-matches #"chunk-metadata/\p{XDigit}+\.transit.json" %) objs))))
(t/is (= 4 (count (filter #(re-matches #"tables/device_info/chunks/.+\.arrow" %) objs))))
(t/is (= 11 (count (filter #(re-matches #"tables/device_readings/chunks/leaf-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 11 (count (filter #(re-matches #"tables/device_readings/chunks/trie-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 11 (count (filter #(re-matches #"tables/xt\$txs/chunks/leaf-\p{XDigit}+\.arrow" %) objs))))
(t/is (= 11 (count (filter #(re-matches #"tables/xt\$txs/chunks/trie-\p{XDigit}+\.arrow" %) objs)))))

(t/is (= :utf8 (.columnType mm "device_info" "xt$id")))))))))))))

Expand Down Expand Up @@ -625,63 +616,3 @@

(tj/check-json (.toPath (io/as-file (io/resource "xtdb/indexer-test/can-index-sql-insert")))
(.resolve node-dir "objects"))))))

(deftest test-skips-irrelevant-live-blocks-632
(with-open [node (node/start-node {:xtdb/live-chunk {:rows-per-block 2, :rows-per-chunk 10}})]
(-> (xt/submit-tx node [[:put :xt_docs {:name "Håkan", :xt/id :hak}]])
(tu/then-await-tx node))

(tu/finish-chunk! node)

(xt/submit-tx node [[:put :xt_docs {:name "Dan", :xt/id :dan}]
[:put :xt_docs {:name "Ivan", :xt/id :iva}]])

(-> (xt/submit-tx node [[:put :xt_docs {:name "James", :xt/id :jms}]
[:put :xt_docs {:name "Jon", :xt/id :jon}]])
(tu/then-await-tx node))

(let [^IMetadataManager metadata-mgr (tu/component node ::meta/metadata-manager)
^IWatermarkSource wm-src (tu/component node :xtdb/indexer)]
(with-open [params (tu/open-params {'?name "Ivan"})]
(let [gt-literal-selector (expr.meta/->metadata-selector '(> name "Ivan") '{name :utf8} {})
gt-param-selector (expr.meta/->metadata-selector '(> name ?name) '{name :utf8} params)]

(t/is (= #{0} (set (keys (.chunksMetadata metadata-mgr)))))

(letfn [(test-live-blocks [^IWatermark wm, metadata-pred]
(with-open [live-blocks (-> (.liveChunk wm)
(.liveTable "xt_docs")
(.liveBlocks #{"_row_id" "name"} metadata-pred))]
(let [!res (atom [])]
(.forEachRemaining live-blocks
(reify Consumer
(accept [_ in-rel]
(swap! !res conj (vr/rel->rows in-rel)))))
@!res)))]

(with-open [wm1 (.openWatermark wm-src nil)]
(t/is (= [[{:_row_id 2, :name "Dan"} {:_row_id 3, :name "Ivan"}]
[{:_row_id 5, :name "James"} {:_row_id 6, :name "Jon"}]]
(test-live-blocks wm1 nil))
"no selector")

(t/is (= [[{:_row_id 5, :name "James"} {:_row_id 6, :name "Jon"}]]
(test-live-blocks wm1 gt-literal-selector))
"only second block, literal selector")

(t/is (= [[{:_row_id 5, :name "James"} {:_row_id 6, :name "Jon"}]]
(test-live-blocks wm1 gt-param-selector))
"only second block, param selector")

(let [next-tx (-> (xt/submit-tx node [[:put :xt_docs {:name "Jeremy", :xt/id :jdt}]])
(tu/then-await-tx node))]

(with-open [wm2 (.openWatermark wm-src next-tx)]
(t/is (= [[{:_row_id 5, :name "James"} {:_row_id 6, :name "Jon"}]]
(test-live-blocks wm1 gt-literal-selector))
"replay with wm1")

(t/is (= [[{:_row_id 5, :name "James"} {:_row_id 6, :name "Jon"}]
[{:_row_id 8, :name "Jeremy"}]]
(test-live-blocks wm2 gt-literal-selector))
"now on wm2"))))))))))
2 changes: 1 addition & 1 deletion src/test/clojure/xtdb/operator/scan_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
{:node node})))))

(t/deftest test-chunk-boundary
(with-open [node (node/start-node {:xtdb/live-chunk {:rows-per-block 5, :rows-per-chunk 20}})]
(with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 20}})]
(->> (for [i (range 110)]
[:put :xt_docs {:xt/id i}])
(partition-all 10)
Expand Down
11 changes: 7 additions & 4 deletions src/test/clojure/xtdb/operator_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
(t/use-fixtures :once tu/with-allocator)

(t/deftest test-find-gt-ivan
(with-open [node (node/start-node {:xtdb/live-chunk {:rows-per-block 2, :rows-per-chunk 10}})]
(with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 10}})]
(-> (xt/submit-tx node [[:put :xt_docs {:name "Håkan", :xt/id :hak}]])
(tu/then-await-tx node))

Expand All @@ -39,8 +39,9 @@

(t/is (= #{0 2} (set (keys (.chunksMetadata metadata-mgr)))))

#_ ; TODO reinstate metadata
(let [expected-match [(meta/map->ChunkMatch
{:chunk-idx 2, :block-idxs (doto (RoaringBitmap.) (.add 1)), :col-names #{"_row_id" "xt$id" "name"}})]]
{:chunk-idx 2, :block-idxs (doto (RoaringBitmap.) (.add 1)), :col-names #{"xt$id" "name"}})]]
(t/is (= expected-match
(meta/matching-chunks metadata-mgr "xt_docs"
(expr.meta/->metadata-selector '(> name "Ivan") '{name :utf8} {})))
Expand All @@ -63,7 +64,7 @@
tx2)))))))

(t/deftest test-find-eq-ivan
(with-open [node (node/start-node {:xtdb/live-chunk {:rows-per-block 3, :rows-per-chunk 10}})]
(with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 10}})]
(-> (xt/submit-tx node [[:put :xt_docs {:name "Håkan", :xt/id :hak}]
[:put :xt_docs {:name "James", :xt/id :jms}]
[:put :xt_docs {:name "Ivan", :xt/id :iva}]])
Expand All @@ -78,8 +79,10 @@
(tu/finish-chunk! node)
(let [^IMetadataManager metadata-mgr (tu/component node ::meta/metadata-manager)]
(t/is (= #{0 4} (set (keys (.chunksMetadata metadata-mgr)))))

#_ ; TODO reinstate metadata
(let [expected-match [(meta/map->ChunkMatch
{:chunk-idx 0, :block-idxs (doto (RoaringBitmap.) (.add 0)), :col-names #{"_row_id" "xt$id" "name"}})]]
{:chunk-idx 0, :block-idxs (doto (RoaringBitmap.) (.add 0)), :col-names #{"xt$id" "name"}})]]
(t/is (= expected-match
(meta/matching-chunks metadata-mgr "xt_docs"
(expr.meta/->metadata-selector '(= name "Ivan") '{name :utf8} {})))
Expand Down
3 changes: 1 addition & 2 deletions src/test/clojure/xtdb/stats_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
(t/use-fixtures :each tu/with-allocator)

(deftest test-scan
(with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 2}
:xtdb/live-chunk {:rows-per-block 2 , :rows-per-chunk 2}})]
(with-open [node (node/start-node {:xtdb/indexer {:rows-per-chunk 2}})]
(let [scan-emitter (util/component node :xtdb.operator.scan/scan-emitter)]
(xt/submit-tx node [[:put :foo {:xt/id "foo1"}]
[:put :bar {:xt/id "bar1"}]])
Expand Down
Loading

0 comments on commit c044fe2

Please sign in to comment.