Skip to content

Commit

Permalink
Merge pull request #3880 from austb/pdb-5691/main/benchmark-delay-com…
Browse files Browse the repository at this point in the history
…mands

(PDB-5691) benchmark: space out command types
  • Loading branch information
austb authored Sep 26, 2023
2 parents 3203dae + 75c90ed commit 632f383
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 46 deletions.
152 changes: 108 additions & 44 deletions src/puppetlabs/puppetdb/cli/benchmark.clj
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,19 @@
[puppetlabs.puppetdb.utils :as utils :refer [println-err]]
[puppetlabs.kitchensink.core :as kitchensink]
[puppetlabs.puppetdb.client :as client]
[puppetlabs.puppetdb.random :refer [random-string random-bool random-sha1]]
[puppetlabs.puppetdb.random :refer [safe-sample-normal random-string random-bool random-sha1]]
[puppetlabs.puppetdb.time :as time :refer [now]]
[puppetlabs.puppetdb.archive :as archive]
[clojure.core.async :refer [go go-loop <! <!! chan] :as async]
[clojure.core.async :refer [go go-loop <! <!! >!! chan] :as async]
[taoensso.nippy :as nippy]
[puppetlabs.i18n.core :refer [trs]]
[puppetlabs.puppetdb.nio :refer [get-path]])
(:import
[clojure.core.async.impl.protocols Buffer UnblockingBuffer]
[java.nio.file.attribute FileAttribute]
[java.nio.file Files OpenOption]
[java.util ArrayDeque]))
(clojure.core.async.impl.protocols Buffer UnblockingBuffer)
(java.nio.file.attribute FileAttribute)
(java.nio.file Files OpenOption)
(java.util ArrayDeque)
(java.util.concurrent RejectedExecutionException)))

