Skip to content


NB Experimental: new bulk pstr format to replace ppstr concept
Browse files Browse the repository at this point in the history
  - [#276] Support easy+efficient generic un/pack caching.
    - In particular: support cache-friendly server->client event buffering.
  - [#16] Support client->server buffering with arb cbs.
  - Read+write performance.
  - Bandwidth efficiency.
  - Flexibility for possible future extensions/changes.
  - Human-readable on the wire (handy for debugging, etc.).
  - Lay groundwork for possible broadcast API fn.

  - Would involve a breaking incompatibility with old clients.
  • Loading branch information
ptaoussanis committed Oct 26, 2016
1 parent c7dfdad commit be9f009
Showing 1 changed file with 106 additions and 115 deletions.
221 changes: 106 additions & 115 deletions src/taoensso/sente.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -196,65 +196,6 @@
;; * Payloads are packed for client<->server transit.
;; * Packing includes ->str encoding, and may incl. wrapping to carry cb info.

(do ; Experimental ; TODO [#276] [Ref. 7a82137c]
;; Main motivation: to help eliminate repacking waste during broadcasts.
;; Un/packing would be a lot easier to efficiently cache in general if we
;; moved cb-uuid *out* of encoded payload as part of + prefix. An easy
;; change, but would require breaking compatibility with older clients. As
;; an interim solution, choosing to cache only non-cb un/packing (the nb
;; case since it affects broadcasting).
;; In future, may (instead/also?) like to support a proper broadcast API
;; mechanism that just generates a single payload before looping. That
;; *could* cause difficulties with buffering; need to think on it some.
;; For example, might (?) be preferable to change buffered format to a
;; delimited string (or payload) of individually packed payloads?
(def ^:private unpack-cached (enc/memoize* 128 nil interfaces/unpack))
(def ^:private unpack-uncached interfaces/unpack)
(def ^:private pack-cached (enc/memoize* 128 nil interfaces/pack))
(def ^:private pack-uncached interfaces/pack))

;; (comment (enc/pr-edn ["encoded1" "encoded2"]))

(defn- unpack "prefixed-pstr->[clj ?cb-uuid]"
[packer prefixed-pstr]
(have? string? prefixed-pstr)
(let [wrapped? (enc/str-starts-with? prefixed-pstr "+")
pstr (subs prefixed-pstr 1)
(if wrapped?
(unpack-uncached packer pstr) ; Ref. 7a82137c
(unpack-cached packer pstr))

(catch #?(:clj Throwable :cljs :default) t
(debugf "Bad package: %s (%s)" pstr t)
[:chsk/bad-package pstr]))

[clj ?cb-uuid] (if wrapped? clj [clj nil])
?cb-uuid (if (= 0 ?cb-uuid) :ajax-cb ?cb-uuid)]

(tracef "Unpacking: %s -> %s" prefixed-pstr [clj ?cb-uuid])
[clj ?cb-uuid]))

(defn- pack "clj->prefixed-pstr"
([packer clj]
;; "-" prefix => unwrapped (has no callback)
(let [pstr (str "-" (pack-cached packer clj))]
(tracef "Packing (unwrapped): %s -> %s" clj pstr)

([packer clj ?cb-uuid]
;; "+" prefix => wrapped (has callback), Ref. 7a82137c
(let [?cb-uuid (if (= ?cb-uuid :ajax-cb) 0 ?cb-uuid)
wrapped-clj (if ?cb-uuid [clj ?cb-uuid] [clj])
pstr (str "+" (pack-uncached packer wrapped-clj))]
(tracef "Packing (wrapped): %s -> %s" wrapped-clj pstr)

(deftype EdnPacker []
(pack [_ x] (enc/pr-edn x))
Expand All @@ -267,19 +208,6 @@
(have #(satisfies? interfaces/IPacker %) x)))

;; Possibility if/when we update pack+broadcast formats to better support
;; efficient auto caching? ; TODO
;; (deftype CachedPacker [cached-pack-fn cached-unpack-fn]
;; interfaces/IPacker
;; (pack [_ x] (cached-pack-fn x))
;; (unpack [_ x] (cached-unpack-fn x)))
;; (defn- cached-packer [packer]
;; (CachedPacker.
;; (enc/memoize* 128 nil (fn [x] (interfaces/pack packer x)))
;; (enc/memoize* 128 nil (fn [x] (interfaces/unpack packer x)))))

(require '[taoensso.sente.packers.transit :as transit])
Expand All @@ -289,10 +217,70 @@
unpack interfaces/unpack
data {:a :A :b :B :c "hello world"}]

(enc/qb 10000
(enc/qb 1e3
(let [pk default-edn-packer] (unpack pk (pack pk data)))
(let [pk default-transit-json-packer] (unpack pk (pack pk data))))))

(do ; Experimental
(def ^:private cached-unpack (enc/memoize* 128 nil interfaces/unpack))
(def ^:private cached-pack (enc/memoize* 128 nil interfaces/pack)))

(defn- unpack
"\"<pstr-len>?<,cb-uuid>:<pstr>...\" -> [[<clj> ?<cb-uuid>] ...]"
[packer bulk-pstr] ; bpstr
(have? string? bulk-pstr)
(let [+prefix? (enc/str-starts-with? bulk-pstr "+")]
(if (or +prefix? (enc/str-starts-with? bulk-pstr "-"))
;; TODO Temporary back compatibility
(let [pstr (subs bulk-pstr 1)
clj (enc/catching (interfaces/unpack packer pstr) _
[:chsk/bad-package pstr])

[clj ?cb-uuid] (if +prefix? clj [clj nil])
?cb-uuid (if (= ?cb-uuid 0) :ajax-cb ?cb-uuid)]
[[clj ?cb-uuid]])

(loop [v [] idx 0]
(if-let [^long split-idx (enc/str-?index bulk-pstr ":" idx)]
(let [prefix (subs bulk-pstr idx split-idx)
[-len ?cb-uuid] (str/split prefix #",")
len (enc/as-nat-int -len)
start-idx (inc split-idx)
end-idx (+ start-idx len)

?cb-uuid (if (= ?cb-uuid 0) :ajax-cb ?cb-uuid)
(when-not (zero? len)
(let [pstr (subs bulk-pstr start-idx end-idx)]
(enc/catching (cached-unpack packer pstr) _
[:chsk/bad-package pstr])))]

(recur (conj v [clj ?cb-uuid]) end-idx))

(comment (enc/qb 1e4 (unpack default-edn-packer "0:2:__10,uuid:__________")))

(defn- pack [packer xs]
(fn [acc [x ?cb-uuid]]
(let [?cb-uuid (if (= ?cb-uuid :ajax-cb) 0 ?cb-uuid)
pstr (cached-pack packer x)]

(enc/sb-append acc
(count pstr)
(when-let [s ?cb-uuid] (str "," s)) ":" pstr))))


(unpack default-edn-packer (pack default-edn-packer [[:chsk/ws-ping]]))
(enc/qb 1e4
(unpack default-edn-packer
(pack default-edn-packer [["foo" nil] [{:a :A :b :B} "uuid"]]))))

;;;; Server API

(def ^:private next-idx! (enc/idx-fn))
Expand Down Expand Up @@ -358,7 +346,7 @@
{:lp-timeout-ms lp-timeout-ms
:default-client-side-ajax-timeout-ms max-ms}))))

(let [packer (do #_cache-packer (coerce-packer packer))
(let [packer (coerce-packer packer)
ch-recv (chan recv-buf-or-n)

Expand Down Expand Up @@ -466,13 +454,14 @@
(have? vector? buffered-evs)
(have? set? ev-uuids)

(let [buffered-evs-ppstr (pack packer buffered-evs)]
(tracef "buffered-evs-ppstr: %s" buffered-evs-ppstr)
(let [unpacked (mapv vector buffered-evs)
buffered-evs-bpstr (pack packer unpacked)]
(tracef "buffered-evs-bpstr: %s" buffered-evs-bpstr)
(case conn-type
:ws (send-buffered-server-evs>ws-clients! conns_
uid buffered-evs-ppstr upd-conn!)
uid buffered-evs-bpstr upd-conn!)
:ajax (send-buffered-server-evs>ajax-clients! conns_
uid buffered-evs-ppstr))))))]
uid buffered-evs-bpstr))))))]

(if (= ev [:chsk/close]) ; Currently undocumented
Expand Down Expand Up @@ -538,16 +527,18 @@
(fn [server-ch websocket?]
(assert (not websocket?))
(let [params (get ring-req :params)
ppstr (get params :ppstr)
bpstr (or
(get params :ppstr) ; TODO Temp
(get params :bpstr))
client-id (get params :client-id)
[clj has-cb?] (unpack packer ppstr)
[[clj has-cb?]] (unpack packer bpstr)
(let [replied?_ (atom false)]
(fn [resp-clj] ; Any clj form
(when (compare-and-set! replied?_ false true)
(tracef "Chsk send (ajax post reply): %s" resp-clj)
(interfaces/sch-send! server-ch websocket?
(pack packer resp-clj)))))]
(pack packer [[resp-clj]])))))]

(put-server-event-msg>ch-recv! ch-recv
(merge ev-msg-const
Expand Down Expand Up @@ -597,7 +588,7 @@
[:chsk/handshake [uid csrf-token]]
[:chsk/handshake [uid csrf-token ?handshake-data]])]
(interfaces/sch-send! server-ch websocket?
(pack packer handshake-ev))))]
(pack packer [[handshake-ev]]))))]

