Skip to content

Commit

Permalink
[WIP] Ensure History Changes only if Resource Changes
Browse files Browse the repository at this point in the history
This change introduces a new transaction command called "keep" that just
means: "Keep the resource of it is not changed in between." Unlike
"put", "keep" will not introduce a new history entry. The command "keep"
is always conditional because the detection of no changes will be done
outside of the transaction.

This is work in progress, because two things are missing: the detection
of no changes inside the transaction for normal "put" commands and the
retry if "keep" will fail.

Closes: #777
  • Loading branch information
alexanderkiel committed Jun 26, 2023
1 parent d5e17ff commit 5e1b05b
Show file tree
Hide file tree
Showing 30 changed files with 813 additions and 280 deletions.
23 changes: 23 additions & 0 deletions .github/scripts/patient-identical-update.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash -e

#
# This script tests that an update without changes of the resource content
# doesn't create a new history entry.
#

SCRIPT_DIR="$(dirname "$(readlink -f "$0")")"
. "$SCRIPT_DIR/util.sh"

BASE="http://localhost:8080/fhir"
PATIENT_IDENTIFIER="X79746011X"
PATIENT=$(curl -sH "Accept: application/fhir+json" "$BASE/Patient?identifier=$PATIENT_IDENTIFIER" | jq -r '.entry[0].resource')
ID="$(echo "$PATIENT" | jq -r .id)"
VERSION_ID="$(echo "$PATIENT" | jq -r .meta.versionId)"
RESULT=$(curl -sXPUT -H "Content-Type: application/fhir+json" -d "$PATIENT" "$BASE/Patient/$ID")
RESULT_VERSION_ID="$(echo "$RESULT" | jq -r .meta.versionId)"

test "versionId" "$RESULT_VERSION_ID" "$VERSION_ID"

HISTORY_TOTAL=$(curl -sH "Accept: application/fhir+json" "$BASE/Patient/$ID/_history" | jq -r '.total')

test "history total" "$HISTORY_TOTAL" "1"
6 changes: 6 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,9 @@ jobs:
- name: Patient Everything
run: .github/scripts/patient-everything.sh

- name: Patient Identical Update
run: .github/scripts/patient-identical-update.sh

not-enforcing-referential-integrity-test:
needs: build
runs-on: ubuntu-22.04
Expand Down Expand Up @@ -1519,6 +1522,9 @@ jobs:
- name: Patient Everything
run: .github/scripts/patient-everything.sh

- name: Patient Identical Update
run: .github/scripts/patient-identical-update.sh

- name: Docker Stats
run: docker stats --no-stream

Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ target
classes
.cpcache
.cache
blaze-load-tests/node_modules
.nrepl-port
9 changes: 8 additions & 1 deletion modules/db-tx-log/src/blaze/db/tx_log/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@


(s/def :blaze.db.tx-cmd/op
#{"create" "put" "delete"})
#{"create" "put" "keep" "delete"})


(s/def :blaze.db.tx-cmd/type
Expand Down Expand Up @@ -73,6 +73,13 @@
:blaze.db.tx-cmd/if-none-match]))


(defmethod tx-cmd "keep" [_]
(s/keys :req-un [:blaze.db.tx-cmd/op
:blaze.db.tx-cmd/type
:blaze.resource/id
:blaze.db.tx-cmd/if-match]))


(defmethod tx-cmd "delete" [_]
(s/keys :req-un [:blaze.db.tx-cmd/op
:blaze.db.tx-cmd/type
Expand Down
2 changes: 1 addition & 1 deletion modules/db/src/blaze/db/impl/db.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
node)

(-as-of [_ t]
(assert (<= ^long t ^long basis-t))
(assert (<= ^long t ^long basis-t) (format "(<= %d %d)" t basis-t))
(Db. node kv-store basis-t t))

(-basis-t [_]
Expand Down
9 changes: 9 additions & 0 deletions modules/db/src/blaze/db/node/transaction.clj
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@
(assoc :if-none-match (prepare-if-none-match precond)))}))


