Skip to content

Commit

Permalink
Merge branch 'remove-old-indexes-2663' into 2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
jarohen committed Aug 4, 2023
2 parents e69c7c9 + c044fe2 commit e33367f
Show file tree
Hide file tree
Showing 166 changed files with 429 additions and 13,409 deletions.
379 changes: 135 additions & 244 deletions core/src/main/clojure/xtdb/indexer.clj

Large diffs are not rendered by default.

67 changes: 0 additions & 67 deletions core/src/main/clojure/xtdb/indexer/internal_id_manager.clj

This file was deleted.

96 changes: 63 additions & 33 deletions core/src/main/clojure/xtdb/indexer/live_index.clj
Original file line number Diff line number Diff line change
@@ -1,46 +1,52 @@
(ns xtdb.indexer.live-index
(:require [juxt.clojars-mirrors.integrant.core :as ig]
[xtdb.buffer-pool]
[xtdb.metadata :as meta]
[xtdb.object-store]
[xtdb.trie :as trie]
[xtdb.types :as types]
[xtdb.util :as util]
[xtdb.vector.reader :as vr]
[xtdb.vector.writer :as vw])
(:import (java.lang AutoCloseable)
(:import [clojure.lang MapEntry]
(java.lang AutoCloseable)
(java.nio ByteBuffer)
(java.util ArrayList HashMap Map)
(java.util.concurrent CompletableFuture)
(java.util.function Function)
(org.apache.arrow.memory BufferAllocator)
[org.apache.arrow.vector.types.pojo Field]
(xtdb.object_store ObjectStore)
(xtdb.trie LiveHashTrie)
(xtdb.vector IRelationWriter IVectorWriter)))
(xtdb.vector IRelationWriter IVectorWriter RelationReader)))

;;
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(definterface ILiveTableWatermark
(^java.util.Map columnTypes [])
(^xtdb.vector.RelationReader liveRelation [])
(^xtdb.trie.LiveHashTrie liveTrie []))

#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(definterface ILiveTableTx
(^xtdb.indexer.live_index.ILiveTableWatermark openWatermark [^boolean retain])
(^xtdb.vector.IVectorWriter docWriter [])
(^void logPut [^bytes iid, ^long legacyIid, ^long validFrom, ^long validTo, writeDocFn])
(^void logDelete [^bytes iid, ^long legacyIid, ^long validFrom, ^long validTo])
(^void logEvict [^bytes iid, ^long legacyIid])
(^void logPut [^java.nio.ByteBuffer iid, ^long validFrom, ^long validTo, writeDocFn])
(^void logDelete [^java.nio.ByteBuffer iid, ^long validFrom, ^long validTo])
(^void logEvict [^java.nio.ByteBuffer iid])
(^void commit [])
(^void close []))

#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(definterface ILiveTable
(^xtdb.indexer.live_index.ILiveTableTx startTx [^xtdb.api.protocols.TransactionInstant txKey])
(^xtdb.indexer.live_index.ILiveTableWatermark openWatermark [^boolean retain])
(^java.util.concurrent.CompletableFuture #_<?> finishChunk [^long chunkIdx])
(^java.util.concurrent.CompletableFuture #_<List<Map$Entry>> finishChunk [^long chunkIdx])
(^void close []))

#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(definterface ILiveIndexWatermark
(^java.util.Map allColumnTypes [])
(^xtdb.indexer.live_index.ILiveTableWatermark liveTable [^String tableName]))

#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
Expand All @@ -55,13 +61,22 @@
(^xtdb.indexer.live_index.ILiveTable liveTable [^String tableName])
(^xtdb.indexer.live_index.ILiveIndexTx startTx [^xtdb.api.protocols.TransactionInstant txKey])
(^xtdb.indexer.live_index.ILiveIndexWatermark openWatermark [])
(^void finishChunk [^long chunkIdx])
(^java.util.Map finishChunk [^long chunkIdx])
(^void close []))

(defprotocol TestLiveTable
(^xtdb.trie.LiveHashTrie live-trie [test-live-table])
(^xtdb.vector.IRelationWriter live-rel [test-live-table]))

