Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate BeanColloctor into Partitioner #138

Merged
merged 32 commits into from
Jan 5, 2022

Conversation

wycccccc
Copy link
Collaborator

@wycccccc wycccccc commented Dec 3, 2021

繼續 #113 的完善
將BeanColloctor整合進了Partitioner中。
簡化了使用者配置Jmx addresses的行為,不再需要將NodeID與Jmx addresses一同配置。
Code更加乾淨了。

@wycccccc
Copy link
Collaborator Author

wycccccc commented Dec 3, 2021

還有一些小問題需要請教 #137
一些程式中的print()會在bug解決後刪除
NodeLoadClient還會再補充一些測試。

@wycccccc wycccccc mentioned this pull request Dec 3, 2021
@wycccccc
Copy link
Collaborator Author

wycccccc commented Dec 5, 2021

#137

另一個角度是說 這個問題先緩一緩 先把重構的部份處理好再回來

這個問題應該緩不了,因為我們的測試環境就是一個host上有三個node。算是上述提到的半個情景,我有嘗試解決,現在可以支持同一個host上有多個node的情況。.
程式的基本框架已經完成,之後的其他功能會在合併後通過其他pr添加。

import org.astraea.metrics.HasBeanObject;

/** create a single BeanCollector to avoid waste of resources */
public class BeanCollectorFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我不太確定是否還需要用這種方式,之前用這個是因為要考慮"cluster"的分組問題,不過現在BeanCollector並沒有cluster的概念,BeanCollector就是維護一個jmx server and metrics的關係。換言之,我們可以簡單用static memebr來處理"singleton"就好

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果目前做法對效能沒有太大影響,我想可以暫時保留這種處理方式,一方面是如果改用static memebr可能還是需要一個類似邏輯的程式判斷BeanCollector何時應該關閉。
另一方面,我未來是打算將BeanCollector能夠根據實際的情況進行客制化創建(例如可以存儲不同時間量的數據,或是根據面對jmxserver的數量決定要開設幾條線程處理它們)。這部分我還沒想好具體要怎麼做,但是一個Factory可以保證我生產出想要的BeanCollector

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

所以現在partitioner和BeanCollector的關係為何?是什麼情況會共用什麼時候又不會共用?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用同樣的config創建多個producer時候這些producer是共用的。用不同的config這樣就是不同的了。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不同的config

這個可能也要開個議題追蹤一下,因為太容易有不同的config 但是面對同一個叢集的狀態

Copy link
Collaborator Author

@wycccccc wycccccc Dec 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這樣聽起來不太秒,我直接改成維護一個 static BeanCollector的形式。已完成。

@wycccccc
Copy link
Collaborator Author

wycccccc commented Dec 10, 2021

緊接着 #112 進行討論
我試圖創建一個Map<producer,partitioner>來將producer與partitoner進行匹配方便控制partitioner中的state。
我盡力想通過producer調出partitioner,或是通過partitioner匹配到producer。但似乎都做不到。
現在我們的使用者將需要多配置一個參數producerID在producerConfig中。
每一個producerID都需要唯一,幫助進行partitioner與producer的匹配。

使用者目前實現dependency的方法為。

var dependencyClient = new DependencyClient();
dependencyClient.initializeDependency(props);
dependencyClient.beginDependency(props);
producer.send();
dependencyClient.finishDependency(props);

如果這種做法可行的話我會再補齊注釋。