(defmethod prepare-op :keep
[_ [_ type id if-match]]
{:blaze.db/tx-cmd
{:op "keep"
:type type
:id id
:if-match if-match}})


(defmethod prepare-op :delete
[_ [_ type id]]
{:blaze.db/tx-cmd
Expand Down
13 changes: 13 additions & 0 deletions modules/db/src/blaze/db/node/tx_indexer/verify.clj
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,19 @@
(update-in [:stats tid :total] inc-0))))))


(defn- verify-tx-cmd-keep-msg [type id if-match]
(format "verify-tx-cmd :keep %s/%s if-match: %d" type id if-match))


(defmethod verify-tx-cmd "keep"
[db-before _ res {:keys [type id if-match]}]
(log/trace (verify-tx-cmd-keep-msg type id if-match))
(with-open [_ (prom/timer duration-seconds "verify-keep")]
(if-not (= if-match (:t (d/resource-handle db-before type id)))
(throw-anom (precondition-failed-anomaly if-match type id))
res)))


(defmethod verify-tx-cmd "delete"
[db-before t res {:keys [type id]}]
(log/trace "verify-tx-cmd :delete" (str type "/" id))
Expand Down
5 changes: 5 additions & 0 deletions modules/db/src/blaze/db/node/validation.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
[(name (fhir-spec/fhir-type resource)) id])


(defmethod extract-type-id :keep
[[_ type id]]
[type id])


(defmethod extract-type-id :delete
[[_ type id]]
[type id])
Expand Down
9 changes: 8 additions & 1 deletion modules/db/src/blaze/db/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@

(defmethod put-precond-op :if-match [_]
(s/cat :op #{:if-match}
:t :blaze.db/t))
:ts (s/+ :blaze.db/t)))


(defmethod put-precond-op :if-none-match [_]
Expand All @@ -91,6 +91,13 @@
:precondition (s/? :blaze.db.tx-op.put/precondition)))


(defmethod tx-op :keep [_]
(s/cat :op #{:keep}
:type :fhir.resource/type
:id :blaze.resource/id
:t :blaze.db/t))


(defmethod tx-op :delete [_]
(s/cat :op #{:delete}
:type :fhir.resource/type
Expand Down
8 changes: 8 additions & 0 deletions modules/db/test/blaze/db/node/transaction_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@
(given (tx/prepare-ops context [[:put {:fhir/type :fhir/Patient :id "0"} [:if-match 4]]])
[0 0 :if-match] := 4)))

(testing "keep"
(given (tx/prepare-ops context [[:keep "Patient" "0" 1]])
[0 0 :op] := "keep"
[0 0 :type] := "Patient"
[0 0 :id] := "0"
[0 0 :if-match] := 1
[1] := {}))

(testing "delete"
(given (tx/prepare-ops context [[:delete "Patient" "0"]])
[0 0 :op] := "delete"
Expand Down
30 changes: 30 additions & 0 deletions modules/db/test/blaze/db/node/tx_indexer/verify_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,36 @@
[4 1 ss-tu/decode-key] := {:t 2}
[4 2 ss-tu/decode-val] := {:total 1 :num-changes 2})))))

(testing "keeping a non-existing patient fails"
(with-system [{:blaze.db/keys [node]} config]

(given (verify/verify-tx-cmds
(d/db node) 1
[{:op "keep" :type "Patient" :id "0" :if-match 1}])
::anom/category := ::anom/conflict
::anom/message := "Precondition `W/\"1\"` failed on `Patient/0`."
:http/status := 412)))

(testing "keeping a non-matching patient fails"
(with-system-data [{:blaze.db/keys [node]} config]
[[[:put patient-0]]
[[:put patient-0]]]

(given (verify/verify-tx-cmds
(d/db node) 1
[{:op "keep" :type "Patient" :id "0" :if-match 1}])
::anom/category := ::anom/conflict
::anom/message := "Precondition `W/\"1\"` failed on `Patient/0`."
:http/status := 412)))

