-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance testing tool #11
Conversation
Producer每秒傳送一筆訊息,consumer接收後印出延時。 執行bin/performance.sh時:若檔案內沒有kafka_2.13-2.8.0資料夾,則會去下載。
Record bytes read/write per second, and latency. And print them out per second. Implement configures: --brokers, --bootstrapServers --topic --topicConfigs --producers --consumers The producer thread sleeps for 1ms every time it send record. To avoid records stuck.
指定producer/consumer數量,指定訊息大小,指定topic參數 輸出每個producer/consumer,輸出/入(MB/second),latency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this patch.
Noted that we have introduced the gradle application plugin and so we can leverage it to run our application
See https://docs.gradle.org/current/userguide/application_plugin.html for more details
謝謝學長的指教, |
Warm up consumers to make the metrics more accurate. Split producers and consumers start up from main. Test consumers and producers creating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
修改命名問題,改寫主程式邏輯。
更新程式碼,改利用ThreadPool
、TopicAdmin
。
使用embedded kafka 進行測試。
app/src/test/java/org/astraea/performance/FakeComponentFactory.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
感謝更新,README.md
好像有些格式壞了
Change unit test environment from fake producer/consumer to real Kafka producer/consumer connects to an embedded Kafka cluster. Higher Consumer `poll()` duration in consumer thread to 10 seconds.
app/src/main/java/org/astraea/performance/ComponentFactory.java
Outdated
Show resolved
Hide resolved
1. Change Metric.java to thread-safe object. Add a new function to update all data to prevent from data inconsistency. 2. Consumer with the same group-id should subscribe the same topic. So the consumers created by the componentFactory have the same group-id and subscribe to the same topics. More, the producers created by the componentFactory send to the same topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updates:
- Metric thread-safe
- Metric data consistency
- Component factory create consumer/producer using the same topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chinghongfang 感謝更新,幾個小問題
另外也請用舊的perf tool和新的tool比較一下,看延遲和吞吐量是否有差異,謝謝
static ThreadPool.Executor consumerExecutor(Consumer consumer, Metrics metrics) { | ||
return new ThreadPool.Executor() { | ||
@Override | ||
public void execute() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InterruptedException
is not used.
byte[] payload = new byte[param.recordSize]; | ||
return new ThreadPool.Executor() { | ||
@Override | ||
public void execute() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InterruptedException
is useless
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I will check through the code for unused again.
validateWith = ArgumentUtil.NotEmptyString.class) | ||
String JMXAddress = "0.0.0.0@0"; | ||
|
||
public Map<String, Object> perfProps() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這個參數要餵給producer
, consumer
and admin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好的,了解,我再修改。
Properties prop = new Properties(); | ||
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); | ||
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, partitionerName); | ||
prop.put("jmx_servers", JMXAddress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這邊要考慮一下JMXAddress
不存在的時候該怎麼辦?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這裡是要給partitioner使用的參數,也只有custom partitioner 會使用這項參數。這裡想交由partitioner做判斷,看partitioner是否接受無JMXAddress
。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我沒表達清楚。
想像一下之後的情境,我們應該會測試poison partitioner和其他partitioners的比較,換言之,前者我們會設定jmx address,但後者我們不會設定。因此這邊應該要判斷是否有jmx address來決定要用哪個partitioner
當然這個可以到下一個PR再來處理,那麼我們應該先不要引入jmx address這個參數,下一隻PR在處理
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
瞭解,這裡就先不引入jmx address,之後再來處理。
private long num; | ||
private long max; | ||
private long min; | ||
private final LongAdder bytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
因為所有public methods都已經使用synchronized
,似乎這裡也就不用LongAdder
確認一下,間隔發送的話要如何確定有測到叢集的速度上限? |
舊版的是可以指定也可以自動產生。新版的建議也要這樣做比較方便。
在正式測試上會比較仰賴時間,一來時間驗證是很重要的事情(例如跑了一小時後速度保持穩定)。因此會建議一定要有時間設定,records數量倒是其次,有的話也很棒
建議要用隨機內容,這樣在測試壓縮時的速度時才不會失真 |
無法確定是否到上限。
好的,這樣是比使用預設固定值好。
瞭解,我再修改。
原來如此!當時想著要盡量減少資料處理的時間,漏掉壓縮的差異。 |
@chinghongfang 上面討論的那些可以在後續的PRs處理,先記錄到issue裡面或開新的也行。 我想先合併這隻,然後把舊的清掉 |
好的,那我就先處理test failed。 |
@chinghongfang 麻煩把我們上述討論的後續題目貼到issues去喔,不然一定會忘記@@ |
效能測試程式 (phase1) #7
快速建立效能測試的工具
使用方式:
(brokers選項要換成啟動docker後,給予的broker address)
功能如 效能測試程式 (phase1) #7 所述。
目前有遇到的問題:
目前是在script內檢查有否kafka library?如果沒有
kafka_2.13-2.8.0
這個資料夾,就會下載。現在的producer每發送完一筆訊息就會停1毫秒,或許會有更好的傳送方式。
目前將檔案放在app資料夾底下,不確定是否妥當。