From 1af75574cef77066d5c3f97695c429b4585ea5b9 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 31 Jul 2024 18:30:49 -0700 Subject: [PATCH] tests: Add a simple pausable idempotent producer using Java API --- tests/java/verifiers/pom.xml | 5 + .../src/main/java/idempotency/App.java | 189 ++++++++++++++++++ .../verifiers/idempotency_load_generator.py | 147 ++++++++++++++ 3 files changed, 341 insertions(+) create mode 100644 tests/java/verifiers/src/main/java/idempotency/App.java create mode 100644 tests/rptest/transactions/verifiers/idempotency_load_generator.py diff --git a/tests/java/verifiers/pom.xml b/tests/java/verifiers/pom.xml index f30ce4af35ce5..f678f19c4b2df 100644 --- a/tests/java/verifiers/pom.xml +++ b/tests/java/verifiers/pom.xml @@ -30,6 +30,11 @@ kafka-clients 3.0.2 + + org.slf4j + slf4j-log4j12 + 1.7.12 + ${buildDir} diff --git a/tests/java/verifiers/src/main/java/idempotency/App.java b/tests/java/verifiers/src/main/java/idempotency/App.java new file mode 100644 index 0000000000000..dc5fe05786519 --- /dev/null +++ b/tests/java/verifiers/src/main/java/idempotency/App.java @@ -0,0 +1,189 @@ +package io.vectorized.idempotency; + +import static spark.Spark.*; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import java.lang.String; +import java.lang.Thread; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.Semaphore; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import spark.*; + +// A simple pausable idempotent producer for sanity testing a Java based +// producer from ducktape. This is a not a verfier but rather a simple pausable +// load generating utility that can start, pause and resume a single idempotency +// session using a single producer on demand. +// +// Supported REST APIs +// - /start-producer - start a new or resume existing idempotent producer +// session +// - /pause-producer - pauses the producer +// - /stop-producer - stops the producer +public class App { + + static Logger logger = Logger.getLogger(App.class); + + static void setupLogging() { + org.apache.log4j.BasicConfigurator.configure(); + Logger.getRootLogger().setLevel(Level.WARN); + Logger.getLogger("io.vectorized").setLevel(Level.DEBUG); + Logger.getLogger("org.apache.kafka").setLevel(Level.DEBUG); + } + + public static class Params { + public String brokers; + public String topic; + public long partitions; + } + + public static class Progress { + public long num_produced; + }; + + static Producer createIdempotentProducer(String brokers) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + props.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.LINGER_MS_CONFIG, 0); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); + props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + return new KafkaProducer(props); + } + + public static class JsonTransformer implements ResponseTransformer { + private Gson gson = new Gson(); + + @Override + public String render(Object model) { + return gson.toJson(model); + } + } + + volatile Params params = null; + volatile Thread produceThread = null; + volatile Semaphore sem = new Semaphore(1, true); + volatile boolean started = false; + volatile boolean stopped = false; + volatile Exception ex = null; + volatile long counter = 0; + + void produceLoop() { + var random = new Random(); + Producer idempotentProducer + = createIdempotentProducer(this.params.brokers); + while (!stopped) { + try { + sem.acquire(); + long partition + = random.longs(0, this.params.partitions).findFirst().getAsLong(); + String kv = Long.toString(counter); + ProducerRecord record + = new ProducerRecord<>(this.params.topic, kv, kv); + idempotentProducer.send(record).get(); + } catch (Exception e) { + ex = e; + logger.error("Exception in produce loop: ", e); + } finally { + sem.release(); + } + counter++; + } + idempotentProducer.close(); + } + + void run() throws Exception { + + port(8080); + + get("/ping", (req, res) -> { + res.status(200); + return ""; + }); + + get("/progress", (req, res) -> { + Progress progress = new Progress(); + progress.num_produced = counter; + return progress; + }, new JsonTransformer()); + + post("/start-producer", (req, res) -> { + if (this.started && !this.stopped) { + logger.info("Producer already started. unpausing it."); + if (this.sem.availablePermits() == 0) { + this.sem.release(); + } + res.status(200); + return ""; + } + logger.info("Starting producer"); + try { + this.params = (new Gson()).fromJson(req.body(), Params.class); + this.produceThread = new Thread(() -> { this.produceLoop(); }); + this.produceThread.start(); + + } catch (Exception e) { + logger.error("Exception starting produce thread ", e); + throw e; + } + this.started = true; + this.stopped = false; + res.status(200); + return ""; + }); + + post("/pause-producer", (req, res) -> { + if (!this.started) { + logger.info("Pause failed, not started."); + res.status(500); + return ""; + } + logger.info("Pausing producer"); + this.sem.acquire(); + res.status(200); + return ""; + }); + + post("/stop-producer", (req, res) -> { + logger.info("Stopping producer"); + this.stopped = true; + if (this.sem.availablePermits() == 0) { + this.sem.release(); + } + try { + if (produceThread != null) { + produceThread.join(); + } + } catch (Exception e) { + logger.error("Exception stopping producer", e); + throw e; + } + + if (ex != null) { + System.exit(1); + } + + res.status(200); + return ""; + }); + } + + public static void main(String[] args) throws Exception { + setupLogging(); + new App().run(); + } +} diff --git a/tests/rptest/transactions/verifiers/idempotency_load_generator.py b/tests/rptest/transactions/verifiers/idempotency_load_generator.py new file mode 100644 index 0000000000000..76e9b5e14708a --- /dev/null +++ b/tests/rptest/transactions/verifiers/idempotency_load_generator.py @@ -0,0 +1,147 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from ducktape.services.service import Service +from rptest.util import wait_until +import requests +import sys + +OUTPUT_LOG = "/opt/remote/var/pausable_idempotent_producer.log" + + +class IdempotencyClientFailure(Exception): + pass + + +class PausableIdempotentProducer(Service): + logs = { + "pausable_idempotent_producer_stdout_stderr": { + "path": OUTPUT_LOG, + "collect_default": True + } + } + + def __init__(self, context, redpanda): + super(PausableIdempotentProducer, self).__init__(context, num_nodes=1) + self._redpanda = redpanda + + def is_alive(self, node): + result = node.account.ssh_output( + "bash /opt/remote/control/alive.sh pausable_idempotent_producer") + result = result.decode("utf-8") + return "YES" in result + + def is_ready(self): + try: + self.remote_ping() + return True + except requests.exceptions.ConnectionError: + return False + + def raise_on_violation(self, node): + self.logger.info( + f"Scanning node {node.account.hostname} log for violations...") + + for line in node.account.ssh_capture( + f"grep -e Exception {OUTPUT_LOG} || true"): + raise IdempotencyClientFailure(line) + + def start_node(self, node, timeout_sec=10): + node.account.ssh( + f"bash /opt/remote/control/start.sh pausable_idempotent_producer \"java -cp /opt/verifiers/verifiers.jar io.vectorized.idempotency.App\"" + ) + wait_until( + lambda: self.is_alive(node), + timeout_sec=timeout_sec, + backoff_sec=1, + err_msg= + f"pausable_idempotent_producer service {node.account.hostname} failed to start within {timeout_sec} sec", + retry_on_exc=False) + self._node = node + wait_until( + lambda: self.is_ready(), + timeout_sec=timeout_sec, + backoff_sec=1, + err_msg= + f"pausable_idempotent_producer service {node.account.hostname} failed to become ready within {timeout_sec} sec", + retry_on_exc=False) + + def stop_node(self, node): + node.account.ssh( + "bash /opt/remote/control/stop.sh pausable_idempotent_producer") + self.raise_on_violation(node) + + def clean_node(self, node): + pass + + def wait_node(self, node, timeout_sec=sys.maxsize): + wait_until( + lambda: not (self.is_alive(node)), + timeout_sec=timeout_sec, + backoff_sec=1, + err_msg= + f"pausable_idempotent_producer service {node.account.hostname} failed to stop within {timeout_sec} sec", + retry_on_exc=False) + return True + + def remote_ping(self): + ip = self._node.account.hostname + r = requests.get(f"http://{ip}:8080/ping") + if r.status_code != 200: + raise Exception(f"unexpected status code: {r.status_code}") + + def start_producer(self, topic, partitions): + ip = self._node.account.hostname + r = requests.post(f"http://{ip}:8080/start-producer", + json={ + "brokers": self._redpanda.brokers(), + "topic": topic, + "partitions": partitions, + }) + if r.status_code != 200: + raise Exception( + f"unexpected status code: {r.status_code} content: {r.content}" + ) + + def get_progress(self): + ip = self._node.account.hostname + return requests.get(f"http://{ip}:8080/progress") + + def ensure_progress(self, expected=1000, timeout_sec=10): + def check_progress(): + r = self.get_progress() + if r.status_code != 200: + return False + output = r.json() + self._redpanda.logger.debug(f"progress response: {output}") + return output["num_produced"] >= expected + + wait_until( + check_progress, + timeout_sec=timeout_sec, + backoff_sec=1, + err_msg= + f"pausable_idempotent_producer service {self._node.account.hostname} failed to make progress in {timeout_sec} sec", + retry_on_exc=False) + + def pause_producer(self): + ip = self._node.account.hostname + r = requests.post(f"http://{ip}:8080/pause-producer") + if r.status_code != 200: + raise Exception( + f"unexpected status code: {r.status_code} content: {r.content}" + ) + + def stop_producer(self): + ip = self._node.account.hostname + r = requests.post(f"http://{ip}:8080/stop-producer") + if r.status_code != 200: + raise Exception( + f"unexpected status code: {r.status_code} content: {r.content}" + )