Skip to content

Commit

Permalink
Add xt/txs to indexer for new tries
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Jul 11, 2023
1 parent 1fe5b00 commit 292f38b
Showing 1 changed file with 42 additions and 18 deletions.
60 changes: 42 additions & 18 deletions core/src/main/clojure/xtdb/indexer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -466,33 +466,55 @@
(def ^:private ^:const ^String txs-table
"xt$txs")

(defn- add-tx-row! [^ILiveChunkTx live-chunk-tx, ^ITemporalTxIndexer temporal-tx, ^IInternalIdManager iid-mgr, ^TransactionInstant tx-key, ^Throwable t]
(defn- add-tx-row! [^BufferAllocator allocator,
^ILiveChunkTx live-chunk-tx, ^ILiveIndexTx live-index-tx ,^ITemporalTxIndexer temporal-tx,
^IInternalIdManager iid-mgr, ^TransactionInstant tx-key, ^Throwable t]
(let [tx-id (.tx-id tx-key)
system-time-µs (util/instant->micros (.system-time tx-key))
live-table (.liveTable live-chunk-tx txs-table)
live-index-table (.liveTable live-index-tx txs-table)
row-id (.nextRowId live-chunk-tx)
writer (.writer live-table)
iid (.getOrCreateInternalId iid-mgr txs-table tx-id row-id)]
(with-open [table-wtr (vw/->rel-writer allocator)]

(.writeRowId live-table row-id)
(.writeRowId live-table row-id)

(doto (.writerForName writer "xt$id" :i64)
(.writeLong tx-id))
(doto (.writerForName writer "xt$id" :i64)
(.writeLong tx-id))
(doto (.writerForName table-wtr "xt$id" :i64)
(.writeLong tx-id))

(doto (.writerForName writer "xt$tx_time" t/temporal-col-type)
(.writeLong system-time-µs))

(doto (.writerForName writer "xt$committed?" :bool)
(.writeBoolean (nil? t)))
(doto (.writerForName writer "xt$tx_time" t/temporal-col-type)
(.writeLong system-time-µs))
(doto (.writerForName table-wtr "xt$tx_time" t/temporal-col-type)
(.writeLong system-time-µs))

(let [e-wtr (.writerForName writer "xt$error" [:union #{:null :clj-form}])]
(if (or (nil? t) (= t abort-exn))
(doto (.writerForType e-wtr :null)
(.writeNull nil))
(doto (.writerForType e-wtr :clj-form)
(.writeObject (pr-str t)))))
(doto (.writerForName writer "xt$committed?" :bool)
(.writeBoolean (nil? t)))
(doto (.writerForName table-wtr "xt$committed?" :bool)
(.writeBoolean (nil? t)))

(.indexPut temporal-tx iid row-id system-time-µs util/end-of-time-μs true)))

(let [e-wtr (.writerForName writer "xt$error" [:union #{:null :clj-form}])
table-e-wtr (.writerForName table-wtr "xt$error" [:union #{:null :clj-form}]) ]
(if (or (nil? t) (= t abort-exn))
(do
(doto (.writerForType e-wtr :null)
(.writeNull nil))
(doto (.writerForType table-e-wtr :null)
(.writeNull nil)))
(do (doto (.writerForType e-wtr :clj-form)
(.writeObject (pr-str t)))
(doto (.writerForType table-e-wtr :clj-form)
(.writeObject (pr-str t))))))

(.endRow table-wtr)
(let [table-row-copier (.rowCopier live-index-table (vw/rel-wtr->rdr table-wtr))]
(.logPut live-index-table (->iid tx-id) system-time-µs util/end-of-time-μs table-row-copier 0))

(.indexPut temporal-tx iid row-id system-time-µs util/end-of-time-μs true))))

(deftype Indexer [^BufferAllocator allocator
^ObjectStore object-store
Expand Down Expand Up @@ -570,14 +592,16 @@
(log/debug e "aborted tx"))
(.abort temporal-idxer)

(with-open [live-chunk-tx (.startTx live-chunk)]
(with-open [live-chunk-tx (.startTx live-chunk)
live-idx-tx (.startTx live-idx tx-key)]
(let [temporal-tx (.startTx temporal-mgr tx-key)]
(add-tx-row! live-chunk-tx temporal-tx iid-mgr tx-key e)
(add-tx-row! allocator live-chunk-tx live-idx-tx temporal-tx iid-mgr tx-key e)
(.commit live-chunk-tx)
(.commit live-idx-tx)
(.commit temporal-tx))))

(do
(add-tx-row! live-chunk-tx temporal-idxer iid-mgr tx-key nil)
(add-tx-row! allocator live-chunk-tx live-idx-tx temporal-idxer iid-mgr tx-key nil)

(.commit live-chunk-tx)
(.commit live-idx-tx)
Expand Down

0 comments on commit 292f38b

Please sign in to comment.