(defn try-load-file
"Attempt to read and parse the JSON in `file`. If this failed, an error is
Expand Down Expand Up @@ -283,44 +284,72 @@
[data-paths false])]
(kitchensink/mapvals #(some-> % (load-sample-data from-cp?)) data-paths))))

(def random-cmd-delay safe-sample-normal)

(defn start-command-sender
"Start a command sending process in the background. Reads host-state maps from
command-send-ch and sends commands to the puppetdb at base-url. Writes
::submitted to rate-monitor-ch for every command sent, or ::error if there was
a problem. Close command-send-ch to stop the background process."
[base-url command-send-ch rate-monitor-ch num-threads ssl-opts]
(let [fanout-commands-ch (chan)]
[base-url command-send-ch rate-monitor-ch num-threads ssl-opts sched max-q-size max-command-delay-ms]
(let [fanout-commands-ch (chan)
schedule (fn [ch v ^long delay] (utils/schedule sched #(>!! ch v) delay))]
;; fanout: given a single host state, emit 3 messages, one for each command.
;; This gives better parallelism for message submission.
(async/pipeline
1
fanout-commands-ch
(mapcat (fn [host-state]
(let [{:keys [host catalog report factset]} host-state]
(remove nil?
[(when catalog [:catalog host 9 catalog])
(when report [:report host 8 report])
(when factset [:factset host 5 factset])]))))
command-send-ch)
(go-loop [host-state (<! command-send-ch)]
(if-let [{:keys [host catalog report factset]} host-state]
;; randomize catalog and report delay, minimum 500ms matches the speed
;; at which puppetdb received commands for an empty catalog
(let [catalog-delay (random-cmd-delay 5000 3000 {:lowerb 500
:upperb max-command-delay-ms})
report-delay (+ catalog-delay
(random-cmd-delay 5000 3000 {:lowerb 500
:upperb max-command-delay-ms}))]
;; The go-loop needs to provide back-pressue on the command-send-ch
;; to ensure it never enqueues more comamnds to send than can be sent
;; before await-schedule-shutdown's timeout. But it also needs to provide a
;; large enough queue that a steady-state (run-interval) run of benchmark
;; has enough queue space for delayed catalogs and reports and can still
;; process factsets as received
(if (< max-q-size (-> sched .getQueue .size))
(do
(<! (async/timeout 50))
(recur host-state))
(do
(try
(when factset (schedule fanout-commands-ch [:factset host 5 factset] 0))
(when catalog (schedule fanout-commands-ch [:catalog host 9 catalog] catalog-delay))
(when report (schedule fanout-commands-ch [:report host 8 report] report-delay))
(catch RejectedExecutionException _
;; if the scheduler has been shutdown, close channel to allow benchmark to
;; terminate cleanly
(utils/await-scheduler-shutdown sched (* 2 max-command-delay-ms))
(async/close! fanout-commands-ch)))
(recur (<! command-send-ch)))))
(do
(utils/request-scheduler-shutdown sched false)
(utils/await-scheduler-shutdown sched (* 2 max-command-delay-ms))
(async/close! fanout-commands-ch))))

;; actual sender process
(async/pipeline-blocking
num-threads
rate-monitor-ch
(map (fn [[command host version payload]]
(let [submit-fn (case command
:catalog client/submit-catalog
:report client/submit-report
:factset client/submit-facts)]
(try
(submit-fn base-url host version payload ssl-opts)
::submitted
(catch Exception e
(do
(println-err (trs "Exception while submitting command: {0}" e))
(println-err (with-out-str (trace/print-stack-trace e))))
::error)))))
fanout-commands-ch)))
[(async/pipeline-blocking
num-threads
rate-monitor-ch
(map (fn [[command host version payload]]
(let [submit-fn (case command
:catalog client/submit-catalog
:report client/submit-report
:factset client/submit-facts)]
(try
(submit-fn base-url host version payload ssl-opts)
::submitted
(catch Exception e
(do
(println-err (trs "Exception while submitting command: {0}" e))
(println-err (with-out-str (trace/print-stack-trace e))))
::error)))))
fanout-commands-ch)
fanout-commands-ch]))

(defn start-rate-monitor
"Start a task which monitors the rate of messages on rate-monitor-ch and
Expand Down Expand Up @@ -485,7 +514,8 @@
(defn register-shutdown-hook! [f]
(.addShutdownHook (Runtime/getRuntime) (Thread. f)))

;; The core.async processes and channels fit together like this:
;; The core.async processes and channels fit together like
;; this (square brackets indicate data being sent):
;;
;; (initial-hosts-ch: host-maps)
;; |
Expand All @@ -499,15 +529,24 @@
;; mult (simulation-write-ch: host-maps) ------------------------/
;; |
;; | (command-send-ch: host-maps)
;; |
;; [host-state]
;; |
;; v
;; command-sender
;; |
;; [factset, delayed catalog, delayed report]
;; |
;; v
;; (scheduler)
;; |
;; | (rate-monitor-ch: ::success or ::error)
;; v
;; rate-monitor
;;
;; It's all set up so that channel closes flow downstream (and upstream to the
;; producer). Closing simulation-read-ch shuts down everything.
;; producer). The command-sender is not a pipeline so to shut down benchmark
;; simulation-read-ch and fanout-ch must be closed and the scheduler shutdown.

(defn benchmark
"Feeds commands to PDB as requested by args. Returns a map of :join, a
Expand All @@ -526,8 +565,23 @@
protocol (if ssl-host "https" "http")
ssl-opts (select-keys (:jetty config) [:ssl-cert :ssl-key :ssl-ca-cert])
base-url (utils/pdb-cmd-base-url pdb-host pdb-port :v1 protocol)
run-interval (-> (get options :runinterval 30) time/minutes)
run-interval (get options :runinterval 30)
run-interval-minutes (-> run-interval time/minutes)
simulation-threads 4

;; Provide a scheduler queue maximum that will provide enough space to handle
;; runinterval command load, but small enough to provide backpressure in the
;; pipeline when run in nummsgs mode.
;; Each host will schedule two delayed commands in the next 30 seconds
;; the queue needs space for that many commands, plus a small (25%) buffer to
;; ensure it is able to process factsets as they come in
hosts-per-sec (/ numhosts (* run-interval 60))
max-command-delay-ms 15000
max-delayed-seconds (* 2 (/ max-command-delay-ms 1000))
max-waiting-commands (* 2 hosts-per-sec max-delayed-seconds)
maximum-scheduler-queue (max 30
(* 1.25 max-waiting-commands))

commands-per-puppet-run (+ (if catalogs 1 0)
(if reports 1 0)
(if facts 1 0))
Expand Down Expand Up @@ -559,12 +613,20 @@
_rate-monitor-finished-ch (start-rate-monitor rate-monitor-ch
(-> 30 time/minutes)
commands-per-puppet-run)
command-sender-finished-ch (start-command-sender base-url
command-send-ch
rate-monitor-ch
threads
ssl-opts)
_ (start-simulation-loop numhosts run-interval nummsgs end-commands-in rand-perc
command-delay-scheduler (utils/scheduler 1)
;; ensures we submit all commands after they are scheduled
;; before we tear down the output channel
_ (.setExecuteExistingDelayedTasksAfterShutdownPolicy command-delay-scheduler true)

[command-sender-finished-ch fanout-ch] (start-command-sender base-url
command-send-ch
rate-monitor-ch
threads
ssl-opts
command-delay-scheduler
maximum-scheduler-queue
max-command-delay-ms)
_ (start-simulation-loop numhosts run-interval-minutes nummsgs end-commands-in rand-perc
simulation-threads simulation-write-ch simulation-read-ch)
join-fn (fn join-benchmark
([] (join-benchmark nil))
Expand All @@ -574,6 +636,8 @@
(async/alt!! t-ch false rate-monitor-ch true))))
stop-fn (fn stop-benchmark []
(async/close! simulation-read-ch)
(utils/request-scheduler-shutdown command-delay-scheduler true)
(async/close! fanout-ch)
(when-not (join-fn benchmark-shutdown-timeout)
(println-err (trs "Timed out while waiting for benchmark to stop."))
false))
Expand Down
5 changes: 3 additions & 2 deletions test/puppetlabs/puppetdb/cli/benchmark_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
;; This normally calls System/exit on a cli error;
;; we'd rather have the exception.
utils/try-process-cli (fn [body] (body))
benchmark/benchmark-shutdown-timeout tu/default-timeout-ms]
benchmark/benchmark-shutdown-timeout tu/default-timeout-ms
;; disable catalog/reports submission delay to avoid slowing down tests
benchmark/random-cmd-delay (constantly 0)]
(f submitted-records (benchmark/benchmark-wrapper cli-args)))))

(deftest progressing-timestamp-nummsgs
Expand Down Expand Up @@ -232,7 +234,6 @@
(stop)))))

(deftest rand-catalog-mutation-keys
(prn "running test")
(let [catalog {"certname" "host-1"
"catalog_uuid" "512d24ae-8999-4f12-bda0-1e5d57c0b5cc"
"producer" "puppet-primary-1"
Expand Down

0 comments on commit 632f383

Please sign in to comment.