From 8a55c410a5f468d45827efe9fa92b0dd8f8a1d57 Mon Sep 17 00:00:00 2001 From: skarpenko Date: Wed, 13 Mar 2024 18:26:05 +0200 Subject: [PATCH] Removed CheckedHerd and improve logs in PastureCoordinator --- .../shepherd/inernal/ShepherdTest.java | 9 +++-- .../com/playtika/shepherd/KafkaPullFarm.java | 3 +- .../com/playtika/shepherd/KafkaPushFarm.java | 3 +- .../shepherd/inernal/CheckedHerd.java | 33 ------------------- .../shepherd/inernal/PastureCoordinator.java | 17 +++++++++- 5 files changed, 22 insertions(+), 43 deletions(-) delete mode 100644 kafka/src/main/java/com/playtika/shepherd/inernal/CheckedHerd.java diff --git a/kafka-functional-tests/src/test/java/com/playtika/shepherd/inernal/ShepherdTest.java b/kafka-functional-tests/src/test/java/com/playtika/shepherd/inernal/ShepherdTest.java index 84f7bb3..7fcece3 100644 --- a/kafka-functional-tests/src/test/java/com/playtika/shepherd/inernal/ShepherdTest.java +++ b/kafka-functional-tests/src/test/java/com/playtika/shepherd/inernal/ShepherdTest.java @@ -14,7 +14,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; -import static com.playtika.shepherd.inernal.CheckedHerd.checked; import static com.playtika.shepherd.inernal.utils.BytesUtils.toBytes; import static java.time.Duration.ofSeconds; import static org.assertj.core.api.Assertions.assertThat; @@ -33,7 +32,7 @@ public void shouldBalanceStaticHerd() { ByteBuffer cow1 = ByteBuffer.wrap(new byte[]{1}); ByteBuffer cow2 = ByteBuffer.wrap(new byte[]{0}); - Herd herd = checked(new Herd() { + Herd herd = new Herd() { @Override public Population getPopulation() { return new Population(List.of(cow1, cow2), -1); @@ -42,7 +41,7 @@ public Population getPopulation() { @Override public void reset() { } - }); + }; LinkedBlockingQueue cows1 = new LinkedBlockingQueue<>(); @@ -121,7 +120,7 @@ public void shouldBalanceDynamicHerd() { List population = new CopyOnWriteArrayList<>(List.of(cow1, cow2)); AtomicInteger version = new AtomicInteger(1); - Herd herd = checked(new Herd() { + Herd herd = new Herd() { @Override public Population getPopulation() { return new Population(population, version.get()); @@ -130,7 +129,7 @@ public Population getPopulation() { @Override public void reset() { } - }); + }; LinkedBlockingQueue cows1 = new LinkedBlockingQueue<>(); PastureListener rebalanceListener1 = new PastureListener<>() { diff --git a/kafka/src/main/java/com/playtika/shepherd/KafkaPullFarm.java b/kafka/src/main/java/com/playtika/shepherd/KafkaPullFarm.java index b5b7ed9..3adcd65 100644 --- a/kafka/src/main/java/com/playtika/shepherd/KafkaPullFarm.java +++ b/kafka/src/main/java/com/playtika/shepherd/KafkaPullFarm.java @@ -15,7 +15,6 @@ import java.util.List; import java.util.Map; -import static com.playtika.shepherd.inernal.CheckedHerd.checked; import static com.playtika.shepherd.serde.SerDeUtils.BYTE_BUFFER_DE_SER; import static com.playtika.shepherd.serde.SerDeUtils.getSerDe; @@ -51,7 +50,7 @@ private Pasture addBreedingPasture(Herd herd, SerDe serDe .setGroupId(herd.getName()) .setProperties(properties) .setRebalanceListener(pullHerd) - .setHerd(checked(pullHerd)) + .setHerd(pullHerd) .build(); pullHerd.setPastureShepherd(pastureShepherd); diff --git a/kafka/src/main/java/com/playtika/shepherd/KafkaPushFarm.java b/kafka/src/main/java/com/playtika/shepherd/KafkaPushFarm.java index 32c698f..ffe26c8 100644 --- a/kafka/src/main/java/com/playtika/shepherd/KafkaPushFarm.java +++ b/kafka/src/main/java/com/playtika/shepherd/KafkaPushFarm.java @@ -17,7 +17,6 @@ import java.util.Map; import java.util.function.Supplier; -import static com.playtika.shepherd.inernal.CheckedHerd.checked; import static com.playtika.shepherd.inernal.utils.CacheUtils.memoize; import static com.playtika.shepherd.serde.SerDeUtils.BYTE_BUFFER_DE_SER; import static com.playtika.shepherd.serde.SerDeUtils.getSerDe; @@ -55,7 +54,7 @@ private Pasture addBreedingPasture(String herdName, SerDe .setGroupId(herdName) .setProperties(properties) .setRebalanceListener(pushHerd) - .setHerd(checked(pushHerd)) + .setHerd(pushHerd) .build(); pushHerd.setPastureShepherd(pastureShepherd); diff --git a/kafka/src/main/java/com/playtika/shepherd/inernal/CheckedHerd.java b/kafka/src/main/java/com/playtika/shepherd/inernal/CheckedHerd.java deleted file mode 100644 index 7d0a681..0000000 --- a/kafka/src/main/java/com/playtika/shepherd/inernal/CheckedHerd.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.playtika.shepherd.inernal; - -public class CheckedHerd implements Herd { - - private final Herd herd; - private boolean requested = false; - - public static Herd checked(Herd herd){ - return new CheckedHerd(herd); - } - - private CheckedHerd(Herd herd) { - this.herd = herd; - } - - @Override - public Population getPopulation(){ - if(requested){ - throw new IllegalStateException("Should be called only once on rebalance"); - } - try { - return herd.getPopulation(); - } finally { - requested = true; - } - } - - @Override - public void reset() { - herd.reset(); - requested = false; - } -} diff --git a/kafka/src/main/java/com/playtika/shepherd/inernal/PastureCoordinator.java b/kafka/src/main/java/com/playtika/shepherd/inernal/PastureCoordinator.java index 89575d1..721d5a4 100644 --- a/kafka/src/main/java/com/playtika/shepherd/inernal/PastureCoordinator.java +++ b/kafka/src/main/java/com/playtika/shepherd/inernal/PastureCoordinator.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static com.playtika.shepherd.inernal.ProtocolHelper.decompress; import static com.playtika.shepherd.inernal.ProtocolHelper.deserializeAssignment; @@ -179,7 +180,21 @@ protected Map onLeaderElected(String leaderId, Population population = herd.getPopulation(); leaderElected = leaderId; - logger.info("Will rebalance population: [{}]", toBytes(population.getSheep())); + if(logger.isDebugEnabled()) { + logger.debug(""" + Will rebalance population of size=[{}] among members count=[{}], + members=[{}], + population=[{}]""", + population.getSheep().size(), allMemberMetadata.size(), + allMemberMetadata.stream().map(JoinGroupResponseMember::memberId).collect(Collectors.joining(", ")), + toBytes(population.getSheep())); + } else { + logger.info(""" + Will rebalance population of size=[{}] among members count=[{}], + members=[{}]""", + population.getSheep().size(), allMemberMetadata.size(), + allMemberMetadata.stream().map(JoinGroupResponseMember::memberId).collect(Collectors.joining(", "))); + } return assignor.performAssignment(leaderId, protocol, population.getSheep(), population.getVersion(),