(if (str/blank? client-id)
(let [err-msg "Client's Ring request doesn't have a client id. Does your server have the necessary keyword Ring middleware (`wrap-params` & `wrap-keyword-params`)?"]
Expand Down Expand Up @@ -635,7 +626,7 @@
;; ->client (should auto-close conn if it's
;; gone dead).
(interfaces/sch-send! server-ch websocket?
(pack packer :chsk/ws-ping)))
(pack packer [[:chsk/ws-ping]])))
(recur udt-t1))))))

;; Ajax handshake/poll
Expand All @@ -659,20 +650,20 @@
;; (assert (= _sch server-ch))
;; Appears to still be the active sch
(interfaces/sch-send! server-ch websocket?
(pack packer :chsk/timeout))))))))))
(pack packer [[:chsk/timeout]]))))))))))

(fn [server-ch websocket? req-ppstr]
(fn [server-ch websocket? req-bpstr]
(assert websocket?)
(upd-conn! :ws uid client-id)
(let [[clj ?cb-uuid] (unpack packer req-ppstr)]
(let [[[clj ?cb-uuid]] (unpack packer req-bpstr)]
(receive-event-msg! clj ; Should be ev
(when ?cb-uuid
(fn reply-fn [resp-clj] ; Any clj form
(tracef "Chsk send (ws reply): %s" resp-clj)
;; true iff apparent success:
(interfaces/sch-send! server-ch websocket?
(pack packer resp-clj ?cb-uuid)))))))
(pack packer [[resp-clj ?cb-uuid]])))))))

