Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/nameservice protocol #858

Merged
merged 21 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0f38184
Merge remote-tracking branch 'origin/main' into refactor/nameservice-…
zonotope Aug 20, 2024
ccdbb78
consolidate nameservice core and protocol namespaces
zonotope Aug 20, 2024
df37226
add read-bytes method to ByteStore; ByteStore implementation for S3
zonotope Aug 20, 2024
2817725
add storage-backed nameservice record
zonotope Aug 20, 2024
639e778
Merge branch 'refactor/store-protocol' into refactor/nameservice-prot…
zonotope Aug 20, 2024
8a28340
supply branch directly to address method
zonotope Aug 20, 2024
cabc02c
fluree.db.nameservice.stored -> fluree.db.nameservice.storage-backed
zonotope Aug 20, 2024
32ee230
include exception in error message
zonotope Aug 21, 2024
f783a24
make ByteStore protocol work directly on paths instead of addresses
zonotope Aug 23, 2024
0ef572c
pass branch directly instead of in options for memory store
zonotope Aug 23, 2024
12c676b
extract record filename from address and write to it directly
zonotope Aug 23, 2024
542df90
remove file, s3, and memory nameservices in favor of storage-backed
zonotope Aug 23, 2024
fa837d1
Merge branch 'refactor/store-protocol' into refactor/nameservice-prot…
zonotope Aug 24, 2024
ebda876
Merge branch 'refactor/store-protocol' into refactor/nameservice-prot…
zonotope Aug 30, 2024
d9eacc3
Merge branch 'refactor/store-protocol' into refactor/nameservice-prot…
zonotope Sep 5, 2024
b8a707b
Merge remote-tracking branch 'origin/refactor/store-protocol' into re…
zonotope Sep 10, 2024
3433da4
Merge branch 'refactor/store-protocol' into refactor/nameservice-prot…
zonotope Sep 16, 2024
c8ee83a
Merge branch 'refactor/store-protocol' into refactor/nameservice-prot…
zonotope Sep 28, 2024
668d6a2
Merge branch 'refactor/store-protocol' into refactor/nameservice-prot…
zonotope Oct 6, 2024
1809392
Merge branch 'refactor/store-protocol' into refactor/nameservice-prot…
zonotope Oct 11, 2024
1c898a6
Merge branch 'refactor/store-protocol' into refactor/nameservice-prot…
zonotope Oct 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/clj/fluree/db/api.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
[fluree.db.ledger :as ledger]
[fluree.db.util.log :as log]
[fluree.db.query.range :as query-range]
[fluree.db.nameservice.core :as nameservice]
[fluree.db.nameservice :as nameservice]
[fluree.db.connection :refer [notify-ledger]]
[fluree.db.json-ld.credential :as cred]
[fluree.db.reasoner :as reasoner]
Expand Down
2 changes: 1 addition & 1 deletion src/clj/fluree/db/api/transact.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[fluree.db.query.fql.parse :as q-parse]
[fluree.db.transact :as tx]
[fluree.db.ledger.json-ld :as jld-ledger]
[fluree.db.nameservice.core :as nameservice]
[fluree.db.nameservice :as nameservice]
[fluree.db.util.async :refer [<? go-try]]
[fluree.db.util.core :as util :refer [catch* try*]]
[fluree.db.util.context :as ctx-util]
Expand Down
14 changes: 5 additions & 9 deletions src/clj/fluree/db/conn/file.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
[fluree.db.serde.json :refer [json-serde]]
[fluree.db.util.bytes :as bytes]
[fluree.db.util.json :as json]
[fluree.db.nameservice.filesystem :as ns-filesystem]
[fluree.db.nameservice.storage-backed :as storage-ns]
[fluree.db.storage :as storage]
[fluree.db.storage.file :as file-storage])
#?(:clj (:import (java.io Writer))))
Expand Down Expand Up @@ -97,11 +97,6 @@
(binding [*out* w]
(pr (connection/printer-map conn)))))

(defn default-file-nameservice
"Returns file nameservice or will throw if storage-path generates an exception."
[path]
(ns-filesystem/initialize path))