(testing "keeping a matching patient"
(with-system-data [{:blaze.db/keys [node]} config]
[[[:put patient-0]]]

(is (empty? (verify/verify-tx-cmds
(d/db node) 1
[{:op "keep" :type "Patient" :id "0" :if-match 1}])))))

(testing "deleting a patient from an empty store"
(with-system [{:blaze.db/keys [node]} config]
(given (verify/verify-tx-cmds
Expand Down
17 changes: 17 additions & 0 deletions modules/db/test/blaze/db/node/validation_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@


(deftest validate-ops-test
(testing "single keep"
(is (nil? (validation/validate-ops [[:keep "Patient" "0" 1]]))))

(testing "duplicate keep"
(given (validation/validate-ops [[:keep "Patient" "0" 1]
[:keep "Patient" "0" 2]])
::anom/category := ::anom/incorrect
:cognitect.anomalies/message := "Duplicate resource `Patient/0`.",
:fhir/issue := "invariant"))

(testing "single delete"
(is (nil? (validation/validate-ops [[:delete "Patient" "0"]]))))

Expand All @@ -24,4 +34,11 @@
[:delete "Patient" "0"]])
::anom/category := ::anom/incorrect
:cognitect.anomalies/message := "Duplicate resource `Patient/0`.",
:fhir/issue := "invariant"))

(testing "duplicate keep/delete"
(given (validation/validate-ops [[:keep "Patient" "0" 1]
[:delete "Patient" "0"]])
::anom/category := ::anom/incorrect
:cognitect.anomalies/message := "Duplicate resource `Patient/0`.",
:fhir/issue := "invariant")))
54 changes: 26 additions & 28 deletions modules/interaction/src/blaze/interaction/create.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
https://www.hl7.org/fhir/http.html#create"
(:require
[blaze.anomaly :as ba]
[blaze.anomaly :as ba :refer [if-ok]]
[blaze.async.comp :as ac]
[blaze.db.api :as d]
[blaze.db.spec]
Expand Down Expand Up @@ -37,9 +37,7 @@
(ba/incorrect
(resource-type-mismatch-msg type body)
:fhir/issue "invariant"
:fhir/operation-outcome "MSG_RESOURCE_TYPE_MISMATCH")

:else body))
:fhir/operation-outcome "MSG_RESOURCE_TYPE_MISMATCH")))


(defn- create-op [resource conditional-clauses]
Expand Down Expand Up @@ -70,31 +68,31 @@
(fn [{{{:fhir.resource/keys [type]} :data} ::reitit/match
:keys [headers body]
:as request}]
(let [id (iu/luid context)
conditional-clauses (conditional-clauses headers)]
(-> (ac/completed-future (validate-resource type body))
(ac/then-apply #(assoc % :id id))
(ac/then-compose
#(d/transact node [(create-op % conditional-clauses)]))
;; it's important to switch to the executor here, because otherwise
;; the central indexing thread would execute response building.
(ac/then-apply-async identity executor)
(ac/then-compose
(fn [db-after]
(if-let [handle (d/resource-handle db-after type id)]
(response/build-response
(response-context request db-after) nil handle)
(let [handle (first (d/type-query db-after type conditional-clauses))]
(if-ok [_ (validate-resource type body)]
(let [id (iu/luid context)
conditional-clauses (conditional-clauses headers)
tx-op (create-op (assoc body :id id) conditional-clauses)]
(-> (d/transact node [tx-op])
;; it's important to switch to the executor here, because otherwise
;; the central indexing thread would execute response building.
(ac/then-apply-async identity executor)
(ac/then-compose
(fn [db-after]
(if-let [handle (d/resource-handle db-after type id)]
(response/build-response
(response-context request db-after) handle handle)))))
(ac/exceptionally
(fn [e]
(cond-> e
(ba/not-found? e)
(assoc
::anom/category ::anom/fault
::anom/message (resource-content-not-found-msg e)
:fhir/issue "incomplete"))))))))
(response-context request db-after) tx-op nil handle)
(let [handle (first (d/type-query db-after type conditional-clauses))]
(response/build-response
(response-context request db-after) tx-op handle handle)))))
(ac/exceptionally
(fn [e]
(cond-> e
(ba/not-found? e)
(assoc
::anom/category ::anom/fault
::anom/message (resource-content-not-found-msg e)
:fhir/issue "incomplete"))))))
ac/completed-future)))


