Skip to content

Commit

Permalink
Add KafkaPullFarm. Switch to Push and Pull approach
Browse files Browse the repository at this point in the history
  • Loading branch information
skarpenko committed Jan 29, 2024
1 parent ad9981d commit 4e0e641
Show file tree
Hide file tree
Showing 18 changed files with 626 additions and 117 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ Thereby it inherits well-known Kafka heartbeat mechanics and its config paramete
```java

/* Specify Kafka servers (will be used for rebalance only) */
KafkaFarm kafkaFarm = new KafkaFarm(kafkaBootstrapServers);
KafkaPushFarm kafkaPushFarm = new KafkaPushFarm(kafkaBootstrapServers);

/* Join the pasture to herd of specific name and class */
Pasture<String> skyNet = kafkaFarm.addBreedingPasture("SkyNet", String.class,
Pasture<String> skyNet = kafkaPushFarm.addBreedingPasture("SkyNet", String.class,
/* listener that will be use to update local assignment to this pasture */
(population, version, generation, isLeader) -> {
logger.info("Assigned leader={} version={} [{}]", isLeader, version, population);
});

/* set global population that will be distributed among all members of this herd */
skyNet.getShepherd().setPopulation(population, version.intValue())

skyNet.start();
```

### Modules
Expand Down
10 changes: 0 additions & 10 deletions common/src/main/java/com/playtika/shepherd/common/Pasture.java

This file was deleted.

11 changes: 0 additions & 11 deletions common/src/main/java/com/playtika/shepherd/common/Shepherd.java

This file was deleted.

31 changes: 31 additions & 0 deletions common/src/main/java/com/playtika/shepherd/common/pull/Farm.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.playtika.shepherd.common.pull;

import com.playtika.shepherd.common.PastureListener;

import java.nio.ByteBuffer;

/**
* Farm has many herds distributed evenly by pastures
*/
public interface Farm {

/**
* Here we come to Farm with our Pasture to graze specific Herd on it.
*
* @param herd Herd we want to inhabit this pasture
* @param pastureListener Will listen for animals from Herd that will be assigned to our Pasture
* @return Shepherd allows to set Herd population
*/
Pasture addPasture(Herd<ByteBuffer> herd, PastureListener<ByteBuffer> pastureListener);

/**
* Here we come to Farm with our Pasture to graze specific Breeding Herd on it.
*
* @param herd Herd we want to inhabit this pasture
* @param breedClass Only elements of this class accepted in this herd
* @param pastureListener Will listen for animals from Herd that will be assigned to our Pasture
* @return Shepherd allows to set Herd population
*/
<Breed> Pasture addBreedingPasture(Herd<Breed> herd, Class<Breed> breedClass, PastureListener<Breed> pastureListener);

}
17 changes: 17 additions & 0 deletions common/src/main/java/com/playtika/shepherd/common/pull/Herd.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.playtika.shepherd.common.pull;

/**
* Provides population
* @param <Breed>
*/
public interface Herd<Breed> {

String getName();

Population<Breed> getPopulation();

void reset();

record Population<Breed>(Breed[] population, int version) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.playtika.shepherd.common.pull;

import java.time.Duration;

public interface Pasture {

Shepherd getShepherd();

void start();

void close(Duration timeout);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.playtika.shepherd.common.pull;

/**
* Allows to rebalance population among pastures
*/
public interface Shepherd {

void rebalanceHerd();

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
package com.playtika.shepherd.common;
package com.playtika.shepherd.common.push;

import com.playtika.shepherd.common.PastureListener;

import java.nio.ByteBuffer;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.playtika.shepherd.common.push;

import java.time.Duration;

public interface Pasture<Breed> {

Shepherd<Breed> getShepherd();

/**
* Set population via Shepherd before calling this method
*/
void start();

void close(Duration timeout);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.playtika.shepherd.common.push;

/**
* Allows to set herd population
* @param <Breed>
*/
public interface Shepherd<Breed> {

/**
*
* @param population
* @param version
* @return true if it will cause rebalance, false if population will be ignored
*/

boolean setPopulation(Breed[] population, int version);

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.playtika.shepherd;

import com.playtika.shepherd.common.Pasture;
import com.playtika.shepherd.common.push.Pasture;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -32,17 +32,17 @@ public static void main(String[] args) {

Consumer<String, String> consumer = createConsumer();

KafkaFarm kafkaFarm = new KafkaFarm(kafkaContainer.getBootstrapServers());
KafkaPushFarm kafkaPushFarm = new KafkaPushFarm(kafkaContainer.getBootstrapServers());


Pasture<String> skyNet = kafkaFarm.addBreedingPasture("SkyNet", String.class,
Pasture<String> skyNet = kafkaPushFarm.addBreedingPasture("SkyNet", String.class,
(population, version, generation, isLeader) -> {
logger.info("Assigned leader={} version={} [{}]", isLeader, version, population);
});


System.out.println("****************************************************");
System.out.println("* Joined the Kafka Farm: \n"+kafkaFarm);
System.out.println("* Joined the Kafka Farm: \n"+ kafkaPushFarm);
System.out.println("* Herd name: SkyNet");
System.out.println("****************************************************");

Expand Down
Loading

0 comments on commit 4e0e641

Please sign in to comment.