Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Sensor to MetricCollector #1171

Merged
merged 14 commits into from
Dec 6, 2022
4 changes: 4 additions & 0 deletions app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.astraea.common.cost.ReplicaSizeCost;
import org.astraea.common.metrics.collector.Fetcher;
import org.astraea.common.metrics.collector.MetricCollector;
import org.astraea.common.metrics.collector.MetricSensors;

class BalancerHandler implements Handler {

Expand Down Expand Up @@ -171,6 +172,7 @@ public CompletionStage<Response> post(Channel channel) {
var bestPlan =
metricContext(
fetchers,
request.configBuilder.get().build().clusterCostFunction().sensors(),
(metricSource) ->
Balancer.create(
request.balancerClasspath,
Expand Down Expand Up @@ -228,12 +230,14 @@ public CompletionStage<Response> post(Channel channel) {

private Optional<Balancer.Plan> metricContext(
Collection<Fetcher> fetchers,
Collection<MetricSensors> metricSensors,
Function<Supplier<ClusterBean>, Optional<Balancer.Plan>> execution) {
// TODO: use a global metric collector when we are ready to enable long-run metric sampling
// https://github.com/skiptests/astraea/pull/955#discussion_r1026491162
try (var collector = MetricCollector.builder().interval(sampleInterval).build()) {
freshJmxAddresses().forEach(collector::registerJmx);
fetchers.forEach(collector::addFetcher);
metricSensors.forEach(collector::addMetricSensors);
return execution.apply(collector::clusterBean);
}
}
Expand Down
12 changes: 12 additions & 0 deletions common/src/main/java/org/astraea/common/cost/CostFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
*/
package org.astraea.common.cost;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import org.astraea.common.Configuration;
import org.astraea.common.metrics.Sensor;
import org.astraea.common.metrics.collector.Fetcher;
import org.astraea.common.metrics.collector.MetricSensors;

/**
* It is meaningless to implement this interface. Instead, we should implement interfaces like
Expand All @@ -42,4 +46,12 @@ public interface CostFunction {
default Optional<Fetcher> fetcher() {
return Optional.empty();
}

/**
* @return the {@link Sensor} and the type of {@link org.astraea.common.metrics.stats.Stat} name
* to use.
*/
default Collection<MetricSensors> sensors() {
return List.of();
}
}
12 changes: 12 additions & 0 deletions common/src/main/java/org/astraea/common/cost/HasClusterCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package org.astraea.common.cost;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.Replica;
import org.astraea.common.metrics.collector.Fetcher;
import org.astraea.common.metrics.collector.MetricSensors;

@FunctionalInterface
public interface HasClusterCost extends CostFunction {
Expand All @@ -35,6 +37,11 @@ static HasClusterCost of(Map<HasClusterCost, Double> costAndWeight) {
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toUnmodifiableList()));
var sensors =
costAndWeight.keySet().stream()
.flatMap(x -> x.sensors().stream())
.collect(Collectors.toList());

return new HasClusterCost() {
@Override
public ClusterCost clusterCost(ClusterInfo<Replica> clusterInfo, ClusterBean clusterBean) {
Expand All @@ -50,6 +57,11 @@ public ClusterCost clusterCost(ClusterInfo<Replica> clusterInfo, ClusterBean clu
public Optional<Fetcher> fetcher() {
return fetcher;
}

@Override
public Collection<MetricSensors> sensors() {
return sensors;
}
};
}

Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/org/astraea/common/metrics/Sensor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ public interface Sensor<V> {
/**
* Get the statistic by the given `metricName`.
*
* @param metricName key to get the measurement
* @param statName key to get the measurement
* @return the value calculated by the corresponding `Stat`
*/
V measure(String metricName);
V measure(String statName);

Map<String, Stat<V>> metrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public synchronized void record(V value) {
}

@Override
public V measure(String metricName) {
return stats.get(metricName).measure();
public V measure(String statName) {
return stats.get(statName).measure();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.astraea.common.metrics.MBeanClient;

public final class LogMetrics {
public static final String DOMAIN_NAME = "kafka.log";
public static final String LOG_TYPE = "Log";

public enum LogCleanerManager implements EnumInfo {
UNCLEANABLE_BYTES("uncleanable-bytes"),
Expand Down Expand Up @@ -59,7 +61,7 @@ public List<Gauge> fetch(MBeanClient mBeanClient) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.log")
.domainName(DOMAIN_NAME)
.property("type", "LogCleanerManager")
.property("logDirectory", "*")
.property("name", metricName)
Expand Down Expand Up @@ -140,8 +142,8 @@ public List<Gauge> fetch(MBeanClient mBeanClient) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.log")
.property("type", "Log")
.domainName(DOMAIN_NAME)
.property("type", LOG_TYPE)
.property("topic", "*")
.property("partition", "*")
.property("name", metricName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
import org.astraea.common.metrics.MBeanClient;

public final class ServerMetrics {
public static final String DOMAIN_NAME = "kafka.server";

public static List<AppInfo> appInfo(MBeanClient client) {
return client
.queryBeans(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", "app-info")
.property("id", "*")
.build())
Expand Down Expand Up @@ -104,7 +105,7 @@ public Histogram fetch(MBeanClient mBeanClient) {
return new Histogram(
mBeanClient.queryBean(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", "ZooKeeperClientMetrics")
.property("name", metricName)
.build()));
Expand Down Expand Up @@ -170,7 +171,7 @@ public Meter fetch(MBeanClient mBeanClient) {
return new Meter(
mBeanClient.queryBean(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", "SessionExpireListener")
.property("name", metricName)
.build()));
Expand Down Expand Up @@ -223,7 +224,7 @@ public static HasGauge<String> clusterId(MBeanClient mBeanClient) {
return () ->
mBeanClient.queryBean(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", "KafkaServer")
.property("name", CLUSTER_ID)
.build());
Expand All @@ -241,7 +242,7 @@ public Gauge fetch(MBeanClient mBeanClient) {
return new Gauge(
mBeanClient.queryBean(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", "KafkaServer")
.property("name", metricName)
.build()));
Expand Down Expand Up @@ -320,7 +321,7 @@ public Gauge fetch(MBeanClient mBeanClient) {
return new Gauge(
mBeanClient.queryBean(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", "DelayedOperationPurgatory")
.property("delayedOperation", metricName)
.property("name", "PurgatorySize")
Expand Down Expand Up @@ -384,7 +385,7 @@ public List<Topic.Meter> fetch(MBeanClient mBeanClient) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", "BrokerTopicMetrics")
.property("topic", "*")
.property("name", this.metricName())
Expand Down Expand Up @@ -517,7 +518,7 @@ public Meter fetch(MBeanClient mBeanClient) {
return new Meter(
mBeanClient.queryBean(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", "BrokerTopicMetrics")
.property("name", this.metricName())
.build()));
Expand Down Expand Up @@ -578,7 +579,7 @@ public Gauge fetch(MBeanClient mBeanClient) {
return new Gauge(
mBeanClient.queryBean(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", "ReplicaManager")
.property("name", metricName)
.build()));
Expand Down Expand Up @@ -632,17 +633,14 @@ public static class Socket {
public static SocketMetric socket(MBeanClient mBeanClient) {
return new SocketMetric(
mBeanClient.queryBean(
BeanQuery.builder()
.domainName("kafka.server")
.property("type", METRIC_TYPE)
.build()));
BeanQuery.builder().domainName(DOMAIN_NAME).property("type", METRIC_TYPE).build()));
}

public static List<SocketListenerMetric> socketListener(MBeanClient mBeanClient) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", METRIC_TYPE)
.property(PROP_LISTENER, "*")
.build())
Expand All @@ -656,7 +654,7 @@ public static List<SocketNetworkProcessorMetric> socketNetworkProcessor(
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", METRIC_TYPE)
.property(PROP_LISTENER, "*")
.property(PROP_NETWORK_PROCESSOR, "*")
Expand All @@ -670,7 +668,7 @@ public static List<Client> client(MBeanClient mBeanClient) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.server")
.domainName(DOMAIN_NAME)
.property("type", METRIC_TYPE)
.property(PROP_LISTENER, "*")
.property(PROP_NETWORK_PROCESSOR, "*")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,24 @@ default void addFetcher(Fetcher fetcher) {
addFetcher(fetcher, (i0, i1) -> {});
}

/**
* Add multiple {@link MetricSensors} for real-time statistics
*
* @param metricSensors to statistical data
*/
void addMetricSensors(MetricSensors metricSensors);

/** Register a JMX server. */
void registerJmx(int identity, InetSocketAddress socketAddress);

/** Register the JMX server on this JVM instance. */
void registerLocalJmx(int identity);

/**
* @return the current registered metricsSensors.
*/
Collection<MetricSensors> listMetricsSensors();

/**
* @return the current registered fetchers.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -44,8 +45,8 @@
import org.astraea.common.metrics.MBeanClient;

public class MetricCollectorImpl implements MetricCollector {

private final Map<Integer, MBeanClient> mBeanClients = new ConcurrentHashMap<>();
private final Collection<MetricSensors> metricSensors = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Map.Entry<Fetcher, BiConsumer<Integer, Exception>>> fetchers =
new CopyOnWriteArrayList<>();
private final ScheduledExecutorService executorService;
Expand Down Expand Up @@ -93,10 +94,19 @@ public MetricCollectorImpl(
// for each fetcher, perform the fetching and store the metrics
for (var fetcher : fetchers) {
try {
beans
var beans = fetcher.getKey().fetch(mBeanClients.get(identity.id));
this.beans
.computeIfAbsent(
identity.id, ignored -> new ConcurrentLinkedQueue<>())
.addAll(fetcher.getKey().fetch(mBeanClients.get(identity.id)));
.addAll(beans);
for (var metricSensor : metricSensors)
metricSensor
.record(identity.id, beans)
.forEach(
(key, value) ->
this.beans
.computeIfAbsent(key, ignore -> new ArrayList<>())
.addAll(value));
} catch (NoSuchElementException e) {
// MBeanClient can throw NoSuchElementException if the result of query
// is empty
Expand All @@ -120,6 +130,11 @@ public void addFetcher(Fetcher fetcher, BiConsumer<Integer, Exception> noSuchMet
this.fetchers.add(Map.entry(fetcher, noSuchMetricHandler));
}

@Override
public void addMetricSensors(MetricSensors metricSensors) {
this.metricSensors.add(metricSensors);
}

@Override
public void registerJmx(int identity, InetSocketAddress socketAddress) {
this.registerJmx(
Expand All @@ -144,6 +159,11 @@ public void registerLocalJmx(int identity) {
+ " with the local JMX server. But this id is already registered");
}

@Override
public Collection<MetricSensors> listMetricsSensors() {
return this.metricSensors;
}

@Override
public Collection<Fetcher> listFetchers() {
return fetchers.stream().map(Map.Entry::getKey).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics.collector;

import java.util.Collection;
import java.util.Map;
import org.astraea.common.metrics.HasBeanObject;

@FunctionalInterface
public interface MetricSensors {

/**
* @param identity broker id or producer/consumer id
* @param beans a collection of {@link HasBeanObject}
* @return The collection of "HasBeanObject" generated after the custom statistical method of
* CostFunction.
*/
Map<Integer, Collection<? extends HasBeanObject>> record(
int identity, Collection<? extends HasBeanObject> beans);
}
Loading