Skip to content

Commit

Permalink
long-index-transduce (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaudgeiser authored Jul 13, 2023
1 parent 82f9e07 commit acb68ec
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
20 changes: 20 additions & 0 deletions src/exoscale/vinyl/store.clj
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,26 @@
([db f val record-type items opts]
(long-range-transduce db nil f val record-type items opts)))

(defn long-index-transduce
"A transducer over large indices.
Results are reduced into an accumulator with the help of the reducing function
`f`. The accumulator is initiated to `init`. `clojure.core.reduced` is honored.
Obviously, this approach does away with any consistency guarantees usually
offered by FDB.
Results being accumulated in memory, this also means that care must be
taken with the accumulator."
[db xform f init index-name scan-type ^TupleRange range opts]
(let [props (scan-properties opts)
scan-type (as-scan-type scan-type)
index (-> db get-metadata (metadata-index index-name))]
(continuation-traversing-transduce
db xform f init
(fn [^FDBRecordStore store ^bytes cont]
(.scanIndex store index scan-type range cont props))
cursor/apply-transduce)))

(defn long-query-reduce
"A reducer over large queries. Accepts queries as per `execute-query`. Results
are reduced into an accumulator with the help of the reducing function `f`.
Expand Down
32 changes: 24 additions & 8 deletions test/exoscale/vinyl/reindex_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,48 @@
(into {}
(map (juxt #(second (.getKey %)) #(tuple/get-long (.getValue %))) entries)))))

(defn- long-scan-refcounting-index
([]
(long-scan-refcounting-index tuple/all))
([range]
(let [entries @(store/long-index-transduce *db* identity conj [] "refcount_index" ::store/by-group range {})]
(into {}
(map (juxt #(second (.getKey %)) #(tuple/get-long (.getValue %))) entries)))))

(deftest reindex-test
(is (= (refcounting-frequencies)
(scan-refcounting-index)))
(scan-refcounting-index)
(long-scan-refcounting-index)))

(store/reindex *db* "refcount_index" {::store/progress-log-interval 1})
(store/reindex *db* "refcount_index" {::store/progress-log-interval 1})

(is (= (refcounting-frequencies)
(scan-refcounting-index))))
(scan-refcounting-index)
(long-scan-refcounting-index))))

(deftest scan-index-test
(store/insert-record *db* (p/object->record {:bucket "bucket" :path "path" :size 2}))

(is (= (assoc (refcounting-frequencies) "bucket" 1)
(scan-refcounting-index)))
(scan-refcounting-index)
(long-scan-refcounting-index)))
(store/delete-record *db* :Invoice [1 1])
(is (= (assoc (refcounting-frequencies #{1}) "bucket" 1)
(scan-refcounting-index)))
(scan-refcounting-index)
(long-scan-refcounting-index)))
(store/delete-record *db* :Invoice [1 2])
(is (= (assoc (refcounting-frequencies #{1 2}) "bucket" 1)
(scan-refcounting-index)))
(scan-refcounting-index)
(long-scan-refcounting-index)))
(store/delete-record *db* :Invoice [3 3])
(is (= (assoc (refcounting-frequencies #{1 2 3}) "bucket" 1)
(scan-refcounting-index)))
(scan-refcounting-index)
(long-scan-refcounting-index)))
(store/delete-record *db* :Invoice [4 4])
(is (= (assoc (refcounting-frequencies #{1 2 3 4}) "bucket" 1)
(scan-refcounting-index (tuple/all-of ["refcount"]))))
(scan-refcounting-index (tuple/all-of ["refcount"]))
(long-scan-refcounting-index (tuple/all-of ["refcount"]))))
(is (= #{"p1" "p2"}
(into #{} (map #(second (.getKey %))
@(store/scan-index *db* "refcount_index" ::store/by-group
Expand All @@ -59,4 +74,5 @@
@(store/scan-index *db* "refcount_index" ::store/by-group
(tuple/all-of ["zero"]) nil {::store/list? true})))))
(is (= {"bucket" 1}
(scan-refcounting-index (tuple/all-of ["refcount"])))))
(scan-refcounting-index (tuple/all-of ["refcount"]))
(long-scan-refcounting-index (tuple/all-of ["refcount"])))))

0 comments on commit acb68ec

Please sign in to comment.