Skip to content

Commit

Permalink
Move ->iid from indexer to xtdb.trie
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Aug 14, 2023
1 parent 9f82e10 commit 0d98bd5
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 32 deletions.
35 changes: 7 additions & 28 deletions core/src/main/clojure/xtdb/indexer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
[xtdb.operator.scan :as scan]
[xtdb.rewrite :refer [zmatch]]
[xtdb.sql :as sql]
[xtdb.trie :as trie]
[xtdb.tx-producer :as txp]
[xtdb.types :as types]
[xtdb.util :as util]
Expand All @@ -23,12 +24,10 @@
java.lang.AutoCloseable
java.nio.ByteBuffer
(java.nio.channels ClosedByInterruptException)
java.security.MessageDigest
(java.time Instant ZoneId)
(java.util Arrays)
(java.util.concurrent CompletableFuture PriorityBlockingQueue TimeUnit)
(java.util.concurrent.locks StampedLock)
(java.util.function Consumer Supplier)
(java.util.function Consumer)
(org.apache.arrow.memory BufferAllocator)
(org.apache.arrow.vector BitVector)
(org.apache.arrow.vector.complex DenseUnionVector ListVector)
Expand Down Expand Up @@ -65,26 +64,6 @@
(^org.apache.arrow.vector.complex.DenseUnionVector indexOp [^long tx-op-idx]
"returns a tx-ops-vec of more operations (mostly for `:call`)"))

(def ^:private ^java.lang.ThreadLocal !msg-digest
(ThreadLocal/withInitial
(reify Supplier
(get [_]
(MessageDigest/getInstance "SHA-256")))))

(defn- ->iid ^bytes [eid]
(ByteBuffer/wrap
(if (uuid? eid)
(util/uuid->bytes eid)

(let [^bytes eid-bytes (cond
(string? eid) (.getBytes (str "s" eid))
(keyword? eid) (.getBytes (str "k" eid))
(integer? eid) (.getBytes (str "i" eid))
:else (throw (UnsupportedOperationException. (pr-str (class eid)))))]
(-> ^MessageDigest (.get !msg-digest)
(.digest eid-bytes)
(Arrays/copyOfRange 0 16))))))

(defn- ->put-indexer ^xtdb.indexer.OpIndexer [^RowCounter row-counter, ^ILiveIndexTx live-idx-tx,
^IVectorReader tx-ops-rdr, ^Instant system-time]
(let [put-leg (.legReader tx-ops-rdr :put)
Expand Down Expand Up @@ -126,7 +105,7 @@
{:valid-from (util/micros->instant valid-from)
:valid-to (util/micros->instant valid-to)})))

(.logPut live-table (->iid eid) valid-from valid-to #(.copyRow doc-copier tx-op-idx))
(.logPut live-table (trie/->iid eid) valid-from valid-to #(.copyRow doc-copier tx-op-idx))
(.addRows row-counter 1))

nil))))
Expand Down Expand Up @@ -154,7 +133,7 @@
:valid-to (util/micros->instant valid-to)})))

(-> (.liveTable live-idx-tx table)
(.logDelete (->iid eid) valid-from valid-to))
(.logDelete (trie/->iid eid) valid-from valid-to))

(.addRows row-counter 1))

Expand All @@ -171,7 +150,7 @@
eid (.getObject id-rdr tx-op-idx)]

(-> (.liveTable live-idx-tx table)
(.logEvict (->iid eid)))
(.logEvict (trie/->iid eid)))

(.addRows row-counter 1))

Expand Down Expand Up @@ -325,7 +304,7 @@
{:valid-from (util/micros->instant valid-from)
:valid-to (util/micros->instant valid-to)})))

(.logPut live-idx-table (->iid eid) valid-from valid-to #(.copyRow live-idx-table-copier idx))))
(.logPut live-idx-table (trie/->iid eid) valid-from valid-to #(.copyRow live-idx-table-copier idx))))

(.addRows row-counter row-count))))))

Expand Down Expand Up @@ -437,7 +416,7 @@
live-table (.liveTable live-idx-tx txs-table)
doc-writer (.docWriter live-table)]

(.logPut live-table (->iid tx-id) system-time-µs util/end-of-time-μs
(.logPut live-table (trie/->iid tx-id) system-time-µs util/end-of-time-μs
(fn write-doc! []
(.startStruct doc-writer)
(doto (.structKeyWriter doc-writer "xt$id" :i64)
Expand Down
29 changes: 25 additions & 4 deletions core/src/main/clojure/xtdb/trie.clj
Original file line number Diff line number Diff line change
@@ -1,23 +1,44 @@
(ns xtdb.trie
(:require [xtdb.buffer-pool]
[xtdb.metadata :as meta]
[xtdb.object-store]
[xtdb.types :as types]
[xtdb.util :as util]
[xtdb.vector.writer :as vw]
[xtdb.metadata :as meta])
[xtdb.vector.writer :as vw])
(:import (java.nio ByteBuffer)
java.security.MessageDigest
(java.util Arrays)
(java.util.concurrent.atomic AtomicInteger)
(java.util.function IntConsumer)
(java.util.function IntConsumer Supplier)
(java.util.stream IntStream)
java.security.MessageDigest
(org.apache.arrow.memory BufferAllocator)
(org.apache.arrow.vector VectorSchemaRoot)
(org.apache.arrow.vector.types.pojo ArrowType$Union Schema)
org.apache.arrow.vector.types.UnionMode
(org.apache.arrow.vector.types.pojo ArrowType$Union Schema)
(xtdb.object_store ObjectStore)
(xtdb.trie HashTrie HashTrie$Node LiveHashTrie LiveHashTrie$Leaf)
(xtdb.vector IVectorReader RelationReader)))

(def ^:private ^java.lang.ThreadLocal !msg-digest
(ThreadLocal/withInitial
(reify Supplier
(get [_]
(MessageDigest/getInstance "SHA-256")))))

(defn ->iid ^ByteBuffer [eid]
(if (uuid? eid)
(util/uuid->byte-buffer eid)
(ByteBuffer/wrap
(let [^bytes eid-bytes (cond
(string? eid) (.getBytes (str "s" eid))
(keyword? eid) (.getBytes (str "k" eid))
(integer? eid) (.getBytes (str "i" eid))
:else (throw (UnsupportedOperationException. (pr-str (class eid)))))]
(-> ^MessageDigest (.get !msg-digest)
(.digest eid-bytes)
(Arrays/copyOfRange 0 16))))))

(def ^org.apache.arrow.vector.types.pojo.Schema trie-schema
(Schema. [(types/->field "nodes" (ArrowType$Union. UnionMode/Dense (int-array (range 3))) false
(types/col-type->field "nil" :null)
Expand Down

0 comments on commit 0d98bd5

Please sign in to comment.