(defn default-lru-cache
[cache-max-mb]
(let [cache-size (conn-cache/memory->cache-size cache-max-mb)]
Expand All @@ -116,12 +111,13 @@
(go
(let [conn-id (str (random-uuid))
state (connection/blank-state)
nameservices* (util/sequential
(or nameservices (default-file-nameservice storage-path)))
lru-cache-atom* (or lru-cache-atom
(default-lru-cache cache-max-mb))
store* (or store
(file-storage/open storage-path))]
(file-storage/open storage-path))
nameservices* (-> nameservices
(or (storage-ns/start "fluree:file://" store* true))
util/sequential)]
;; TODO - need to set up monitor loops for async chans
(map->FileConnection {:id conn-id
:store store*
Expand Down
7 changes: 4 additions & 3 deletions src/clj/fluree/db/conn/ipfs.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
[clojure.core.async :as async :refer [chan]]
[fluree.db.serde.json :refer [json-serde]]
[fluree.db.nameservice.ipns :as ns-ipns]
[fluree.db.nameservice.filesystem :as ns-filesystem]
[fluree.db.nameservice.storage-backed :as storage-ns]
[fluree.db.storage.file :as file-storage]
[fluree.db.conn.cache :as conn-cache]
[fluree.db.storage :as storage])
#?(:clj (:import (java.io Writer))))
Expand Down Expand Up @@ -95,8 +96,8 @@

(defn default-file-nameservice
[{:keys [path base-address sync?]}]
(ns-filesystem/initialize path {:base-address base-address
:sync? sync?}))
(let [ns-store (file-storage/open path)]
(storage-ns/start base-address ns-store sync?)))

(defn connect
"Creates a new IPFS connection.
Expand Down
19 changes: 2 additions & 17 deletions src/clj/fluree/db/conn/memory.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
(:require [clojure.core.async :as async :refer [go]]
[fluree.db.indexer.storage :as index-storage]
[fluree.db.index :as index]
[fluree.db.nameservice.memory :as ns-memory]
[fluree.db.nameservice.storage-backed :as storage-ns]
[fluree.db.util.core :as util]
[fluree.db.util.log :as log :include-macros true]
[fluree.db.connection :as connection]
[fluree.db.util.async :refer [<? go-try]]
[fluree.db.conn.cache :as conn-cache]
[fluree.json-ld :as json-ld]
[fluree.db.storage :as storage]
[fluree.db.storage.memory :as memory-storage]
#?(:cljs [fluree.db.platform :as platform]))
Expand Down Expand Up @@ -87,11 +86,6 @@
(binding [*out* w]
(pr (connection/printer-map conn)))))

(defn default-memory-nameservice
"Returns memory nameservice"
[store]
(ns-memory/initialize store))

(defn connect
"Creates a new memory connection."
[{:keys [parallelism lru-cache-atom cache-max-mb defaults nameservices]}]
Expand All @@ -101,16 +95,7 @@
mem-store (memory-storage/create)
nameservices* (util/sequential
(or nameservices
;; TODO: We should not reach inside the storage
;; implementation to reuse the contents atom
;; because that breaks the abstraction and
;; implicitly couples components that should be
;; independent. We should change the memory
;; nameservice to either use a storage
;; implementation explicitly, or to use an atom
;; independent of the data held in commit and
;; index storage.
(default-memory-nameservice (:contents mem-store))))
(storage-ns/start "fluree:memory://" mem-store true)))
cache-size (conn-cache/memory->cache-size cache-max-mb)
lru-cache-atom (or lru-cache-atom (atom (conn-cache/create-lru-cache
cache-size)))]
Expand Down
21 changes: 7 additions & 14 deletions src/clj/fluree/db/conn/s3.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns fluree.db.conn.s3
(:require [cognitect.aws.client.api :as aws]
[clojure.string :as str]
[fluree.db.nameservice.s3 :as ns-s3]
[fluree.db.nameservice.storage-backed :as storage-ns]
[clojure.core.async :as async :refer [go]]
[fluree.db.conn.cache :as conn-cache]
[fluree.db.connection :as connection]
Expand Down Expand Up @@ -91,28 +91,21 @@
(binding [*out* w]
(pr (connection/printer-map conn))))

(defn default-S3-nameservice
"Returns S3 nameservice or will throw if storage-path generates an exception."
[s3-client s3-bucket s3-prefix]
(ns-s3/initialize s3-client s3-bucket s3-prefix))

