Skip to content

Commit

Permalink
tests: Add a simple pausable idempotent producer using Java API
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Aug 6, 2024
1 parent 3c36e7c commit 1af7557
Show file tree
Hide file tree
Showing 3 changed files with 341 additions and 0 deletions.
5 changes: 5 additions & 0 deletions tests/java/verifiers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
<artifactId>kafka-clients</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>
</dependency>
</dependencies>
<build>
<directory>${buildDir}</directory>
Expand Down
189 changes: 189 additions & 0 deletions tests/java/verifiers/src/main/java/idempotency/App.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package io.vectorized.idempotency;

import static spark.Spark.*;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.lang.String;
import java.lang.Thread;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Semaphore;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import spark.*;

// A simple pausable idempotent producer for sanity testing a Java based
// producer from ducktape. This is a not a verfier but rather a simple pausable
// load generating utility that can start, pause and resume a single idempotency
// session using a single producer on demand.
//
// Supported REST APIs
// - /start-producer - start a new or resume existing idempotent producer
// session
// - /pause-producer - pauses the producer
// - /stop-producer - stops the producer
public class App {

static Logger logger = Logger.getLogger(App.class);

static void setupLogging() {
org.apache.log4j.BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.WARN);
Logger.getLogger("io.vectorized").setLevel(Level.DEBUG);
Logger.getLogger("org.apache.kafka").setLevel(Level.DEBUG);
}

public static class Params {
public String brokers;
public String topic;
public long partitions;
}

public static class Progress {
public long num_produced;
};

static Producer<String, String> createIdempotentProducer(String brokers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new KafkaProducer<String, String>(props);
}

public static class JsonTransformer implements ResponseTransformer {
private Gson gson = new Gson();

@Override
public String render(Object model) {
return gson.toJson(model);
}
}

volatile Params params = null;
volatile Thread produceThread = null;
volatile Semaphore sem = new Semaphore(1, true);
volatile boolean started = false;
volatile boolean stopped = false;
volatile Exception ex = null;
volatile long counter = 0;

void produceLoop() {
var random = new Random();
Producer<String, String> idempotentProducer
= createIdempotentProducer(this.params.brokers);
while (!stopped) {
try {
sem.acquire();
long partition
= random.longs(0, this.params.partitions).findFirst().getAsLong();
String kv = Long.toString(counter);
ProducerRecord<String, String> record
= new ProducerRecord<>(this.params.topic, kv, kv);
idempotentProducer.send(record).get();
} catch (Exception e) {
ex = e;
logger.error("Exception in produce loop: ", e);
} finally {
sem.release();
}
counter++;
}
idempotentProducer.close();
}

void run() throws Exception {

port(8080);

get("/ping", (req, res) -> {
res.status(200);
return "";
});

get("/progress", (req, res) -> {
Progress progress = new Progress();
progress.num_produced = counter;
return progress;
}, new JsonTransformer());

post("/start-producer", (req, res) -> {
if (this.started && !this.stopped) {
logger.info("Producer already started. unpausing it.");
if (this.sem.availablePermits() == 0) {
this.sem.release();
}
res.status(200);
return "";
}
logger.info("Starting producer");
try {
this.params = (new Gson()).fromJson(req.body(), Params.class);
this.produceThread = new Thread(() -> { this.produceLoop(); });
this.produceThread.start();

} catch (Exception e) {
logger.error("Exception starting produce thread ", e);
throw e;
}
this.started = true;
this.stopped = false;
res.status(200);
return "";
});

post("/pause-producer", (req, res) -> {
if (!this.started) {
logger.info("Pause failed, not started.");
res.status(500);
return "";
}
logger.info("Pausing producer");
this.sem.acquire();
res.status(200);
return "";
});

post("/stop-producer", (req, res) -> {
logger.info("Stopping producer");
this.stopped = true;
if (this.sem.availablePermits() == 0) {
this.sem.release();
}
try {
if (produceThread != null) {
produceThread.join();
}
} catch (Exception e) {
logger.error("Exception stopping producer", e);
throw e;
}

if (ex != null) {
System.exit(1);
}

res.status(200);
return "";
});
}

