Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KafkaPullFarm. Switch to Push and Pull approach #25

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ Thereby it inherits well-known Kafka heartbeat mechanics and its config paramete
```java

/* Specify Kafka servers (will be used for rebalance only) */
KafkaFarm kafkaFarm = new KafkaFarm(kafkaBootstrapServers);
KafkaPushFarm kafkaPushFarm = new KafkaPushFarm(kafkaBootstrapServers);

/* Join the pasture to herd of specific name and class */
Pasture<String> skyNet = kafkaFarm.addBreedingPasture("SkyNet", String.class,
Pasture<String> skyNet = kafkaPushFarm.addBreedingPasture("SkyNet", String.class,
/* listener that will be use to update local assignment to this pasture */
(population, version, generation, isLeader) -> {
logger.info("Assigned leader={} version={} [{}]", isLeader, version, population);
});

/* set global population that will be distributed among all members of this herd */
skyNet.getShepherd().setPopulation(population, version.intValue())

skyNet.start();
```

### Modules
Expand Down
10 changes: 0 additions & 10 deletions common/src/main/java/com/playtika/shepherd/common/Pasture.java

This file was deleted.

11 changes: 0 additions & 11 deletions common/src/main/java/com/playtika/shepherd/common/Shepherd.java

This file was deleted.

31 changes: 31 additions & 0 deletions common/src/main/java/com/playtika/shepherd/common/pull/Farm.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.playtika.shepherd.common.pull;

import com.playtika.shepherd.common.PastureListener;

import java.nio.ByteBuffer;

/**
* Farm has many herds distributed evenly by pastures
*/
public interface Farm {

/**
* Here we come to Farm with our Pasture to graze specific Herd on it.
*
* @param herd Herd we want to inhabit this pasture
* @param pastureListener Will listen for animals from Herd that will be assigned to our Pasture
* @return Shepherd allows to set Herd population
*/
Pasture addPasture(Herd<ByteBuffer> herd, PastureListener<ByteBuffer> pastureListener);

/**
* Here we come to Farm with our Pasture to graze specific Breeding Herd on it.
*
* @param herd Herd we want to inhabit this pasture
* @param breedClass Only elements of this class accepted in this herd
* @param pastureListener Will listen for animals from Herd that will be assigned to our Pasture
* @return Shepherd allows to set Herd population
*/
<Breed> Pasture addBreedingPasture(Herd<Breed> herd, Class<Breed> breedClass, PastureListener<Breed> pastureListener);

}
17 changes: 17 additions & 0 deletions common/src/main/java/com/playtika/shepherd/common/pull/Herd.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.playtika.shepherd.common.pull;

/**
* Provides population
* @param <Breed>
*/
public interface Herd<Breed> {

String getName();

Population<Breed> getPopulation();

void reset();

record Population<Breed>(Breed[] population, int version) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.playtika.shepherd.common.pull;

import java.time.Duration;

public interface Pasture {

Shepherd getShepherd();

void start();

void close(Duration timeout);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.playtika.shepherd.common.pull;

/**
* Allows to rebalance population among pastures
*/
public interface Shepherd {

void rebalanceHerd();

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
package com.playtika.shepherd.common;
package com.playtika.shepherd.common.push;

import com.playtika.shepherd.common.PastureListener;

import java.nio.ByteBuffer;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.playtika.shepherd.common.push;

import java.time.Duration;

public interface Pasture<Breed> {

Shepherd<Breed> getShepherd();

/**
* Set population via Shepherd before calling this method
*/
void start();

void close(Duration timeout);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.playtika.shepherd.common.push;

/**
* Allows to set herd population
* @param <Breed>
*/
public interface Shepherd<Breed> {

/**
*
* @param population
* @param version
* @return true if it will cause rebalance, false if population will be ignored
*/

boolean setPopulation(Breed[] population, int version);

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.playtika.shepherd;

import com.playtika.shepherd.common.Pasture;
import com.playtika.shepherd.common.push.Pasture;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -32,17 +32,17 @@ public static void main(String[] args) {

Consumer<String, String> consumer = createConsumer();

KafkaFarm kafkaFarm = new KafkaFarm(kafkaContainer.getBootstrapServers());
KafkaPushFarm kafkaPushFarm = new KafkaPushFarm(kafkaContainer.getBootstrapServers());


Pasture<String> skyNet = kafkaFarm.addBreedingPasture("SkyNet", String.class,
Pasture<String> skyNet = kafkaPushFarm.addBreedingPasture("SkyNet", String.class,
(population, version, generation, isLeader) -> {
logger.info("Assigned leader={} version={} [{}]", isLeader, version, population);
});


System.out.println("****************************************************");
System.out.println("* Joined the Kafka Farm: \n"+kafkaFarm);
System.out.println("* Joined the Kafka Farm: \n"+ kafkaPushFarm);
System.out.println("* Herd name: SkyNet");
System.out.println("****************************************************");

Expand Down
Loading
Loading