-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make the code clear. #113
Make the code clear. #113
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc We can check the error later. a couple of comments below.
|
||
public interface SafeMetadata { | ||
/** @return The nodeID. */ | ||
public String getNodeID(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove public
as it is the default access modifier.
@@ -18,7 +17,7 @@ | |||
private final JMXServiceURL serviceURL; | |||
private final MBeanClient mBeanClient; | |||
private final String nodeID; | |||
private HashMap<String, Double> metricsValues; | |||
private final NodeMetadata nodeMetadata; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Could you change
NodeMetadata
to interface (i.e theSafeMetadata
can be removed)? NodeMetrics
should implement theNodeMetadata
interface.NodeMetrics
should be renamed toNodeClient
as it has a memberMBeanClient
.NodeMetrics
should implement theAutoClosable
interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc 幾個意見請看一下,另外
- 請把
get
set
等前綴盡量拿掉 有點太囉唆了 - 請盡量用java 11的var來宣告變數,這樣可以簡化
} | ||
|
||
public double totalBytesPerSec() { | ||
return metricsValues.get("BytesInPerSec") + metricsValues.get("BytesOutPerSec"); | ||
public MBeanClient getKafkaMetricClient() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這個方法還需要曝露出來嗎?或者說mBeanClient
還需要給外部存取嗎?
} | ||
interface NodeMetadata { | ||
/** @return The nodeID. */ | ||
public String getNodeID(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
請移除多餘的public
因為那已經是預設的了
/** @return The nodeID. */ | ||
public String getNodeID(); | ||
/** @return The Sum of node InputPerSec and OutputPerSec. */ | ||
public double getTotalBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
可否把get
這個前綴拿掉?我相信totalBytes
已經很清楚了
private double totalBytes; | ||
private int overLoadCount; | ||
|
||
NodeClient(String ID, String address) throws IOException { | ||
argumentTargetMetrics.add("BytesInPerSec"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這個看起來都是常數,麻煩弄成static member,此外請用KafkaMetrics
裡面的enum來取代常數字串
@@ -13,23 +12,26 @@ | |||
import org.astraea.metrics.kafka.KafkaMetrics; | |||
|
|||
/** Responsible for connecting jmx according to the received address */ | |||
public class NodeMetrics { | |||
public class NodeClient implements NodeMetadata, AutoCloseable { | |||
private final String JMX_URI_FORMAT = "service:jmx:rmi:///jndi/rmi://" + "%s" + "/jmxrmi"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這應該要是static member
this.overLoadCount = count; | ||
} | ||
|
||
public void setTotalBytes(double bytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這個方法需要暴露出去嗎?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
應該還是需要不然test不太好做。
public void close() { | ||
MBeanClient mBeanClient = getKafkaMetricClient(); | ||
try { | ||
mBeanClient.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Utils.close
感謝review,上述提到的細節都有改進。 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc 感謝更新,幾個意見請看一下
public void close() { | ||
try { | ||
Utils.close(mBeanClient); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這個catch是多餘的,因為Utils.close
已經做了類似的操作
nodeID = ID; | ||
if (Pattern.compile("^service:").matcher(address).find()) | ||
serviceURL = new JMXServiceURL(address); | ||
else serviceURL = new JMXServiceURL(createJmxUrl(address)); | ||
mBeanClient = new MBeanClient(serviceURL); | ||
metricsValues = new HashMap(); | ||
totalBytes = 0.0; | ||
overLoadCount = 0; | ||
} | ||
|
||
public String createJmxUrl(String address) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這個邏輯看起來很間單,有必要獨立成一個method嗎?
private final JMXServiceURL serviceURL; | ||
private final MBeanClient mBeanClient; | ||
private final String nodeID; | ||
private HashMap<String, Double> metricsValues; | ||
private Collection<String> argumentTargetMetrics = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這個應該要改成static member,例如:
private static final Collection<String> BYTES_METRICS = Set.of(KafkaMetrics.BrokerTopic.BytesOutPerSec.toString(), KafkaMetrics.BrokerTopic.BytesOutPerSec.toString());
@@ -18,38 +18,40 @@ public BrokersWeight(LoadPoisson loadPoisson) { | |||
} | |||
|
|||
/** Change the weight of the node according to the current Poisson. */ | |||
public synchronized void setBrokerHashMap() { | |||
HashMap<String, Double> poissonMap = loadPoisson.setAllPoisson(); | |||
public synchronized void setBrokerWeightHashMap() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BrokersWeight
這個物件看起來就是去操作某個static member?然後該物件又是只有LinkPartitioner
需要,這樣還需要用一個static member來維護嗎?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BrokersWeight這個物件看起來就是去操作某個static member?然後該物件又是只有LinkPartitioner需要,這樣還需要用一個static member來維護嗎?
學長的意思是用一個object來維護對嘛。這樣感覺似乎的確直接放在linkpartitioner
之中就可以了,那我把BrokersWeight
直接整個拿掉。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc 感謝更新,還有一些細節可以再調整,麻煩根據下面意見重新審視一下PR
* Record the current weight of each node according to Poisson calculation and the weight after | ||
* partitioner calculation. | ||
*/ | ||
private static HashMap<String, int[]> brokerWeightHashMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這邊還需要用static嗎?從架構上來說ThreadSafeSmoothPartitioner
已經是singleton了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
另外命名上可否不要把HashMap
包含進來?看起來有點詭異@@
@@ -122,6 +124,39 @@ public void configure(Map<String, ?> configs) { | |||
} | |||
pool = ThreadPool.builder().executor(nodeLoadClient).build(); | |||
} | |||
/** Change the weight of the node according to the current Poisson. */ | |||
public synchronized void setBrokerWeightHashMap() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這個命名應該要反應實做,例如updatePoisson
} | ||
} | ||
|
||
public synchronized int getAllWeight() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
盡量學習用lambda,在數學運算上會比較好閱讀(假如沒有明顯效能考量的話),例如
brokerWeightHashMap.values().stream().mapToInt(vs -> vs[0]).sum()
return brokerWeightHashMap; | ||
} | ||
|
||
public synchronized void setCurrentBrokerHashMap(HashMap<String, int[]> currentBrokerHashMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我沒看到有針對此項目的測試,因此可以將它改成private
public LoadPoisson(NodeLoadClient nodeLoadClient) { | ||
this.nodeLoadClient = nodeLoadClient; | ||
} | ||
private HashMap<String, Double> allPoissonMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
除非必要,不然可以用介面宣告就好,
private final Map<String, Double> allPoissonMap = new HashMap<>();
} catch (Exception e) { | ||
e.printStackTrace(); | ||
} | ||
for (NodeClient nodeClient : nodeClientCollection) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nodeClientCollection.forEach(NodeClient::close);
我結合了上述說到的點,再排查了下相關程式碼,將相關問題儘量做了修改。還有問題我再繼續跟進,麻煩學長了。 |
var current = count.get(configs); | ||
if (current == 1) { | ||
try { | ||
nodeLoadClient.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我已將singleton 轉移到了nodeloadclient上。但是有一個小疑問。我嘗試在這個位置執行pool.close();
關閉我的線程池。可是放在這裡會進入一個很奇怪的無限迴圈。我不太清楚導致這樣的原因。是我關閉線程的方式不恰當嘛。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
另一個方式是避免掉這個作法,可以考慮增加hook到JVM裡面,讓關閉的動作在JVM結束時才呼叫 (https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Runtime.html#addShutdownHook(java.lang.Thread))
當然這樣一個副作用是說會有idle client的風險,不過考量那個情境應該不多,應該可以先不用在意
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
學到了,已添加
import java.util.concurrent.TimeUnit; | ||
import org.astraea.concurrent.ThreadPool; | ||
import org.astraea.metrics.jmx.MBeanClient; | ||
|
||
public class NodeLoadClient implements ThreadPool.Executor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如剛剛電話討論,麻煩將此物件抽出來放到org.astraea.metrics
,並使其與partitioner保持獨立(例如不能有LoadPoisson
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我徹底的對代碼進行了重構,現在BeanCollector
可以存儲所讀取所有metrics。有關之前提到的BeanCollector
build我會放在之後的pr里再更新。
目前還有幾個問題,想向學長確認和求助。
|
||
private Set<NodeMetrics> allNodes(int index) { | ||
return allNodes.computeIfAbsent(index, ignored -> Collections.synchronizedSet(new HashSet<>())); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我用這個方法替換了原來的new ConcurrentSkipListSet<>
來保證並發。原來的set並發的判定機制似乎是匹配輸入的host。而相同host下的metrics會因為這個機制被吃掉。所以進行了替換。但我不確定這樣保證並發是否合適。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
原本的做法是比對host
and port
,請看下面的範例
new ConcurrentSkipListSet<>(
Comparator.comparing(Node::host).thenComparing(Node::port)));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
調用addClient
方法當線程中已經被創建Set的時候則無法新添加相同host的其它NodeMetrics。我替換成其他Set的時候就可以正常添加執行了。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
無法新添加相同host的其它NodeMetrics
port有不一樣嗎?你有像我上述的方式去加Comparator
嗎?可否寫個單元測試驗證一下
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
相同host和port,上述應該就是原先學長寫的範例吧。我有之前就有寫unit test, 在BeanCollectorTest
中。一直失敗,花了我超級久才定位到了使這個點出了問題。我搜索了一些有關這個Set的資料做了上述推測。理論上應該是這個問題,因為我改成這樣它就正常了。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我發現原因了,是我把addClient方法改掉了,我讓他接受MBeanClient獲取host與port,而不是直接接受jmxSercerURL,所以才出現每次都只add一個的情況。我再去修改,感謝支援。
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, LinkPartitioner.class.getName()); | ||
props.put("jmx_servers", jmxServiceURL() + "@0"); | ||
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SmoothWeightPartitioner.class.getName()); | ||
props.put("jmx_servers", jmxServiceURL() + ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我們測試環境下生成的jmx address太長了service:jmx:rmi://127.0.0.1/stub/rO0ABXNyAC5qYXZheC5tYW5hZ2VtZW50LnJlbW90ZS5ybWkuUk1JU2VydmVySW1wbF9TdHViAAAAAAAAAAICAAB4cgAaamF2YS5ybWkuc2VydmVyLlJlbW90ZVN0dWLp/tzJi+FlGgIAAHhyABxqYXZhLnJtaS5zZXJ2ZXIuUmVtb3RlT2JqZWN002G0kQxhMx4DAAB4cHc3AApVbmljYXN0UmVmAA4xOTIuMTY4LjMxLjEzMQAAmN8iljUQhzGYEjICnUMAAAF9bArTp4ABAHg=
我不太清楚它長成這樣的原因。
目前我們的beancollector
是用host+por來做為key的。我有嘗試過用nodeID作為key進行替換。但實做後發現Cluster
信息無法在config拿到,而之後在partition中再配置它感覺也有點奇怪,所以最後還是改回了用host做key。
這個長長的key會導致out of bounds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jmx的參數要求應該是只要host + port就好,你可以參考@garyparrot 開發的MetricExplorer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
用host+por來做為key的
這個方法應該是比較直覺,也可以對應使用者輸入的參數 (jmx address and port)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wycccccc 感謝更新,幾個建議請看一下
} | ||
|
||
/** @return the monitored host/port */ | ||
public Set<Map.Entry<String, Integer>> nodesID() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
給Map.Entry<String, Integer>
一個介面吧,例如就叫做Node,然後放在BeanCollector
底下,並且讓NodeMetrics
去繼承
|
||
private Set<NodeMetrics> allNodes(int index) { | ||
return allNodes.computeIfAbsent(index, ignored -> Collections.synchronizedSet(new HashSet<>())); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
原本的做法是比對host
and port
,請看下面的範例
new ConcurrentSkipListSet<>(
Comparator.comparing(Node::host).thenComparing(Node::port)));
.build(); | ||
} | ||
|
||
private Set<NodeMetrics> allNodes(int index) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
可否改成nodes
? 因為內部變數已經有一個allNodes
* @return create a new nodeLoadClient if there is no matched nodeLoadClient (checked by | ||
* comparator). Otherwise, it returns the existent nodeLoadClient. | ||
*/ | ||
public NodeLoadClient getOrCreate(Class<NodeLoadClient> clz, Map<String, ?> configs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clz
的用途是什麼?
Objects.requireNonNull( | ||
jmxAddress, "You must configure jmx_servers correctly.(JmxAddress@NodeID)"); | ||
var nodeLoadClient = new NodeLoadClient(jmxAddress); | ||
var proxy = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
現在還需要建立唯一的NodeLoadClient
嗎?我們不是已經用BeanCollector
管理所有metrics連線了嗎?這樣其他物件要建立幾個都無所謂了吧?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這個問題我有思考過,因為我們還是需要確保BeanCollector唯一?既然這樣就讓NodeLoadClient創建BeanCollector然後繼續讓他保持唯一我感覺也是一種解決辦法。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
直覺BeanCollector
保持唯一會比較簡單,因為他本身根據host + port去保存的特性,後面所有NodeLoadClient應該都可以放心地放更多metric client進去
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好的,我再思考下怎麼做。
這個PR經過了太多的修改,顯得十分混亂,整理思路重新開始才是上策。 |
已完成 #138 關閉此pr |
刪除了不必要的資料保存,現在所有從jmx中抓取到的metrics都將只保存在NodeMetadata中。
同時增加
SafeMetadata
interface,保證該object下read-only。更改了一下程式邏輯,現在possion的計算也將隨每次metrics refresh進行。這樣就不用每次partition都算一次(因為數據不更新算出來也不會變,乾脆直接讓partition獲得possion的值來計算權重),進而提升了一些效能。