(defn- live-rel->col-types [^RelationReader live-rel]
(->> (for [^Field child-field (-> (.readerForName live-rel "op")
(.legReader :put)
(.structKeyReader "xt$doc")
(.getField)
(.getChildren))]
(MapEntry/create (.getName child-field) (types/field->col-type child-field)))
(into {})))

(defn- open-wm-live-rel ^xtdb.vector.RelationReader [^IRelationWriter rel, retain?]
(let [out-cols (ArrayList.)]
(try
Expand All @@ -78,7 +93,7 @@

(deftype LiveTable [^BufferAllocator allocator, ^ObjectStore obj-store, ^String table-name
^IRelationWriter live-rel, ^:unsynchronized-mutable ^LiveHashTrie live-trie
^IVectorWriter iid-wtr, ^IVectorWriter legacy-iid-wtr, ^IVectorWriter system-from-wtr
^IVectorWriter iid-wtr, ^IVectorWriter system-from-wtr
^IVectorWriter put-wtr, ^IVectorWriter put-valid-from-wtr, ^IVectorWriter put-valid-to-wtr, ^IVectorWriter put-doc-wtr
^IVectorWriter delete-wtr, ^IVectorWriter delete-valid-from-wtr, ^IVectorWriter delete-valid-to-wtr
^IVectorWriter evict-wtr]
Expand All @@ -89,11 +104,10 @@
(reify ILiveTableTx
(docWriter [_] put-doc-wtr)

(logPut [_ iid legacy-iid valid-from valid-to write-doc!]
(logPut [_ iid valid-from valid-to write-doc!]
(.startRow live-rel)

(.writeBytes iid-wtr (ByteBuffer/wrap iid))
(.writeLong legacy-iid-wtr legacy-iid)
(.writeBytes iid-wtr iid)
(.writeLong system-from-wtr system-from-µs)

(.startStruct put-wtr)
Expand All @@ -106,9 +120,8 @@

(swap! !transient-trie #(.add ^LiveHashTrie % (dec (.getPosition (.writerPosition live-rel))))))

(logDelete [_ iid legacy-iid valid-from valid-to]
(.writeBytes iid-wtr (ByteBuffer/wrap iid))
(.writeLong legacy-iid-wtr legacy-iid)
(logDelete [_ iid valid-from valid-to]
(.writeBytes iid-wtr iid)
(.writeLong system-from-wtr system-from-µs)

(.startStruct delete-wtr)
Expand All @@ -120,9 +133,8 @@

(swap! !transient-trie #(.add ^LiveHashTrie % (dec (.getPosition (.writerPosition live-rel))))))

(logEvict [_ iid legacy-iid]
(.writeBytes iid-wtr (ByteBuffer/wrap iid))
(.writeLong legacy-iid-wtr legacy-iid)
(logEvict [_ iid]
(.writeBytes iid-wtr iid)
(.writeLong system-from-wtr system-from-µs)

(.writeNull evict-wtr nil)
Expand All @@ -134,8 +146,10 @@
(openWatermark [_ retain?]
(locking this-table
(let [wm-live-rel (open-wm-live-rel live-rel retain?)
col-types (live-rel->col-types wm-live-rel)
wm-live-trie (.compactLogs ^LiveHashTrie @!transient-trie)]
(reify ILiveTableWatermark
(columnTypes [_] col-types)
(liveRelation [_] wm-live-rel)
(liveTrie [_] wm-live-trie)

Expand All @@ -151,15 +165,24 @@
(close [_]))))

(finishChunk [_ chunk-idx]
(when-let [bufs (trie/live-trie->bufs allocator (-> live-trie (.compactLogs)) (vw/rel-wtr->rdr live-rel))]
(let [chunk-idx-str (util/->lex-hex-string chunk-idx)]
(trie/write-trie-bufs! obj-store (format "tables/%s/chunks" table-name) chunk-idx-str bufs))))
(let [live-rel-rdr (vw/rel-wtr->rdr live-rel)]
(when-let [bufs (trie/live-trie->bufs allocator (-> live-trie (.compactLogs)) live-rel-rdr)]
(let [chunk-idx-str (util/->lex-hex-string chunk-idx)
!fut (trie/write-trie-bufs! obj-store (format "tables/%s/chunks" table-name) chunk-idx-str bufs)
table-metadata (MapEntry/create table-name
{:col-types (live-rel->col-types live-rel-rdr)
:row-count (.rowCount live-rel-rdr)})]
(-> !fut
(util/then-apply (fn [_] table-metadata)))))))

(openWatermark [this retain?]
(locking this
(let [wm-live-rel (open-wm-live-rel live-rel retain?)
col-types (live-rel->col-types wm-live-rel)
wm-live-trie (.compactLogs live-trie)]

(reify ILiveTableWatermark
(columnTypes [_] col-types)
(liveRelation [_] wm-live-rel)
(liveTrie [_] wm-live-trie)

Expand All @@ -182,12 +205,12 @@
op-wtr (.writerForName rel "op")
put-wtr (.writerForField op-wtr trie/put-field)
delete-wtr (.writerForField op-wtr trie/delete-field)]
(LiveTable. allocator object-store table-name rel
(LiveHashTrie/emptyTrie (.getVector iid-wtr))
iid-wtr (.writerForName rel "xt$legacy_iid") (.writerForName rel "xt$system_from")
put-wtr (.structKeyWriter put-wtr "xt$valid_from") (.structKeyWriter put-wtr "xt$valid_to") (.structKeyWriter put-wtr "xt$doc")
delete-wtr (.structKeyWriter delete-wtr "xt$valid_from") (.structKeyWriter delete-wtr "xt$valid_to")
(.writerForField op-wtr trie/evict-field)))))
(->LiveTable allocator object-store table-name rel
(LiveHashTrie/emptyTrie (.getVector iid-wtr))
iid-wtr (.writerForName rel "xt$system_from")
put-wtr (.structKeyWriter put-wtr "xt$valid_from") (.structKeyWriter put-wtr "xt$valid_to") (.structKeyWriter put-wtr "xt$doc")
delete-wtr (.structKeyWriter delete-wtr "xt$valid_from") (.structKeyWriter delete-wtr "xt$valid_to")
(.writerForField op-wtr trie/evict-field)))))

(defrecord LiveIndex [^BufferAllocator allocator, ^ObjectStore object-store, ^Map tables]
ILiveIndex
Expand Down Expand Up @@ -221,6 +244,7 @@
(util/->jfn (fn [_] (.openWatermark live-table false)))))

(reify ILiveIndexWatermark
(allColumnTypes [_] (update-vals wms #(.columnTypes ^ILiveTableWatermark %)))
(liveTable [_ table-name] (.get wms table-name))

AutoCloseable
Expand All @@ -236,21 +260,27 @@
(.put wms table-name (.openWatermark live-table true)))

(reify ILiveIndexWatermark
(allColumnTypes [_] (update-vals wms #(.columnTypes ^ILiveTableWatermark %)))

(liveTable [_ table-name] (.get wms table-name))

AutoCloseable
(close [_] (util/close wms)))))

(finishChunk [_ chunk-idx]
@(CompletableFuture/allOf (->> (for [^ILiveTable table (.values tables)]
(.finishChunk table chunk-idx))
(let [futs (->> (for [^ILiveTable table (.values tables)]
(.finishChunk table chunk-idx))

(remove nil?)
(into-array CompletableFuture))]

(remove nil?)
@(CompletableFuture/allOf futs)

(into-array CompletableFuture)))
(util/close tables)
(.clear tables)

(util/close tables)
(.clear tables))
(-> (into {} (keep deref) futs)
(util/rethrowing-cause))))

AutoCloseable
(close [_]
Expand All @@ -262,7 +292,7 @@
opts))

(defmethod ig/init-key :xtdb.indexer/live-index [_ {:keys [allocator object-store]}]
(LiveIndex. allocator object-store (HashMap.)))
(->LiveIndex allocator object-store (HashMap.)))

(defmethod ig/halt-key! :xtdb.indexer/live-index [_ live-idx]
(util/close live-idx))
Loading

0 comments on commit e33367f

Please sign in to comment.