(defmethod ig/pre-init-spec :blaze.interaction/create [_]
Expand Down
40 changes: 23 additions & 17 deletions modules/interaction/src/blaze/interaction/transaction.clj
Original file line number Diff line number Diff line change
Expand Up @@ -331,31 +331,37 @@
(ac/completed-future (noop-entry db handle)))))))


(defn- keep? [[op]]
(identical? :keep op))


(defn- update-entry
[{:keys [db] :as context} type {:keys [num-changes id] :as handle}]
[{:keys [db] :as context} type tx-op old-handle {:keys [id] :as handle}]
(let [tx (d/tx db (:t handle))
vid (str (:blaze.db/t tx))]
vid (str (:blaze.db/t tx))
created (and (not (keep? tx-op))
(or (nil? old-handle) (identical? :delete (:op old-handle))))]
{:fhir/type :fhir.Bundle/entry
:response
(cond->
{:fhir/type :fhir.Bundle.entry/response
:status (if (= 1 num-changes) "201" "200")
:status (if created "201" "200")
:etag (str "W/\"" vid "\"")
:lastModified (:blaze.db.tx/instant tx)}
(= 1 num-changes)
created
(assoc :location (location context type id vid)))}))


(defmethod build-response-entry "PUT"
[{:keys [db return-preference] :as context}
_
{{:fhir/keys [type] :keys [id]} :resource}]
{{:fhir/keys [type] :keys [id]} :resource :keys [tx-op]}]
(let [type (name type)
handle (d/resource-handle db type id)]
[new-handle old-handle] (take 2 (d/instance-history db type id))]
(if (identical? :blaze.preference.return/representation return-preference)
(do-sync [resource (pull db handle)]
(assoc (update-entry context type handle) :resource resource))
(ac/completed-future (update-entry context type handle)))))
(do-sync [resource (pull db new-handle)]
(assoc (update-entry context type tx-op old-handle new-handle) :resource resource))
(ac/completed-future (update-entry context type tx-op old-handle new-handle)))))


(defmethod build-response-entry "DELETE"
Expand All @@ -372,9 +378,7 @@

(defn- build-response-entries* [{:keys [db] :as context} entries]
(with-open [batch-db (d/new-batch-db db)]
(->> entries
(map-indexed (partial build-response-entry (assoc context :db batch-db)))
doall)))
(into [] (map-indexed (partial build-response-entry (assoc context :db batch-db))) entries)))


(defn- build-response-entries [context entries]
Expand Down Expand Up @@ -519,15 +523,17 @@

(defmethod process-entries "transaction"
[{:keys [node executor] :as context} _ entries]
(let [writes (bundle/writes entries)]
(-> (if (empty? writes)
(d/sync node)
(d/transact node (bundle/tx-ops writes)))
(if-ok [entries (bundle/assoc-tx-ops (d/db node) entries)]
(-> (let [tx-ops (bundle/tx-ops entries)]
(if (empty? tx-ops)
(d/sync node)
(d/transact node tx-ops)))
;; it's important to switch to the executor here, because otherwise
;; the central indexing thread would execute response building.
(ac/then-compose-async
#(build-response-entries (assoc context :db %) entries)
executor))))
executor))
ac/completed-future))


(defn- process-context
Expand Down
Loading

0 comments on commit 5e1b05b

Please sign in to comment.