Skip to content
Merged
14 changes: 6 additions & 8 deletions docs/streams/developer-guide/app-reset-tool.html
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,18 @@ <h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-ru
<p>The tool accepts the following parameters:</p>
<pre><code class="language-text">Option (* = required) Description
--------------------- -----------
* --application-id &lt;String: id&gt; The Kafka Streams application ID
* --application-id &lt;String: id&gt; REQUIRED: The Kafka Streams application ID
(application.id).
--bootstrap-server &lt;String: server to REQUIRED unless --bootstrap-servers
connect to&gt; (deprecated) is specified. The server
(s) to connect to. The broker list
string in the form HOST1:PORT1,HOST2:
PORT2.
--bootstrap-server &lt;String: server to The server(s) to connect to.
connect to&gt; The broker list string in the form HOST1:PORT1,HOST2:PORT2.
(default: localhost:9092)
--by-duration &lt;String: urls&gt; Reset offsets to offset by duration from
current timestamp. Format: &#39;PnDTnHnMnS&#39;
--config-file &lt;String: file name&gt; (Deprecated) Property file containing configs to be
--config-file &lt;String: file name&gt; (Deprecated) Property file containing configs to be
passed to admin clients and embedded consumer.
This option will be removed in a future version.
Use --command-config instead.
--command-config &lt;String: file name&gt; Config properties file to be passed to admin clients
--command-config &lt;String: file name&gt; Config properties file to be passed to admin clients
and embedded consumer.
--dry-run Display the actions that would be
performed without executing the reset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ static ArgumentParser argParser() {
.action(store())
.required(false)
.type(String.class)
.metavar("BOOTSTRAP-SERVERS")
.dest("bootstrapServers")
.metavar("BOOTSTRAP-SERVER")
.dest("bootstrapServer")
.help("The server(s) to connect to. This config takes precedence over bootstrap.servers specified " +
"via --command-property or --command-config.");

Expand Down Expand Up @@ -566,7 +566,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
}

static final class ConfigPostProcessor {
final String bootstrapServers;
final String bootstrapServer;
final String topicName;
final long numRecords;
final long warmupRecords;
Expand All @@ -582,7 +582,7 @@ static final class ConfigPostProcessor {

public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOException, ArgumentParserException {
Namespace namespace = parser.parseArgs(args);
this.bootstrapServers = namespace.getString("bootstrapServers");
this.bootstrapServer = namespace.getString("bootstrapServer");
this.topicName = namespace.getString("topic");
this.numRecords = namespace.getLong("numRecords");
this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0);
Expand All @@ -608,7 +608,7 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
if (recordSize != null && recordSize <= 0) {
throw new ArgumentParserException("--record-size should be greater than zero.", parser);
}
if (bootstrapServers == null && commandProperties == null && producerConfigs == null && producerConfigFile == null && commandConfigFile == null) {
if (bootstrapServer == null && commandProperties == null && producerConfigs == null && producerConfigFile == null && commandConfigFile == null) {
throw new ArgumentParserException("At least one of --bootstrap-server, --command-property, --producer-props, --producer.config or --command-config must be specified.", parser);
}
if (commandProperties != null && producerConfigs != null) {
Expand Down Expand Up @@ -637,8 +637,8 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
commandConfigFile = producerConfigFile;
}
this.producerProps = readProps(commandProperties, commandConfigFile);
if (bootstrapServers != null) {
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
if (bootstrapServer != null) {
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
}
// setup transaction related configs
this.transactionsEnabled = transactionDurationMsArg != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ private static class StreamsResetterOptions extends CommandDefaultOptions {

public StreamsResetterOptions(String[] args) {
super(args);
applicationIdOption = parser.accepts("application-id", "The Kafka Streams application ID (application.id).")
applicationIdOption = parser.accepts("application-id", "REQUIRED: The Kafka Streams application ID (application.id).")
.withRequiredArg()
.ofType(String.class)
.describedAs("id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ public void testConfigPostProcessor() throws IOException, ArgumentParserExceptio
"--transaction-duration-ms", "5000",
};
ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args);
assertEquals("localhost:9000", configs.bootstrapServers);
assertEquals("localhost:9000", configs.bootstrapServer);
assertEquals("Hello-Kafka", configs.topicName);
assertEquals(5, configs.numRecords);
assertEquals(100, configs.throughput);
Expand Down