Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch version from int to long. #26

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public interface PastureListener<Breed> {
/**
* Invoked when new subpopulation assigned to this pasture
*/
void assigned(List<Breed> population, int version, int generation, boolean isLeader);
void assigned(List<Breed> population, long version, int generation, boolean isLeader);

/**
* Invoked on first phase of rebalance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ public interface Herd<Breed> {

void reset();

record Population<Breed>(Breed[] population, int version) {
record Population<Breed>(Breed[] population, long version) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ public interface Shepherd<Breed> {
* @return true if it will cause rebalance, false if population will be ignored
*/

boolean setPopulation(Breed[] population, int version);
boolean setPopulation(Breed[] population, long version);

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

Expand Down Expand Up @@ -112,7 +112,7 @@ private void shouldBalanceDynamicHerd(boolean versioned) {
int ver1 = nextVersion(0, versioned);

AtomicReference<List<ByteBuffer>> cows1 = new AtomicReference<>(List.of());
AtomicInteger version1 = new AtomicInteger();
AtomicLong version1 = new AtomicLong();
Pasture pasture1 = kafkaRanch.addPasture(herd, (population, version, generation, isLeader) -> {
logPopulation(1, population, version, isLeader);
cows1.set(population);
Expand All @@ -122,7 +122,7 @@ private void shouldBalanceDynamicHerd(boolean versioned) {
pasture1.start();

AtomicReference<List<ByteBuffer>> cows2 = new AtomicReference<>(List.of());
AtomicInteger version2 = new AtomicInteger();
AtomicLong version2 = new AtomicLong();
Pasture pasture2 = kafkaRanch.addPasture(herd, (population, version, generation, isLeader) -> {
logPopulation(2, population, version, isLeader);
cows2.set(population);
Expand Down Expand Up @@ -253,7 +253,7 @@ public void shouldBalanceBreedingStaticHerd() {
});
}

private static void logPopulation(int pastureIndex, List<ByteBuffer> population, int version, boolean isLeader) {
private static void logPopulation(int pastureIndex, List<ByteBuffer> population, long version, boolean isLeader) {
logger.info("Assigned to pasture{} leader={} version={} [{}]", pastureIndex, isLeader, version, toBytes(population));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

Expand Down Expand Up @@ -111,7 +111,7 @@ private void shouldBalanceDynamicHerd(boolean versioned) {
int ver1 = nextVersion(0, versioned);

AtomicReference<List<ByteBuffer>> cows1 = new AtomicReference<>(List.of());
AtomicInteger version1 = new AtomicInteger();
AtomicLong version1 = new AtomicLong();
String herdName = versioned ? "push-dynamic-group-versioned" : "push-dynamic-group";
Pasture<ByteBuffer> pasture1 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
logPopulation(1, population, version, isLeader);
Expand All @@ -122,7 +122,7 @@ private void shouldBalanceDynamicHerd(boolean versioned) {
pasture1.start();

AtomicReference<List<ByteBuffer>> cows2 = new AtomicReference<>(List.of());
AtomicInteger version2 = new AtomicInteger();
AtomicLong version2 = new AtomicLong();
Pasture<ByteBuffer> pasture2 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
logPopulation(2, population, version, isLeader);
cows2.set(population);
Expand Down Expand Up @@ -326,7 +326,7 @@ public void shouldBalanceBreedingStaticHerd() {
});
}

private static void logPopulation(int pastureIndex, List<ByteBuffer> population, int version, boolean isLeader) {
private static void logPopulation(int pastureIndex, List<ByteBuffer> population, long version, boolean isLeader) {
logger.info("Assigned to pasture{} leader={} version={} [{}]", pastureIndex, isLeader, version, toBytes(population));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
Expand All @@ -37,7 +36,7 @@ public void shouldBalanceStaticHerd() {
Herd herd = checked(new Herd() {
@Override
public Population getPopulation() {
return new Population(Set.of(cow1, cow2), -1);
return new Population(List.of(cow1, cow2), -1);
}

@Override
Expand All @@ -49,7 +48,7 @@ public void reset() {
LinkedBlockingQueue<ByteBuffer> cows1 = new LinkedBlockingQueue<>();
PastureListener<ByteBuffer> rebalanceListener1 = new PastureListener<>() {
@Override
public void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
logger.info("Assigned cows1 [{}]", toBytes(population));
cows1.addAll(population);
}
Expand Down Expand Up @@ -78,7 +77,7 @@ public void cleanup() {
LinkedBlockingQueue<ByteBuffer> cows2 = new LinkedBlockingQueue<>();
PastureListener<ByteBuffer> rebalanceListener2 = new PastureListener<>() {
@Override
public void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
logger.info("Assigned cows2 [{}]", toBytes(population));
cows2.addAll(population);
}
Expand Down Expand Up @@ -119,8 +118,7 @@ public void shouldBalanceDynamicHerd() {

ByteBuffer cow1 = ByteBuffer.wrap(new byte[]{1});
ByteBuffer cow2 = ByteBuffer.wrap(new byte[]{0});
Set<ByteBuffer> population = ConcurrentHashMap.newKeySet();
population.addAll(List.of(cow1, cow2));
List<ByteBuffer> population = new CopyOnWriteArrayList<>(List.of(cow1, cow2));
AtomicInteger version = new AtomicInteger(1);

Herd herd = checked(new Herd() {
Expand All @@ -137,7 +135,7 @@ public void reset() {
LinkedBlockingQueue<ByteBuffer> cows1 = new LinkedBlockingQueue<>();
PastureListener<ByteBuffer> rebalanceListener1 = new PastureListener<>() {
@Override
public void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
logger.info("Assigned cows1 [{}]", toBytes(population));
cows1.addAll(population);
}
Expand All @@ -161,7 +159,7 @@ public void cleanup() {
LinkedBlockingQueue<ByteBuffer> cows2 = new LinkedBlockingQueue<>();
PastureListener<ByteBuffer> rebalanceListener2 = new PastureListener<>() {
@Override
public void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
logger.info("Assigned cows2 [{}]", toBytes(population));
cows2.addAll(population);
}
Expand Down
5 changes: 2 additions & 3 deletions kafka/src/main/java/com/playtika/shepherd/KafkaPullFarm.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -76,7 +75,7 @@ static class PullHerd<Breed> implements com.playtika.shepherd.inernal.Herd, Past
@Override
public Population getPopulation() {
Herd.Population<Breed> population = herd.getPopulation();
return new Population(new HashSet<>(serDe.serialize(List.of(population.population()))), population.version());
return new Population(serDe.serialize(List.of(population.population())), population.version());
}

@Override
Expand Down Expand Up @@ -107,7 +106,7 @@ public void rebalanceHerd() {
}

@Override
public void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
pastureListener.assigned(serDe.deserialize(population), version, generation, isLeader);
}

Expand Down
12 changes: 5 additions & 7 deletions kafka/src/main/java/com/playtika/shepherd/KafkaPushFarm.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.playtika.shepherd;

import com.playtika.shepherd.common.PastureListener;
import com.playtika.shepherd.common.push.Farm;
import com.playtika.shepherd.common.push.Pasture;
import com.playtika.shepherd.common.PastureListener;
import com.playtika.shepherd.common.push.Shepherd;
import com.playtika.shepherd.inernal.Herd;
import com.playtika.shepherd.inernal.PastureShepherd;
Expand All @@ -13,10 +13,8 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static com.playtika.shepherd.inernal.CheckedHerd.checked;
Expand Down Expand Up @@ -83,21 +81,21 @@ static final class PushHerd<Breed> implements Herd, Pasture<Breed>, Shepherd<Bre
private Population snapshot;
private Population latest;

private int assignedVersion = Integer.MIN_VALUE;
private long assignedVersion = Long.MIN_VALUE;

PushHerd(PastureListener<Breed> pastureListener, SerDe<Breed> serDe) {
this.pastureListener = pastureListener;
this.serDe = serDe;
}

@Override
public synchronized boolean setPopulation(Breed[] population, int version) {
public synchronized boolean setPopulation(Breed[] population, long version) {
//Ignore outdated non-static version
if(version >=0 && version <= assignedVersion){
return false;
}

Supplier<Set<ByteBuffer>> latest = memoize(() -> new HashSet<>(serDe.serialize(Arrays.asList(population))));
Supplier<List<ByteBuffer>> latest = memoize(() -> serDe.serialize(Arrays.asList(population)));
if(this.snapshot == null
|| version >= 0 && version > this.snapshot.getVersion()
|| version < 0 && !this.snapshot.getSheep().equals(latest.get())){
Expand Down Expand Up @@ -136,7 +134,7 @@ public synchronized void reset() {
}

@Override
public synchronized void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
public synchronized void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
this.pastureListener.assigned(serDe.deserialize(population), version, generation, isLeader);
this.assignedVersion = version;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,6 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;

public class Assignment {

private final String leader;
private final List<ByteBuffer> assigned;
private final int version;

public Assignment(String leader, int version, List<ByteBuffer> assigned) {
this.leader = leader;
this.assigned = assigned;
this.version = version;
}

public String getLeader() {
return leader;
}

public List<ByteBuffer> getAssigned() {
return assigned;
}

public int getVersion() {
return version;
}

@Override
public boolean equals(Object obj){
Assignment assignment = (Assignment) obj;
return Objects.equals(leader, assignment.leader)
&& Objects.equals(assigned, assignment.assigned);
}

public record Assignment(String leader, long version, List<ByteBuffer> assigned) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;

public interface Assignor {
Map<String, ByteBuffer> performAssignment(
String leaderId, String protocol, Set<ByteBuffer> population, int version,
String leaderId, String protocol, List<ByteBuffer> population, long version,
List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,10 @@ protected void onJoinComplete(int generation, String memberId, String protocol,
Assignment newAssignment = deserializeAssignment(decompress(memberAssignment));
if(logger.isDebugEnabled()){
logger.debug("Received new assignment version: {}, generation: {}, memberId: {}\nassigned: [{}]",
newAssignment.getVersion(), generation, memberId, toBytes(newAssignment.getAssigned()));
newAssignment.version(), generation, memberId, toBytes(newAssignment.assigned()));
} else {
logger.info("Received new assignment version: {}, generation: {}, memberId: {}",
newAssignment.getVersion(), generation, memberId);
newAssignment.version(), generation, memberId);
}
// At this point we always consider ourselves to be a member of the cluster, even if there was an assignment
// error (the leader couldn't make the assignment) or we are behind the config and cannot yet work on our assigned
Expand All @@ -203,7 +203,7 @@ protected void onJoinComplete(int generation, String memberId, String protocol,
rejoinRequested = false;
assignmentSnapshot = newAssignment;
lastCompletedGenerationId = generation;
listener.assigned(newAssignment.getAssigned(), newAssignment.getVersion(), generation, isLeader(newAssignment));
listener.assigned(newAssignment.assigned(), newAssignment.version(), generation, isLeader(newAssignment));
}

@Override
Expand Down Expand Up @@ -245,7 +245,7 @@ public boolean isLeaderElected() {
}

private boolean isLeader(Assignment assignment) {
return memberId().equals(assignment.getLeader());
return memberId().equals(assignment.leader());
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package com.playtika.shepherd.inernal;

import java.nio.ByteBuffer;
import java.util.Set;
import java.util.List;

public class Population {

private final Set<ByteBuffer> sheep;
private final int version;
private final List<ByteBuffer> sheep;
private final long version;

public Population(Set<ByteBuffer> sheep, int version) {
public Population(List<ByteBuffer> sheep, long version) {
this.sheep = sheep;
this.version = version;
}

public Set<ByteBuffer> getSheep() {
public List<ByteBuffer> getSheep() {
return sheep;
}

public int getVersion() {
public long getVersion() {
return version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class ProtocolHelper {

public static final Schema ASSIGNMENT_V0 = new Schema(
new Field(LEADER_KEY_NAME, Type.STRING),
new Field(VERSION_KEY_NAME, Type.INT32),
new Field(VERSION_KEY_NAME, Type.INT64),
new Field(ASSIGNED_KEY_NAME, new ArrayOf(BYTES)));
public static final int COMPRESSION_LEVEL = 17;

Expand All @@ -48,9 +48,9 @@ public static ByteBuffer decompress(ByteBuffer buffer) {

public static ByteBuffer serializeAssignment(Assignment assignment) {
Struct struct = new Struct(ASSIGNMENT_V0);
struct.set(LEADER_KEY_NAME, assignment.getLeader());
struct.set(VERSION_KEY_NAME, assignment.getVersion());
struct.set(ASSIGNED_KEY_NAME, assignment.getAssigned().toArray());
struct.set(LEADER_KEY_NAME, assignment.leader());
struct.set(VERSION_KEY_NAME, assignment.version());
struct.set(ASSIGNED_KEY_NAME, assignment.assigned().toArray());

ByteBuffer buffer = ByteBuffer.allocate(ASSIGNMENT_V0.sizeOf(struct));
ASSIGNMENT_V0.write(buffer, struct);
Expand All @@ -61,7 +61,7 @@ public static ByteBuffer serializeAssignment(Assignment assignment) {
public static Assignment deserializeAssignment(ByteBuffer buffer) {
Struct struct = ASSIGNMENT_V0.read(buffer);
String leader = struct.getString(LEADER_KEY_NAME);
int version = struct.getInt(VERSION_KEY_NAME);
long version = struct.getLong(VERSION_KEY_NAME);
List<ByteBuffer> assigned = new ArrayList<>();
for (Object element : struct.getArray(ASSIGNED_KEY_NAME)) {
assigned.add((ByteBuffer) element);
Expand Down
Loading
Loading