public void close() {
synchronized (lock) {
if (count == 1) {
beanCollector.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這樣設計會有問題,例如當同一個JVM關掉最後一個partitioner,然後又開一個新的,這樣就會用到已經被關閉的beanCollector

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已修復該問題,現在關掉後再創建不會再用到已關閉的beanCollector

@@ -25,7 +25,7 @@ public static Builder builder() {
return new Builder();
}

static class Builder {
public static class Builder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

請使用builder()取代開放此建構

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個地方我不太知道改用哪種寫法。如果刪掉public在我build它的時候會出現Cannot be accessed from outside package的錯誤。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BeanCollector.build() 這樣就會建立Builder了

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2021-12-11 15-51-06 的螢幕擷圖

去掉pubilc後它會回報上述錯誤,我是不知道這個錯誤該怎麼避免。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

喔喔 那是我看錯了 你改得沒錯 在別的package要存取的話

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的,沒問題就行

@wycccccc
Copy link
Collaborator Author

如果producerID的做法沒有問題的話,會有以下一個問題
現在的producerID為了程序正常運轉是必須需要設置的,但是實際上只有當用dependency的時候才需要這個參數。
為了讓使用者更自由確定是否設置producerID,還需要加一套producerID的防呆機制。
我先將這個問題記錄在這裡,如果確定producerID是可行的未來方向。我再開始處理這個問題。

}

public void close() {
FACTORY.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在多個partitioner共用的狀況下,close並不會釋放任何東西。但實際上的確有東西要釋放掉,那就是beanCollector.register()執行完後的物件,該物件連結了在beanCollector裡面的資源,那就是呼叫註冊的執行緒必須去釋放的東西

因此這邊的邏輯應該要改成 搜集執行緒過程建立的Unregister,然後在這裡把它釋放掉就好

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

請見#150,現在不需要去追蹤FACTORY,而是把自己拿到的Receivers斷掉就好

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有修改成斷掉維護Receiversthread pool,這樣有算把資源釋放掉嘛。

@chia7712
Copy link
Contributor

如果producerID的做法沒有問題的話,會有以下一個問題
現在的producerID為了程序正常運轉是必須需要設置的,但是實際上只有當用dependency的時候才需要這個參數。
為了讓使用者更自由確定是否設置producerID,還需要加一套producerID的防呆機制。
我先將這個問題記錄在這裡,如果確定producerID是可行的未來方向。我再開始處理這個問題。

麻煩開另一個議題來追蹤,這應該跟這隻PR的關係不大

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 因為現在演算法過程用的資料結構太難閱讀,我希望看如何調整BeanCollector讓你不需要用那麼複雜的方式來保存資料,也希望能簡化你這隻在多執行緒和關閉等議題上變得比較輕鬆

麻煩先看一下我回的意見,以及#150,謝謝

this.nodeMetadataCollection.add(
new NodeMetadata(entry.getKey(), createNodeMetrics(entry.getKey(), entry.getValue())));
this.beanCollector = FACTORY.beanCollector();
beanCollector
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我看了一下你的用法,重新調整了BeanCollector (see #150),在新版的用法下,這段可以改成

var receivers = List.of(
           beanCollector
           .register()
           .host(entry.getKey())
           .port(Integer.parseInt(entry.getValue()))
           .metricsGetter(KafkaMetrics.BrokerTopic.BytesOutPerSec::fetch)
           .build(),
           beanCollector
           .register()
           .host(entry.getKey())
           .port(Integer.parseInt(entry.getValue()))
           .metricsGetter(KafkaMetrics.BrokerTopic.BytesInPerSec::fetch)
           .build())

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已完成修改。

for (NodeMetadata nodeMetadata : nodeMetadataCollection) {
NodeMetrics nodeMetrics = nodeMetadata.getNodeMetrics();
Utils.close(nodeMetrics.getKafkaMetricClient());
for (Map.Entry<Integer, Map.Entry<String, Integer>> hostPort : addresses.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

麻煩看看能否以Receiver -> List<HasBeanObject>的資料結構來重新整理這段,現在這隻PR用的這些資料結構太難閱讀(因為層層圈套),如果無法單純用Receiver -> List<HasBeanObject>這樣的方式來處理,那我們再討論一下該如何調整APIs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我寫的時候也很艱辛,像是在玩一個邏輯積木。我會嘗試用新方法去實現,應該不會再變成這樣。今天需要做論文簡報,明天睡前應該可以改好。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以將複雜的資料結構寫成一個新的class,給各個變數新的名字,一來好閱讀二來也好維護。另外也建議多寫一些comment幫助理解現在的要用的資料結構

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有學到,不過使用Receiver後資料結構得以簡化不少,目前應該保持Map還可以接受。

}

public void close() {
FACTORY.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

請見#150,現在不需要去追蹤FACTORY,而是把自己拿到的Receivers斷掉就好

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 感謝更新,一樣幾個重要的設計討論請看一下

for (Map.Entry<String, Integer> entry : jmxAddresses.entrySet()) {
var mbeanClient = MBeanClient.jndi(entry.getKey(), entry.getValue());
BiFunction<String, Integer, MBeanClient> clientCreator = (host, port) -> mbeanClient;
var beanCollector =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

現在BeanCollector已經不佔用資源,而是作為一個“共同“儲存媒介給各個Receiver,換言之,跟現在的BeanCollectorFactory角色有點重疊,另外BeanCollector已經是thread-safe,這邊在用lock有點多餘

.interval(Duration.ofSeconds(1))
.numberOfObjectsPerNode(10)
.clientCreator(clientCreator)
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊每次都建立新的BeanCollector會使得不同的receiver不能共享bean objects,除非你是故意要達到這樣的效果,否則應該要由同一個BeanCollector來取得Receiver

ThreadPool.builder()
.runnables(
receiversList.stream()
.map(receiver -> ((Runnable) receiver::current))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一樣需要執行緒嗎?如果要仰賴額外執行緒來取得bean,又會掉入之前處理多個partitioner共用下要如何優雅關閉的問題

.metricsGetter(KafkaMetrics.BrokerTopic.BytesOutPerSec::fetch)
.build());
count.put(entry, 1);
} else {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

出現了一個問題,我按照這樣的形式創建我的receiver們,可是獲得的receiver獲取的metrics都會變成第一個receiver設置的metrics。
2021-12-17 01-55-08 的螢幕擷圖
我不確定是我創建的方式有問題,還是這是一個隱蔽的bug。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你是跑哪一個test出現這個問題?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我很快看了一下,這邊用的資料結構是Map<String, Receiver>,這樣在有多個brokers的時候就會撞到key重複。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

學長指的是nodeLoadClient中的Map<String, Receiver>嘛。這個應該不會是問題的所在,我跑的是partitionerTest,因為我在創建這個資料結構前,在我一開始拿到receiverList的時候,我就直接對其中的兩個receiver,拿取他們的hasBeanObject進行對比。
發現兩個receiver記錄的metrics是一樣的。這代表問題應該在receiverFactory創建receiver中就已經出現。所以我在懷疑是我創建的方式有問題還是其他原因。
上圖我截圖的代碼是我用來抓取receiverList中每個receiverhasBeanObjectmetricsname

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的,我了解問題了,請看#154

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已測試完畢,現在它能正常運行了。

@wycccccc
Copy link
Collaborator Author

上述的建議都已修改,現在代碼應該精煉了不少,再麻煩學長review。

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 現在程式碼裡面很多可以用Map.forEach來改寫,麻煩看一下

}
}
public synchronized void brokerHashMap(Map<Integer, Double> poissonMap) {
poissonMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    poissonMap.forEach(
        (key, value) -> {
          if (!brokerHashMap.containsKey(key)) {
            brokerHashMap.put(key, new int[] {(int) ((1 - value) * 20), 0});
          } else {
            brokerHashMap.put(key, new int[] {(int) ((1 - value) * 20), brokerHashMap.get(key)[1]});
          }
        });

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已完成提到的所有修改,麻煩再看一下給予意見。

*/
public List<Receiver> receiversList(Map<String, Integer> jmxAddresses) {
synchronized (lock) {
jmxAddresses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

麻煩用jmxAddresses.forEach((host, port) -> {來處理

public BrokersWeight(LoadPoisson loadPoisson) {
this.loadPoisson = loadPoisson;
}
private static Map<Integer, int[]> brokerHashMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這種用法容易產生誤會, 因為使用者必須“建立BrokersWeight”這個物件,但這個物件內部的資料卻是static...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊我這樣做的原因,是因為我想讓不同的partitioner可以共享一個權重,這樣新的partitioner一進來就會按目前集群的狀況來塞資料。或許我可以把它搬進partitioer中。將BrokerWeight變為一個內部類,然後在partitioner中維護一個static BrokersWeight,這樣感覺就比較自然。

Copy link
Collaborator Author

@wycccccc wycccccc Dec 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我又思考了一下,所有partitioner都用一個權重好像也沒有明顯的優勢。反而會在維護權重上產生不少問題。所以我直接移除了static

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc thanks for updating code. a couple of comments below. Also, I'm waiting the response about producer id

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class receiverFactoryTest extends RequireBrokerCluster {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReceiverFactoryTest


@Test
void testSingleton() {
initProConfig();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned object is not used so I guess this method is not necessary


private int avgLoadCount(Map<Integer, Integer> overLoadCount) {
var avgLoadCount =
overLoadCount.values().stream().mapToDouble(Integer::doubleValue).average().getAsDouble();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verLoadCount.values().stream().mapToDouble(Integer::doubleValue).average().orElse(0);

return ans;
}

// visible for test
public long factorial(long number) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is used for testing only, it should be package-private rather than public modifier

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

了解,相關的其他地方也已一並修改。

int x = nodeLoadClient.getBinOneCount(entry.getValue());
poissonMap.put(entry.getKey(), doPoisson(lambda, x));
}
public synchronized HashMap<Integer, Double> allPoisson(Map<Integer, Integer> overLoadCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it requires synchronization? You should make overLoadCount thread-safe instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sor,我沒有修改到,現在的架構確實已經不需要了,已修正。

e.printStackTrace();
}
return null;
private static final ReceiverFactory FACTORY = new ReceiverFactory();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RECEIVER_FACTORY

for (Map.Entry<Integer, Map.Entry<String, Integer>> hostPort : addresses.entrySet()) {
List<Receiver> matchingNode;
if (!hostPort.getValue().getKey().matches(regex)) {
var localhost =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to check localhost?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

應該不是loaclhost,而是host。
這裡是因為我發現測試中啟動的kafka抓到的host是host name,而不是它的ip地址。
那時候一直在解決這個問題,所以心中一直想着loaclhost就取成了這個名。
已修改名稱。

n.stream().findAny().get().port(),
hostPort.getValue().getValue()))
.anyMatch(n -> n.equals(true)))
|| Objects.equals(nodeIDReceiver.size(), 0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should evaluate Objects.equals(nodeIDReceiver.size(), 0) first since it is cheaper.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有學到!

var addresses =
cluster.nodes().stream()
.collect(
Collectors.toMap(node -> node.id(), node -> Map.entry(node.host(), node.port())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Node::id

receiverList.stream()
.filter(
receiver ->
((nodeIDReceiver.values().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use helper to handle this duplicate code

Copy link
Collaborator Author

@wycccccc wycccccc Dec 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

雖然它們之間長的很像但是實際上是在比較不同的東西,所以用不了helper。
不過現在我有改變一下思路,進而拿掉了那些duplicate code。


for (Map.Entry<Integer, Map.Entry<String, Integer>> hostPort : addresses.entrySet()) {
List<Receiver> matchingNode;
var host = "-1.-1.-1.-1";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這看起來是要把hostname轉成IP address,麻煩用個helper method來處理

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已修改。

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 感謝更新,針對message order的設計還有一些疑問,請看一下


private SmoothWeightPartitioner partitionerOfProducer(KafkaProducer producer)
throws NoSuchFieldException, IllegalAccessException {
var field = producer.getClass().getDeclaredField("partitioner");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

麻煩把這個動作放到Utils裡面

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已移動到Utils

smoothWeightPartitioner.finishDependency();
}

private SmoothWeightPartitioner partitionerOfProducer(KafkaProducer producer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaProducer<?, ?>

Objects.requireNonNull(mapAddress, "You must configure jmx_servers correctly.");

nodeLoadClient = new NodeLoadClient(mapAddress);
dependencyManager = new DependencyManager();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這可以直接初始化就好
private final DependencyManager dependencyManager = new DependencyManager();

} catch (UnknownHostException e) {
e.printStackTrace();
}
assert overLoadCount != null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

為何不是拋出錯誤而是要用assert?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

修改成拋出錯誤,類似問題也都已修改。


assert overLoadCount != null;
brokerHashMap(loadPoisson.allPoisson(overLoadCount));
AtomicReference<Map.Entry<Integer, int[]>> maxWeightServer = new AtomicReference<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊用Map就好

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊我只需要找到一個最大權重的server就好,感覺資料結構上Map沒有Map.Entry好處理。


private NodeLoadClient nodeLoadClient;
private DependencyManager dependencyManager;
private int targetPartition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個特別的參數需要註解

public class DependencyClient {
private final SmoothWeightPartitioner smoothWeightPartitioner;

public DependencyClient(KafkaProducer kafkaProducer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可否讓這個建構式交由SmoothWeightPartitioner處理?也就是使用者只要認SmoothWeightPartitioner就好,而不是用特別去記這個看起來不容易察覺關聯性的名字?例如

SmoothWeightPartitioner.startGroup(producer);
try {
...
}  finally {
    SmoothWeightPartitioner.endGroup(producer);
}

this.smoothWeightPartitioner = partitionerOfProducer(kafkaProducer);
}

public synchronized void initializeDependency() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

承上,為何需要有READY?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

備用,現在Dependency大概只是能夠按要求動的程度,留個坑以後好補些邏輯做優化。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

除非必要,否則APi越簡潔越好

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好,已經移除。

field.setAccessible(true);
return (SmoothWeightPartitioner) field.get(producer);
}

Copy link
Collaborator Author

@wycccccc wycccccc Dec 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

現在user使用Utils中的partitionerOfProducermethod調用到producer對應的partitioner。
以下為範例代碼

KafkaProducer producer = new KafkaProducer(props);
 
SmoothWeightPartitioner smoothWeightPartitioner = Utils.partitionerOfProducer(producer);
  try{
     dependencyClient.beginDependency();
     producer.send();
  } finally{
      dependencyClient.finishDependency();
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SmoothWeightPartitioner smoothWeightPartitioner = Utils.partitionerOfProducer(producer);

理想上使用者只需要認SmoothWeightPartitioner這隻class就好,一來方便二來好記憶,因此會建議改成:

var orderControl = SmoothWeightPartitioner.orderControl(producer);
try {
  orderControl.begin();
  producer.send();
  producer.send();
  producer.send();
} finally {
  orderControl.finish();
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好,已修改成上述案例樣式。

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 感謝更新,還有一些設計討論,請看一下

* @param fieldName reflected field name.
* @return Required field.
*/
public static Field reflectionField(Object object, String fieldName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  public static Object requireField(Object object, String fieldName) {
    try {
      var field = object.getClass().getDeclaredField(fieldName);
      field.setAccessible(true);
      return field.get(object);
    } catch (NoSuchFieldException | IllegalAccessException e) {
      throw new IllegalArgumentException(e);
    }
  }

.collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue));
try (var producer = Producer.builder().configs(props).build()) {
SmoothWeightPartitioner smoothWeightPartitioner =
SmoothWeightPartitioner.orderControl(producer.kafkaProducer());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊也是前後兩行的呼叫可以得知,拿到smoothWeightPartitioner後馬上要呼叫beginDependency,那麼我們就可以把這兩行合併在一起

另外也要考慮是否要建議一個新的介面來避免直接使用SmoothWeightPartitioner

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有建立了一個interface DependencyClient來避免使用者能直接調用到partitioner。然後是合併的問題,這樣如果使用者在一個程式里對一個producer反覆開關dependency不就會創建很多沒有用的object了。還是說為了讓使用者方便,這種程度的資源浪費是可以接受的?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一個producer反覆開關dependency不就會創建很多沒有用的object了。還是說為了讓使用者方便,這種程度的資源浪費是可以接受的?

別太小看現代JVM了 而且相比這支程式要處理的邏輯 這個簡單的物件開關不會是大事情

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

了解,已經修改完畢,現在它們被整合到了一起。

@@ -11,6 +12,8 @@

void close();

KafkaProducer kafkaProducer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaProducer<Key, Value> kafkaProducer();

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc a coupe of major questions below.

@@ -0,0 +1,5 @@
package org.astraea.partitioner.smoothPartitioner;

public interface DependencyClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

麻煩加上註解

return (DependencyClient) Utils.requireField(producer, "partitioner");
}

public synchronized void finishDependency() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override

}

public static synchronized DependencyClient beginDependency(KafkaProducer<?, ?> producer) {
dependencyManager.beginDependency();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果同時有多個執行緒同時在呼叫beginDependencyfinishDependency,這樣會執行緒安全嗎

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

目前的話,這樣會因為錯誤的狀態轉換而報錯,我將在下個PR一併優化該問題。

@wycccccc
Copy link
Collaborator Author

wycccccc commented Jan 5, 2022

我提議能夠先將此PR meger,因為我已基於這支PR完成了partitoner v1.0.0,添加了不少新東西。再在此push顯得過於臃腫,不太適合繼續在此幹活了。

@chia7712
Copy link
Contributor

chia7712 commented Jan 5, 2022

我提議能夠先將此PR meger,因為我已基於這支PR完成了partitoner v1.0.0,添加了不少新東西。再在此push顯得過於臃腫,不太適合繼續在此幹活了。

麻煩把整個討論串中提到的題目都先開好issue

@chia7712 chia7712 merged commit d6ab715 into opensource4you:main Jan 5, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants