diff --git a/common/src/main/java/org/astraea/common/cost/Dispersion.java b/common/src/main/java/org/astraea/common/cost/Dispersion.java index 800ebb1c00..bc4c87e170 100644 --- a/common/src/main/java/org/astraea/common/cost/Dispersion.java +++ b/common/src/main/java/org/astraea/common/cost/Dispersion.java @@ -53,6 +53,29 @@ static Dispersion cov() { }; } + /** + * Obtain standard deviation from a series of values. + * + * + */ + static Dispersion standardDeviation() { + return numbers -> { + // special case: no number + if (numbers.isEmpty()) return 0; + var numSummary = numbers.stream().mapToDouble(Number::doubleValue).summaryStatistics(); + var numVariance = + numbers.stream() + .mapToDouble(Number::doubleValue) + .map(score -> score - numSummary.getAverage()) + .map(score -> score * score) + .summaryStatistics() + .getSum(); + return Math.sqrt(numVariance / numbers.size()); + }; + } + /** * Processing a series of values via a specific statistics method. * diff --git a/common/src/main/java/org/astraea/common/partitioner/RoundRobin.java b/common/src/main/java/org/astraea/common/partitioner/RoundRobin.java index 0bdb99755f..fc8ed838ac 100644 --- a/common/src/main/java/org/astraea/common/partitioner/RoundRobin.java +++ b/common/src/main/java/org/astraea/common/partitioner/RoundRobin.java @@ -42,6 +42,30 @@ static RoundRobin smooth(Map scores) { */ Optional next(Set availableTargets); + /** + * Given initial key-score pair, it will output a preferred key with the highest current weight. + * The current weight of the chosen key will decrease the sum of effective weight. And all current + * weight will increment by its effective weight. It may result in "higher score with higher + * chosen rate". For example: + * + *

||========================================||============================================|| + * ||------------ Broker in cluster ------------||------------- Effective weight -------------|| + * ||------------------Broker1------------------||-------------------- 5 ---------------------|| + * ||------------------Broker2------------------||-------------------- 1 ---------------------|| + * ||------------------Broker3------------------||-------------------- 1 ---------------------|| + * ||===========================================||============================================|| + * + *

||===================||=======================||===============||======================|| + * ||--- Request Number ---|| Before current weight || Target Broker || After current weight || + * ||----------1-----------||------ {5, 1, 1} ------||----Broker1----||----- {-2, 1, 1} -----|| + * ||----------2-----------||------ {3, 2, 2} ------||----Broker1----||----- {-4, 2, 2} -----|| + * ||----------3-----------||------ {1, 3, 3} ------||----Broker2----||----- { 1,-4, 3} -----|| + * ||----------4-----------||------ {6,-3, 4} ------||----Broker1----||----- {-1,-3, 4} -----|| + * ||----------5-----------||------ {4,-2, 5} ------||----Broker3----||----- { 4,-2,-2} -----|| + * ||----------6-----------||------ {9,-1,-1} ------||----Broker1----||----- { 2,-1,-1} -----|| + * ||----------7-----------||------ {7, 0, 0} ------||----Broker1----||----- { 0, 0, 0} -----|| + * ||======================||=======================||===============||======================|| + */ class SmoothRoundRobin implements RoundRobin { private final Map effectiveScores; diff --git a/common/src/main/java/org/astraea/common/partitioner/RoundRobinKeeper.java b/common/src/main/java/org/astraea/common/partitioner/RoundRobinKeeper.java new file mode 100644 index 0000000000..7a4e68ee79 --- /dev/null +++ b/common/src/main/java/org/astraea/common/partitioner/RoundRobinKeeper.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.partitioner; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.astraea.common.Lazy; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.NodeInfo; + +public class RoundRobinKeeper { + private final AtomicInteger next = new AtomicInteger(0); + final int[] roundRobin; + final Duration roundRobinLease; + volatile long timeToUpdateRoundRobin = -1; + + private RoundRobinKeeper(int preLength, Duration roundRobinLease) { + this.roundRobin = new int[preLength]; + this.roundRobinLease = roundRobinLease; + } + + static RoundRobinKeeper of(int preLength, Duration roundRobinLease) { + return new RoundRobinKeeper(preLength, roundRobinLease); + } + + synchronized void tryToUpdate(ClusterInfo clusterInfo, Lazy> costToScore) { + if (System.currentTimeMillis() >= timeToUpdateRoundRobin) { + var roundRobin = RoundRobin.smooth(costToScore.get()); + var ids = + clusterInfo.nodes().stream().map(NodeInfo::id).collect(Collectors.toUnmodifiableSet()); + // TODO: make ROUND_ROBIN_LENGTH configurable ??? + IntStream.range(0, this.roundRobin.length) + .forEach(index -> this.roundRobin[index] = roundRobin.next(ids).orElse(-1)); + timeToUpdateRoundRobin = System.currentTimeMillis() + roundRobinLease.toMillis(); + } + } + + int next() { + return roundRobin[ + next.getAndUpdate(previous -> previous >= roundRobin.length - 1 ? 0 : previous + 1)]; + } +} diff --git a/common/src/main/java/org/astraea/common/partitioner/SmoothWeightCal.java b/common/src/main/java/org/astraea/common/partitioner/SmoothWeightCal.java new file mode 100644 index 0000000000..b7c8d6fdae --- /dev/null +++ b/common/src/main/java/org/astraea/common/partitioner/SmoothWeightCal.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.partitioner; + +import java.util.ArrayList; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.astraea.common.Lazy; +import org.astraea.common.cost.Dispersion; + +public final class SmoothWeightCal { + private final double UPPER_LIMIT_OFFSET_RATIO = 0.1; + private final Dispersion dispersion = Dispersion.standardDeviation(); + private Map currentEffectiveWeightResult; + Lazy> effectiveWeightResult = Lazy.of(); + + SmoothWeightCal(Map effectiveWeight) { + this.effectiveWeightResult.get( + () -> + effectiveWeight.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, ignored -> 1.0))); + this.currentEffectiveWeightResult = effectiveWeightResult.get(); + } + + /** + * Update effective weight. + * + * @param brokerScore Broker Score. + */ + synchronized void refresh(Supplier> brokerScore) { + this.effectiveWeightResult = + Lazy.of( + () -> { + var score = brokerScore.get(); + var avgScore = score.values().stream().mapToDouble(i -> i).average().orElse(1.0); + var offsetRatioOfBroker = + score.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> (entry.getValue() - avgScore) / avgScore)); + // If the average offset of all brokers from the cluster is greater than 0.1, it is + // unbalanced. + var balance = + dispersion.calculate(new ArrayList<>(score.values())) + > UPPER_LIMIT_OFFSET_RATIO * avgScore; + this.currentEffectiveWeightResult = + this.currentEffectiveWeightResult.entrySet().stream() + .collect( + Collectors.toUnmodifiableMap( + Map.Entry::getKey, + entry -> { + var offsetRatio = offsetRatioOfBroker.get(entry.getKey()); + var weight = + balance + ? entry.getValue() * (1 - offsetRatio) + : entry.getValue(); + return Math.max(weight, 0.1); + })); + + return this.currentEffectiveWeightResult; + }); + } +} diff --git a/common/src/main/java/org/astraea/common/partitioner/SmoothWeightRoundRobinDispatcher.java b/common/src/main/java/org/astraea/common/partitioner/SmoothWeightRoundRobinDispatcher.java new file mode 100644 index 0000000000..c85c8ff11a --- /dev/null +++ b/common/src/main/java/org/astraea/common/partitioner/SmoothWeightRoundRobinDispatcher.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.partitioner; + +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.astraea.common.Configuration; +import org.astraea.common.Utils; +import org.astraea.common.admin.BrokerTopic; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.cost.NeutralIntegratedCost; +import org.astraea.common.metrics.collector.MetricCollector; + +public class SmoothWeightRoundRobinDispatcher extends Dispatcher { + private static final int ROUND_ROBIN_LENGTH = 400; + private static final String JMX_PORT = "jmx.port"; + public static final String ROUND_ROBIN_LEASE_KEY = "round.robin.lease"; + private final ConcurrentLinkedDeque unusedPartitions = new ConcurrentLinkedDeque<>(); + private final MetricCollector metricCollector = + MetricCollector.builder().interval(Duration.ofMillis(1500)).build(); + private final NeutralIntegratedCost neutralIntegratedCost = new NeutralIntegratedCost(); + private SmoothWeightCal smoothWeightCal; + private RoundRobinKeeper roundRobinKeeper; + private Function> jmxPortGetter = (id) -> Optional.empty(); + + @Override + public int partition(String topic, byte[] key, byte[] value, ClusterInfo clusterInfo) { + var partitionLeaders = clusterInfo.replicaLeaders(topic); + // just return first partition if there is no available partitions + if (partitionLeaders.isEmpty()) return 0; + + // just return the only one available partition + if (partitionLeaders.size() == 1) return partitionLeaders.get(0).partition(); + + var targetPartition = unusedPartitions.poll(); + refreshPartitionMetaData(clusterInfo, topic); + Supplier> supplier = + () -> + // fetch the latest beans for each node + neutralIntegratedCost.brokerCost(clusterInfo, metricCollector.clusterBean()).value(); + + smoothWeightCal.refresh(supplier); + + if (targetPartition == null) { + roundRobinKeeper.tryToUpdate(clusterInfo, smoothWeightCal.effectiveWeightResult); + var target = roundRobinKeeper.next(); + + var candidate = + target < 0 ? partitionLeaders : clusterInfo.replicaLeaders(BrokerTopic.of(target, topic)); + candidate = candidate.isEmpty() ? partitionLeaders : candidate; + + targetPartition = candidate.get((int) (Math.random() * candidate.size())).partition(); + } + + return targetPartition; + } + + @Override + public void close() { + metricCollector.close(); + } + + @Override + public void configure(Configuration configuration) { + configure( + configuration.integer(JMX_PORT), + PartitionerUtils.parseIdJMXPort(configuration), + configuration + .string(ROUND_ROBIN_LEASE_KEY) + .map(Utils::toDuration) + // The duration of updating beans is 4 seconds, so + // the default duration of updating RR is 4 seconds. + .orElse(Duration.ofSeconds(4))); + } + + void configure( + Optional jmxPortDefault, + Map customJmxPort, + Duration roundRobinLease) { + this.jmxPortGetter = id -> Optional.ofNullable(customJmxPort.get(id)).or(() -> jmxPortDefault); + this.neutralIntegratedCost.fetcher().ifPresent(metricCollector::addFetcher); + this.roundRobinKeeper = RoundRobinKeeper.of(ROUND_ROBIN_LENGTH, roundRobinLease); + this.smoothWeightCal = + new SmoothWeightCal<>( + customJmxPort.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, ignore -> 1.0))); + } + + @Override + public void onNewBatch(String topic, int prevPartition) { + unusedPartitions.add(prevPartition); + } + + private void refreshPartitionMetaData(ClusterInfo clusterInfo, String topic) { + clusterInfo.availableReplicas(topic).stream() + .filter(p -> !metricCollector.listIdentities().contains(p.nodeInfo().id())) + .forEach( + node -> { + if (!metricCollector.listIdentities().contains(node.nodeInfo().id())) { + jmxPortGetter + .apply(node.nodeInfo().id()) + .ifPresent( + port -> + metricCollector.registerJmx( + node.nodeInfo().id(), + InetSocketAddress.createUnresolved(node.nodeInfo().host(), port))); + } + }); + } +} diff --git a/common/src/main/java/org/astraea/common/partitioner/StrictCostDispatcher.java b/common/src/main/java/org/astraea/common/partitioner/StrictCostDispatcher.java index 4c3bcbd4de..fae404fd43 100644 --- a/common/src/main/java/org/astraea/common/partitioner/StrictCostDispatcher.java +++ b/common/src/main/java/org/astraea/common/partitioner/StrictCostDispatcher.java @@ -21,15 +21,13 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.astraea.common.Configuration; +import org.astraea.common.Lazy; import org.astraea.common.Utils; import org.astraea.common.admin.BrokerTopic; import org.astraea.common.admin.ClusterInfo; -import org.astraea.common.admin.NodeInfo; import org.astraea.common.cost.BrokerCost; import org.astraea.common.cost.HasBrokerCost; import org.astraea.common.cost.NodeLatencyCost; @@ -52,24 +50,16 @@ */ public class StrictCostDispatcher extends Dispatcher { static final int ROUND_ROBIN_LENGTH = 400; - - public static final String JMX_PORT = "jmx.port"; - public static final String ROUND_ROBIN_LEASE_KEY = "round.robin.lease"; - + static final String JMX_PORT = "jmx.port"; + static final String ROUND_ROBIN_LEASE_KEY = "round.robin.lease"; // visible for testing final MetricCollector metricCollector = MetricCollector.builder().interval(Duration.ofMillis(1500)).build(); - Duration roundRobinLease = Duration.ofSeconds(4); - + private Duration roundRobinLease = Duration.ofSeconds(4); HasBrokerCost costFunction = new NodeLatencyCost(); Function> jmxPortGetter = (id) -> Optional.empty(); - - final int[] roundRobin = new int[ROUND_ROBIN_LENGTH]; - - final AtomicInteger next = new AtomicInteger(0); - - volatile long timeToUpdateRoundRobin = -1; + RoundRobinKeeper roundRobinKeeper; void tryToUpdateFetcher(ClusterInfo clusterInfo) { // register new nodes to metric collector @@ -104,11 +94,13 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster tryToUpdateFetcher(clusterInfo); - tryToUpdateRoundRobin(clusterInfo); + roundRobinKeeper.tryToUpdate( + clusterInfo, + Lazy.of( + () -> + costToScore(costFunction.brokerCost(clusterInfo, metricCollector.clusterBean())))); - var target = - roundRobin[ - next.getAndUpdate(previous -> previous >= roundRobin.length - 1 ? 0 : previous + 1)]; + var target = roundRobinKeeper.next(); // TODO: if the topic partitions are existent in fewer brokers, the target gets -1 in most cases var candidate = @@ -117,20 +109,6 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster return candidate.get((int) (Math.random() * candidate.size())).partition(); } - synchronized void tryToUpdateRoundRobin(ClusterInfo clusterInfo) { - if (System.currentTimeMillis() >= timeToUpdateRoundRobin) { - var roundRobin = - RoundRobin.smooth( - costToScore(costFunction.brokerCost(clusterInfo, metricCollector.clusterBean()))); - var ids = - clusterInfo.nodes().stream().map(NodeInfo::id).collect(Collectors.toUnmodifiableSet()); - // TODO: make ROUND_ROBIN_LENGTH configurable ??? - IntStream.range(0, ROUND_ROBIN_LENGTH) - .forEach(index -> this.roundRobin[index] = roundRobin.next(ids).orElse(-1)); - timeToUpdateRoundRobin = System.currentTimeMillis() + roundRobinLease.toMillis(); - } - } - /** * The value of cost returned from cost function is conflict to score, since the higher cost * represents lower score. This helper reverses the cost by subtracting the cost from "max cost". @@ -170,6 +148,7 @@ public void configure(Configuration config) { if (!metricCollector.listIdentities().contains(-1)) metricCollector.registerLocalJmx(-1); this.costFunction.fetcher().ifPresent(metricCollector::addFetcher); + this.roundRobinKeeper = RoundRobinKeeper.of(ROUND_ROBIN_LENGTH, roundRobinLease); } /** diff --git a/common/src/main/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobin.java b/common/src/main/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobin.java deleted file mode 100644 index f619e23173..0000000000 --- a/common/src/main/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobin.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.common.partitioner.smooth; - -import java.time.Duration; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import org.astraea.common.Lazy; -import org.astraea.common.admin.ClusterInfo; - -/** - * Given initial key-score pair, it will output a preferred key with the highest current weight. The - * current weight of the chosen key will decrease the sum of effective weight. And all current - * weight will increment by its effective weight. It may result in "higher score with higher chosen - * rate". For example: - * - *

||========================================||============================================|| - * ||------------ Broker in cluster ------------||------------- Effective weight -------------|| - * ||------------------Broker1------------------||-------------------- 5 ---------------------|| - * ||------------------Broker2------------------||-------------------- 1 ---------------------|| - * ||------------------Broker3------------------||-------------------- 1 ---------------------|| - * ||===========================================||============================================|| - * - *

||===================||=======================||===============||======================|| - * ||--- Request Number ---|| Before current weight || Target Broker || After current weight || - * ||----------1-----------||------ {5, 1, 1} ------||----Broker1----||----- {-2, 1, 1} -----|| - * ||----------2-----------||------ {3, 2, 2} ------||----Broker1----||----- {-4, 2, 2} -----|| - * ||----------3-----------||------ {1, 3, 3} ------||----Broker2----||----- { 1,-4, 3} -----|| - * ||----------4-----------||------ {6,-3, 4} ------||----Broker1----||----- {-1,-3, 4} -----|| - * ||----------5-----------||------ {4,-2, 5} ------||----Broker3----||----- { 4,-2,-2} -----|| - * ||----------6-----------||------ {9,-1,-1} ------||----Broker1----||----- { 2,-1,-1} -----|| - * ||----------7-----------||------ {7, 0, 0} ------||----Broker1----||----- { 0, 0, 0} -----|| - * ||======================||=======================||===============||======================|| - */ -public final class SmoothWeightRoundRobin { - private final Lazy effectiveWeightResult = Lazy.of(); - private Map currentWeight; - private final Map> brokersIDofTopic = new HashMap<>(); - private final double upperLimitOffsetRatio = 0.1; - - public SmoothWeightRoundRobin(Map effectiveWeight) { - effectiveWeightResult.get( - () -> - new EffectiveWeightResult( - effectiveWeight.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, ignored -> 1.0)))); - currentWeight = - effectiveWeight.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, ignored -> 0.0)); - } - - /** - * Update effective weight. - * - * @param brokerScore Broker Score. - */ - public synchronized void init(Supplier> brokerScore) { - effectiveWeightResult.get( - () -> { - var avgScore = - brokerScore.get().values().stream().mapToDouble(i -> i).average().getAsDouble(); - var offsetRatioOfBroker = - brokerScore.get().entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, entry -> (entry.getValue() - avgScore) / avgScore)); - // If the average offset of all brokers from the cluster is greater than 0.1, it is - // unbalanced. - var balance = - standardDeviationImperative(avgScore, brokerScore.get()) - > upperLimitOffsetRatio * avgScore; - var effectiveWeights = - this.effectiveWeightResult.get().effectiveWeight.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> { - var offsetRatio = offsetRatioOfBroker.get(entry.getKey()); - var weight = - balance ? entry.getValue() * (1 - offsetRatio) : entry.getValue(); - return Math.max(weight, 0.0); - })); - return new EffectiveWeightResult(effectiveWeights); - }, - Duration.ofSeconds(10)); - } - - /** - * Get the preferred ID, and update the state. - * - * @return the preferred ID - */ - public synchronized int getAndChoose(String topic, ClusterInfo clusterInfo) { - // TODO Update brokerID with ClusterInfo frequency. - var brokerID = - brokersIDofTopic.computeIfAbsent( - topic, - e -> - clusterInfo.replicaLeaders(topic).stream() - .map(replicaInfo -> replicaInfo.nodeInfo().id()) - .collect(Collectors.toList())); - this.currentWeight = - this.currentWeight.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> - brokerID.contains(e.getKey()) - ? e.getValue() - + effectiveWeightResult.get().effectiveWeight.get(e.getKey()) - : e.getValue())); - var maxID = - this.currentWeight.entrySet().stream() - .filter(entry -> brokerID.contains(entry.getKey())) - .max(Comparator.comparingDouble(Map.Entry::getValue)) - .map(Map.Entry::getKey) - .orElse(0); - this.currentWeight = - this.currentWeight.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> - e.getKey().equals(maxID) - ? e.getValue() - - effectiveWeightResult.get().effectiveWeightSum(brokerID) - : e.getValue())); - return maxID; - } - - public static class EffectiveWeightResult { - private final Map effectiveWeight; - - EffectiveWeightResult(Map effectiveWeight) { - this.effectiveWeight = effectiveWeight; - } - - double effectiveWeightSum(List brokerID) { - return effectiveWeight.entrySet().stream() - .filter(entry -> brokerID.contains(entry.getKey())) - .mapToDouble(Map.Entry::getValue) - .sum(); - } - } - - private static double standardDeviationImperative( - double avgMetrics, Map metrics) { - var variance = new AtomicReference<>(0.0); - metrics - .values() - .forEach( - metric -> - variance.updateAndGet(v -> v + (metric - avgMetrics) * (metric - avgMetrics))); - return Math.sqrt(variance.get() / metrics.size()); - } -} diff --git a/common/src/main/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobinDispatcher.java b/common/src/main/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobinDispatcher.java deleted file mode 100644 index 4c7ac5dee1..0000000000 --- a/common/src/main/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobinDispatcher.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.common.partitioner.smooth; - -import java.net.InetSocketAddress; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Optional; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import org.astraea.common.Configuration; -import org.astraea.common.Lazy; -import org.astraea.common.admin.ClusterInfo; -import org.astraea.common.admin.NodeInfo; -import org.astraea.common.admin.Replica; -import org.astraea.common.cost.NeutralIntegratedCost; -import org.astraea.common.metrics.collector.MetricCollector; -import org.astraea.common.partitioner.Dispatcher; - -public class SmoothWeightRoundRobinDispatcher extends Dispatcher { - private final ConcurrentLinkedDeque unusedPartitions = new ConcurrentLinkedDeque<>(); - private final ConcurrentMap topicCounter = new ConcurrentHashMap<>(); - private final MetricCollector metricCollector = - MetricCollector.builder() - .interval(Duration.ofSeconds(1)) - .expiration(Duration.ofSeconds(10)) - .build(); - private final Optional jmxPortDefault = Optional.empty(); - private final Map jmxPorts = new TreeMap<>(); - - private final Lazy smoothWeightRoundRobinCal = Lazy.of(); - - private final NeutralIntegratedCost neutralIntegratedCost = new NeutralIntegratedCost(); - - private List partitions; - - public static final String JMX_PORT = "jmx.port"; - - @Override - protected int partition(String topic, byte[] key, byte[] value, ClusterInfo clusterInfo) { - var targetPartition = unusedPartitions.poll(); - refreshPartitionMetaData(clusterInfo, topic); - Supplier> supplier = - () -> - // fetch the latest beans for each node - neutralIntegratedCost.brokerCost(clusterInfo, metricCollector.clusterBean()).value(); - // just return first partition if there is no available partitions - if (partitions.isEmpty()) return 0; - - // just return the only one available partition - if (partitions.size() == 1) return partitions.iterator().next().partition(); - - if (targetPartition == null) { - var smooth = smoothWeightRoundRobinCal.get(() -> new SmoothWeightRoundRobin(supplier.get())); - smooth.init(supplier); - var targetBroker = smooth.getAndChoose(topic, clusterInfo); - var targetPartitions = - clusterInfo.availableReplicas(topic).stream() - .filter(r -> r.nodeInfo().id() == targetBroker) - .collect(Collectors.toUnmodifiableList()); - targetPartition = - targetPartitions - .get(nextValue(topic, clusterInfo, targetBroker) % targetPartitions.size()) - .partition(); - } - - return targetPartition; - } - - @Override - public void close() { - metricCollector.close(); - super.close(); - } - - @Override - public void configure(Configuration configuration) { - // seeks for custom jmx ports. - configuration.entrySet().stream() - .filter(e -> e.getKey().startsWith("broker.")) - .filter(e -> e.getKey().endsWith(JMX_PORT)) - .forEach( - e -> - jmxPorts.put( - Integer.parseInt(e.getKey().split("\\.")[1]), Integer.parseInt(e.getValue()))); - neutralIntegratedCost.fetcher().ifPresent(metricCollector::addFetcher); - } - - @Override - protected void onNewBatch(String topic, int prevPartition) { - unusedPartitions.add(prevPartition); - } - - int jmxPort(int id) { - if (jmxPorts.containsKey(id)) return jmxPorts.get(id); - return jmxPortDefault.orElseThrow( - () -> new NoSuchElementException("broker: " + id + " does not have jmx port")); - } - - private int nextValue(String topic, ClusterInfo clusterInfo, int targetBroker) { - return topicCounter - .computeIfAbsent(topic, k -> new BrokerNextCounter(clusterInfo)) - .brokerCounter - .get(targetBroker) - .getAndIncrement(); - } - - private void refreshPartitionMetaData(ClusterInfo clusterInfo, String topic) { - partitions = clusterInfo.availableReplicas(topic); - partitions.stream() - .filter(p -> !metricCollector.listIdentities().contains(p.nodeInfo().id())) - .forEach( - p -> - metricCollector.registerJmx( - p.nodeInfo().id(), - InetSocketAddress.createUnresolved( - p.nodeInfo().host(), jmxPort(p.nodeInfo().id())))); - } - - private static class BrokerNextCounter { - private final Map brokerCounter; - - BrokerNextCounter(ClusterInfo clusterInfo) { - brokerCounter = - clusterInfo.nodes().stream() - .collect(Collectors.toMap(NodeInfo::id, node -> new AtomicInteger(0))); - } - } -} diff --git a/common/src/test/java/org/astraea/common/cost/DispersionTest.java b/common/src/test/java/org/astraea/common/cost/DispersionTest.java index 4b85661967..2bcb030cad 100644 --- a/common/src/test/java/org/astraea/common/cost/DispersionTest.java +++ b/common/src/test/java/org/astraea/common/cost/DispersionTest.java @@ -33,4 +33,16 @@ void testCorrelationCoefficient() { Assertions.assertFalse(Double.isNaN(score)); Assertions.assertEquals(0.0, score); } + + @Test + void standardDeviation() { + var dispersion = Dispersion.standardDeviation(); + var scores = List.of(8, 8, 4, 4); + Assertions.assertEquals(2, dispersion.calculate(scores)); + + var zeroScores = List.of(0.0, 0.0, 0.0); + var score = dispersion.calculate(zeroScores); + Assertions.assertFalse(Double.isNaN(score)); + Assertions.assertEquals(0.0, score); + } } diff --git a/common/src/test/java/org/astraea/common/partitioner/DispatcherTest.java b/common/src/test/java/org/astraea/common/partitioner/DispatcherTest.java index 810c40200b..31cdd7f0a4 100644 --- a/common/src/test/java/org/astraea/common/partitioner/DispatcherTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/DispatcherTest.java @@ -27,13 +27,11 @@ import java.util.Map; import java.util.Objects; import java.util.Properties; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.Cluster; import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.astraea.common.Configuration; @@ -41,16 +39,21 @@ import org.astraea.common.Utils; import org.astraea.common.admin.Admin; import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.ClusterInfoBuilder; import org.astraea.common.producer.Metadata; import org.astraea.common.producer.Producer; import org.astraea.common.producer.Record; import org.astraea.common.producer.Serializer; import org.astraea.it.RequireSingleBrokerCluster; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; public class DispatcherTest extends RequireSingleBrokerCluster { + private final String SMOOTH_ROUND_ROBIN = + "org.astraea.common.partitioner.SmoothWeightRoundRobinDispatcher"; + private final String STRICT_ROUND_ROBIN = "org.astraea.common.partitioner.StrictCostDispatcher"; @Test void testUpdateClusterInfo() { @@ -74,7 +77,7 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster @Test void testNullKey() { var count = new AtomicInteger(); - var dispatcher = + Dispatcher dispatcher = new Dispatcher() { @Override public int partition(String topic, byte[] key, byte[] value, ClusterInfo clusterInfo) { @@ -93,12 +96,13 @@ public void configure(Configuration config) { dispatcher.configure(Map.of("a", "b")); Assertions.assertEquals(1, count.get()); - dispatcher.partition( - "t", null, null, null, null, new Cluster("aa", List.of(), List.of(), Set.of(), Set.of())); + dispatcher.partition("t", null, null, ClusterInfoBuilder.builder().build()); + Assertions.assertEquals(2, count.get()); } - @RepeatedTest(5) - void multipleThreadTest() throws IOException { + @ParameterizedTest + @ValueSource(strings = {SMOOTH_ROUND_ROBIN, STRICT_ROUND_ROBIN}) + void multipleThreadTest(String className) throws IOException { var topicName = "address"; createTopic(topicName); var key = "tainan"; @@ -109,7 +113,7 @@ void multipleThreadTest() throws IOException { Producer.builder() .keySerializer(Serializer.STRING) .configs( - initProConfig().entrySet().stream() + put(initProConfig(), className).entrySet().stream() .collect( Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()))) .build()) { @@ -152,8 +156,9 @@ void multipleThreadTest() throws IOException { } } - @Test - void interdependentTest() throws IOException { + @ParameterizedTest + @ValueSource(strings = {SMOOTH_ROUND_ROBIN, STRICT_ROUND_ROBIN}) + void interdependentTest(String className) throws IOException { var topicName = "address"; createTopic(topicName); var key = "tainan"; @@ -164,7 +169,7 @@ void interdependentTest() throws IOException { Producer.builder() .keySerializer(Serializer.STRING) .configs( - initProConfig().entrySet().stream() + put(initProConfig(), className).entrySet().stream() .collect( Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()))) .build()) { @@ -247,14 +252,19 @@ private org.apache.kafka.clients.producer.Producer instanceOfProduce : null; } + private Properties put(Properties properties, String name) { + properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, name); + return properties; + } + private Properties initProConfig() throws IOException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.put(ProducerConfig.CLIENT_ID_CONFIG, "id1"); - // TODO: add smooth dispatch to this test - props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, StrictCostDispatcher.class.getName()); + props.put( + ProducerConfig.PARTITIONER_CLASS_CONFIG, SmoothWeightRoundRobinDispatcher.class.getName()); props.put("producerID", 1); var file = new File( diff --git a/common/src/test/java/org/astraea/common/partitioner/SmoothWeightCalTest.java b/common/src/test/java/org/astraea/common/partitioner/SmoothWeightCalTest.java new file mode 100644 index 0000000000..144afa3829 --- /dev/null +++ b/common/src/test/java/org/astraea/common/partitioner/SmoothWeightCalTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.partitioner; + +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SmoothWeightCalTest { + @Test + void testRefresh() { + var smoothWeight = new SmoothWeightCal<>(Map.of(1, 5.0, 2, 3.0, 3, 1.0)); + + var effectiveWeightResult = smoothWeight.effectiveWeightResult.get(); + Assertions.assertEquals(1.0, effectiveWeightResult.get(1)); + Assertions.assertEquals(1.0, effectiveWeightResult.get(2)); + Assertions.assertEquals(1.0, effectiveWeightResult.get(3)); + + smoothWeight.refresh(() -> Map.of(1, 6.0, 2, 3.0, 3, 6.0)); + effectiveWeightResult = smoothWeight.effectiveWeightResult.get(); + + Assertions.assertEquals(0.8, effectiveWeightResult.get(1)); + Assertions.assertEquals(1.4, effectiveWeightResult.get(2)); + Assertions.assertEquals(0.8, effectiveWeightResult.get(3)); + + smoothWeight.refresh(() -> Map.of(1, 10.0, 2, 1.0, 3, 10.0)); + effectiveWeightResult = smoothWeight.effectiveWeightResult.get(); + + Assertions.assertEquals(0.45714285714285713, effectiveWeightResult.get(1)); + Assertions.assertEquals(2.6, effectiveWeightResult.get(2)); + Assertions.assertEquals(0.45714285714285713, effectiveWeightResult.get(3)); + } + + @Test + void testLazyEffectiveWeightResult() { + var smoothWeight = new SmoothWeightCal<>(Map.of(1, 5.0, 2, 3.0, 3, 1.0)); + + var effectiveWeightResult = smoothWeight.effectiveWeightResult.get(); + Assertions.assertEquals(1.0, effectiveWeightResult.get(1)); + Assertions.assertEquals(1.0, effectiveWeightResult.get(2)); + Assertions.assertEquals(1.0, effectiveWeightResult.get(3)); + + smoothWeight.refresh(() -> Map.of(1, 10.0, 2, 3.0, 3, 10.0)); + smoothWeight.refresh(() -> Map.of(1, 6.0, 2, 3.0, 3, 6.0)); + effectiveWeightResult = smoothWeight.effectiveWeightResult.get(); + + Assertions.assertEquals(0.8, effectiveWeightResult.get(1)); + Assertions.assertEquals(1.4, effectiveWeightResult.get(2)); + Assertions.assertEquals(0.8, effectiveWeightResult.get(3)); + } +} diff --git a/common/src/test/java/org/astraea/common/partitioner/StrictCostDispatcherPerfTest.java b/common/src/test/java/org/astraea/common/partitioner/StrictCostDispatcherPerfTest.java index ab496bd50c..aedbdee2b0 100644 --- a/common/src/test/java/org/astraea/common/partitioner/StrictCostDispatcherPerfTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/StrictCostDispatcherPerfTest.java @@ -94,7 +94,7 @@ void test() { .collect(Collectors.groupingBy(i -> i)); var keys = - Arrays.stream(dispatcher.roundRobin) + Arrays.stream(dispatcher.roundRobinKeeper.roundRobin) .boxed() .collect(Collectors.groupingBy(i -> i)) .keySet(); diff --git a/common/src/test/java/org/astraea/common/partitioner/StrictCostDispatcherTest.java b/common/src/test/java/org/astraea/common/partitioner/StrictCostDispatcherTest.java index f7ad5041d2..0e3559222b 100644 --- a/common/src/test/java/org/astraea/common/partitioner/StrictCostDispatcherTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/StrictCostDispatcherTest.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.astraea.common.Configuration; +import org.astraea.common.Lazy; import org.astraea.common.Utils; import org.astraea.common.admin.ClusterBean; import org.astraea.common.admin.ClusterInfo; @@ -273,24 +274,26 @@ void testInvalidCostToScore() { @Test void testRoundRobinLease() { try (var dispatcher = new StrictCostDispatcher()) { - dispatcher.configure( Configuration.of(Map.of(StrictCostDispatcher.ROUND_ROBIN_LEASE_KEY, "2s"))); - Assertions.assertEquals(Duration.ofSeconds(2), dispatcher.roundRobinLease); + Assertions.assertEquals(Duration.ofSeconds(2), dispatcher.roundRobinKeeper.roundRobinLease); - dispatcher.tryToUpdateRoundRobin(ClusterInfo.empty()); - var t = dispatcher.timeToUpdateRoundRobin; + dispatcher.roundRobinKeeper.tryToUpdate(ClusterInfo.empty(), Lazy.of(Map::of)); + var t = dispatcher.roundRobinKeeper.timeToUpdateRoundRobin; var rr = - Arrays.stream(dispatcher.roundRobin).boxed().collect(Collectors.toUnmodifiableList()); + Arrays.stream(dispatcher.roundRobinKeeper.roundRobin) + .boxed() + .collect(Collectors.toUnmodifiableList()); Assertions.assertEquals(StrictCostDispatcher.ROUND_ROBIN_LENGTH, rr.size()); // the rr is not updated yet - dispatcher.tryToUpdateRoundRobin(ClusterInfo.empty()); + dispatcher.roundRobinKeeper.tryToUpdate(ClusterInfo.empty(), Lazy.of(Map::of)); IntStream.range(0, rr.size()) - .forEach(i -> Assertions.assertEquals(rr.get(i), dispatcher.roundRobin[i])); + .forEach( + i -> Assertions.assertEquals(rr.get(i), dispatcher.roundRobinKeeper.roundRobin[i])); Utils.sleep(Duration.ofSeconds(3)); - dispatcher.tryToUpdateRoundRobin(ClusterInfo.empty()); + dispatcher.roundRobinKeeper.tryToUpdate(ClusterInfo.empty(), Lazy.of(Map::of)); // rr is updated already - Assertions.assertNotEquals(t, dispatcher.timeToUpdateRoundRobin); + Assertions.assertNotEquals(t, dispatcher.roundRobinKeeper.timeToUpdateRoundRobin); } } diff --git a/common/src/test/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobinDispatchTest.java b/common/src/test/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobinDispatchTest.java deleted file mode 100644 index d7e67b801e..0000000000 --- a/common/src/test/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobinDispatchTest.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.common.partitioner.smooth; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.astraea.common.FutureUtils; -import org.astraea.common.Header; -import org.astraea.common.Utils; -import org.astraea.common.admin.Admin; -import org.astraea.common.admin.ClusterInfo; -import org.astraea.common.admin.NodeInfo; -import org.astraea.common.admin.Replica; -import org.astraea.common.consumer.Consumer; -import org.astraea.common.consumer.ConsumerConfigs; -import org.astraea.common.consumer.Deserializer; -import org.astraea.common.producer.Producer; -import org.astraea.common.producer.Record; -import org.astraea.common.producer.Serializer; -import org.astraea.it.RequireBrokerCluster; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -public class SmoothWeightRoundRobinDispatchTest extends RequireBrokerCluster { - private final String brokerList = bootstrapServers(); - private final Admin admin = Admin.of(bootstrapServers()); - - private Map initProConfig() { - return Map.of( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - brokerList, - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - ByteArraySerializer.class.getName(), - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - ByteArraySerializer.class.getName(), - ProducerConfig.CLIENT_ID_CONFIG, - "id1", - ProducerConfig.PARTITIONER_CLASS_CONFIG, - SmoothWeightRoundRobinDispatcher.class.getName(), - "producerID", - "1", - "broker.0.jmx.port", - String.valueOf(jmxServiceURL().getPort()), - "broker.1.jmx.port", - String.valueOf(jmxServiceURL().getPort()), - "broker.2.jmx.port", - String.valueOf(jmxServiceURL().getPort())); - } - - @Test - void testPartitioner() { - var topicName = "address"; - admin.creator().topic(topicName).numberOfPartitions(10).run().toCompletableFuture().join(); - var key = "tainan"; - var timestamp = System.currentTimeMillis() + 10; - var header = Header.of("a", "b".getBytes()); - try (var producer = - Producer.builder() - .keySerializer(Serializer.STRING) - .configs( - initProConfig().entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()))) - .build()) { - var i = 0; - while (i < 300) { - var metadata = - producer - .send( - Record.builder() - .topic(topicName) - .key(key) - .timestamp(timestamp) - .headers(List.of(header)) - .build()) - .toCompletableFuture() - .join(); - assertEquals(topicName, metadata.topic()); - assertEquals(timestamp, metadata.timestamp()); - i++; - } - } - Utils.sleep(Duration.ofSeconds(1)); - try (var consumer = - Consumer.forTopics(Set.of(topicName)) - .bootstrapServers(bootstrapServers()) - .config( - ConsumerConfigs.AUTO_OFFSET_RESET_CONFIG, - ConsumerConfigs.AUTO_OFFSET_RESET_EARLIEST) - .keyDeserializer(Deserializer.STRING) - .build()) { - var records = consumer.poll(Duration.ofSeconds(20)); - var recordsCount = records.size(); - while (recordsCount < 300) { - recordsCount += consumer.poll(Duration.ofSeconds(20)).size(); - } - assertEquals(300, recordsCount); - var record = records.iterator().next(); - assertEquals(topicName, record.topic()); - assertEquals("tainan", record.key()); - assertEquals(1, record.headers().size()); - var actualHeader = record.headers().iterator().next(); - assertEquals(header.key(), actualHeader.key()); - Assertions.assertArrayEquals(header.value(), actualHeader.value()); - } - } - - @Test - void testMultipleProducer() { - var topicName = "addr"; - admin.creator().topic(topicName).numberOfPartitions(10).run().toCompletableFuture().join(); - var key = "tainan"; - var timestamp = System.currentTimeMillis() + 10; - var header = Header.of("a", "b".getBytes()); - - FutureUtils.sequence( - IntStream.range(0, 10) - .mapToObj( - ignored -> - CompletableFuture.runAsync( - producerThread( - Producer.builder() - .keySerializer(Serializer.STRING) - .configs( - initProConfig().entrySet().stream() - .collect( - Collectors.toMap( - e -> e.getKey(), e -> e.getValue()))) - .build(), - topicName, - key, - header, - timestamp))) - .collect(Collectors.toUnmodifiableList())) - .join(); - - try (var consumer = - Consumer.forTopics(Set.of(topicName)) - .bootstrapServers(bootstrapServers()) - .config( - ConsumerConfigs.AUTO_OFFSET_RESET_CONFIG, - ConsumerConfigs.AUTO_OFFSET_RESET_EARLIEST) - .keyDeserializer(Deserializer.STRING) - .build()) { - var records = consumer.poll(Duration.ofSeconds(20)); - var recordsCount = records.size(); - while (recordsCount < 1000) { - recordsCount += consumer.poll(Duration.ofSeconds(20)).size(); - } - assertEquals(1000, recordsCount); - var record = records.iterator().next(); - assertEquals(topicName, record.topic()); - assertEquals("tainan", record.key()); - assertEquals(1, record.headers().size()); - var actualHeader = record.headers().iterator().next(); - assertEquals(header.key(), actualHeader.key()); - Assertions.assertArrayEquals(header.value(), actualHeader.value()); - } - } - - @Test - void testJmxConfig() throws IOException { - var props = initProConfig(); - var file = - new File( - SmoothWeightRoundRobinDispatchTest.class.getResource("").getPath() - + "PartitionerConfigTest"); - try (var fileWriter = new FileWriter(file)) { - fileWriter.write( - "broker.0.jmx.port=" - + jmxServiceURL().getPort() - + "\n" - + "broker.1.jmx.port=" - + jmxServiceURL().getPort() - + "\n" - + "broker.2.jmx.port=" - + jmxServiceURL().getPort() - + "\n"); - fileWriter.flush(); - } - var topicName = "addressN"; - admin.creator().topic(topicName).numberOfPartitions(10).run().toCompletableFuture().join(); - var key = "tainan"; - var timestamp = System.currentTimeMillis() + 10; - var header = Header.of("a", "b".getBytes()); - - try (var producer = - Producer.builder() - .keySerializer(Serializer.STRING) - .configs( - props.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()))) - .build()) { - var metadata = - producer - .send( - Record.builder() - .topic(topicName) - .key(key) - .timestamp(timestamp) - .headers(List.of(header)) - .build()) - .toCompletableFuture() - .join(); - assertEquals(topicName, metadata.topic()); - assertEquals(timestamp, metadata.timestamp()); - } - } - - private Runnable producerThread( - Producer producer, String topic, String key, Header header, long timeStamp) { - return () -> { - try (producer) { - var i = 0; - while (i <= 99) { - producer.send( - Record.builder() - .topic(topic) - .key(key) - .timestamp(timeStamp) - .headers(List.of(header)) - .build()); - i++; - } - producer.flush(); - } - }; - } - - @Test - public void testGetAndChoose() { - var topic = "test"; - var smoothWeight = new SmoothWeightRoundRobin(Map.of(1, 5.0, 2, 3.0, 3, 1.0)); - var node1 = Mockito.mock(NodeInfo.class); - Mockito.when(node1.id()).thenReturn(1); - var re1 = - Replica.builder().topic(topic).partition(0).nodeInfo(node1).path("/tmp/aa").buildLeader(); - - var node2 = Mockito.mock(NodeInfo.class); - Mockito.when(node2.id()).thenReturn(2); - var re2 = - Replica.builder().topic(topic).partition(1).nodeInfo(node2).path("/tmp/aa").buildLeader(); - - var node3 = Mockito.mock(NodeInfo.class); - Mockito.when(node3.id()).thenReturn(3); - var re3 = - Replica.builder().topic(topic).partition(2).nodeInfo(node3).path("/tmp/aa").buildLeader(); - var testCluster = ClusterInfo.of("fake", List.of(node1, node2, node3), List.of(re1, re2, re3)); - Assertions.assertEquals(1, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(2, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(3, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(1, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(2, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(3, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(1, smoothWeight.getAndChoose(topic, testCluster)); - } -} diff --git a/common/src/test/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobinTest.java b/common/src/test/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobinTest.java deleted file mode 100644 index 8f816527fe..0000000000 --- a/common/src/test/java/org/astraea/common/partitioner/smooth/SmoothWeightRoundRobinTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.common.partitioner.smooth; - -import java.util.List; -import java.util.Map; -import org.astraea.common.admin.ClusterInfo; -import org.astraea.common.admin.NodeInfo; -import org.astraea.common.admin.Replica; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class SmoothWeightRoundRobinTest { - @Test - void testGetAndChoose() { - var topic = "test"; - var smoothWeight = new SmoothWeightRoundRobin(Map.of(1, 5.0, 2, 3.0, 3, 1.0)); - var testCluster = clusterInfo(); - - Assertions.assertEquals(1, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(2, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(3, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(1, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(2, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(3, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(1, smoothWeight.getAndChoose(topic, testCluster)); - } - - @Test - void testPartOfBrokerGetAndChoose() { - var topic = "test"; - var smoothWeight = new SmoothWeightRoundRobin(Map.of(1, 5.0, 2, 3.0, 3, 1.0, 4, 1.0)); - var testCluster = clusterInfo(); - - Assertions.assertEquals(1, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(2, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(3, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(1, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(2, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(3, smoothWeight.getAndChoose(topic, testCluster)); - Assertions.assertEquals(1, smoothWeight.getAndChoose(topic, testCluster)); - } - - ClusterInfo clusterInfo() { - var nodes = - List.of( - NodeInfo.of(1, "host", 1111), - NodeInfo.of(2, "host", 1111), - NodeInfo.of(3, "host", 1111)); - return ClusterInfo.of( - "fake", - nodes, - List.of( - Replica.builder() - .topic("test") - .partition(1) - .nodeInfo(nodes.get(0)) - .path("/tmp/aa") - .buildLeader(), - Replica.builder() - .topic("test") - .partition(2) - .nodeInfo(nodes.get(1)) - .path("/tmp/aa") - .buildLeader(), - Replica.builder() - .topic("test") - .partition(3) - .nodeInfo(nodes.get(2)) - .path("/tmp/aa") - .buildLeader())); - } -} diff --git a/docs/dispatcher/smooth_dispatcher.md b/docs/dispatcher/smooth_dispatcher.md index dd6352c44e..b32bc09eef 100644 --- a/docs/dispatcher/smooth_dispatcher.md +++ b/docs/dispatcher/smooth_dispatcher.md @@ -18,7 +18,7 @@ class demo{ void initConfig(){ Properties properties = new Properties(); - properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, org.astraea.common.partitioner.smooth.SmoothWeightRoundRobinDispatcher); + properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, org.astraea.common.partitioner.SmoothWeightRoundRobinDispatcher); } } ```