Skip to content

Commit

Permalink
squash
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Jul 19, 2023
1 parent 6a4c2b9 commit 6e2aaf1
Show file tree
Hide file tree
Showing 9 changed files with 1,043 additions and 365 deletions.
1 change: 0 additions & 1 deletion core/src/main/clojure/xtdb/indexer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
(defn- ->iid ^bytes [eid]
(if (uuid? eid)
(util/uuid->bytes eid)

(let [^bytes eid-bytes (cond
(string? eid) (.getBytes (str "s" eid))
(keyword? eid) (.getBytes (str "k" eid))
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/clojure/xtdb/indexer/live_index.clj
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
;; TODO metadata
(types/col-type->field "leaf" '[:struct {page-idx :i32}]))]))

(defn ->leaf-obj-key [table-name chunk-idx]
(format "tables/%s/chunks/leaf-c%s.arrow" table-name chunk-idx))

(defn ->trie-obj-key [table-name chunk-idx]
(format "tables/%s/chunks/trie-c%s.arrow" table-name chunk-idx))

(defn- write-trie!
^java.util.concurrent.CompletableFuture [^BufferAllocator allocator, ^ObjectStore obj-store,
^String table-name, ^String chunk-idx,
Expand All @@ -99,7 +105,7 @@
copier (vw/->rel-copier leaf-rel-wtr leaf-rel)]

(-> (.putObject obj-store
(format "tables/%s/chunks/leaf-c%s.arrow" table-name chunk-idx)
(->leaf-obj-key table-name chunk-idx)
(util/build-arrow-ipc-byte-buffer leaf-vsr :file
(fn [write-batch!]
(.accept trie
Expand Down Expand Up @@ -141,7 +147,7 @@
(fn [_]
(.syncRowCount trie-rel-wtr)
(.putObject obj-store
(format "tables/%s/chunks/trie-c%s.arrow" table-name chunk-idx)
(->trie-obj-key table-name chunk-idx)
(util/root->arrow-ipc-byte-buffer trie-vsr :file))))

(.whenComplete (reify BiConsumer
Expand Down Expand Up @@ -288,7 +294,10 @@
put-wtr (.writerForField op-wtr put-field)
delete-wtr (.writerForField op-wtr delete-field)]
(LiveTable. allocator object-store table-name rel
(LiveTrie/emptyTrie (TrieKeys. (.getVector iid-wtr)))
#_(LiveTrie/emptyTrie (TrieKeys. (.getVector iid-wtr)))
(.build (doto (LiveTrie/builder (TrieKeys. (.getVector iid-wtr)))
(.setPageLimit 4)
(.setLogLimit 4)))
iid-wtr (.writerForName rel "xt$system_from")
put-wtr (.structKeyWriter put-wtr "xt$valid_from") (.structKeyWriter put-wtr "xt$valid_to") (.structKeyWriter put-wtr "xt$doc")
delete-wtr (.structKeyWriter delete-wtr "xt$valid_from") (.structKeyWriter delete-wtr "xt$valid_to")
Expand Down
476 changes: 452 additions & 24 deletions core/src/main/clojure/xtdb/operator/scan.clj

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions core/src/main/clojure/xtdb/types.clj
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@
(into #{} (map field->col-type))
(apply merge-col-types)))

;; strict
(defn union-type->col-type [union-field]
[:union (into #{} (map field->col-type (.getChildren union-field)))])

(defn col-type->union-type ^org.apache.arrow.vector.types.pojo.Field [col-name [_ col-types]]
(apply ->field col-name (.getType Types$MinorType/DENSEUNION) false
(map col-type->field col-types)))

;;; number

(defmethod arrow-type->col-type ArrowType$Int [^ArrowType$Int arrow-type]
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/clojure/xtdb/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@
(.putLong (.getLeastSignificantBits uuid)))]
(.array bb)))

(defn bytes->uuid ^UUID [^bytes bytes]
(let [bb (ByteBuffer/wrap bytes)]
(UUID. (.getLong bb) (.getLong bb))))

(defn ->lex-hex-string
"Turn a long into a lexicographically-sortable hex string by prepending the length"
[^long l]
Expand Down
107 changes: 104 additions & 3 deletions core/src/main/clojure/xtdb/vector/writer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
(org.apache.arrow.memory BufferAllocator)
(org.apache.arrow.vector BigIntVector BitVector DecimalVector DateDayVector DateMilliVector DurationVector ExtensionTypeVector FixedSizeBinaryVector Float4Vector Float8Vector IntVector IntervalDayVector IntervalMonthDayNanoVector IntervalYearVector NullVector PeriodDuration SmallIntVector TimeMicroVector TimeMilliVector TimeNanoVector TimeSecVector TimeStampVector TinyIntVector ValueVector VarBinaryVector VarCharVector VectorSchemaRoot)
(org.apache.arrow.vector.complex DenseUnionVector ListVector StructVector)
(org.apache.arrow.vector.types Types$MinorType)
(org.apache.arrow.vector.types.pojo ArrowType$List ArrowType$Struct ArrowType$Union Field FieldType)
xtdb.api.protocols.ClojureForm
(xtdb.types IntervalDayTime IntervalMonthDayNano IntervalYearMonth)
Expand Down Expand Up @@ -609,8 +610,10 @@
(aset copier-mapping src-type-id
;; HACK to make things work for named duv legs
(if-not (= child-field-name (types/col-type->field-name col-type))
(.rowCopier (.writerForField dest-col child-field)
(.getVectorByType src-vec src-type-id))
(do #_(prn "child-field-name "child-field-name)
#_(prn "col-type-name" (types/col-type->field-name col-type))
(.rowCopier (.writerForField dest-col child-field)
(.getVectorByType src-vec src-type-id)))
(.rowCopier (.writerForType dest-col col-type)
(.getVectorByType src-vec src-type-id))))))

Expand Down Expand Up @@ -810,7 +813,6 @@
(apply types/merge-col-types))])
(into {})))


(defn ->vec-writer
(^xtdb.vector.IVectorWriter [^BufferAllocator allocator, col-name]
(->writer (-> (types/->field col-name types/dense-union-type false)
Expand All @@ -820,6 +822,20 @@
(->writer (-> (types/col-type->field col-name col-type)
(.createVector allocator)))))

;; HACK to not squash union types
(defn ->strict-vec-writer
(^xtdb.vector.IVectorWriter [^BufferAllocator allocator, col-name]
(->writer (-> (types/->field col-name types/dense-union-type false)
(.createVector allocator))))

(^xtdb.vector.IVectorWriter [^BufferAllocator allocator, col-name, col-type]
(->writer (-> (if (types/union? col-type)
;; FIXME names are not preserved in the transformation, so this creates unnessary legs
;; (types/col-type->union-type col-name col-type)
(types/->field col-name types/dense-union-type false)
(types/col-type->field col-name col-type))
(.createVector allocator)))))

(defn ->rel-copier ^xtdb.vector.IRowCopier [^IRelationWriter rel-wtr, ^RelationReader in-rel]
(let [wp (.writerPosition rel-wtr)
copiers (vec (concat (for [^IVectorReader in-vec in-rel]
Expand Down Expand Up @@ -867,6 +883,19 @@
(populate-with-absents pos))
(->vec-writer allocator col-name col-type)))))))

(writerForName [this col-name col-type strict]
(if-not strict
(.writerForName this col-name col-type)
(.computeIfAbsent writers col-name
(reify Function
(apply [_ col-name]
(let [pos (.getPosition wp)]
(if (pos? pos)
(doto (->strict-vec-writer allocator col-name (types/merge-col-types col-type :absent))
(populate-with-absents pos))
(->strict-vec-writer allocator col-name col-type))))))))


(rowCopier [this in-rel] (->rel-copier this in-rel))

(iterator [_] (.iterator (.entrySet writers)))
Expand All @@ -875,6 +904,69 @@
(close [this]
(run! util/try-close (vals this))))))

(defn strict-field->col-type [^org.apache.arrow.vector.types.pojo.Field field]
(if (= (.getType field) (.getType Types$MinorType/DENSEUNION))
(types/union-type->col-type field)
(types/field->col-type field)))

(defn ->strict-rel-copier ^xtdb.vector.IRowCopier [^IRelationWriter rel-wtr, ^RelationReader in-rel]
(let [wp (.writerPosition rel-wtr)
copiers (vec (concat (for [^IVectorReader in-vec in-rel]
(.rowCopier in-vec (.writerForName rel-wtr (.getName in-vec)
;; HACK to not squash union types
(strict-field->col-type (.getField in-vec)))))

(for [absent-col-name (set/difference (set (keys rel-wtr))
(into #{} (map #(.getName ^IVectorReader %)) in-rel))
:let [!writer (delay
(-> (.writerForName rel-wtr absent-col-name)
(.writerForType :absent)))]]
(reify IRowCopier
(copyRow [_ _src-idx]
(let [pos (.getPosition wp)]
(.writeNull ^IVectorWriter @!writer nil)
pos))))))]
(reify IRowCopier
(copyRow [_ src-idx]
(let [pos (.getPositionAndIncrement wp)]
(doseq [^IRowCopier copier copiers]
(.copyRow copier src-idx))
pos)))))

(defn ->strict-rel-writer ^xtdb.vector.IRelationWriter [^BufferAllocator allocator]
(let [writers (LinkedHashMap.)
wp (IVectorPosition/build)]
(reify IRelationWriter
(writerPosition [_] wp)

(endRow [_] (.getPositionAndIncrement wp))

(writerForName [_ col-name]
(.computeIfAbsent writers col-name
(reify Function
(apply [_ col-name]
(doto (->strict-vec-writer allocator col-name)
(populate-with-absents (.getPosition wp)))))))

(writerForName [_ col-name col-type]
(.computeIfAbsent writers col-name
(reify Function
(apply [_ col-name]
(let [pos (.getPosition wp)]
(if (pos? pos)
(doto (->vec-writer allocator col-name (types/merge-col-types col-type :absent))
(populate-with-absents pos))
(->strict-vec-writer allocator col-name col-type)))))))

(rowCopier [this in-rel] (->strict-rel-copier this in-rel))

(iterator [_] (.iterator (.entrySet writers)))

AutoCloseable
(close [this]
(run! util/try-close (vals this))))))


(defn root->writer ^xtdb.vector.IRelationWriter [^VectorSchemaRoot root]
(let [writers (LinkedHashMap.)
wp (IVectorPosition/build)]
Expand Down Expand Up @@ -965,3 +1057,12 @@

(let [wp (.writerPosition dest-rel)]
(.setPosition wp (+ (.getPosition wp) (.rowCount src-rel)))))

(defn strict-append-rel [^IRelationWriter dest-rel, ^RelationReader src-rel]
(doseq [^IVectorReader src-col src-rel
:let [col-type (strict-field->col-type (.getField src-col))
^IVectorWriter vec-writer (.writerForName dest-rel (.getName src-col) col-type true)]]
(append-vec vec-writer src-col))

(let [wp (.writerPosition dest-rel)]
(.setPosition wp (+ (.getPosition wp) (.rowCount src-rel)))))
1 change: 1 addition & 0 deletions core/src/main/java/xtdb/vector/IRelationWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ default void syncRowCount() {

IVectorWriter writerForName(String name);
IVectorWriter writerForName(String name, Object colType);
IVectorWriter writerForName(String name, Object colType, Boolean strict);

IRowCopier rowCopier(RelationReader relation);

Expand Down
Loading

0 comments on commit 6e2aaf1

Please sign in to comment.