From 8d82ad8e9a56152676786a26df3606ea583268d6 Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Fri, 13 May 2022 16:47:14 +0800 Subject: [PATCH 1/9] swrr --- .../main/java/org/astraea/cost/Periodic.java | 15 +++ .../SmoothWeightRoundRobin.java | 97 +++++++++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 app/src/main/java/org/astraea/partitioner/smoothPartitioner/SmoothWeightRoundRobin.java diff --git a/app/src/main/java/org/astraea/cost/Periodic.java b/app/src/main/java/org/astraea/cost/Periodic.java index c3756c2acd..f903af5327 100644 --- a/app/src/main/java/org/astraea/cost/Periodic.java +++ b/app/src/main/java/org/astraea/cost/Periodic.java @@ -25,4 +25,19 @@ protected Value tryUpdate(Supplier updater) { protected long currentTime() { return System.currentTimeMillis(); } + + /** + * Updates the value interval second. + * + * @param updater Methods that need to be updated regularly. + * @param interval Required interval. + * @return an object of type Value created from the parameter value. + */ + protected Value tryUpdate(Supplier updater, int interval) { + if (Utils.overSecond(lastUpdate, interval)) { + value = updater.get(); + lastUpdate = currentTime(); + } + return value; + } } diff --git a/app/src/main/java/org/astraea/partitioner/smoothPartitioner/SmoothWeightRoundRobin.java b/app/src/main/java/org/astraea/partitioner/smoothPartitioner/SmoothWeightRoundRobin.java new file mode 100644 index 0000000000..a4831f34dc --- /dev/null +++ b/app/src/main/java/org/astraea/partitioner/smoothPartitioner/SmoothWeightRoundRobin.java @@ -0,0 +1,97 @@ +package org.astraea.partitioner.smoothPartitioner; + +import java.util.Comparator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.astraea.cost.Periodic; +import org.astraea.cost.brokersMetrics.CostUtils; + +/** + * Given initial key-score pair, it will output a preferred key with the highest current weight. The + * current weight of the chosen key will decrease the sum of effective weight. And all current + * weight will increment by its effective weight. It may result in "higher score with higher chosen + * rate". For example: + * + *

||========================================||============================================|| + * ||------------ Broker in cluster ------------||------------- Effective weight -------------|| + * ||------------------Broker1------------------||-------------------- 5 ---------------------|| + * ||------------------Broker2------------------||-------------------- 1 ---------------------|| + * ||------------------Broker3------------------||-------------------- 1 ---------------------|| + * ||===========================================||============================================|| + * + *

||===================||=======================||===============||======================|| + * ||--- Request Number ---|| Before current weight || Target Broker || After current weight || + * ||----------1-----------||------ {5, 1, 1} ------||----Broker1----||----- {-2, 1, 1} -----|| + * ||----------2-----------||------ {3, 2, 2} ------||----Broker1----||----- {-4, 2, 2} -----|| + * ||----------3-----------||------ {1, 3, 3} ------||----Broker2----||----- { 1,-4, 3} -----|| + * ||----------4-----------||------ {6,-3, 4} ------||----Broker1----||----- {-1,-3, 4} -----|| + * ||----------5-----------||------ {4,-2, 5} ------||----Broker3----||----- { 4,-2,-2} -----|| + * ||----------6-----------||------ {9,-1,-1} ------||----Broker1----||----- { 2,-1,-1} -----|| + * ||----------7-----------||------ {7, 0, 0} ------||----Broker1----||----- { 0, 0, 0} -----|| + * ||======================||=======================||===============||======================|| + */ +public final class SmoothWeightRoundRobin extends Periodic { + private Map effectiveWeight; + private double effectiveWeightSum; + public Map currentWeight; + + public SmoothWeightRoundRobin() {} + + public SmoothWeightRoundRobin(Map effectiveWeight) { + init(effectiveWeight); + } + + public synchronized void init(Map brokerScore) { + tryUpdate( + () -> { + if (effectiveWeight == null) { + this.effectiveWeight = new ConcurrentHashMap<>(brokerScore); + this.effectiveWeight.replaceAll( + (k, v) -> (double) Math.round(100 * (1.0 / brokerScore.size())) / 100.0); + this.currentWeight = new ConcurrentHashMap<>(brokerScore); + this.currentWeight.replaceAll((k, v) -> 0.0); + this.effectiveWeightSum = + this.effectiveWeight.values().stream().mapToDouble(i -> i).sum(); + } else { + var zCurrentLoad = CostUtils.ZScore(brokerScore); + this.effectiveWeight.replaceAll( + (k, v) -> { + var zLoad = zCurrentLoad.get(k); + var score = + Math.round( + 10000 + * (v + - (zLoad.isNaN() ? 0.0 : zLoad) + * 0.01 + / this.effectiveWeight.size())) + / 10000.0; + if (score > 1.0) { + return 1.0; + } else if (score < 0.0) { + return 0.0; + } + return score; + }); + this.effectiveWeightSum = + this.effectiveWeight.values().stream().mapToDouble(i -> i).sum(); + } + return null; + }, + 10); + } + + /** + * Get the preferred ID, and update the state. + * + * @return the preferred ID + */ + public int getAndChoose() { + var maxID = + this.currentWeight.entrySet().stream() + .max(Comparator.comparingDouble(Map.Entry::getValue)) + .orElseGet(() -> Map.entry(0, 0.0)) + .getKey(); + this.currentWeight.computeIfPresent(maxID, (ID, value) -> value - effectiveWeightSum); + return maxID; + } +} From 12df7ca892908b4d0aeb76abc187843a9cfdaf4e Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Fri, 13 May 2022 17:12:07 +0800 Subject: [PATCH 2/9] tryUpdata --- app/.attach_pid1330935 | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 app/.attach_pid1330935 diff --git a/app/.attach_pid1330935 b/app/.attach_pid1330935 new file mode 100644 index 0000000000..e69de29bb2 From 896ada60f34e3195aef4ad4869eba480c55b764c Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Fri, 13 May 2022 17:27:33 +0800 Subject: [PATCH 3/9] remove null file --- app/{.attach_pid1330935 => .attach_pid1349210} | 0 .../java/org/astraea/cost/brokersMetrics/MemoryCostTest.java | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename app/{.attach_pid1330935 => .attach_pid1349210} (100%) diff --git a/app/.attach_pid1330935 b/app/.attach_pid1349210 similarity index 100% rename from app/.attach_pid1330935 rename to app/.attach_pid1349210 diff --git a/app/src/test/java/org/astraea/cost/brokersMetrics/MemoryCostTest.java b/app/src/test/java/org/astraea/cost/brokersMetrics/MemoryCostTest.java index 538b0c800c..a100a7fd16 100644 --- a/app/src/test/java/org/astraea/cost/brokersMetrics/MemoryCostTest.java +++ b/app/src/test/java/org/astraea/cost/brokersMetrics/MemoryCostTest.java @@ -81,7 +81,7 @@ public List availablePartitions(String topic) { } }; scores = memoryCost.brokerCost(clusterInfo2).value(); - Assertions.assertEquals(0.53, scores.get(1)); + Assertions.assertEquals(0.52, scores.get(1)); Assertions.assertEquals(0.52, scores.get(2)); Assertions.assertEquals(0.45, scores.get(3)); } From 7d08a55b9436366804213236cf11c5df31ea95e2 Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Fri, 13 May 2022 18:24:50 +0800 Subject: [PATCH 4/9] add EffectiveWeightResult --- app/.attach_pid1365286 | 0 .../SmoothWeightRoundRobin.java | 47 +++++++++++-------- 2 files changed, 28 insertions(+), 19 deletions(-) create mode 100644 app/.attach_pid1365286 diff --git a/app/.attach_pid1365286 b/app/.attach_pid1365286 new file mode 100644 index 0000000000..e69de29bb2 diff --git a/app/src/main/java/org/astraea/partitioner/smoothPartitioner/SmoothWeightRoundRobin.java b/app/src/main/java/org/astraea/partitioner/smoothPartitioner/SmoothWeightRoundRobin.java index a4831f34dc..3b7ee44d00 100644 --- a/app/src/main/java/org/astraea/partitioner/smoothPartitioner/SmoothWeightRoundRobin.java +++ b/app/src/main/java/org/astraea/partitioner/smoothPartitioner/SmoothWeightRoundRobin.java @@ -1,8 +1,8 @@ package org.astraea.partitioner.smoothPartitioner; import java.util.Comparator; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import org.astraea.cost.Periodic; import org.astraea.cost.brokersMetrics.CostUtils; @@ -30,13 +30,11 @@ * ||----------7-----------||------ {7, 0, 0} ------||----Broker1----||----- { 0, 0, 0} -----|| * ||======================||=======================||===============||======================|| */ -public final class SmoothWeightRoundRobin extends Periodic { - private Map effectiveWeight; - private double effectiveWeightSum; +public final class SmoothWeightRoundRobin + extends Periodic { + private EffectiveWeightResult effectiveWeightResult; public Map currentWeight; - public SmoothWeightRoundRobin() {} - public SmoothWeightRoundRobin(Map effectiveWeight) { init(effectiveWeight); } @@ -44,17 +42,17 @@ public SmoothWeightRoundRobin(Map effectiveWeight) { public synchronized void init(Map brokerScore) { tryUpdate( () -> { - if (effectiveWeight == null) { - this.effectiveWeight = new ConcurrentHashMap<>(brokerScore); - this.effectiveWeight.replaceAll( + Map effectiveWeight; + if (effectiveWeightResult == null) { + effectiveWeight = new HashMap<>(brokerScore); + effectiveWeight.replaceAll( (k, v) -> (double) Math.round(100 * (1.0 / brokerScore.size())) / 100.0); - this.currentWeight = new ConcurrentHashMap<>(brokerScore); + this.currentWeight = new HashMap<>(brokerScore); this.currentWeight.replaceAll((k, v) -> 0.0); - this.effectiveWeightSum = - this.effectiveWeight.values().stream().mapToDouble(i -> i).sum(); } else { var zCurrentLoad = CostUtils.ZScore(brokerScore); - this.effectiveWeight.replaceAll( + effectiveWeight = this.effectiveWeightResult.effectiveWeight; + effectiveWeight.replaceAll( (k, v) -> { var zLoad = zCurrentLoad.get(k); var score = @@ -63,7 +61,7 @@ public synchronized void init(Map brokerScore) { * (v - (zLoad.isNaN() ? 0.0 : zLoad) * 0.01 - / this.effectiveWeight.size())) + / effectiveWeight.size())) / 10000.0; if (score > 1.0) { return 1.0; @@ -72,10 +70,8 @@ public synchronized void init(Map brokerScore) { } return score; }); - this.effectiveWeightSum = - this.effectiveWeight.values().stream().mapToDouble(i -> i).sum(); } - return null; + return new EffectiveWeightResult(effectiveWeight); }, 10); } @@ -85,13 +81,26 @@ public synchronized void init(Map brokerScore) { * * @return the preferred ID */ - public int getAndChoose() { + public synchronized int getAndChoose() { + var effective = effectiveWeightResult.effectiveWeight; + this.currentWeight.replaceAll((k, v) -> v + effective.get(k)); var maxID = this.currentWeight.entrySet().stream() .max(Comparator.comparingDouble(Map.Entry::getValue)) .orElseGet(() -> Map.entry(0, 0.0)) .getKey(); - this.currentWeight.computeIfPresent(maxID, (ID, value) -> value - effectiveWeightSum); + this.currentWeight.computeIfPresent( + maxID, (ID, value) -> value - effectiveWeightResult.effectiveWeightSum); return maxID; } + + public static class EffectiveWeightResult { + private final Map effectiveWeight; + private final double effectiveWeightSum; + + EffectiveWeightResult(Map effectiveWeight) { + this.effectiveWeight = effectiveWeight; + this.effectiveWeightSum = effectiveWeight.values().stream().mapToDouble(i -> i).sum(); + } + } } From fbb773a77dbd9c8116935366d45f19f4c1cbf280 Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Fri, 13 May 2022 19:09:26 +0800 Subject: [PATCH 5/9] resolve bug --- app/src/test/java/org/astraea/cost/broker/MemoryCostTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/src/test/java/org/astraea/cost/broker/MemoryCostTest.java b/app/src/test/java/org/astraea/cost/broker/MemoryCostTest.java index f8d2cb4d39..95760900fb 100644 --- a/app/src/test/java/org/astraea/cost/broker/MemoryCostTest.java +++ b/app/src/test/java/org/astraea/cost/broker/MemoryCostTest.java @@ -81,7 +81,7 @@ public List availablePartitions(String topic) { } }; scores = memoryCost.brokerCost(clusterInfo2).value(); - Assertions.assertEquals(0.52, scores.get(1)); + Assertions.assertEquals(0.53, scores.get(1)); Assertions.assertEquals(0.52, scores.get(2)); Assertions.assertEquals(0.45, scores.get(3)); } From fc6b7951d2b35edc9cd1032e42b141a60d09a6cc Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Sat, 14 May 2022 18:54:39 +0800 Subject: [PATCH 6/9] normalization --- .../org/astraea/cost/broker/CostUtils.java | 6 +++++ .../smooth/SmoothWeightRoundRobin.java | 22 ++++++------------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/app/src/main/java/org/astraea/cost/broker/CostUtils.java b/app/src/main/java/org/astraea/cost/broker/CostUtils.java index bc1234ea60..8706204a47 100644 --- a/app/src/main/java/org/astraea/cost/broker/CostUtils.java +++ b/app/src/main/java/org/astraea/cost/broker/CostUtils.java @@ -43,4 +43,10 @@ public static Map TScore(Map metrics) { return score; })); } + + public static Map normalize(Map score) { + var sum = score.values().stream().mapToDouble(d -> d).sum(); + return score.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getKey() / sum)); + } } diff --git a/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java b/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java index f9998b69ef..d1867f914d 100644 --- a/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java +++ b/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java @@ -45,30 +45,22 @@ public synchronized void init(Map brokerScore) { Map effectiveWeight; if (effectiveWeightResult == null) { effectiveWeight = new HashMap<>(brokerScore); - effectiveWeight.replaceAll( - (k, v) -> (double) Math.round(100 * (1.0 / brokerScore.size())) / 100.0); + effectiveWeight.replaceAll((k, v) -> 1.0); this.currentWeight = new HashMap<>(brokerScore); this.currentWeight.replaceAll((k, v) -> 0.0); } else { - var zCurrentLoad = CostUtils.ZScore(brokerScore); + var normalizationLoad = CostUtils.normalize(brokerScore); effectiveWeight = this.effectiveWeightResult.effectiveWeight; effectiveWeight.replaceAll( (k, v) -> { - var zLoad = zCurrentLoad.get(k); - var score = - Math.round( - 10000 - * (v - - (zLoad.isNaN() ? 0.0 : zLoad) - * 0.01 - / effectiveWeight.size())) - / 10000.0; - if (score > 1.0) { + var nLoad = normalizationLoad.get(k); + var weight = v * (nLoad.isNaN() ? 1.0 : ((nLoad + 1) > 0 ? nLoad + 1 : 0.1)); + if (weight > 2.0) { return 1.0; - } else if (score < 0.0) { + } else if (weight < 0.0) { return 0.0; } - return score; + return weight; }); } return new EffectiveWeightResult(effectiveWeight); From da3c495382d18b762588ceeb5dcf9a2c713a36d7 Mon Sep 17 00:00:00 2001 From: wycccccc <43372856+wycccccc@users.noreply.github.com> Date: Sun, 15 May 2022 03:08:09 +0800 Subject: [PATCH 7/9] Update SmoothWeightRoundRobin.java --- .../org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java b/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java index d1867f914d..1ac9293fa6 100644 --- a/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java +++ b/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java @@ -56,7 +56,7 @@ public synchronized void init(Map brokerScore) { var nLoad = normalizationLoad.get(k); var weight = v * (nLoad.isNaN() ? 1.0 : ((nLoad + 1) > 0 ? nLoad + 1 : 0.1)); if (weight > 2.0) { - return 1.0; + return 2.0; } else if (weight < 0.0) { return 0.0; } From e4b156ec42db6279a73877a221e04608c31a78bb Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Mon, 16 May 2022 21:21:54 +0800 Subject: [PATCH 8/9] read-only --- app/src/main/java/org/astraea/Utils.java | 4 + .../main/java/org/astraea/cost/Periodic.java | 9 ++ .../smooth/SmoothWeightRoundRobin.java | 132 ++++++++++-------- 3 files changed, 86 insertions(+), 59 deletions(-) diff --git a/app/src/main/java/org/astraea/Utils.java b/app/src/main/java/org/astraea/Utils.java index 733c31c2d7..68ada88267 100644 --- a/app/src/main/java/org/astraea/Utils.java +++ b/app/src/main/java/org/astraea/Utils.java @@ -141,6 +141,10 @@ public static boolean overSecond(long lastTime, int second) { return (lastTime + Duration.ofSeconds(second).toMillis()) < System.currentTimeMillis(); } + public static boolean overSecond(long lastTime, Duration second) { + return (lastTime + second.toMillis()) < System.currentTimeMillis(); + } + public static void sleep(Duration duration) { try { TimeUnit.MILLISECONDS.sleep(duration.toMillis()); diff --git a/app/src/main/java/org/astraea/cost/Periodic.java b/app/src/main/java/org/astraea/cost/Periodic.java index f903af5327..34f33b6514 100644 --- a/app/src/main/java/org/astraea/cost/Periodic.java +++ b/app/src/main/java/org/astraea/cost/Periodic.java @@ -1,5 +1,6 @@ package org.astraea.cost; +import java.time.Duration; import java.util.function.Supplier; import org.astraea.Utils; @@ -40,4 +41,12 @@ protected Value tryUpdate(Supplier updater, int interval) { } return value; } + + protected Value tryUpdate(Supplier updater, Duration interval) { + if (Utils.overSecond(lastUpdate, interval)) { + value = updater.get(); + lastUpdate = currentTime(); + } + return value; + } } diff --git a/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java b/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java index d1867f914d..ead3a58672 100644 --- a/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java +++ b/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java @@ -1,8 +1,10 @@ package org.astraea.partitioner.smooth; +import java.time.Duration; import java.util.Comparator; -import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; + import org.astraea.cost.Periodic; import org.astraea.cost.broker.CostUtils; @@ -31,68 +33,80 @@ * ||======================||=======================||===============||======================|| */ public final class SmoothWeightRoundRobin - extends Periodic { - private EffectiveWeightResult effectiveWeightResult; - public Map currentWeight; + extends Periodic { + private EffectiveWeightResult effectiveWeightResult; + public Map currentWeight; - public SmoothWeightRoundRobin(Map effectiveWeight) { - init(effectiveWeight); - } + public SmoothWeightRoundRobin(Map effectiveWeight) { + effectiveWeightResult = + new EffectiveWeightResult( + effectiveWeight.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, ignored -> 1.0))); + currentWeight = + effectiveWeight.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, ignored -> 0.0)); + } - public synchronized void init(Map brokerScore) { - tryUpdate( - () -> { - Map effectiveWeight; - if (effectiveWeightResult == null) { - effectiveWeight = new HashMap<>(brokerScore); - effectiveWeight.replaceAll((k, v) -> 1.0); - this.currentWeight = new HashMap<>(brokerScore); - this.currentWeight.replaceAll((k, v) -> 0.0); - } else { - var normalizationLoad = CostUtils.normalize(brokerScore); - effectiveWeight = this.effectiveWeightResult.effectiveWeight; - effectiveWeight.replaceAll( - (k, v) -> { - var nLoad = normalizationLoad.get(k); - var weight = v * (nLoad.isNaN() ? 1.0 : ((nLoad + 1) > 0 ? nLoad + 1 : 0.1)); - if (weight > 2.0) { - return 1.0; - } else if (weight < 0.0) { - return 0.0; - } - return weight; - }); - } - return new EffectiveWeightResult(effectiveWeight); - }, - 10); - } + public synchronized void init(Map brokerScore) { + effectiveWeightResult = + tryUpdate( + () -> { + var normalizationLoad = CostUtils.normalize(brokerScore); + return new EffectiveWeightResult( + this.effectiveWeightResult.effectiveWeight.entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey(), + entry -> { + var nLoad = normalizationLoad.get(entry.getKey()); + var weight = + entry.getValue() + * (nLoad.isNaN() + ? 1.0 + : ((nLoad + 1) > 0 ? nLoad + 1 : 0.1)); + if (weight > 2.0) return 2.0; + return Math.max(weight, 0.0); + }))); + }, + Duration.ofSeconds(10)); + } - /** - * Get the preferred ID, and update the state. - * - * @return the preferred ID - */ - public synchronized int getAndChoose() { - var effective = effectiveWeightResult.effectiveWeight; - this.currentWeight.replaceAll((k, v) -> v + effective.get(k)); - var maxID = - this.currentWeight.entrySet().stream() - .max(Comparator.comparingDouble(Map.Entry::getValue)) - .orElseGet(() -> Map.entry(0, 0.0)) - .getKey(); - this.currentWeight.computeIfPresent( - maxID, (ID, value) -> value - effectiveWeightResult.effectiveWeightSum); - return maxID; - } + /** + * Get the preferred ID, and update the state. + * + * @return the preferred ID + */ + public synchronized int getAndChoose() { + this.currentWeight = + this.currentWeight.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue() + effectiveWeightResult.effectiveWeight.get(e.getKey()))); + var maxID = + this.currentWeight.entrySet().stream() + .max(Comparator.comparingDouble(Map.Entry::getValue)) + .map(Map.Entry::getKey) + .orElse(0); + this.currentWeight = + this.currentWeight.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> + e.getKey().equals(maxID) + ? e.getValue() - effectiveWeightResult.effectiveWeightSum + : e.getValue())); + return maxID; + } - public static class EffectiveWeightResult { - private final Map effectiveWeight; - private final double effectiveWeightSum; + public static class EffectiveWeightResult { + private final Map effectiveWeight; + private final double effectiveWeightSum; - EffectiveWeightResult(Map effectiveWeight) { - this.effectiveWeight = effectiveWeight; - this.effectiveWeightSum = effectiveWeight.values().stream().mapToDouble(i -> i).sum(); + EffectiveWeightResult(Map effectiveWeight) { + this.effectiveWeight = effectiveWeight; + this.effectiveWeightSum = effectiveWeight.values().stream().mapToDouble(i -> i).sum(); + } } - } } From 687fa05470b0f424c4e67987a8ef00bca20193d3 Mon Sep 17 00:00:00 2001 From: wycccccc <493172422@qq.com> Date: Mon, 16 May 2022 21:29:14 +0800 Subject: [PATCH 9/9] spotless --- .../smooth/SmoothWeightRoundRobin.java | 141 +++++++++--------- 1 file changed, 70 insertions(+), 71 deletions(-) diff --git a/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java b/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java index ead3a58672..5608f42450 100644 --- a/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java +++ b/app/src/main/java/org/astraea/partitioner/smooth/SmoothWeightRoundRobin.java @@ -4,7 +4,6 @@ import java.util.Comparator; import java.util.Map; import java.util.stream.Collectors; - import org.astraea.cost.Periodic; import org.astraea.cost.broker.CostUtils; @@ -33,80 +32,80 @@ * ||======================||=======================||===============||======================|| */ public final class SmoothWeightRoundRobin - extends Periodic { - private EffectiveWeightResult effectiveWeightResult; - public Map currentWeight; + extends Periodic { + private EffectiveWeightResult effectiveWeightResult; + public Map currentWeight; - public SmoothWeightRoundRobin(Map effectiveWeight) { - effectiveWeightResult = - new EffectiveWeightResult( - effectiveWeight.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, ignored -> 1.0))); - currentWeight = - effectiveWeight.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, ignored -> 0.0)); - } + public SmoothWeightRoundRobin(Map effectiveWeight) { + effectiveWeightResult = + new EffectiveWeightResult( + effectiveWeight.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, ignored -> 1.0))); + currentWeight = + effectiveWeight.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, ignored -> 0.0)); + } - public synchronized void init(Map brokerScore) { - effectiveWeightResult = - tryUpdate( - () -> { - var normalizationLoad = CostUtils.normalize(brokerScore); - return new EffectiveWeightResult( - this.effectiveWeightResult.effectiveWeight.entrySet().stream() - .collect( - Collectors.toMap( - entry -> entry.getKey(), - entry -> { - var nLoad = normalizationLoad.get(entry.getKey()); - var weight = - entry.getValue() - * (nLoad.isNaN() - ? 1.0 - : ((nLoad + 1) > 0 ? nLoad + 1 : 0.1)); - if (weight > 2.0) return 2.0; - return Math.max(weight, 0.0); - }))); - }, - Duration.ofSeconds(10)); - } + public synchronized void init(Map brokerScore) { + effectiveWeightResult = + tryUpdate( + () -> { + var normalizationLoad = CostUtils.normalize(brokerScore); + return new EffectiveWeightResult( + this.effectiveWeightResult.effectiveWeight.entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey(), + entry -> { + var nLoad = normalizationLoad.get(entry.getKey()); + var weight = + entry.getValue() + * (nLoad.isNaN() + ? 1.0 + : ((nLoad + 1) > 0 ? nLoad + 1 : 0.1)); + if (weight > 2.0) return 2.0; + return Math.max(weight, 0.0); + }))); + }, + Duration.ofSeconds(10)); + } - /** - * Get the preferred ID, and update the state. - * - * @return the preferred ID - */ - public synchronized int getAndChoose() { - this.currentWeight = - this.currentWeight.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> e.getValue() + effectiveWeightResult.effectiveWeight.get(e.getKey()))); - var maxID = - this.currentWeight.entrySet().stream() - .max(Comparator.comparingDouble(Map.Entry::getValue)) - .map(Map.Entry::getKey) - .orElse(0); - this.currentWeight = - this.currentWeight.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> - e.getKey().equals(maxID) - ? e.getValue() - effectiveWeightResult.effectiveWeightSum - : e.getValue())); - return maxID; - } + /** + * Get the preferred ID, and update the state. + * + * @return the preferred ID + */ + public synchronized int getAndChoose() { + this.currentWeight = + this.currentWeight.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue() + effectiveWeightResult.effectiveWeight.get(e.getKey()))); + var maxID = + this.currentWeight.entrySet().stream() + .max(Comparator.comparingDouble(Map.Entry::getValue)) + .map(Map.Entry::getKey) + .orElse(0); + this.currentWeight = + this.currentWeight.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> + e.getKey().equals(maxID) + ? e.getValue() - effectiveWeightResult.effectiveWeightSum + : e.getValue())); + return maxID; + } - public static class EffectiveWeightResult { - private final Map effectiveWeight; - private final double effectiveWeightSum; + public static class EffectiveWeightResult { + private final Map effectiveWeight; + private final double effectiveWeightSum; - EffectiveWeightResult(Map effectiveWeight) { - this.effectiveWeight = effectiveWeight; - this.effectiveWeightSum = effectiveWeight.values().stream().mapToDouble(i -> i).sum(); - } + EffectiveWeightResult(Map effectiveWeight) { + this.effectiveWeight = effectiveWeight; + this.effectiveWeightSum = effectiveWeight.values().stream().mapToDouble(i -> i).sum(); } + } }