Skip to content

Commit

Permalink
kafka & hornbill: modify a lot of configs for consumer
Browse files Browse the repository at this point in the history
- WARNING: there is false-positive for :poll-skip. Just ignore it please...
- Default to 2t * 2p
- Set consumer timeouts (rebalance.timeout.ms, session.timeout.ms, max.poll.interval.ms and poll-ms) to 3000ms. This ensures poll work actively
- Add some info about groups in :poll/:subscription history
- Set test time to 360s and final poll time to 60s
  • Loading branch information
Commelina committed Jul 16, 2024
1 parent 6d9d771 commit 2bc041c
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 25 deletions.
16 changes: 9 additions & 7 deletions docker/control-hornbill/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ CMD /init-ssh-control.sh && \
--db hornbill \
--workload queue \
--sub-via subscribe \
--time-limit 180 \
--final-time-limit 120 \
--time-limit 360 \
--final-time-limit 60 \
--key-dist uniform \
--key-count 32 \
--key-count 4 \
##<<<
# This value is enough. A larger one can make elle exhausted.
--concurrency 10 \
Expand All @@ -57,9 +57,13 @@ CMD /init-ssh-control.sh && \
##<<<
# We allow the producer to send messages in batch, but do not
# be too large.
--producer-linger-ms 2000 \
--producer-linger-ms 3000 \
--batch-max-bytes 10240 \
##>>>
# auto reset offset to earliest to prevent "offset not start from 0"
##<<<
--auto-offset-reset earliest \
##>>>
#-------------- txns options --------------#
##<<<
# We do not support txns and idempotent for now.
Expand All @@ -69,7 +73,7 @@ CMD /init-ssh-control.sh && \
--no-server-idempotence \
##>>>
#-------------- nemesis options --------------#
--nemesis all \
--nemesis none \
--nemesis-interval 15 \
--crash-clients true \
--tcpdump \
Expand All @@ -79,9 +83,7 @@ CMD /init-ssh-control.sh && \
##<<<
# The following opts is client default (3.7.0) but we
# specify them explicitly here for clarity.
--auto-offset-reset latest \
--enable-auto-commit \
--enable-server-auto-create-topics \
--isolation-level read_uncommitted \
--acks all && \
##>>>
Expand Down
20 changes: 14 additions & 6 deletions docker/control-kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ CMD /init-ssh.sh && \
--db hstream \
--workload queue \
--sub-via subscribe \
--time-limit 180 \
--final-time-limit 120 \
--time-limit 360 \
--final-time-limit 60 \
--key-dist uniform \
--key-count 32 \
--key-count 4 \
##<<<
# This value is enough. A larger one can make elle exhausted.
--concurrency 10 \
--rate 10 \
--rate 20 \
##>>>
##<<<
# When writes of a key exceed the limit, the generator will pick
Expand All @@ -86,6 +86,16 @@ CMD /init-ssh.sh && \
# It may be turned on in the future to perform more tests...
--retries 0 \
##>>>
##<<<
# We allow the producer to send messages in batch, but do not
# be too large.
--producer-linger-ms 3000 \
--batch-max-bytes 10240 \
##>>>
# auto reset offset to earliest to prevent "offset not start from 0"
##<<<
--auto-offset-reset earliest \
##>>>
#-------------- txns options --------------#
##<<<
# We do not support txns and idempotent for now.
Expand All @@ -105,9 +115,7 @@ CMD /init-ssh.sh && \
##<<<
# The following opts is client default (3.7.0) but we
# specify them explicitly here for clarity.
--auto-offset-reset latest \
--enable-auto-commit \
--enable-server-auto-create-topics \
--isolation-level read_uncommitted \
--acks all && \
##>>>
Expand Down
9 changes: 8 additions & 1 deletion src/jepsen/hstream/kafka/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
(java.time Duration)
(java.util Properties)
(java.util.concurrent ExecutionException)
(org.apache.kafka.clients CommonClientConfigs)
(org.apache.kafka.clients.admin Admin
AdminClientConfig
NewTopic)
Expand Down Expand Up @@ -93,11 +94,17 @@
300

ConsumerConfig/SESSION_TIMEOUT_MS_CONFIG
6000 ; Bounded by server
3000 ; Bounded by server

ConsumerConfig/CONNECTIONS_MAX_IDLE_MS_CONFIG
60000

ConsumerConfig/MAX_POLL_INTERVAL_MS_CONFIG
3000

CommonClientConfigs/REBALANCE_TIMEOUT_MS_CONFIG
3000

; ConsumerConfig/DEFAULT_ISOLATION_LEVEL
; ???
}
Expand Down
2 changes: 1 addition & 1 deletion src/jepsen/hstream/kafka/workload/list_append.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

(def partition-count
"How many partitions per topic?"
4)
2)

(defn k->topic
"Turns a logical key into a topic."
Expand Down
33 changes: 23 additions & 10 deletions src/jepsen/hstream/kafka/workload/queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@

(def partition-count
"How many partitions per topic?"
4)
2)

(def replication-factor
"What replication factor should we use for each topic?"
3)

(def poll-ms
"How long should we poll for, in ms?"
100)
3000)

(defn k->topic
"Turns a logical key into a topic."
Expand Down Expand Up @@ -480,7 +480,10 @@

(catch UnknownServerException e#
(assoc ~op :type :info, :error [:unknown-server-exception
(.getMessage e#)]))
(str (.getMessage e#)
(.getCause e#)
(doall (map #(.toString %) (.getStackTrace e#))))
]))

(catch TimeoutException _#
(assoc ~op :type :info, :error :kafka-timeout))
Expand Down Expand Up @@ -652,7 +655,10 @@
topics
(rc/logging-rebalance-listener rebalance-log))
(rc/subscribe! consumer topics))
(assoc op :type :ok))

(let [subs (doall (seq (.subscription consumer)))
subs (into [] subs)]
(assoc op :type :ok :subs subs)))

; Apply poll/send transactions
(:poll, :send, :txn)
Expand All @@ -677,10 +683,17 @@
; https://stackoverflow.com/questions/45195010/meaning-of-sendoffsetstotransaction-in-kafka-0-11,
; commitSync is more intended for non-transactional
; workflows.
(when (and (#{:poll :txn} (:f op))
(not (:txn test))
(:subscribe (:sub-via test)))
(try (.commitSync consumer)
(if (and (#{:poll :txn} (:f op))
(not (:txn test))
(:subscribe (:sub-via test)))
;; FIXME: hardcoded timeout for `commitSync` and `.position`
(try (.commitSync consumer (java.time.Duration/ofMillis 2000))
(let [tps (doall (seq (.assignment consumer)))
poss (doall (map #(.position consumer % (java.time.Duration/ofMillis 1000)) tps))
tups (doall (map #(vector (.toString %1) %2) tps poss))
meta (.toString (.groupMetadata consumer))
]
(assoc op :type :ok :cur-offsets tups :group-meta meta))
; If we crash during commitSync *outside* a
; transaction, it might be that we poll()ed some
; values in this txn which Kafka will think we
Expand All @@ -689,8 +702,8 @@
; the reads, but note the lack of commit.
(catch RuntimeException e
(assoc op :type :ok
:error [:consumer-commit (.getMessage e)]))))
(assoc op :type :ok)))))))))
:error [:consumer-commit (.getMessage e)])))
(assoc op :type :ok))))))))))

(teardown! [this test])

Expand Down

0 comments on commit 2bc041c

Please sign in to comment.