Skip to content

Commit

Permalink
Expose internal statistics of GreedyBalancer via JMX (#1095)
Browse files Browse the repository at this point in the history
  • Loading branch information
garyparrot authored Nov 9, 2022
1 parent b807963 commit dba74ba
Show file tree
Hide file tree
Showing 6 changed files with 435 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand All @@ -39,6 +40,12 @@ static Builder builder() {
return new Builder();
}

/**
* @return a String indicate the name of this execution. This information is used for debug and
* logging usage.
*/
String executionId();

/**
* @return the cluster cost function for this problem.
*/
Expand Down Expand Up @@ -81,6 +88,7 @@ static Builder builder() {

class Builder {

private String executionId = "noname-" + UUID.randomUUID();
private HasClusterCost clusterCostFunction;
private List<HasMoveCost> moveCostFunction = List.of(HasMoveCost.EMPTY);
private BiPredicate<ClusterCost, ClusterCost> clusterConstraint =
Expand All @@ -91,6 +99,17 @@ class Builder {
private Predicate<String> topicFilter = ignore -> true;
private final Map<String, String> config = new HashMap<>();

/**
* Set a String that represents the execution of this algorithm. This information is typically
* used for debugging and logging usage.
*
* @return this
*/
public Builder executionId(String id) {
this.executionId = id;
return this;
}

/**
* Specify the cluster cost function to use. It implemented specific logic to evaluate if a
* rebalance plan is worth using at certain performance/resource usage aspect
Expand Down Expand Up @@ -199,6 +218,11 @@ public Builder config(String key, String value) {

public AlgorithmConfig build() {
return new AlgorithmConfig() {
@Override
public String executionId() {
return executionId;
}

@Override
public HasClusterCost clusterCostFunction() {
return clusterCostFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -31,6 +33,7 @@
import org.astraea.common.balancer.log.ClusterLogAllocation;
import org.astraea.common.balancer.tweakers.ShuffleTweaker;
import org.astraea.common.cost.ClusterCost;
import org.astraea.common.metrics.jmx.MBeanRegister;

/**
* A single-state hill-climbing algorithm. It discovers rebalance solution by tweaking the cluster
Expand All @@ -51,6 +54,7 @@ public class GreedyBalancer implements Balancer {
private final int minStep;
private final int maxStep;
private final int iteration;
private final AtomicInteger run = new AtomicInteger();

public GreedyBalancer(AlgorithmConfig algorithmConfig) {
this.config = algorithmConfig;
Expand Down Expand Up @@ -80,6 +84,8 @@ public GreedyBalancer(AlgorithmConfig algorithmConfig) {
@Override
public Optional<Plan> offer(
ClusterInfo<Replica> currentClusterInfo, Map<Integer, Set<String>> brokerFolders) {
Jmx jmx = new Jmx();

final var allocationTweaker = new ShuffleTweaker(minStep, maxStep);
final var metrics = config.metricSource().get();
final var clusterCostFunction = config.clusterCostFunction();
Expand Down Expand Up @@ -114,6 +120,8 @@ public Optional<Plan> offer(
ClusterLogAllocation.of(ClusterInfo.masked(currentClusterInfo, config.topicFilter()));
var currentPlan = Optional.<Balancer.Plan>empty();
while (true) {
jmx.nextIteration();
jmx.newCost(currentCost.value());
var newPlan = next.apply(currentAllocation, currentCost);
if (newPlan.isEmpty()) break;
currentPlan = newPlan;
Expand All @@ -122,4 +130,40 @@ public Optional<Plan> offer(
}
return currentPlan;
}

// visible for test
static long minLongDouble(long lhs, long rhs) {
double l = Double.longBitsToDouble(lhs);
double r = Double.longBitsToDouble(rhs);
double out = (Double.isNaN(r) || l < r) ? l : r;
return Double.doubleToRawLongBits(out);
}

private class Jmx {

private final LongAdder currentIteration = new LongAdder();
private final LongAccumulator currentMinCost =
new LongAccumulator(GreedyBalancer::minLongDouble, Double.doubleToRawLongBits(Double.NaN));

Jmx() {
final var runId = run.getAndIncrement();
MBeanRegister.local()
.setDomainName("astraea.balancer")
.addProperty("id", config.executionId())
.addProperty("algorithm", GreedyBalancer.class.getSimpleName())
.addProperty("run", Integer.toString(runId))
.addAttribute("Iteration", Long.class, currentIteration::sum)
.addAttribute(
"MinCost", Double.class, () -> Double.longBitsToDouble(currentMinCost.get()))
.register();
}

void nextIteration() {
currentIteration.increment();
}

void newCost(double cost) {
currentMinCost.accumulate(Double.doubleToRawLongBits(cost));
}
}
}
197 changes: 197 additions & 0 deletions common/src/main/java/org/astraea/common/metrics/jmx/MBeanRegister.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics.jmx;

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.AttributeNotFoundException;
import javax.management.DynamicMBean;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanConstructorInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanNotificationInfo;
import javax.management.MBeanOperationInfo;
import javax.management.ObjectName;
import org.astraea.common.Utils;

/**
* A helper class for construct & register a Dynamic MBean.
*
* @see <a href="https://docs.oracle.com/cd/E19206-01/816-4178/6madjde4k/index.html">Dynamic MBean
* Introduction</a>
* @see <a href="https://docs.oracle.com/cd/E19206-01/816-4178/6madjde4l/index.html">Implementing a
* Dynamic MBean</a>
*/
public class MBeanRegister {

public static LocalRegister local() {
return new LocalRegister();
}

public static class LocalRegister {

// TODO: At this moment, this builder support readonly attribute only.
// Enhance it if you need extra features.

private LocalRegister() {}

private String domainName;
private final Map<String, String> properties = new Hashtable<>();
private String description = "";
private final List<MBeanAttributeInfo> attributeInfo = new ArrayList<>();
private final Map<String, Supplier<?>> attributes = new ConcurrentHashMap<>();

/**
* The domain name of this MBean.
*
* @return this.
*/
public LocalRegister setDomainName(String domainName) {
this.domainName = domainName;
return this;
}

/**
* Put a new property of this MBean.
*
* @return this.
*/
public LocalRegister addProperty(String key, String value) {
this.properties.put(key, value);
return this;
}

/**
* Description of this Mbean.
*
* @return this.
*/
public LocalRegister setDescription(String description) {
this.description = description;
return this;
}

/**
* Declare a new attribute of this Mbean.
*
* @param name the name of this attribute.
* @param attributeClass the type of this attribute.
* @param source where to retrieve the attribute value. Note that this {@link Supplier} must be
* thread-safe.
* @return this.
*/
public <T> LocalRegister addAttribute(
String name, Class<T> attributeClass, Supplier<T> source) {
Objects.requireNonNull(name);
Objects.requireNonNull(attributeClass);
Objects.requireNonNull(source);
attributeInfo.add(
new MBeanAttributeInfo(name, attributeClass.getName(), "", true, false, false));
attributes.compute(
name,
(attribute, original) -> {
if (original != null)
throw new IllegalArgumentException(
"This attribute " + attribute + " already registered");
else return source;
});
return this;
}

private DynamicMBean buildMBean() {
final MBeanInfo mBeanInfo =
new MBeanInfo(
Object.class.getName(),
description,
attributeInfo.toArray(MBeanAttributeInfo[]::new),
new MBeanConstructorInfo[0],
new MBeanOperationInfo[0],
new MBeanNotificationInfo[0]);
final var defensiveCopy = Map.copyOf(attributes);

return new DynamicMBean() {
@Override
public MBeanInfo getMBeanInfo() {
return mBeanInfo;
}

@Override
public Object getAttribute(String s) throws AttributeNotFoundException {
var attribute = defensiveCopy.get(s);
if (attribute == null)
throw new AttributeNotFoundException("Cannot find attribute: " + s);
else return attribute.get();
}

@Override
public AttributeList getAttributes(String[] strings) {
// Some consideration of the contract of this method
// 1. An error in accessing specific attribute doesn't interrupt the whole read operation.
// 2. Attribute with an error during the access, its value will be excluded from the
// result.
var list = new AttributeList();

for (int i = 0; i < strings.length; i++) {
try {
list.add(i, new Attribute(strings[i], getAttribute(strings[i])));
} catch (Exception ignore) {
// swallow
}
}

return list;
}

@Override
public void setAttribute(Attribute attribute) {
throw new UnsupportedOperationException(
MBeanRegister.class.getName() + " doesn't support writable attribute");
}

@Override
public AttributeList setAttributes(AttributeList attributeList) {
throw new UnsupportedOperationException(
MBeanRegister.class.getName() + " doesn't support writable attribute");
}

@Override
public Object invoke(String s, Object[] objects, String[] strings) {
throw new UnsupportedOperationException(
MBeanRegister.class.getName() + " doesn't support invoke operation");
}
};
}

/** Build this Mbean, and register it to the local JVM MBean server. */
public void register() {
Utils.packException(
() -> {
var name = new ObjectName(domainName, new Hashtable<>(properties));
var mBean = buildMBean();
ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name);
});
}
}
}
Loading

0 comments on commit dba74ba

Please sign in to comment.