-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Interdependent Message #553
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc 有一個跟設計有關的建議,請看一下
另外可以把一些跟PR無關的變更拿掉,這樣我們可以集中討論
好啊 可是這些改動大多都和測試相關。少了這些測試會動不了。我先把相關改動切成小pr? |
這樣很好 |
衝突已解決,再麻煩review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc 不好意思,在設計上我有幾個較大的想法,請看一下
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我將其按照上述討論修改簡化,額外的議題我會再開issue討論。還有一個小問題想要確認一下。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc Thanks for this patch. there are some minor comments. PTAL
上述conversation已跟進,後續議題開設在 #591,再麻煩學長review。 |
我好好的修改了一下,確保了所有Dispatcher都能正確使用Interdependent功能,再麻煩review。 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc 一樣有設計上的問題,請再多思考一下
已更新,現在一個producer的每一個thread都會有其自己的Interdependent。 |
@wycccccc 不好意思,麻煩修正一下衝突 |
衝突已解決 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc 一樣針對設計的部分還有多執行緒的用法有一些疑問,請看一下
common/src/main/java/org/astraea/common/partitioner/Dispatcher.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/astraea/common/partitioner/Dispatcher.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/astraea/common/partitioner/Dispatcher.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/astraea/common/partitioner/Dispatcher.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc 程式碼風格還有一些建議,請看一下
common/src/main/java/org/astraea/common/partitioner/Dispatcher.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/astraea/common/partitioner/Dispatcher.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/astraea/common/partitioner/Dispatcher.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/astraea/common/partitioner/Dispatcher.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc 感謝更新,應該是最後幾個建議了,感謝耐心
valueBytes == null ? new byte[0] : valueBytes, | ||
CLUSTER_CACHE.computeIfAbsent(cluster, ignored -> ClusterInfo.of(cluster))); | ||
interdependent.targetPartitions = target; | ||
return interdependent.isInterdependent ? interdependent.targetPartitions : target; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這邊的判斷是多餘的,同一個執行緒下 interdependent.targetPartitions
和 target
的值會是相同的
* @param producer Kafka producer | ||
*/ | ||
static void endInterdependent(Producer<Key, Value> producer) { | ||
THREAD_LOCAL.get().targetPartitions = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
用remove
吧,這樣之後如果加入更多狀態也不需要新增額外的初始化邏輯
|
||
@Test | ||
void multipleThreadTest() { | ||
var admin = Admin.of(bootstrapServers()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try-release
* @param attribute attribute name | ||
* @return attribute | ||
*/ | ||
public static Object reflectionAttribute(Object object, String attribute) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
麻煩移除這個方法並改用Utils.member
已经修正完毕,感谢学长耐心的review |
@chinghongfang 你方便幫忙測試下這隻的效能嗎? |
沒問題,結果我再紀錄在 doc/dispatcher/experiments 裡面。 |
#541 實作
目前挑選target partition的邏輯爲選擇log size最小的Broker,再選中該Broker中log size最小的partition。
target partition在選擇後不會改變直到關閉Interdependent function。再此開啓會重新選擇target partition。不開啓則爲正常所選擇Dispatcher功能。
在Interface中新增了
jmxAddress()
以在Interface中獲取jmx address。修改測試中的
RequireBrokerCluster
讓其能重新設置NUMBER_OF_BROKERS。原因在於叢集中的三個節點都會共用一臺機器,導致獲取到的Broker metrics無法鑑別是在哪一個節點上。考慮到正常情況應該是一個節點一臺機器,故爲了保證測試正常進行,讓其能夠修改NUMBER_OF_BROKERS。