Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Sensor to MetricCollector #1171

Merged
merged 14 commits into from
Dec 6, 2022

Conversation

qoo332001
Copy link
Collaborator

@qoo332001 qoo332001 commented Nov 26, 2022

此PR結合MetricCollector,ClusterBeanSensor的功能,讓CostFunction在計算分數時可以透過ClusterBean拿到Sensor統計一段時間內的值

  • 新增一個介面MetricSensors,內有一個方法用來處理一種Metrics要如何做統計,以及回傳自行產生的HasBeanObject
  • MetricCollector中新增 addMetricSensors(),用來增加現有的MetricSensors
  • MetricCollector更新metrics到beans的同時會呼叫各個MetricSensors中的record方法,並取回由該方法產生的HasBeanObject儲存到beans
  • ClusterBean中新增兩個方法分別可以用來取得TopicPartitionReplica level 與 broker level透過Sensor計算出來的統計資料
    • statisticsByReplica
    • statisticsByNode

目前如何統計使用Sensor統計這些HasBeanObject會完全交給CostFunction來決定,因此如果需要使用自行統計後的值會需要先實做一個方法CostFunction#sensors(),這個方法會需要回傳Collection,每個MetricsSensors會紀錄"一種Metrics"(ex. BytesInPerSec, Size),每一種Metrics可能會有多個Sensors(看metrics種類,有可能是一個replica對應一個Sensor或是一個Broker對應一個Sensor),MetricsSensors中有一個方法record需要實做,每當MetricsCollector那邊收集到新的metrics時,會將identitiy與其收集到的一包metrics送進MetricsSensors#record進行自訂的統計,統計後會回傳一個CostFUnction中自行產生的,統計後的HasBeanObject,最後存到MetricCollector中,當CostFunction要取用時,可以透過ClusterBean來獲取

一個實做MetricsSensors#record的範例:

    @Override
    public Collection<MetricSensors> sensors() {
        return List.of(
                new MetricSensors() {
                    final Map<TopicPartitionReplica, Sensor<Double>> sensors = new HashMap<>();

                    @Override
                    public Map<Integer, Collection<? extends HasBeanObject>> record(
                            int identity, Collection<? extends HasBeanObject> beans) {
                        var statisticalBeans = new HashMap<TopicPartitionReplica, HasBeanObject>();
                        beans.forEach(
                                bean -> {
                                    if (bean != null) {
                                        if (bean.beanObject().domainName().equals(LogMetrics.DOMAIN_NAME)
                                                && bean.beanObject().properties().get("type").equals(LogMetrics.LOG_TYPE)) {
                                            var tpr =
                                                    TopicPartitionReplica.of(
                                                            bean.beanObject().properties().get("topic"),
                                                            Integer.parseInt(bean.beanObject().properties().get("partition")),
                                                            identity);
                                            sensors
                                                    .computeIfAbsent(
                                                            tpr,
                                                            ignore ->
                                                                    new SensorBuilder<Double>()
                                                                            .addStat(Avg.AVG_KEY, Avg.of())
                                                                            .build())
                                                    .record(
                                                            Double.valueOf(
                                                                    bean.beanObject().attributes().get("Value").toString()));
                                            statisticalBeans.put(tpr, bean);
                                        }
                                    }
                                });
                        return Map.of(
                                identity,
                                sensors.entrySet().stream()
                                        .map(
                                                sensor -> {
                                                    var bean = statisticalBeans.get(sensor.getKey()).beanObject();
                                                    return new SizeStatisticalBean(
                                                            new BeanObject(
                                                                    bean.domainName(),
                                                                    bean.properties(),
                                                                    Map.of("Value", sensor.getValue().measure(Avg.AVG_KEY)),
                                                                    bean.createdTimestamp()));
                                                })
                                        .collect(Collectors.toList()));
                    }
                });
    }

CostFunction會回傳的自訂HasBeanObject,實做如下:

  public class SizeStatisticalBean implements HasBeanObject{
      BeanObject beanObject;
      SizeStatisticalBean(BeanObject beanObject){
          this.beanObject = beanObject;
      }
      @Override
      public BeanObject beanObject() {
          return beanObject;
      }

      public String topic() {
          return beanObject().properties().get("topic");
      }

      public int partition() {
          return Integer.parseInt(beanObject().properties().get("partition"));
      }

      public double value(){
          return Double.parseDouble(beanObject().attributes().get("Value").toString());
      }
  }

@chia7712
Copy link
Contributor

@qoo332001 終於看到這個功能,很棒。整個專案目前最重要的項目就是如何提升metrics的品質,如同11/25 很多人在詢問我們是透過哪些線索來“處理負載平衡”

@qoo332001 qoo332001 requested a review from chia7712 December 2, 2022 09:11
@qoo332001 qoo332001 requested a review from chia7712 December 2, 2022 14:04
@qoo332001 qoo332001 requested a review from chia7712 December 4, 2022 10:22
Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

麻煩接著試試看用你新增的方法來套用到 cost function,自我檢驗一下

@qoo332001
Copy link
Collaborator Author

麻煩接著試試看用你新增的方法來套用到 cost function,自我檢驗一下

另外開一PR做對嗎

@chia7712
Copy link
Contributor

chia7712 commented Dec 6, 2022

另外開一PR做對嗎

yep

@qoo332001 qoo332001 merged commit c3858aa into opensource4you:main Dec 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants