Skip to content

Commit

Permalink
Removed CheckedHerd and improve logs in PastureCoordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
skarpenko committed Mar 13, 2024
1 parent 0fda531 commit 8a55c41
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import static com.playtika.shepherd.inernal.CheckedHerd.checked;
import static com.playtika.shepherd.inernal.utils.BytesUtils.toBytes;
import static java.time.Duration.ofSeconds;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -33,7 +32,7 @@ public void shouldBalanceStaticHerd() {
ByteBuffer cow1 = ByteBuffer.wrap(new byte[]{1});
ByteBuffer cow2 = ByteBuffer.wrap(new byte[]{0});

Herd herd = checked(new Herd() {
Herd herd = new Herd() {
@Override
public Population getPopulation() {
return new Population(List.of(cow1, cow2), -1);
Expand All @@ -42,7 +41,7 @@ public Population getPopulation() {
@Override
public void reset() {
}
});
};


LinkedBlockingQueue<ByteBuffer> cows1 = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -121,7 +120,7 @@ public void shouldBalanceDynamicHerd() {
List<ByteBuffer> population = new CopyOnWriteArrayList<>(List.of(cow1, cow2));
AtomicInteger version = new AtomicInteger(1);

Herd herd = checked(new Herd() {
Herd herd = new Herd() {
@Override
public Population getPopulation() {
return new Population(population, version.get());
Expand All @@ -130,7 +129,7 @@ public Population getPopulation() {
@Override
public void reset() {
}
});
};

LinkedBlockingQueue<ByteBuffer> cows1 = new LinkedBlockingQueue<>();
PastureListener<ByteBuffer> rebalanceListener1 = new PastureListener<>() {
Expand Down
3 changes: 1 addition & 2 deletions kafka/src/main/java/com/playtika/shepherd/KafkaPullFarm.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.List;
import java.util.Map;

import static com.playtika.shepherd.inernal.CheckedHerd.checked;
import static com.playtika.shepherd.serde.SerDeUtils.BYTE_BUFFER_DE_SER;
import static com.playtika.shepherd.serde.SerDeUtils.getSerDe;

Expand Down Expand Up @@ -51,7 +50,7 @@ private <Breed> Pasture addBreedingPasture(Herd<Breed> herd, SerDe<Breed> serDe
.setGroupId(herd.getName())
.setProperties(properties)
.setRebalanceListener(pullHerd)
.setHerd(checked(pullHerd))
.setHerd(pullHerd)
.build();

pullHerd.setPastureShepherd(pastureShepherd);
Expand Down
3 changes: 1 addition & 2 deletions kafka/src/main/java/com/playtika/shepherd/KafkaPushFarm.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Map;
import java.util.function.Supplier;

import static com.playtika.shepherd.inernal.CheckedHerd.checked;
import static com.playtika.shepherd.inernal.utils.CacheUtils.memoize;
import static com.playtika.shepherd.serde.SerDeUtils.BYTE_BUFFER_DE_SER;
import static com.playtika.shepherd.serde.SerDeUtils.getSerDe;
Expand Down Expand Up @@ -55,7 +54,7 @@ private <Breed> Pasture<Breed> addBreedingPasture(String herdName, SerDe<Breed>
.setGroupId(herdName)
.setProperties(properties)
.setRebalanceListener(pushHerd)
.setHerd(checked(pushHerd))
.setHerd(pushHerd)
.build();

pushHerd.setPastureShepherd(pastureShepherd);
Expand Down
33 changes: 0 additions & 33 deletions kafka/src/main/java/com/playtika/shepherd/inernal/CheckedHerd.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static com.playtika.shepherd.inernal.ProtocolHelper.decompress;
import static com.playtika.shepherd.inernal.ProtocolHelper.deserializeAssignment;
Expand Down Expand Up @@ -179,7 +180,21 @@ protected Map<String, ByteBuffer> onLeaderElected(String leaderId,
Population population = herd.getPopulation();
leaderElected = leaderId;

logger.info("Will rebalance population: [{}]", toBytes(population.getSheep()));
if(logger.isDebugEnabled()) {
logger.debug("""
Will rebalance population of size=[{}] among members count=[{}],
members=[{}],
population=[{}]""",
population.getSheep().size(), allMemberMetadata.size(),
allMemberMetadata.stream().map(JoinGroupResponseMember::memberId).collect(Collectors.joining(", ")),
toBytes(population.getSheep()));
} else {
logger.info("""
Will rebalance population of size=[{}] among members count=[{}],
members=[{}]""",
population.getSheep().size(), allMemberMetadata.size(),
allMemberMetadata.stream().map(JoinGroupResponseMember::memberId).collect(Collectors.joining(", ")));
}

return assignor.performAssignment(leaderId, protocol,
population.getSheep(), population.getVersion(),
Expand Down

0 comments on commit 8a55c41

Please sign in to comment.