diff --git a/docs/ops.html b/docs/ops.html index 61007a30b8f64..f804061007033 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -4318,7 +4318,7 @@

Try to send messages to the `tieredTopic` topic to roll the log segment:

-
$ bin/kafka-producer-perf-test.sh --topic tieredTopic --num-records 1200 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092
+
$ bin/kafka-producer-perf-test.sh --bootstrap-server localhost:9092 --topic tieredTopic --num-records 1200 --record-size 1024 --throughput -1

Then, after the active segment is rolled, the old segment should be moved to the remote storage and get deleted. This can be verified by checking the remote log directory configured above. For example: diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index acfe4790d731c..4a93541c7cbcf 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -91,7 +91,7 @@ def start_cmd(self, node): cmd += " export KAFKA_LOG4J_OPTS=\"%s%s\"; " % (get_log4j_config_param(node), get_log4j_config_for_tools(node)) cmd += "KAFKA_OPTS=%(kafka_opts)s KAFKA_HEAP_OPTS=\"-XX:+HeapDumpOnOutOfMemoryError\" %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \ - "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s %(metrics_props)s" % args + "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --command-property bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s %(metrics_props)s" % args self.security_config.setup_node(node) if self.security_config.security_protocol != SecurityConfig.PLAINTEXT: diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index d6ed1d0a4ef5f..fa21432b5870a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -81,7 +81,7 @@ void start(String[] args) throws IOException { System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary."); } boolean isSteadyState = false; - stats = new Stats(config.numRecords, isSteadyState); + stats = new Stats(config.numRecords, config.reportingInterval, isSteadyState); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs); @@ -101,7 +101,7 @@ record = new ProducerRecord<>(config.topicName, payload); long sendStartMs = System.currentTimeMillis(); if ((isSteadyState = config.warmupRecords > 0) && i == config.warmupRecords) { - steadyStateStats = new Stats(config.numRecords - config.warmupRecords, isSteadyState); + steadyStateStats = new Stats(config.numRecords - config.warmupRecords, config.reportingInterval, isSteadyState); stats.suppressPrinting(); } cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats); @@ -131,7 +131,7 @@ record = new ProducerRecord<>(config.topicName, payload); steadyStateStats.printTotal(); } } else { - // Make sure all messages are sent before printing out the stats and the metrics + // Make sure all records are sent before printing out the stats and the metrics // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py // expects this class to work with older versions of the client jar that don't support flush(). producer.flush(); @@ -177,7 +177,7 @@ static byte[] generateRandomPayload(Integer recordSize, List payloadByte } else if (payloadMonotonic) { payload = Long.toString(recordValue).getBytes(StandardCharsets.UTF_8); } else { - throw new IllegalArgumentException("no payload File Path or record Size or payload-monotonic option provided"); + throw new IllegalArgumentException("No payload file, record size or payload-monotonic option provided."); } return payload; } @@ -221,7 +221,7 @@ static List readPayloadFile(String payloadFilePath, String payloadDelimi } } - System.out.println("Number of messages read: " + payloadByteList.size()); + System.out.println("Number of records read: " + payloadByteList.size()); } return payloadByteList; @@ -230,24 +230,34 @@ static List readPayloadFile(String payloadFilePath, String payloadDelimi /** Get the command-line argument parser. */ static ArgumentParser argParser() { ArgumentParser parser = ArgumentParsers - .newArgumentParser("producer-performance") + .newArgumentParser("kafka-producer-perf-test") .defaultHelp(true) .description("This tool is used to verify the producer performance. To enable transactions, " + - "you can specify a transaction id or set a transaction duration using --transaction-duration-ms. " + - "There are three ways to specify the transaction id: set transaction.id= via --producer-props, " + - "set transaction.id= in the config file via --producer.config, or use --transaction-id ."); + "you can specify a transactional id or set a transaction duration using --transaction-duration-ms. " + + "There are three ways to specify the transactional id: set transactional.id= via --command-property, " + + "set transactional.id= in the config file via --command-config, or use --transactional-id ."); + + parser.addArgument("--bootstrap-server") + .action(store()) + .required(false) + .type(String.class) + .metavar("BOOTSTRAP-SERVERS") + .dest("bootstrapServers") + .help("The server(s) to connect to. This config takes precedence over bootstrap.servers specified " + + "via --command-property or --command-config."); MutuallyExclusiveGroup payloadOptions = parser .addMutuallyExclusiveGroup() .required(true) - .description("either --record-size or --payload-file must be specified but not both."); + .description("Note that you must provide exactly one of --record-size, --payload-file " + + "or --payload-monotonic."); parser.addArgument("--topic") .action(store()) .required(true) .type(String.class) .metavar("TOPIC") - .help("produce messages to this topic"); + .help("Produce records to this topic."); parser.addArgument("--num-records") .action(store()) @@ -255,7 +265,7 @@ static ArgumentParser argParser() { .type(Long.class) .metavar("NUM-RECORDS") .dest("numRecords") - .help("number of messages to produce"); + .help("Number of records to produce."); payloadOptions.addArgument("--record-size") .action(store()) @@ -263,7 +273,7 @@ static ArgumentParser argParser() { .type(Integer.class) .metavar("RECORD-SIZE") .dest("recordSize") - .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file " + + .help("Record size in bytes. Note that you must provide exactly one of --record-size, --payload-file " + "or --payload-monotonic."); payloadOptions.addArgument("--payload-file") @@ -272,17 +282,17 @@ static ArgumentParser argParser() { .type(String.class) .metavar("PAYLOAD-FILE") .dest("payloadFile") - .help("file to read the message payloads from. This works only for UTF-8 encoded text files. " + - "Payloads will be read from this file and a payload will be randomly selected when sending messages. " + - "Note that you must provide exactly one of --record-size or --payload-file or --payload-monotonic."); + .help("File to read the record payloads from. This works only for UTF-8 encoded text files. " + + "Payloads will be read from this file and a payload will be randomly selected when sending records. " + + "Note that you must provide exactly one of --record-size, --payload-file or --payload-monotonic."); payloadOptions.addArgument("--payload-monotonic") .action(storeTrue()) .type(Boolean.class) .metavar("PAYLOAD-MONOTONIC") .dest("payloadMonotonic") - .help("payload is monotonically increasing integer. Note that you must provide exactly one of --record-size " + - "or --payload-file or --payload-monotonic."); + .help("Payload is a monotonically increasing integer. Note that you must provide exactly one of --record-size, " + + "--payload-file or --payload-monotonic."); parser.addArgument("--payload-delimiter") .action(store()) @@ -291,8 +301,7 @@ static ArgumentParser argParser() { .metavar("PAYLOAD-DELIMITER") .dest("payloadDelimiter") .setDefault("\\n") - .help("provides delimiter to be used when --payload-file is provided. " + - "Defaults to new line. " + + .help("Provides the delimiter to be used when --payload-file is provided. Defaults to new line. " + "Note that this parameter will be ignored if --payload-file is not provided."); parser.addArgument("--throughput") @@ -300,16 +309,26 @@ static ArgumentParser argParser() { .required(true) .type(Double.class) .metavar("THROUGHPUT") - .help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling."); + .help("Throttle maximum record throughput to *approximately* THROUGHPUT records/sec. Set this to -1 to disable throttling."); parser.addArgument("--producer-props") - .nargs("+") - .required(false) - .metavar("PROP-NAME=PROP-VALUE") - .type(String.class) - .dest("producerConfig") - .help("kafka producer related configuration properties like bootstrap.servers,client.id etc. " + - "These configs take precedence over those passed via --producer.config."); + .nargs("+") + .required(false) + .metavar("PROP-NAME=PROP-VALUE") + .type(String.class) + .dest("producerConfig") + .help("(DEPRECATED) Kafka producer related configuration properties like client.id. " + + "These configs take precedence over those passed via --command-config or --producer.config. " + + "This option will be removed in a future version. Use --command-property instead."); + + parser.addArgument("--command-property") + .nargs("+") + .required(false) + .metavar("PROP-NAME=PROP-VALUE") + .type(String.class) + .dest("commandProperties") + .help("Kafka producer related configuration properties like client.id. " + + "These configs take precedence over those passed via --command-config or --producer.config."); parser.addArgument("--producer.config") .action(store()) @@ -317,46 +336,64 @@ static ArgumentParser argParser() { .type(String.class) .metavar("CONFIG-FILE") .dest("producerConfigFile") - .help("producer config properties file."); + .help("(DEPRECATED) Producer config properties file. " + + "This option will be removed in a future version. Use --command-config instead."); + + parser.addArgument("--command-config") + .action(store()) + .required(false) + .type(String.class) + .metavar("CONFIG-FILE") + .dest("commandConfigFile") + .help("Producer config properties file."); parser.addArgument("--print-metrics") .action(storeTrue()) .type(Boolean.class) .metavar("PRINT-METRICS") .dest("printMetrics") - .help("print out metrics at the end of the test."); + .help("Print out metrics at the end of the test."); parser.addArgument("--transactional-id") - .action(store()) - .required(false) - .type(String.class) - .metavar("TRANSACTIONAL-ID") - .dest("transactionalId") - .help("The transactional id to use. This config takes precedence over the transactional.id " + - "specified via --producer.config or --producer-props. Note that if the transactional id " + - "is not specified while --transaction-duration-ms is provided, the default value for the " + - "transactional id will be performance-producer- followed by a random uuid."); + .action(store()) + .required(false) + .type(String.class) + .metavar("TRANSACTIONAL-ID") + .dest("transactionalId") + .help("The transactional id to use. This config takes precedence over the transactional.id " + + "specified via --command-property or --command-config. Note that if the transactional id " + + "is not specified while --transaction-duration-ms is provided, the default value for the " + + "transactional id will be performance-producer- followed by a random uuid."); parser.addArgument("--transaction-duration-ms") - .action(store()) - .required(false) - .type(Long.class) - .metavar("TRANSACTION-DURATION") - .dest("transactionDurationMs") - .help("The max age of each transaction. The commitTransaction will be called after this time has elapsed. " + - "The value should be greater than 0. If the transactional id is specified via --producer-props, " + - "--producer.config, or --transactional-id but --transaction-duration-ms is not specified, " + - "the default value will be 3000."); + .action(store()) + .required(false) + .type(Long.class) + .metavar("TRANSACTION-DURATION") + .dest("transactionDurationMs") + .help("The maximum duration of each transaction. The commitTransaction will be called after this time has elapsed. " + + "The value should be greater than 0. If the transactional id is specified via --command-property, " + + "--command-config or --transactional-id but --transaction-duration-ms is not specified, " + + "the default value will be 3000."); parser.addArgument("--warmup-records") - .action(store()) - .required(false) - .type(Long.class) - .metavar("WARMUP-RECORDS") - .dest("warmupRecords") - .setDefault(0L) - .help("The number of records to treat as warmup; these initial records will not be included in steady-state statistics. " + - "An additional summary line will be printed describing the steady-state statistics. (default: 0)."); + .action(store()) + .required(false) + .type(Long.class) + .metavar("WARMUP-RECORDS") + .dest("warmupRecords") + .setDefault(0L) + .help("The number of records to treat as warmup. These initial records will not be included in steady-state statistics. " + + "An additional summary line will be printed describing the steady-state statistics."); + + parser.addArgument("--reporting-interval") + .action(store()) + .required(false) + .type(Long.class) + .metavar("INTERVAL-MS") + .dest("reportingInterval") + .setDefault(5_000L) + .help("Interval in milliseconds at which to print progress info."); return parser; } @@ -381,7 +418,7 @@ static class Stats { private final boolean isSteadyState; private boolean suppressPrint; - public Stats(long numRecords, boolean isSteadyState) { + public Stats(long numRecords, long reportingInterval, boolean isSteadyState) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.iteration = 0; @@ -394,7 +431,7 @@ public Stats(long numRecords, boolean isSteadyState) { this.windowTotalLatency = 0; this.windowBytes = 0; this.totalLatency = 0; - this.reportingInterval = 5000; + this.reportingInterval = reportingInterval; this.isSteadyState = isSteadyState; this.suppressPrint = false; } @@ -529,6 +566,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { } static final class ConfigPostProcessor { + final String bootstrapServers; final String topicName; final long numRecords; final long warmupRecords; @@ -540,9 +578,11 @@ static final class ConfigPostProcessor { final Long transactionDurationMs; final boolean transactionsEnabled; final List payloadByteList; + final long reportingInterval; public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOException, ArgumentParserException { Namespace namespace = parser.parseArgs(args); + this.bootstrapServers = namespace.getString("bootstrapServers"); this.topicName = namespace.getString("topic"); this.numRecords = namespace.getLong("numRecords"); this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0); @@ -550,33 +590,56 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept this.throughput = namespace.getDouble("throughput"); this.payloadMonotonic = namespace.getBoolean("payloadMonotonic"); this.shouldPrintMetrics = namespace.getBoolean("printMetrics"); + this.reportingInterval = namespace.getLong("reportingInterval"); List producerConfigs = namespace.getList("producerConfig"); String producerConfigFile = namespace.getString("producerConfigFile"); + List commandProperties = namespace.getList("commandProperties"); + String commandConfigFile = namespace.getString("commandConfigFile"); String payloadFilePath = namespace.getString("payloadFile"); Long transactionDurationMsArg = namespace.getLong("transactionDurationMs"); String transactionIdArg = namespace.getString("transactionalId"); if (numRecords <= 0) { - throw new ArgumentParserException("--num-records should be greater than zero", parser); + throw new ArgumentParserException("--num-records should be greater than zero.", parser); } if (warmupRecords >= numRecords) { throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser); } if (recordSize != null && recordSize <= 0) { - throw new ArgumentParserException("--record-size should be greater than zero", parser); + throw new ArgumentParserException("--record-size should be greater than zero.", parser); + } + if (bootstrapServers == null && commandProperties == null && producerConfigs == null && producerConfigFile == null && commandConfigFile == null) { + throw new ArgumentParserException("At least one of --bootstrap-server, --command-property, --producer-props, --producer.config or --command-config must be specified.", parser); + } + if (commandProperties != null && producerConfigs != null) { + throw new ArgumentParserException("--command-property and --producer-props cannot be specified together.", parser); } - if (producerConfigs == null && producerConfigFile == null) { - throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser); + if (commandConfigFile != null && producerConfigFile != null) { + throw new ArgumentParserException("--command-config and --producer.config cannot be specified together.", parser); } if (transactionDurationMsArg != null && transactionDurationMsArg <= 0) { - throw new ArgumentParserException("--transaction-duration-ms should be greater than zero", parser); + throw new ArgumentParserException("--transaction-duration-ms should be greater than zero.", parser); + } + if (reportingInterval <= 0) { + throw new ArgumentParserException("--reporting-interval should be greater than zero.", parser); } // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here. String payloadDelimiter = namespace.getString("payloadDelimiter").equals("\\n") ? "\n" : namespace.getString("payloadDelimiter"); this.payloadByteList = readPayloadFile(payloadFilePath, payloadDelimiter); - this.producerProps = readProps(producerConfigs, producerConfigFile); + if (producerConfigs != null) { + System.out.println("Option --producer-props has been deprecated and will be removed in a future version. Use --command-property instead."); + commandProperties = producerConfigs; + } + if (producerConfigFile != null) { + System.out.println("Option --producer.config has been deprecated and will be removed in a future version. Use --command-config instead."); + commandConfigFile = producerConfigFile; + } + this.producerProps = readProps(commandProperties, commandConfigFile); + if (bootstrapServers != null) { + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } // setup transaction related configs this.transactionsEnabled = transactionDurationMsArg != null || transactionIdArg != null diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index 7217c2101a00a..007a22a80202c 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -188,7 +188,7 @@ public void testNumberOfCallsForSendAndClose() throws IOException { "--num-records", "5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(5)).send(any(), any()); verify(producerMock, times(1)).close(); @@ -205,7 +205,7 @@ public void testEnableTransaction() throws IOException { "--throughput", "100", "--record-size", "100", "--transactional-id", "foobar", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(1)).beginTransaction(); verify(producerMock, times(1)).commitTransaction(); @@ -225,7 +225,7 @@ public void testNumberOfSuccessfulSendAndClose() throws IOException { "--num-records", "10", "--throughput", "1", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(10)).send(any(), any()); @@ -246,7 +246,7 @@ public void testNumberOfFailedSendAndClose() throws IOException { "--num-records", "10", "--throughput", "1", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(10)).send(any(), any()); @@ -263,7 +263,7 @@ public void testMutuallyExclusiveGroup() { "--throughput", "100", "--record-size", "100", "--payload-monotonic", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser1 = ProducerPerformance.argParser(); ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser1.parseArgs(args1)); assertEquals("argument --payload-monotonic: not allowed with argument --record-size", thrown.getMessage()); @@ -274,7 +274,7 @@ public void testMutuallyExclusiveGroup() { "--throughput", "100", "--payload-file", "abc.txt", "--payload-monotonic", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser2 = ProducerPerformance.argParser(); thrown = assertThrows(ArgumentParserException.class, () -> parser2.parseArgs(args2)); assertEquals("argument --payload-monotonic: not allowed with argument --payload-file", thrown.getMessage()); @@ -287,8 +287,8 @@ public void testUnexpectedArg() { "--topic", "Hello-Kafka", "--num-records", "5", "--throughput", "100", - "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--record-size", "100", + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser = ProducerPerformance.argParser(); ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); assertEquals("unrecognized arguments: '--test'", thrown.getMessage()); @@ -301,7 +301,7 @@ public void testFractionalThroughput() { "--num-records", "5", "--throughput", "1.25", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser = ProducerPerformance.argParser(); assertDoesNotThrow(() -> parser.parseArgs(args)); } @@ -354,7 +354,7 @@ public void testGenerateRandomPayloadException() { SplittableRandom random = new SplittableRandom(0); IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random, false, 0L)); - assertEquals("no payload File Path or record Size or payload-monotonic option provided", thrown.getMessage()); + assertEquals("No payload file, record size or payload-monotonic option provided.", thrown.getMessage()); } @Test @@ -380,14 +380,14 @@ public void testDefaultClientId() throws Exception { @Test public void testStatsInitializationWithLargeNumRecords() { long numRecords = Long.MAX_VALUE; - assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, false)); + assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, 5000L, false)); } @Test public void testStatsCorrectness() throws Exception { ExecutorService singleThreaded = Executors.newSingleThreadExecutor(); final long numRecords = 1000000; - ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, false); + ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, 5000L, false); for (long i = 0; i < numRecords; i++) { final Callback callback = new ProducerPerformance.PerfCallback(0, 100, stats, null); CompletableFuture.runAsync(() -> callback.onCompletion(null, null), singleThreaded); @@ -412,11 +412,12 @@ public void testConfigPostProcessor() throws IOException, ArgumentParserExceptio "--throughput", "100", "--record-size", "100", "--print-metrics", - "--producer-props", "bootstrap.servers=localhost:9000", + "--bootstrap-server", "localhost:9000", "--transactional-id", "foobar", "--transaction-duration-ms", "5000", }; ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9000", configs.bootstrapServers); assertEquals("Hello-Kafka", configs.topicName); assertEquals(5, configs.numRecords); assertEquals(100, configs.throughput); @@ -438,28 +439,28 @@ public void testInvalidConfigPostProcessor() { "--num-records", "5", "--throughput", "100", "--record-size", "100"}; - assertEquals("Either --producer-props or --producer.config must be specified.", - assertThrows(ArgumentParserException.class, - () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidProducerProps)).getMessage()); + assertEquals("At least one of --bootstrap-server, --command-property, --producer-props, --producer.config or --command-config must be specified.", + assertThrows(ArgumentParserException.class, + () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidProducerProps)).getMessage()); String[] invalidTransactionDurationMs = new String[]{ "--topic", "Hello-Kafka", "--num-records", "5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000", + "--bootstrap-server", "localhost:9000", "--transaction-duration-ms", "0"}; - assertEquals("--transaction-duration-ms should be greater than zero", - assertThrows(ArgumentParserException.class, - () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidTransactionDurationMs)).getMessage()); + assertEquals("--transaction-duration-ms should be greater than zero.", + assertThrows(ArgumentParserException.class, + () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidTransactionDurationMs)).getMessage()); String[] invalidNumRecords = new String[]{ "--topic", "Hello-Kafka", "--num-records", "-5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; - assertEquals("--num-records should be greater than zero", + "--bootstrap-server", "localhost:9000"}; + assertEquals("--num-records should be greater than zero.", assertThrows(ArgumentParserException.class, () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidNumRecords)).getMessage()); @@ -468,10 +469,72 @@ public void testInvalidConfigPostProcessor() { "--num-records", "5", "--throughput", "100", "--record-size", "-100", - "--producer-props", "bootstrap.servers=localhost:9000"}; - assertEquals("--record-size should be greater than zero", + "--bootstrap-server", "localhost:9000"}; + assertEquals("--record-size should be greater than zero.", assertThrows(ArgumentParserException.class, () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidRecordSize)).getMessage()); + + String[] invalidReportingInterval = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--reporting-interval", "0", + "--bootstrap-server", "localhost:9000"}; + assertEquals("--reporting-interval should be greater than zero.", + assertThrows(ArgumentParserException.class, + () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidReportingInterval)).getMessage()); + } + + @Test + public void testBootstrapServer() throws IOException, ArgumentParserException { + ArgumentParser parser = ProducerPerformance.argParser(); + String[] args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--bootstrap-server", "localhost:9000"}; + ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9000", configs.producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--command-property", "bootstrap.servers=localhost:9001"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9001", configs.producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--bootstrap-server", "localhost:9000", + "--command-property", "bootstrap.servers=localhost:9001"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9000", configs.producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9001"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9001", configs.producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--bootstrap-server", "localhost:9000", + "--producer-props", "bootstrap.servers=localhost:9001"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9000", configs.producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } @Test @@ -482,7 +545,7 @@ public void testNoTransactionRelatedConfigs() throws IOException, ArgumentParser "--num-records", "5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertFalse(configs.transactionsEnabled); assertNull(configs.transactionDurationMs); @@ -490,14 +553,15 @@ public void testNoTransactionRelatedConfigs() throws IOException, ArgumentParser } @Test - public void testEnableTransactionByProducerProps() throws IOException, ArgumentParserException { + public void testEnableTransactionByProducerProperty() throws IOException, ArgumentParserException { ArgumentParser parser = ProducerPerformance.argParser(); String[] args = new String[]{ "--topic", "Hello-Kafka", "--num-records", "5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000", "transactional.id=foobar"}; + "--bootstrap-server", "localhost:9000", + "--command-property", "transactional.id=foobar"}; ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertTrue(configs.transactionsEnabled); assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); @@ -513,8 +577,8 @@ public void testEnableTransactionByTransactionId() throws IOException, ArgumentP "--num-records", "5", "--throughput", "100", "--record-size", "100", - "--producer.config", producerConfigFile.getAbsolutePath(), - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000", + "--command-config", producerConfigFile.getAbsolutePath()}; ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertTrue(configs.transactionsEnabled); assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); @@ -525,8 +589,55 @@ public void testEnableTransactionByTransactionId() throws IOException, ArgumentP "--num-records", "5", "--throughput", "100", "--record-size", "100", + "--bootstrap-server", "localhost:9000", + "--command-config", producerConfigFile.getAbsolutePath(), + "--command-property", "transactional.id=hello_kafka"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertTrue(configs.transactionsEnabled); + assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); + assertEquals("hello_kafka", configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--transactional-id", "kafka_hello", + "--bootstrap-server", "localhost:9000", + "--command-config", producerConfigFile.getAbsolutePath(), + "--command-property", "transactional.id=hello_kafka"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertTrue(configs.transactionsEnabled); + assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); + assertEquals("kafka_hello", configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + + Utils.delete(producerConfigFile); + } + + @Test + public void testEnableTransactionByTransactionIdDeprecated() throws IOException, ArgumentParserException { + File producerConfigFile = createTempFile("transactional.id=foobar"); + ArgumentParser parser = ProducerPerformance.argParser(); + String[] args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--bootstrap-server", "localhost:9000", + "--producer.config", producerConfigFile.getAbsolutePath()}; + ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertTrue(configs.transactionsEnabled); + assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); + assertEquals("foobar", configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--bootstrap-server", "localhost:9000", "--producer.config", producerConfigFile.getAbsolutePath(), - "--producer-props", "bootstrap.servers=localhost:9000", "transactional.id=hello_kafka"}; + "--producer-props", "transactional.id=hello_kafka"}; configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertTrue(configs.transactionsEnabled); assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); @@ -538,8 +649,9 @@ public void testEnableTransactionByTransactionId() throws IOException, ArgumentP "--throughput", "100", "--record-size", "100", "--transactional-id", "kafka_hello", + "--bootstrap-server", "localhost:9000", "--producer.config", producerConfigFile.getAbsolutePath(), - "--producer-props", "bootstrap.servers=localhost:9000", "transactional.id=hello_kafka"}; + "--producer-props", "transactional.id=hello_kafka"}; configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertTrue(configs.transactionsEnabled); assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); @@ -548,6 +660,31 @@ public void testEnableTransactionByTransactionId() throws IOException, ArgumentP Utils.delete(producerConfigFile); } + @Test + public void testEnsureDeprecatedAndModernArgumentsNotBothSpecified() throws IOException { + File producerConfigFile = createTempFile("bootstrap.servers=localhost:9000"); + String[] args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--producer.config", producerConfigFile.getAbsolutePath(), + "--command-config", producerConfigFile.getAbsolutePath()}; + ArgumentParser parser = ProducerPerformance.argParser(); + assertThrows(ArgumentParserException.class, () -> new ProducerPerformance.ConfigPostProcessor(parser, args)); + + String[] args2 = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9090", + "--command-property", "bootstrap.servers=localhost:9090"}; + assertThrows(ArgumentParserException.class, () -> new ProducerPerformance.ConfigPostProcessor(parser, args2)); + + Utils.delete(producerConfigFile); + } + @Test public void testEnableTransactionByTransactionDurationMs() throws IOException, ArgumentParserException { ArgumentParser parser = ProducerPerformance.argParser(); @@ -557,7 +694,7 @@ public void testEnableTransactionByTransactionDurationMs() throws IOException, A "--throughput", "100", "--record-size", "100", "--transaction-duration-ms", "5000", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertTrue(configs.transactionsEnabled); assertEquals(5000, configs.transactionDurationMs); @@ -566,28 +703,28 @@ public void testEnableTransactionByTransactionDurationMs() throws IOException, A } @Test - public void testWarmupRecordsFractionalValue() throws Exception { + public void testWarmupRecordsFractionalValue() { String[] args = new String[] { "--topic", "Hello-Kafka", "--num-records", "10", "--warmup-records", "1.5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser = ProducerPerformance.argParser(); ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); thrown.printStackTrace(); } @Test - public void testWarmupRecordsString() throws Exception { + public void testWarmupRecordsString() { String[] args = new String[] { "--topic", "Hello-Kafka", "--num-records", "10", "--warmup-records", "foo", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser = ProducerPerformance.argParser(); ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); thrown.printStackTrace(); @@ -607,7 +744,7 @@ public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException { "--warmup-records", "2", "--throughput", "1", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(10)).send(any(), any()); @@ -630,7 +767,7 @@ public void testWarmupNegativeRecordsNormalTest() throws IOException { "--warmup-records", "-1", "--throughput", "1", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(10)).send(any(), any());