:on-close ; We rely on `on-close` to trigger for _every_ conn!
(fn [server-ch websocket? _status]
Expand Down Expand Up @@ -894,7 +885,7 @@
(defn- receive-buffered-evs! [chs clj]
(tracef "receive-buffered-evs!: %s" clj)
(let [buffered-evs (have vector? clj)]
(doseq [ev buffered-evs]
(doseq [[ev _] buffered-evs]
(assert-event ev)
;; Should never receive :chsk/* events from server here:
(let [[id] ev] (assert (not= (namespace id) "chsk")))
Expand Down Expand Up @@ -995,7 +986,7 @@

;; TODO Buffer before sending (but honor `:flush?`)
(let [?cb-uuid (when ?cb-fn (enc/uuid-str 6))
ppstr (pack packer ev ?cb-uuid)]
bpstr (pack packer [[ev ?cb-uuid]])]

(when-let [cb-uuid ?cb-uuid]
(reset-in! cbs-waiting_ [cb-uuid] (have ?cb-fn))
Expand All @@ -1006,7 +997,7 @@
(cb-fn* :chsk/timeout)))))

(.send @socket_ ppstr)
(.send @socket_ bpstr)
(reset! udt-last-comms_ (enc/now-udt))
(catch :default e
Expand Down Expand Up @@ -1071,33 +1062,32 @@

(aset "onmessage" ; Nb receives both push & cb evs!
(fn [ws-ev]
(let [ppstr (enc/oget ws-ev "data")
(let [bpstr (enc/oget ws-ev "data")
unpacked (unpack packer bpstr)

;; `clj` may/not satisfy `event?` since
;; we also receive cb replies here. This
;; is why we prefix pstrs to indicate
;; whether they're wrapped or not
[clj ?cb-uuid] (unpack packer ppstr)]
;; we also receive cb replies here:
[[clj1 ?cb-uuid1]] unpacked]

(reset! udt-last-comms_ (enc/now-udt))

(when (handshake? clj)
(receive-handshake! :ws chsk clj)
(when (handshake? clj1)
(receive-handshake! :ws chsk clj1)
(reset! retry-count_ 0)

(when (= clj :chsk/ws-ping)
(when (= clj1 :chsk/ws-ping)
(put! (:<server chs) [:chsk/ws-ping])

(if-let [cb-uuid ?cb-uuid]
(if-let [cb-uuid ?cb-uuid1]
(if-let [cb-fn (pull-unused-cb-fn! cbs-waiting_
(cb-fn clj)
(warnf "Cb reply w/o local cb-fn: %s" clj))
(let [buffered-evs clj]
(receive-buffered-evs! chs buffered-evs)))))))

(receive-buffered-evs! chs unpacked))))))

;; Fires repeatedly (on each connection attempt) while
;; server is down:
Expand Down Expand Up @@ -1207,7 +1197,7 @@
{:X-CSRF-Token csrf-token})

(let [ppstr (pack packer ev (when ?cb-fn :ajax-cb))]
(let [bpstr (pack packer [[ev (when ?cb-fn :ajax-cb)]])]
(merge params ; 1st (don't clobber impl.):
{:udt (enc/now-udt) ; Force uncached resp

Expand All @@ -1220,7 +1210,8 @@
;; implementation:
:client-id client-id

:ppstr ppstr}))})
:ppstr bpstr ; TODO Temp
:bpstr bpstr}))})

(fn ajax-cb [{:keys [?error ?content]}]
(if ?error
Expand All @@ -1231,9 +1222,9 @@
#(chsk-state->closed % :unexpected))
(when ?cb-fn (?cb-fn :chsk/error))))

(let [content ?content
resp-ppstr content
[resp-clj _] (unpack packer resp-ppstr)]
(let [content ?content
resp-bpstr content
[[resp-clj _]] (unpack packer resp-bpstr)]
(if ?cb-fn
(?cb-fn resp-clj)
(when (not= resp-clj :chsk/dummy-cb-200)
Expand Down Expand Up @@ -1299,26 +1290,26 @@

;; The Ajax long-poller is used only for events, never cbs:
(let [content ?content
ppstr content
[clj] (unpack packer ppstr)
handshake? (handshake? clj)]
(let [bpstr (have ?content)
unpacked (unpack packer bpstr)
[[clj1]] unpacked
handshake? (handshake? clj1)]

(when handshake?
(receive-handshake! :ajax chsk clj))
(receive-handshake! :ajax chsk clj1))

(swap-chsk-state! chsk #(assoc % :open? true))
(poll-fn 0) ; Repoll asap

(when-not handshake?
(when (= clj :chsk/timeout)
(when (= clj1 :chsk/timeout)
(when @debug-mode?_
(receive-buffered-evs! chs [[:debug/timeout]]))
(put! (:<server chs) [:debug/timeout]))

(let [buffered-evs clj] ; An application reply
(receive-buffered-evs! chs buffered-evs))))))))))))]
;; An application reply:
(receive-buffered-evs! chs unpacked)))))))))))]

(poll-fn 0)
Expand Down

0 comments on commit be9f009

Please sign in to comment.