public static void main(String[] args) throws Exception {
setupLogging();
new App().run();
}
}
147 changes: 147 additions & 0 deletions tests/rptest/transactions/verifiers/idempotency_load_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from ducktape.services.service import Service
from rptest.util import wait_until
import requests
import sys

OUTPUT_LOG = "/opt/remote/var/pausable_idempotent_producer.log"


class IdempotencyClientFailure(Exception):
pass


class PausableIdempotentProducer(Service):
logs = {
"pausable_idempotent_producer_stdout_stderr": {
"path": OUTPUT_LOG,
"collect_default": True
}
}

def __init__(self, context, redpanda):
super(PausableIdempotentProducer, self).__init__(context, num_nodes=1)
self._redpanda = redpanda

def is_alive(self, node):
result = node.account.ssh_output(
"bash /opt/remote/control/alive.sh pausable_idempotent_producer")
result = result.decode("utf-8")
return "YES" in result

def is_ready(self):
try:
self.remote_ping()
return True
except requests.exceptions.ConnectionError:
return False

def raise_on_violation(self, node):
self.logger.info(
f"Scanning node {node.account.hostname} log for violations...")

for line in node.account.ssh_capture(
f"grep -e Exception {OUTPUT_LOG} || true"):
raise IdempotencyClientFailure(line)

def start_node(self, node, timeout_sec=10):
node.account.ssh(
f"bash /opt/remote/control/start.sh pausable_idempotent_producer \"java -cp /opt/verifiers/verifiers.jar io.vectorized.idempotency.App\""
)
wait_until(
lambda: self.is_alive(node),
timeout_sec=timeout_sec,
backoff_sec=1,
err_msg=
f"pausable_idempotent_producer service {node.account.hostname} failed to start within {timeout_sec} sec",
retry_on_exc=False)
self._node = node
wait_until(
lambda: self.is_ready(),
timeout_sec=timeout_sec,
backoff_sec=1,
err_msg=
f"pausable_idempotent_producer service {node.account.hostname} failed to become ready within {timeout_sec} sec",
retry_on_exc=False)

def stop_node(self, node):
node.account.ssh(
"bash /opt/remote/control/stop.sh pausable_idempotent_producer")
self.raise_on_violation(node)

def clean_node(self, node):
pass

def wait_node(self, node, timeout_sec=sys.maxsize):
wait_until(
lambda: not (self.is_alive(node)),
timeout_sec=timeout_sec,
backoff_sec=1,
err_msg=
f"pausable_idempotent_producer service {node.account.hostname} failed to stop within {timeout_sec} sec",
retry_on_exc=False)
return True

def remote_ping(self):
ip = self._node.account.hostname
r = requests.get(f"http://{ip}:8080/ping")
if r.status_code != 200:
raise Exception(f"unexpected status code: {r.status_code}")

def start_producer(self, topic, partitions):
ip = self._node.account.hostname
r = requests.post(f"http://{ip}:8080/start-producer",
json={
"brokers": self._redpanda.brokers(),
"topic": topic,
"partitions": partitions,
})
if r.status_code != 200:
raise Exception(
f"unexpected status code: {r.status_code} content: {r.content}"
)

def get_progress(self):
ip = self._node.account.hostname
return requests.get(f"http://{ip}:8080/progress")

def ensure_progress(self, expected=1000, timeout_sec=10):
def check_progress():
r = self.get_progress()
if r.status_code != 200:
return False
output = r.json()
self._redpanda.logger.debug(f"progress response: {output}")
return output["num_produced"] >= expected

wait_until(
check_progress,
timeout_sec=timeout_sec,
backoff_sec=1,
err_msg=
f"pausable_idempotent_producer service {self._node.account.hostname} failed to make progress in {timeout_sec} sec",
retry_on_exc=False)

def pause_producer(self):
ip = self._node.account.hostname
r = requests.post(f"http://{ip}:8080/pause-producer")
if r.status_code != 200:
raise Exception(
f"unexpected status code: {r.status_code} content: {r.content}"
)

def stop_producer(self):
ip = self._node.account.hostname
r = requests.post(f"http://{ip}:8080/stop-producer")
if r.status_code != 200:
raise Exception(
f"unexpected status code: {r.status_code} content: {r.content}"
)

0 comments on commit 1af7557

Please sign in to comment.