(defn connect
"Create a new S3 connection."
[{:keys [defaults parallelism s3-endpoint s3-bucket s3-prefix lru-cache-atom
cache-max-mb serializer nameservices]
:or {serializer (json-serde)} :as _opts}]
(go
(let [aws-opts (cond-> {:api :s3}
s3-endpoint (assoc :endpoint-override s3-endpoint))
client (aws/client aws-opts)
conn-id (str (random-uuid))
(let [conn-id (str (random-uuid))
state (connection/blank-state)
nameservices* (util/sequential
(or nameservices (default-S3-nameservice client s3-bucket s3-prefix)))
s3-store (s3-storage/open s3-bucket s3-prefix s3-endpoint)
nameservices* (-> nameservices
(or (storage-ns/start "fluree:s3://" s3-store true))
util/sequential)
cache-size (conn-cache/memory->cache-size cache-max-mb)
lru-cache-atom (or lru-cache-atom
(atom (conn-cache/create-lru-cache cache-size)))
s3-store (s3-storage/open s3-bucket s3-prefix s3-endpoint)]
(atom (conn-cache/create-lru-cache cache-size)))]
(map->S3Connection {:id conn-id
:store s3-store
:state state
Expand Down
2 changes: 1 addition & 1 deletion src/clj/fluree/db/json_ld/api.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
[fluree.db.util.log :as log]
[fluree.db.query.api :as query-api]
[fluree.db.query.range :as query-range]
[fluree.db.nameservice.core :as nameservice]
[fluree.db.nameservice :as nameservice]
[fluree.db.connection :refer [notify-ledger]]
[fluree.db.json-ld.credential :as cred]
[fluree.db.reasoner :as reasoner]
Expand Down
2 changes: 1 addition & 1 deletion src/clj/fluree/db/json_ld/migrate/sid.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[fluree.db.json-ld.iri :as iri]
[fluree.db.json-ld.reify :as reify]
[fluree.db.ledger.json-ld :as jld-ledger]
[fluree.db.nameservice.core :as nameservice]
[fluree.db.nameservice :as nameservice]
[fluree.db.query.exec.update :as update]
[fluree.db.util.async :refer [<? go-try]]
[fluree.db.util.core :as util :refer [get-first get-first-id get-first-value]]
Expand Down
5 changes: 2 additions & 3 deletions src/clj/fluree/db/ledger/json_ld.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
[fluree.db.json-ld.reify :as jld-reify]
[clojure.string :as str]
[fluree.db.util.core :as util :refer [get-first get-first-value]]
[fluree.db.nameservice.proto :as ns-proto]
[fluree.db.nameservice.core :as nameservice]
[fluree.db.nameservice :as nameservice]
[fluree.db.connection :as connection :refer [register-ledger release-ledger]]
[fluree.db.json-ld.commit-data :as commit-data]
[fluree.json-ld :as json-ld]
Expand Down Expand Up @@ -380,7 +379,7 @@
[conn db-alias commit-map]
(or (get-first-value commit-map const/iri-alias)
(->> (connection/-nameservices conn)
(some #(ns-proto/-alias % db-alias)))))
(some #(nameservice/-alias % db-alias)))))

;; TODO - once we have a different delimiter than `/` for branch/t-value this can simplified
(defn address->alias
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,97 @@
(ns fluree.db.nameservice.core
(:refer-clojure :exclude [exists?])
(ns fluree.db.nameservice
(:refer-clojure :exclude [-lookup exists?])
(:require [clojure.string :as str]
[fluree.db.connection :as connection]
[fluree.db.nameservice.proto :as ns-proto]
[fluree.db.util.async :refer [<? go-try]]
[fluree.db.util.log :as log]))

#?(:clj (set! *warn-on-reflection* true))

(defprotocol iNameService
(-lookup [nameservice ledger-address] [nameservice ledger-alias opts] "Performs lookup operation on ledger alias and returns map of latest commit and other metadata")
(-sync? [nameservice] "Indicates if nameservice updates should be performed synchronously, before commit is finalized. Failure will cause commit to fail")
(-close [nameservice] "Closes all resources for this nameservice")
(-alias [nameservice ledger-address] "Given a ledger address, returns ledger's default alias name else nil, if not avail")
(-address [nameservice ledger-alias branch] "Returns full nameservice address/iri which will get published in commit. If 'private', return nil."))

(defprotocol Publisher
(-push [nameservice commit-data] "Pushes new commit to nameservice."))

(defprotocol Publication
(-subscribe [nameservice ledger-alias callback] "Creates a subscription to nameservice(s) for ledger events. Will call callback with event data as received.")
(-unsubscribe [nameservice ledger-alias] "Unsubscribes to nameservice(s) for ledger events"))

(defn full-address
[prefix ledger-alias]
(str prefix ledger-alias))

(defn ns-record
"Generates nameservice metadata map for JSON storage. For now, since we only
have a single branch possible, always sets default-branch. Eventually will
need to merge changes from different branches into existing metadata map"
[ns-address {address "address", alias "alias", branch "branch", :as commit-jsonld}]
(let [branch-iri (str ns-address "(" branch ")")]
{"@context" "https://ns.flur.ee/ledger/v1"
"@id" ns-address
"defaultBranch" branch-iri
"ledgerAlias" alias
"branches" [{"@id" branch-iri
"address" address
"commit" commit-jsonld}]}))

(defn commit-address-from-record
[record branch]
(let [branch-iri (if branch
(str (get record "@id") "(" branch ")")
(get record "defaultBranch"))]
(->> (get record "branches")
(some #(when (= (get % "@id") branch-iri)
(get % "address"))))))

(defn address-path
[address]
(let [[_ _ path] (str/split address #":")]
(subs path 2)))

(defn address->alias
[ledger-address]
(-> ledger-address
address-path
(str/split #"/")
(->> (drop-last 2) ; branch-name, head
(str/join #"/"))))

(defn extract-branch
"Splits a given namespace address into its nameservice and branch parts.
Returns two-tuple of [nameservice branch].
If no branch is found, returns nil as branch value and original ns-address as the nameservice."
[ns-address]
(if (str/ends-with? ns-address ")")
(let [[_ ns branch] (re-matches #"(.*)\((.*)\)" ns-address)]
[ns branch])
[ns-address nil]))

(defn resolve-address
"Resolves a provided namespace address, which might be relative or absolute,
into three parts returned as a map:
- :alias - ledger alias
- :branch - branch (or nil if default)
- :address - absolute namespace address (including branch if provided)
If 'branch' parameter is provided will always use it as the branch regardless
of if a branch is specificed in the ns-address."
[base-address ns-address branch]
(let [[ns-address* extracted-branch] (extract-branch ns-address)
branch* (or branch extracted-branch)
absolute? (str/starts-with? ns-address base-address)
[ns-address** alias] (if absolute?
[ns-address* (subs ns-address* (count base-address))]
[(str base-address ns-address*) ns-address*])]
{:alias alias
:branch branch*
:address (if branch*
(str ns-address** "(" branch* ")")
ns-address*)}))

(defn nameservices
[conn]
(connection/-nameservices conn))
Expand All @@ -19,7 +103,7 @@
(defn ns-address
"Returns async channel"
[nameservice ledger-alias branch]
(ns-proto/-address nameservice ledger-alias {:branch branch}))
(-address nameservice ledger-alias branch))

(defn addresses
"Retrieve address for each nameservices based on a relative ledger-alias.
Expand Down Expand Up @@ -60,10 +144,10 @@
(go-try
(loop [nameservices* nameservices]
(when-let [ns (first nameservices*)]
(let [sync? (ns-proto/-sync? ns)]
(let [sync? (-sync? ns)]
(if sync?
(<? (ns-proto/-push ns json-ld-commit))
(ns-proto/-push ns json-ld-commit))
(<? (-push ns json-ld-commit))
(-push ns json-ld-commit))
(recur (rest nameservices*))))))))

(defn lookup-commit
Expand All @@ -74,7 +158,7 @@
(go-try
(loop [nameservices* nameservices]
(when-let [ns (first nameservices*)]
(let [commit-address (<? (ns-proto/-lookup ns ledger-address))]
(let [commit-address (<? (-lookup ns ledger-address))]
(if commit-address
commit-address
(recur (rest nameservices*)))))))))
Expand Down Expand Up @@ -108,7 +192,7 @@
(go-try
(loop [nameservices* nameservices]
(if-let [ns (first nameservices*)]
(let [exists? (<? (ns-proto/-lookup ns ledger-alias))]
(let [exists? (<? (-lookup ns ledger-alias))]
(if exists?
true
(recur (rest nameservices*))))
Expand All @@ -129,7 +213,7 @@
(go-try
(loop [nameservices* nameservices]
(when-let [ns (first nameservices*)]
(<? (ns-proto/-subscribe ns ledger-alias callback))
(<? (-subscribe ns ledger-alias callback))
(recur (rest nameservices*)))))))

(defn unsubscribe-ledger
Expand All @@ -139,5 +223,5 @@
(go-try
(loop [nameservices* nameservices]
(when-let [ns (first nameservices*)]
(<? (ns-proto/-unsubscribe ns ledger-alias))
(<? (-unsubscribe ns ledger-alias))
(recur (rest nameservices*)))))))
Loading