diff --git a/src/examples/java/io/nats/examples/ExampleUtils.java b/src/examples/java/io/nats/examples/ExampleUtils.java index e2c55d327..53418a616 100644 --- a/src/examples/java/io/nats/examples/ExampleUtils.java +++ b/src/examples/java/io/nats/examples/ExampleUtils.java @@ -15,9 +15,10 @@ import io.nats.client.*; import io.nats.client.StreamConfiguration.StorageType; +import io.nats.client.impl.JetStreamApiException; +import java.io.IOException; import java.time.Duration; -import java.util.concurrent.TimeoutException; public class ExampleUtils { public static Options createExampleOptions(String server, boolean allowReconnect) throws Exception { @@ -58,15 +59,20 @@ public void slowConsumerDetected(Connection conn, Consumer consumer) { } public static void createTestStream(JetStream js, String streamName, String subject) - throws TimeoutException, InterruptedException { + throws IOException, JetStreamApiException { + createTestStream(js, streamName, subject, StorageType.File); + } + + public static void createTestStream(JetStream js, String streamName, String subject, StorageType storageType) + throws IOException, JetStreamApiException { // Create a stream, here will use a file storage type, and one subject, // the passed subject. - StreamConfiguration sc = StreamConfiguration.builder(). - name(streamName). - storageType(StorageType.File). - subjects(new String[] { subject }). - build(); + StreamConfiguration sc = StreamConfiguration.builder() + .name(streamName) + .storageType(storageType) + .subjects(subject) + .build(); // Add or use an existing stream. StreamInfo si = js.addStream(sc); @@ -104,6 +110,15 @@ public static ExampleArgs readReplyArgs(String[] args, String usageString) { return readSubscribeArgs(args, usageString); } + public static ExampleArgs readConsumerArgs(String[] args, String usageString) { + ExampleArgs ea = new ExampleArgs(args, false, usageString); + if (ea.msgCount < 1 || ea.stream == null || ea.consumer == null) { + System.out.println("Stream name and consumer name are required to attach.\nSubject and message count are required.\n"); + usage(usageString); + } + return ea; + } + private static void usage(String usageString) { System.out.println(usageString); System.exit(-1); diff --git a/src/examples/java/io/nats/examples/NatsJsManage.java b/src/examples/java/io/nats/examples/NatsJsManage.java new file mode 100644 index 000000000..ff9902a7f --- /dev/null +++ b/src/examples/java/io/nats/examples/NatsJsManage.java @@ -0,0 +1,132 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples; + +import io.nats.client.*; + +import static io.nats.client.support.DebugUtil.printable; + +public class NatsJsManage { + + static final String usageString = "\nUsage: java NatsJsManage [-s server]\n" + + "\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n" + + "\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n" + + "\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n" + + "\nUse the URL for user/pass/token authentication.\n"; + + private static final String STREAM1 = "example-stream-1"; + private static final String STREAM2 = "example-stream-2"; + private static final String STRM1SUB1 = "strm1sub1"; + private static final String STRM1SUB2 = "strm1sub2"; + private static final String STRM2SUB1 = "strm2sub1"; + private static final String STRM2SUB2 = "strm2sub2"; + + public static void main(String[] args) { + ExampleArgs exArgs = new ExampleArgs(args, false, usageString); + + try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server, false))) { + // Create a jetstream context. This hangs off the original connection + // allowing us to produce data to streams and consume data from + // jetstream consumers. + JetStream js = nc.jetStream(); + + action("Configure And Add Stream 1"); + StreamConfiguration sc1 = StreamConfiguration.builder() + .name(STREAM1) + .storageType(StreamConfiguration.StorageType.Memory) + .subjects(STRM1SUB1, STRM1SUB2) + .build(); + js.addStream(sc1); + + StreamInfo si = js.streamInfo(STREAM1); + printObject(si); + + action("Configure And Add Stream 2"); + StreamConfiguration sc2 = StreamConfiguration.builder() + .name(STREAM2) + .storageType(StreamConfiguration.StorageType.Memory) + .subjects(STRM2SUB1, STRM2SUB2) + .build(); + js.addStream(sc2); + + si = js.streamInfo(STREAM2); + printObject(si); + + action("Delete Stream 2"); + js.deleteStream(STREAM2); + + action("Delete Non-Existent Stream"); + try { + js.deleteStream(STREAM2); + } + catch (IllegalStateException ise) { + System.out.println(ise.getMessage()); + } + + action("Update Non-Existent Stream "); + try { + StreamConfiguration non = StreamConfiguration.builder() + .name(STREAM2) + .storageType(StreamConfiguration.StorageType.Memory) + .subjects(STRM2SUB1, STRM2SUB2) + .build(); + js.updateStream(non); + } + catch (IllegalStateException ise) { + System.out.println(ise.getMessage()); + } + + action("Configure And Add Consumer"); + ConsumerConfiguration cc = ConsumerConfiguration.builder() + .deliverSubject("strm1-deliver") + .durable("GQJ3IvWo") + .build(); + ConsumerInfo ci = js.addConsumer(STREAM1, cc); + printObject(ci); + si = js.streamInfo(STREAM1); + printObject(si); + + action("Make And Use Subscription"); + SubscribeOptions so = SubscribeOptions.builder() + .pullDirect(STREAM1, "GQJ3IvWo", 10) +// .configuration(STREAM1, cc) + .build(); + printObject(so); + + si = js.streamInfo(STREAM1); + printObject(si); + + JetStreamSubscription sub = js.subscribe(STRM1SUB1, so); + printObject(sub); + + action("List Consumers"); + ConsumerLister lister = js.newConsumerLister(STREAM1); + printObject(lister); + } + catch (Exception exp) { + action("EXCEPTION!!!"); + exp.printStackTrace(); + } + } + + private static void action(String label) { + System.out.println("================================================================================"); + System.out.println(label); + System.out.println("--------------------------------------------------------------------------------"); + } + + public static void printObject(Object o) { + System.out.println(printable(o.toString()) + "\n"); + } +} diff --git a/src/examples/java/io/nats/examples/NatsJsSubAttach.java b/src/examples/java/io/nats/examples/NatsJsSubAttach.java index f7d182f94..04a30e59e 100644 --- a/src/examples/java/io/nats/examples/NatsJsSubAttach.java +++ b/src/examples/java/io/nats/examples/NatsJsSubAttach.java @@ -31,12 +31,8 @@ public class NatsJsSubAttach { + "\nUse the URL for user/pass/token authentication.\n"; public static void main(String[] args) { - ExampleArgs exArgs = ExampleUtils.readSubscribeArgs(args, usageString); + ExampleArgs exArgs = ExampleUtils.readConsumerArgs(args, usageString); - if (exArgs.stream == null || exArgs.consumer == null) { - System.out.println("-stream and -consumer is required to attach."); - System.exit(1); - } System.out.printf("\nTrying to connect to %s, and listen to %s for %d messages.\n\n", exArgs.server, exArgs.subject, exArgs.msgCount); diff --git a/src/examples/java/io/nats/examples/NatsJsSubAttachDirect.java b/src/examples/java/io/nats/examples/NatsJsSubAttachDirect.java index 4492f89e6..4528f889e 100644 --- a/src/examples/java/io/nats/examples/NatsJsSubAttachDirect.java +++ b/src/examples/java/io/nats/examples/NatsJsSubAttachDirect.java @@ -31,12 +31,8 @@ public class NatsJsSubAttachDirect { + "\nUse the URL for user/pass/token authentication.\n"; public static void main(String[] args) { - ExampleArgs exArgs = ExampleUtils.readSubscribeArgs(args, usageString); + ExampleArgs exArgs = ExampleUtils.readConsumerArgs(args, usageString); - if (exArgs.stream == null || exArgs.consumer == null) { - System.out.println("-stream and -consumer is required to attach."); - System.exit(1); - } System.out.printf("\nTrying to connect to %s, and listen to %s for %d messages.\n\n", exArgs.server, exArgs.subject, exArgs.msgCount); @@ -60,7 +56,7 @@ public static void main(String[] args) { for(int i=1;i<=exArgs.msgCount;i++) { Message msg = sub.nextMessage(Duration.ofHours(1)); - System.out.printf("\nMessage Received:\n"); + System.out.println("\nMessage Received:"); if (msg.hasHeaders()) { System.out.println(" Headers:"); diff --git a/src/examples/java/io/nats/examples/benchmark/Sample.java b/src/examples/java/io/nats/examples/benchmark/Sample.java index 389f7d304..1331e8c9a 100644 --- a/src/examples/java/io/nats/examples/benchmark/Sample.java +++ b/src/examples/java/io/nats/examples/benchmark/Sample.java @@ -15,11 +15,12 @@ package io.nats.examples.benchmark; -import static io.nats.examples.benchmark.Utils.humanBytes; - import io.nats.client.Statistics; + import java.text.DecimalFormat; +import static io.nats.examples.benchmark.Utils.humanBytes; + public class Sample { int jobMsgCnt; long msgCnt; @@ -91,7 +92,7 @@ public double seconds() { } /** - * {@inheritDoc}. + * {@inheritDoc} */ @Override public String toString() { diff --git a/src/main/java/io/nats/client/Connection.java b/src/main/java/io/nats/client/Connection.java index 81814e699..d69921777 100644 --- a/src/main/java/io/nats/client/Connection.java +++ b/src/main/java/io/nats/client/Connection.java @@ -399,18 +399,17 @@ enum Status { * Gets a context for publishing and subscribing to subjects backed by Jetstream streams * and consumers. * @return a JetStream instance. - * @throws TimeoutException timed out verifying jetstream - * @throws InterruptedException the operation was interrupted + * @throws IOException various IO exception such as timeout or interruption */ - JetStream jetStream() throws InterruptedException, TimeoutException; + JetStream jetStream() throws IOException; /** * Gets a context for publishing and subscribing to subjects backed by Jetstream streams * and consumers. * @param options JetStream options. * @return a JetStream instance. - * @throws TimeoutException timed out verifying jetstream - * @throws InterruptedException the operation was interrupted + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption */ - JetStream jetStream(JetStreamOptions options) throws InterruptedException, TimeoutException; + JetStream jetStream(JetStreamOptions options) throws IOException; } diff --git a/src/main/java/io/nats/client/ConsumerConfiguration.java b/src/main/java/io/nats/client/ConsumerConfiguration.java index 55d4ee84e..55a49dd72 100644 --- a/src/main/java/io/nats/client/ConsumerConfiguration.java +++ b/src/main/java/io/nats/client/ConsumerConfiguration.java @@ -649,4 +649,24 @@ public void setMaxAckPending(long maxAckPending) { public long getMaxWaiting() { return maxWaiting; } + + @Override + public String toString() { + return "ConsumerConfiguration{" + + "durable='" + durable + '\'' + + ", deliverPolicy=" + deliverPolicy + + ", deliverSubject='" + deliverSubject + '\'' + + ", startSeq=" + startSeq + + ", startTime=" + startTime + + ", ackPolicy=" + ackPolicy + + ", ackWait=" + ackWait + + ", maxDeliver=" + maxDeliver + + ", filterSubject='" + filterSubject + '\'' + + ", replayPolicy=" + replayPolicy + + ", sampleFrequency='" + sampleFrequency + '\'' + + ", rateLimit=" + rateLimit + + ", maxWaiting=" + maxWaiting + + ", maxAckPending=" + maxAckPending + + '}'; + } } diff --git a/src/main/java/io/nats/client/ConsumerInfo.java b/src/main/java/io/nats/client/ConsumerInfo.java index c21dcafc3..56f0d4ff0 100644 --- a/src/main/java/io/nats/client/ConsumerInfo.java +++ b/src/main/java/io/nats/client/ConsumerInfo.java @@ -13,15 +13,13 @@ package io.nats.client; -import java.time.Instant; -import java.time.ZoneId; +import io.nats.client.impl.JsonUtils; +import io.nats.client.impl.JsonUtils.FieldType; + import java.time.ZonedDateTime; import java.util.regex.Matcher; import java.util.regex.Pattern; -import io.nats.client.impl.JsonUtils; -import io.nats.client.impl.JsonUtils.FieldType; - // TODO Add properties /** @@ -64,6 +62,14 @@ public long getConsumerSequence() { public long getStreamSequence() { return streamSeq; } + + @Override + public String toString() { + return "SequencePair{" + + "consumerSeq=" + consumerSeq + + ", streamSeq=" + streamSeq + + '}'; + } } private String stream; @@ -75,7 +81,7 @@ public long getStreamSequence() { private long numPending; private long numWaiting; private long numAckPending; - private long numRelivered; + private long numRedelivered; private static final String streamNameField = "stream_name"; private static final String nameField = "name"; @@ -103,7 +109,7 @@ public long getStreamSequence() { /** * Internal method to generate consumer information. - * @param json JSON represeenting the consumer information. + * @param json JSON representing the consumer information. */ public ConsumerInfo(String json) { Matcher m = streamNameRE.matcher(json); @@ -119,25 +125,17 @@ public ConsumerInfo(String json) { m = createdRE.matcher(json); if (m.find()) { - // Instant can parse rfc 3339... we're making a time zone assumption. - Instant inst = Instant.parse(m.group(1)); - this.created = ZonedDateTime.ofInstant(inst, ZoneId.systemDefault()); + this.created = JsonUtils.parseDateTime(m.group(1)); } - String s = JsonUtils.getJSONObject(configField, json); - if (s != null) { - this.configuration = new ConsumerConfiguration(s); - } - - s = JsonUtils.getJSONObject(deliveredField, json); - if (s != null) { - this.delivered = new SequencePair(s); - } - - s = JsonUtils.getJSONObject(ackFloorField, json); - if (s != null) { - this.ackFloor = new SequencePair(s); - } + String jsonObject = JsonUtils.getJSONObject(configField, json); + this.configuration = new ConsumerConfiguration(jsonObject); + + jsonObject = JsonUtils.getJSONObject(deliveredField, json); + this.delivered = new SequencePair(jsonObject); + + jsonObject = JsonUtils.getJSONObject(ackFloorField, json); + this.ackFloor = new SequencePair(jsonObject); m = numPendingRE.matcher(json); if (m.find()) { @@ -156,8 +154,7 @@ public ConsumerInfo(String json) { m = numRedeliveredRE.matcher(json); if (m.find()) { - // todo - double check - this.numRelivered = Long.parseLong(m.group(1)); + this.numRedelivered = Long.parseLong(m.group(1)); } } @@ -198,6 +195,22 @@ public long getNumAckPending() { } public long getRedelivered() { - return numRelivered; + return numRedelivered; + } + + @Override + public String toString() { + return "ConsumerInfo{" + + "stream='" + stream + '\'' + + ", name='" + name + '\'' + + ", " + configuration + + ", created=" + getCreationTime() + + ", " + delivered + + ", " + ackFloor + + ", numPending=" + numPending + + ", numWaiting=" + numWaiting + + ", numAckPending=" + numAckPending + + ", numRedelivered=" + numRedelivered + + '}'; } } diff --git a/src/main/java/io/nats/client/ConsumerLister.java b/src/main/java/io/nats/client/ConsumerLister.java new file mode 100644 index 000000000..77e3f687b --- /dev/null +++ b/src/main/java/io/nats/client/ConsumerLister.java @@ -0,0 +1,81 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client; + +import io.nats.client.impl.JsonUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ConsumerLister { + private int total; + private int offset; + private int limit; + private final List consumers; + + private static final Pattern totalRE = JsonUtils.buildPattern("total", JsonUtils.FieldType.jsonNumber); + private static final Pattern offsetRE = JsonUtils.buildPattern("offset", JsonUtils.FieldType.jsonNumber); + private static final Pattern limitRE = JsonUtils.buildPattern("limit", JsonUtils.FieldType.jsonNumber); + + public ConsumerLister(String json) { + Matcher m = totalRE.matcher(json); + if (m.find()) { + this.total = Integer.parseInt(m.group(1)); + } + + m = offsetRE.matcher(json); + if (m.find()) { + this.offset = Integer.parseInt(m.group(1)); + } + + m = limitRE.matcher(json); + if (m.find()) { + this.limit = Integer.parseInt(m.group(1)); + } + + this.consumers = new ArrayList<>(); + List consumersJson = JsonUtils.getJSONArray("consumer", json); + for (String j : consumersJson) { + this.consumers.add(new ConsumerInfo(j)); + } + } + + public int getTotal() { + return total; + } + + public int getOffset() { + return offset; + } + + public int getLimit() { + return limit; + } + + public List getConsumers() { + return consumers; + } + + @Override + public String toString() { + return "ConsumerLister{" + + "total=" + total + + ", offset=" + offset + + ", limit=" + limit + + ", " + consumers + + '}'; + } +} diff --git a/src/main/java/io/nats/client/JetStream.java b/src/main/java/io/nats/client/JetStream.java index 27d698e99..789c17f27 100644 --- a/src/main/java/io/nats/client/JetStream.java +++ b/src/main/java/io/nats/client/JetStream.java @@ -12,8 +12,10 @@ // limitations under the License. package io.nats.client; +import io.nats.client.impl.JetStreamApiException; + import java.io.IOException; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.CompletableFuture; /** * JetStream context for creation and access to streams and consumers in NATS. @@ -23,204 +25,371 @@ public interface JetStream { /** * Represents the Jetstream Account Limits */ - public interface AccountLimits { + interface AccountLimits { /** * Gets the maximum amount of memory in the Jetstream deployment. * @return bytes */ - public long getMaxMemory(); + long getMaxMemory(); /** * Gets the maximum amount of storage in the Jetstream deployment. * @return bytes */ - public long getMaxStorage(); + long getMaxStorage(); /** * Gets the maximum number of allowed streams in the Jetstream deployment. * @return stream maximum count */ - public long getMaxStreams(); + long getMaxStreams(); /** * Gets the maximum number of allowed consumers in the Jetstream deployment. * @return consumer maximum count */ - public long getMaxConsumers(); + long getMaxConsumers(); } /** * The Jetstream Account Statistics */ - public interface AccountStatistics { + interface AccountStatistics { /** * Gets the amount of memory used by the Jetstream deployment. * @return bytes */ - public long getMemory(); + long getMemory(); /** * Gets the amount of storage used by the Jetstream deployment. * @return bytes */ - public long getStorage(); + long getStorage(); /** * Gets the number of streams used by the Jetstream deployment. * @return stream maximum count */ - public long getStreams(); + long getStreams(); /** * Gets the number of consumers used by the Jetstream deployment. * @return consumer maximum count */ - public long getConsumers(); - } + long getConsumers(); + } /** * Loads or creates a stream. * @param config the stream configuration to use. * @return stream information - * @throws TimeoutException if the NATS server does not return a response - * @throws InterruptedException if the thread is interrupted + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws IllegalArgumentException the configuration is missing or invalid + */ + StreamInfo addStream(StreamConfiguration config) throws IOException, JetStreamApiException; + + /** + * Updates an existing stream. + * @param config the stream configuration to use. + * @return stream information + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws IllegalArgumentException the configuration is missing or invalid + */ + StreamInfo updateStream(StreamConfiguration config) throws IOException, JetStreamApiException; + + /** + * Deletes an existing stream. + * @param streamName the stream name to use. + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data */ - public StreamInfo addStream(StreamConfiguration config) throws TimeoutException, InterruptedException; + void deleteStream(String streamName) throws IOException, JetStreamApiException; + + /** + * Gets the info for an existing stream. + * @param streamName the stream name to use. + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @return stream information + */ + StreamInfo streamInfo(String streamName) throws IOException, JetStreamApiException; + + /** + * Purge stream messages + * @param streamName the stream name to use. + * @return stream information + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + StreamInfo purgeStream(String streamName) throws IOException, JetStreamApiException; /** * Loads or creates a consumer. - * @param stream name of the stream + * @param streamName name of the stream * @param config the consumer configuration to use. - * @throws IOException if there are communcation issues with the NATS server - * @throws TimeoutException if the NATS server does not return a response - * @throws InterruptedException if the thread is interrupted + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data * @return consumer information. - */ - public ConsumerInfo addConsumer(String stream, ConsumerConfiguration config) throws TimeoutException, InterruptedException, IOException; + */ + ConsumerInfo addConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException; + + /** + * Deletes a consumer. + * @param streamName name of the stream + * @param consumer the name of the consumer. + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + void deleteConsumer(String streamName, String consumer) throws IOException, JetStreamApiException; + + /** + * Return pages of ConsumerInfo objects + * @param streamName the name of the consumer. + * @return The consumer lister + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + ConsumerLister newConsumerLister(String streamName) throws IOException, JetStreamApiException; /** * Send a message to the specified subject and waits for a response from - * Jetstream. The message body will not be copied. The expected + * Jetstream. The default publish options will be used. + * The message body will not be copied. The expected * usage with string content is something like: - * + * *
      * nc = Nats.connect()
      * JetStream js = nc.JetStream()
-     * js.publish("destination", "message".getBytes("UTF-8"), publishOptions)
+     * js.publish("destination", "message".getBytes("UTF-8"))
      * 
- * + * * where the sender creates a byte array immediately before calling publish. - * - * See {@link #publish(String, byte[]) publish()} for more details on + * + * See {@link #publish(String, byte[]) publish()} for more details on * publish during reconnect. - * + * * @param subject the subject to send the message to * @param body the message body * @return The publish acknowledgement - * @throws IllegalStateException if the reconnect buffer is exceeded - * @throws IOException if there are communcation issues with the NATS server - * @throws TimeoutException if the NATS server does not return a response - * @throws InterruptedException if the thread is interrupted + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data */ - public PublishAck publish(String subject, byte[] body) throws IOException, InterruptedException, TimeoutException; + PublishAck publish(String subject, byte[] body) throws IOException, JetStreamApiException; /** * Send a message to the specified subject and waits for a response from * Jetstream. The message body will not be copied. The expected * usage with string content is something like: - * + * *
      * nc = Nats.connect()
      * JetStream js = nc.JetStream()
      * js.publish("destination", "message".getBytes("UTF-8"), publishOptions)
      * 
- * + * * where the sender creates a byte array immediately before calling publish. - * - * See {@link #publish(String, byte[]) publish()} for more details on + * + * See {@link #publish(String, byte[]) publish()} for more details on * publish during reconnect. - * + * * @param subject the subject to send the message to * @param body the message body * @param options publisher options * @return The publish acknowledgement - * @throws IllegalStateException if the reconnect buffer is exceeded - * @throws IOException if there are communcation issues with the NATS server - * @throws TimeoutException if the NATS server does not return a response - * @throws InterruptedException if the thread is interrupted - */ - public PublishAck publish(String subject, byte[] body, PublishOptions options) throws IOException, InternalError, TimeoutException, InterruptedException; + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + PublishAck publish(String subject, byte[] body, PublishOptions options) throws IOException, JetStreamApiException; /** * Send a message to the specified subject and waits for a response from - * Jetstream. The message body will not be copied. The expected + * Jetstream. The default publish options will be used. + * The message body will not be copied. The expected * usage with string content is something like: - * + * *
      * nc = Nats.connect()
      * JetStream js = nc.JetStream()
-     * js.publish("destination", "message".getBytes("UTF-8"), publishOptions)
+     * js.publish(message)
      * 
- * + * * where the sender creates a byte array immediately before calling publish. - * - * See {@link #publish(String, byte[]) publish()} for more details on + * + * See {@link #publish(String, byte[]) publish()} for more details on * publish during reconnect. - * + * * @param message the message to send * @return The publish acknowledgement - * @throws IllegalStateException if the reconnect buffer is exceeded - * @throws IOException if there are communcation issues with the NATS server - * @throws TimeoutException if the NATS server does not return a response - * @throws InterruptedException if the thread is interrupted + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data */ - public PublishAck publish(Message message) throws IOException, InterruptedException, TimeoutException; + PublishAck publish(Message message) throws IOException, JetStreamApiException; /** * Send a message to the specified subject and waits for a response from * Jetstream. The message body will not be copied. The expected * usage with string content is something like: - * + * *
      * nc = Nats.connect()
      * JetStream js = nc.JetStream()
-     * js.publish("destination", "message".getBytes("UTF-8"), publishOptions)
+     * js.publish(message, publishOptions)
      * 
- * + * * where the sender creates a byte array immediately before calling publish. - * - * See {@link #publish(String, byte[]) publish()} for more details on + * + * See {@link #publish(String, byte[]) publish()} for more details on * publish during reconnect. - * + * * @param message the message to send * @param options publisher options * @return The publish acknowledgement - * @throws IllegalStateException if the reconnect buffer is exceeded - * @throws IOException if there are communcation issues with the NATS server - * @throws TimeoutException if the NATS server does not return a response - * @throws InterruptedException if the thread is interrupted - */ - public PublishAck publish(Message message, PublishOptions options) throws IOException, InternalError, TimeoutException, InterruptedException; + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + PublishAck publish(Message message, PublishOptions options) throws IOException, JetStreamApiException; + + /** + * Send a message to the specified subject but does not wait for a response from + * Jetstream. The default publish options will be used. + * The message body will not be copied. The expected + * usage with string content is something like: + * + *
+     * nc = Nats.connect()
+     * JetStream js = nc.JetStream()
+     * CompletableFuture<PublishAck> future =
+     *     js.publishAsync("destination", "message".getBytes("UTF-8"),)
+     * 
+ * + * where the sender creates a byte array immediately before calling publish. + * + * See {@link #publish(String, byte[]) publish()} for more details on + * publish during reconnect. + * + * The future me be completed with an exception, either + * an IOException covers various communication issues with the NATS server such as timeout or interruption + * - or - a JetStreamApiException the request had an error related to the data + * + * @param subject the subject to send the message to + * @param body the message body + * @return The future + */ + CompletableFuture publishAsync(String subject, byte[] body); + + /** + * Send a message to the specified subject but does not wait for a response from + * Jetstream. The message body will not be copied. The expected + * usage with string content is something like: + * + *
+     * nc = Nats.connect()
+     * JetStream js = nc.JetStream()
+     * CompletableFuture<PublishAck> future =
+     *     js.publishAsync("destination", "message".getBytes("UTF-8"), publishOptions)
+     * 
+ * + * where the sender creates a byte array immediately before calling publish. + * + * See {@link #publish(String, byte[]) publish()} for more details on + * publish during reconnect. + * + * The future me be completed with an exception, either + * an IOException covers various communication issues with the NATS server such as timeout or interruption + * - or - a JetStreamApiException the request had an error related to the data + * + * @param subject the subject to send the message to + * @param body the message body + * @param options publisher options + * @return The future + */ + CompletableFuture publishAsync(String subject, byte[] body, PublishOptions options); + + /** + * Send a message to the specified subject but does not wait for a response from + * Jetstream. The default publish options will be used. + * The message body will not be copied. The expected + * usage with string content is something like: + * + *
+     * nc = Nats.connect()
+     * JetStream js = nc.JetStream()
+     * CompletableFuture<PublishAck> future = js.publishAsync(message)
+     * 
+ * + * where the sender creates a byte array immediately before calling publish. + * + * See {@link #publish(String, byte[]) publish()} for more details on + * publish during reconnect. + * + * The future me be completed with an exception, either + * an IOException covers various communication issues with the NATS server such as timeout or interruption + * - or - a JetStreamApiException the request had an error related to the data + * + * @param message the message to send + * @return The future + */ + CompletableFuture publishAsync(Message message); + + /** + * Send a message to the specified subject but does not wait for a response from + * Jetstream. The message body will not be copied. The expected + * usage with string content is something like: + * + *
+     * nc = Nats.connect()
+     * JetStream js = nc.JetStream()
+     * CompletableFuture<PublishAck> future = js.publishAsync(message, publishOptions)
+     * 
+ * + * where the sender creates a byte array immediately before calling publish. + * + * See {@link #publish(String, byte[]) publish()} for more details on + * publish during reconnect. + * + * The future me be completed with an exception, either + * an IOException covers various communication issues with the NATS server such as timeout or interruption + * - or - a JetStreamApiException the request had an error related to the data + * + * @param message the message to publish + * @param options publisher options + * @return The future + */ + CompletableFuture publishAsync(Message message, PublishOptions options); /** - * Create a synchronous subscription to the specified subject with default options. - * - *

Use the {@link io.nats.client.Subscription#nextMessage(Duration) nextMessage} - * method to read messages for this subscription. - * - *

See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for - * information about creating an asynchronous subscription with callbacks. - * - * @param subject the subject to subscribe to - * @return an object representing the subscription - * @throws TimeoutException if the NATS server does not return a response - * @throws InterruptedException if the thread is interrupted - * @throws IOException if there are communcation issues with the NATS server + * Create a synchronous subscription to the specified subject with default options. + * + *

Use the {@link io.nats.client.Subscription#nextMessage(Duration) nextMessage} + * method to read messages for this subscription. + * + *

See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for + * information about creating an asynchronous subscription with callbacks. + * + * @param subject the subject to subscribe to + * @return The subscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data */ - public JetStreamSubscription subscribe(String subject) throws InterruptedException, TimeoutException, IOException; - + JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException; /** * Create a synchronous subscription to the specified subject. @@ -233,12 +402,12 @@ public interface AccountStatistics { * * @param subject the subject to subscribe to * @param options subscription options - * @return an object representing the subscription - * @throws TimeoutException if the NATS server does not return a response - * @throws InterruptedException if the thread is interrupted - * @throws IOException if there are communcation issues with the NATS server + * @return The subscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data */ - public JetStreamSubscription subscribe(String subject, SubscribeOptions options) throws InterruptedException, TimeoutException, IOException; + JetStreamSubscription subscribe(String subject, SubscribeOptions options) throws IOException, JetStreamApiException; /** * Create a synchronous subscription to the specified subject. @@ -254,12 +423,12 @@ public interface AccountStatistics { * @param subject the subject to subscribe to * @param queue the queue group to join * @param options subscription options - * @return an object representing the subscription - * @throws TimeoutException if the NATS server does not return a response - * @throws InterruptedException if the thread is interrupted - * @throws IOException if there are communcation issues with the NATS server + * @return The subscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data */ - public JetStreamSubscription subscribe(String subject, String queue, SubscribeOptions options) throws InterruptedException, TimeoutException, IOException; + JetStreamSubscription subscribe(String subject, String queue, SubscribeOptions options) throws IOException, JetStreamApiException; /** * Create a subscription to the specified subject under the control of the @@ -269,13 +438,12 @@ public interface AccountStatistics { * @param subject The subject to subscribe to * @param handler The target for the messages * @param dispatcher The dispatcher to handle this subscription - * @return The Subscription, so subscriptions may be later unsubscribed manually. - * @throws TimeoutException if communication with the NATS server timed out - * @throws InterruptedException if communication with the NATS was interrupted - * @throws IOException if there are communcation issues with the NATS server - * @throws IllegalStateException if the dispatcher was previously closed + * @return The subscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data */ - public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler) throws InterruptedException, TimeoutException, IOException; + JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException; /** * Create a subscription to the specified subject under the control of the @@ -286,13 +454,12 @@ public interface AccountStatistics { * @param dispatcher The dispatcher to handle this subscription * @param handler The target for the messages * @param options The options for this subscription. - * @return The Subscription, so subscriptions may be later unsubscribed manually. - * @throws TimeoutException if communication with the NATS server timed out - * @throws InterruptedException if communication with the NATS was interrupted - * @throws IOException if there are communcation issues with the NATS server - * @throws IllegalStateException if the dispatcher was previously closed + * @return The subscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data */ - public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, SubscribeOptions options) throws InterruptedException, TimeoutException, IOException; + JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, SubscribeOptions options) throws IOException, JetStreamApiException; /** * Create a subscription to the specified subject under the control of the @@ -303,13 +470,12 @@ public interface AccountStatistics { * @param queue The queue group to join. * @param dispatcher The dispatcher to handle this subscription * @param handler The target for the messages - * @return The Subscription, so subscriptions may be later unsubscribed manually. - * @throws TimeoutException if communication with the NATS server timed out - * @throws InterruptedException if communication with the NATS was interrupted - * @throws IOException if there are communcation issues with the NATS server - * @throws IllegalStateException if the dispatcher was previously closed + * @return The subscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data */ - public JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler) throws InterruptedException, TimeoutException, IOException; + JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException; /** * Create a subscription to the specified subject under the control of the @@ -321,11 +487,10 @@ public interface AccountStatistics { * @param dispatcher The dispatcher to handle this subscription * @param handler The target for the messages * @param options The options for this subscription. - * @return The Subscription, so subscriptions may be later unsubscribed manually. - * @throws TimeoutException if communication with the NATS server timed out - * @throws InterruptedException if communication with the NATS was interrupted - * @throws IOException if there are communcation issues with the NATS server - * @throws IllegalStateException if the dispatcher was previously closed + * @return The subscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data */ - public JetStreamSubscription subscribe( String subject, String queue, Dispatcher dispatcher, MessageHandler handler, SubscribeOptions options) throws InterruptedException, TimeoutException, IOException; + JetStreamSubscription subscribe( String subject, String queue, Dispatcher dispatcher, MessageHandler handler, SubscribeOptions options) throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/JetStreamSubscription.java b/src/main/java/io/nats/client/JetStreamSubscription.java index fd750ab73..0c6f4bdbe 100644 --- a/src/main/java/io/nats/client/JetStreamSubscription.java +++ b/src/main/java/io/nats/client/JetStreamSubscription.java @@ -13,8 +13,9 @@ package io.nats.client; +import io.nats.client.impl.JetStreamApiException; + import java.io.IOException; -import java.util.concurrent.TimeoutException; /** * Subscription on a JetStream context. @@ -25,14 +26,14 @@ public interface JetStreamSubscription extends Subscription { * Polls for new messages. This should only be used when the subscription * is pull based. */ - public void poll(); + void poll(); /** * Gets information about the consumer behind this subscription. - * @throws IOException if there are communcation issues with the NATS server - * @throws TimeoutException if the NATS server does not return a response - * @throws InterruptedException if the thread is interrupted * @return consumer information + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data */ - public ConsumerInfo getConsumerInfo() throws IOException, TimeoutException, InterruptedException; + ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/NKey.java b/src/main/java/io/nats/client/NKey.java index 015bde278..23077644e 100644 --- a/src/main/java/io/nats/client/NKey.java +++ b/src/main/java/io/nats/client/NKey.java @@ -13,22 +13,6 @@ package io.nats.client; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.security.GeneralSecurityException; -import java.security.KeyPair; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.security.NoSuchProviderException; -import java.security.PrivateKey; -import java.security.PublicKey; -import java.security.SecureRandom; -import java.security.Signature; -import java.util.Arrays; -import java.util.Random; - import net.i2p.crypto.eddsa.EdDSAEngine; import net.i2p.crypto.eddsa.EdDSAPrivateKey; import net.i2p.crypto.eddsa.EdDSAPublicKey; @@ -37,6 +21,16 @@ import net.i2p.crypto.eddsa.spec.EdDSAPrivateKeySpec; import net.i2p.crypto.eddsa.spec.EdDSAPublicKeySpec; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.security.*; +import java.util.Arrays; + +import static io.nats.client.support.RandomUtils.PRAND; +import static io.nats.client.support.RandomUtils.SRAND; + class DecodedSeed { int prefix; byte[] bytes; @@ -155,25 +149,25 @@ public static Type fromPrefix(int prefix) { } // PrefixByteSeed is the prefix byte used for encoded NATS Seeds - private static int PREFIX_BYTE_SEED = 18 << 3; // Base32-encodes to 'S...' + private static final int PREFIX_BYTE_SEED = 18 << 3; // Base32-encodes to 'S...' // PrefixBytePrivate is the prefix byte used for encoded NATS Private keys - private static int PREFIX_BYTE_PRIVATE = 15 << 3; // Base32-encodes to 'P...' + private static final int PREFIX_BYTE_PRIVATE = 15 << 3; // Base32-encodes to 'P...' // PrefixByteServer is the prefix byte used for encoded NATS Servers - private static int PREFIX_BYTE_SERVER = 13 << 3; // Base32-encodes to 'N...' + private static final int PREFIX_BYTE_SERVER = 13 << 3; // Base32-encodes to 'N...' // PrefixByteCluster is the prefix byte used for encoded NATS Clusters - private static int PREFIX_BYTE_CLUSTER = 2 << 3; // Base32-encodes to 'C...' + private static final int PREFIX_BYTE_CLUSTER = 2 << 3; // Base32-encodes to 'C...' // PrefixByteAccount is the prefix byte used for encoded NATS Accounts - private static int PREFIX_BYTE_ACCOUNT = 0; // Base32-encodes to 'A...' + private static final int PREFIX_BYTE_ACCOUNT = 0; // Base32-encodes to 'A...' // PrefixByteUser is the prefix byte used for encoded NATS Users - private static int PREFIX_BYTE_USER = 20 << 3; // Base32-encodes to 'U...' + private static final int PREFIX_BYTE_USER = 20 << 3; // Base32-encodes to 'U...' // PrefixByteOperator is the prefix byte used for encoded NATS Operators - private static int PREFIX_BYTE_OPERATOR = 14 << 3; // Base32-encodes to 'O...' + private static final int PREFIX_BYTE_OPERATOR = 14 << 3; // Base32-encodes to 'O...' private static final int ED25519_PUBLIC_KEYSIZE = 32; private static final int ED25519_PRIVATE_KEYSIZE = 64; @@ -214,9 +208,9 @@ static int crc16(byte[] bytes) { // http://en.wikipedia.org/wiki/Base_32 private static final String BASE32_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"; - private static int[] BASE32_LOOKUP; - private static int MASK = 31; - private static int SHIFT = 5; + private static final int[] BASE32_LOOKUP; + private static final int MASK = 31; + private static final int SHIFT = 5; static { BASE32_LOOKUP = new int[256]; @@ -438,7 +432,7 @@ static DecodedSeed decodeSeed(char[] seed) { private static NKey createPair(Type type, SecureRandom random) throws IOException, NoSuchProviderException, NoSuchAlgorithmException { if (random == null) { - random = new SecureRandom(); + random = SRAND; } byte[] seed = new byte[NKey.ed25519.getCurve().getField().getb() / 8]; @@ -656,16 +650,15 @@ private NKey(Type t, char[] publicKey, char[] privateKey) { * The nkey is unusable after this operation. */ public void clear() { - Random r = new Random(); if (privateKeyAsSeed != null) { for (int i=0; i< privateKeyAsSeed.length ; i++) { - privateKeyAsSeed[i] = (char)(r.nextInt(26) + 'a'); + privateKeyAsSeed[i] = (char)(PRAND.nextInt(26) + 'a'); } Arrays.fill(privateKeyAsSeed, '\0'); } if (publicKey != null) { for (int i=0; i< publicKey.length ; i++) { - publicKey[i] = (char)(r.nextInt(26) + 'a'); + publicKey[i] = (char)(PRAND.nextInt(26) + 'a'); } Arrays.fill(publicKey, '\0'); } diff --git a/src/main/java/io/nats/client/NUID.java b/src/main/java/io/nats/client/NUID.java index 2365b9dcc..e8bef8bd0 100644 --- a/src/main/java/io/nats/client/NUID.java +++ b/src/main/java/io/nats/client/NUID.java @@ -15,10 +15,7 @@ package io.nats.client; -import java.nio.ByteBuffer; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; -import java.util.Random; +import static io.nats.client.support.RandomUtils.*; /** * A highly performant unique identifier generator. The library uses this to generate @@ -52,13 +49,9 @@ public final class NUID { private long seq; private long inc; - private static final SecureRandom srand; - private static final Random prand; private static final NUID globalNUID; static { - srand = new SecureRandom(); - prand = new Random(bytesToLong(srand.generateSeed(8))); // seed with 8 bytes (64 bits) globalNUID = new NUID(); } @@ -77,8 +70,8 @@ static NUID getInstance() { */ public NUID() { // Generate a cryto random int, 0 <= val < max to seed pseudorandom - seq = nextLong(prand, maxSeq); - inc = minInc + nextLong(prand, maxInc - minInc); + seq = nextLong(PRAND, maxSeq); + inc = minInc + nextLong(PRAND, maxInc - minInc); pre = new char[preLen]; for (int i = 0; i < preLen; i++) { pre[i] = '0'; @@ -121,8 +114,8 @@ public final synchronized String next() { // Resets the sequntial portion of the NUID void resetSequential() { - seq = nextLong(prand, maxSeq); - inc = minInc + nextLong(prand, maxInc - minInc); + seq = nextLong(PRAND, maxSeq); + inc = minInc + nextLong(PRAND, maxInc - minInc); } /* @@ -134,31 +127,13 @@ final void randomizePrefix() { byte[] cb = new byte[preLen]; // Use SecureRandom for prefix only - srand.nextBytes(cb); + SRAND.nextBytes(cb); for (int i = 0; i < preLen; i++) { pre[i] = digits[(cb[i] & 0xFF) % base]; } } - static long nextLong(Random rng, long maxValue) { - // error checking and 2^x checking removed for simplicity. - long bits; - long val; - do { - bits = (rng.nextLong() << 1) >>> 1; - val = bits % maxValue; - } while (bits - val + (maxValue - 1) < 0L); - return val; - } - - static long bytesToLong(byte[] bytes) { - ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE); - buffer.put(bytes); - buffer.flip();// need flip - return buffer.getLong(); - } - /** * @return the pre */ diff --git a/src/main/java/io/nats/client/StreamConfiguration.java b/src/main/java/io/nats/client/StreamConfiguration.java index c2a2ea4f1..46860cf27 100644 --- a/src/main/java/io/nats/client/StreamConfiguration.java +++ b/src/main/java/io/nats/client/StreamConfiguration.java @@ -17,6 +17,7 @@ import io.nats.client.impl.JsonUtils.FieldType; import java.time.Duration; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; @@ -33,7 +34,6 @@ */ public class StreamConfiguration { - /** * Stream retention policies. */ @@ -42,7 +42,7 @@ public enum RetentionPolicy { Interest("interest"), WorkQueue("workqueue"); - private String policy; + private final String policy; RetentionPolicy(String p) { policy = p; @@ -72,7 +72,7 @@ public enum DiscardPolicy { New("new"), Old("old"); - private String policy; + private final String policy; DiscardPolicy(String p) { policy = p; @@ -102,7 +102,7 @@ public enum StorageType { File("file"), Memory("memory"); - private String policy; + private final String policy; StorageType(String p) { policy = p; @@ -130,6 +130,7 @@ public static StorageType get(String value) { private RetentionPolicy retentionPolicy = RetentionPolicy.Limits; private long maxConsumers = -1; + private long maxMsgs = -1; private long maxBytes = -1; private long maxMsgSize = -1; private Duration maxAge = null; @@ -140,24 +141,46 @@ public static StorageType get(String value) { private Duration duplicateWindow = Duration.ZERO; private String template = null; - private static String nameField = "name"; - private static String subjectsField = "subjects"; - private static String retentionField = "retention"; - private static String maxConsumersField = "max_consumers"; - private static String maxBytesField = "max_bytes"; - - private static String maxAgeField = "max_age"; - private static String maxMsgSizeField = "max_msg_size"; - private static String storageTypeField = "storage"; - private static String discardPolicyField = "discard"; - private static String replicasField = "num_replicas"; - private static String noAckField = "no_ack"; - private static String templateField = "template"; - private static String duplicatesField = "duplicates"; + @Override + public String toString() { + return "StreamConfiguration{" + + "name='" + name + '\'' + + ", subjects=" + Arrays.toString(subjects) + + ", retentionPolicy=" + retentionPolicy + + ", maxConsumers=" + maxConsumers + + ", maxMsgs=" + maxMsgs + + ", maxBytes=" + maxBytes + + ", maxMsgSize=" + maxMsgSize + + ", maxAge=" + maxAge + + ", storageType=" + storageType + + ", discardPolicy=" + discardPolicy + + ", replicas=" + replicas + + ", noAck=" + noAck + + ", duplicateWindow=" + duplicateWindow + + ", template='" + template + '\'' + + '}'; + } + + private static final String nameField = "name"; + private static final String subjectsField = "subjects"; + private static final String retentionField = "retention"; + private static final String maxConsumersField = "max_consumers"; + private static final String maxMsgsField = "max_msgs"; + private static final String maxBytesField = "max_bytes"; + + private static final String maxAgeField = "max_age"; + private static final String maxMsgSizeField = "max_msg_size"; + private static final String storageTypeField = "storage"; + private static final String discardPolicyField = "discard"; + private static final String replicasField = "num_replicas"; + private static final String noAckField = "no_ack"; + private static final String templateField = "template"; + private static final String duplicatesField = "duplicate_window"; private static final Pattern nameRE = JsonUtils.buildPattern(nameField, FieldType.jsonString); private static final Pattern maxConsumersRE = JsonUtils.buildPattern(maxConsumersField, FieldType.jsonNumber); private static final Pattern retentionRE = JsonUtils.buildPattern(retentionField, FieldType.jsonString); + private static final Pattern maxMsgsRE = JsonUtils.buildPattern(maxMsgsField, FieldType.jsonNumber); private static final Pattern maxBytesRE = JsonUtils.buildPattern(maxBytesField, FieldType.jsonNumber); private static final Pattern maxAgeRE = JsonUtils.buildPattern(maxAgeField, FieldType.jsonNumber); private static final Pattern maxMsgSizeRE = JsonUtils.buildPattern(maxMsgSizeField, FieldType.jsonNumber); @@ -185,6 +208,11 @@ public static StorageType get(String value) { this.retentionPolicy = RetentionPolicy.get(m.group(1)); } + m = maxMsgsRE.matcher(json); + if (m.find()) { + this.maxMsgs = Long.parseLong(m.group(1)); + } + m = maxBytesRE.matcher(json); if (m.find()) { this.maxBytes = Long.parseLong(m.group(1)); @@ -235,32 +263,35 @@ public static StorageType get(String value) { // For the builder StreamConfiguration( - String name, - String[] subjects, - RetentionPolicy retentionPolicy, - long maxConsumers, - long maxBytes, - long maxMsgSize, - Duration maxAge, - StorageType storageType, - DiscardPolicy discardPolicy, - long replicas, - boolean noAck, - Duration duplicateWindow, - String template) { - this.name = name; - this.subjects = subjects; - this.retentionPolicy = retentionPolicy; - this.maxConsumers = maxConsumers; - this.maxBytes = maxBytes; - this.maxMsgSize = maxMsgSize; - this.maxAge = maxAge; - this.storageType = storageType; - this.discardPolicy = discardPolicy; - this.replicas = replicas; - this.noAck = noAck; - this.duplicateWindow = duplicateWindow; - this.template = template; + String name, + String[] subjects, + RetentionPolicy retentionPolicy, + long maxConsumers, + long maxMsgs, + long maxBytes, + long maxMsgSize, + Duration maxAge, + StorageType storageType, + DiscardPolicy discardPolicy, + long replicas, + boolean noAck, + Duration duplicateWindow, + String template) + { + this.name = name; + this.subjects = subjects; + this.retentionPolicy = retentionPolicy; + this.maxConsumers = maxConsumers; + this.maxMsgs = maxMsgs; + this.maxBytes = maxBytes; + this.maxMsgSize = maxMsgSize; + this.maxAge = maxAge; + this.storageType = storageType; + this.discardPolicy = discardPolicy; + this.replicas = replicas; + this.noAck = noAck; + this.duplicateWindow = duplicateWindow; + this.template = template; } /** @@ -276,6 +307,7 @@ public String toJSON() { JsonUtils.addFld(sb, subjectsField, subjects); JsonUtils.addFld(sb, retentionField, retentionPolicy.toString()); JsonUtils.addFld(sb, maxConsumersField, maxConsumers); + JsonUtils.addFld(sb, maxMsgsField, maxMsgs); JsonUtils.addFld(sb, maxBytesField, maxBytes); JsonUtils.addFld(sb, maxMsgSizeField, maxMsgSize); JsonUtils.addFld(sb, maxAgeField, maxAge); @@ -329,6 +361,14 @@ public long getMaxConsumers() { return maxConsumers; } + /** + * Gets the maximum messages for this stream configuration. + * @return the maximum number of messages for this stream. + */ + public long getMaxMsgs() { + return maxMsgs; + } + /** * Gets the maximum number of bytes for this stream configuration. * @return the maximum number of bytes for this stream. @@ -415,6 +455,7 @@ public static class Builder { private String[] subjects = null; private RetentionPolicy retentionPolicy = RetentionPolicy.Limits; private long maxConsumers = -1; + private long maxMsgs = -1; private long maxBytes = -1; private long maxMsgSize = -1; private Duration maxAge = Duration.ZERO; @@ -433,14 +474,14 @@ public static class Builder { public Builder name(String name) { this.name = name; return this; - } + } /** * Sets the subjects in the StreamConfiguration. * @param subjects the stream's subjects * @return Builder */ - public Builder subjects(String[] subjects) { + public Builder subjects(String... subjects) { if (subjects == null || subjects.length == 0) { throw new IllegalArgumentException("Subjects cannot be null or empty"); } @@ -468,6 +509,15 @@ public Builder maxConsumers(long maxConsumers) { return this; } + /** + * Sets the maximum number of consumers in the StreamConfiguration. + * @param maxMsgs the maximum number of messages + * @return Builder + */ + public Builder maxMessages(long maxMsgs) { + this.maxMsgs = maxMsgs; + return this; + } /** * Sets the maximum number of bytes in the StreamConfiguration. @@ -575,6 +625,7 @@ public StreamConfiguration build() { subjects, retentionPolicy, maxConsumers, + maxMsgs, maxBytes, maxMsgSize, maxAge, diff --git a/src/main/java/io/nats/client/StreamInfo.java b/src/main/java/io/nats/client/StreamInfo.java index 9f9824217..8e76f2a83 100644 --- a/src/main/java/io/nats/client/StreamInfo.java +++ b/src/main/java/io/nats/client/StreamInfo.java @@ -13,11 +13,12 @@ package io.nats.client; +import io.nats.client.impl.JsonUtils; +import io.nats.client.impl.JsonUtils.FieldType; + import java.time.ZonedDateTime; import java.util.regex.Matcher; import java.util.regex.Pattern; -import io.nats.client.impl.JsonUtils; -import io.nats.client.impl.JsonUtils.FieldType; /** @@ -32,7 +33,7 @@ public static class StreamState { private ZonedDateTime firstTime; private long lastSeq; private ZonedDateTime lastTime; - private long consumers; + private long consumerCount; private static final String msgsField = "messages"; private static final String bytesField = "bytes"; @@ -83,7 +84,7 @@ public StreamState(String json) { m = consumersRE.matcher(json); if (m.find()) { - this.consumers = Long.parseLong(m.group(1)); + this.consumerCount = Long.parseLong(m.group(1)); } } @@ -140,8 +141,30 @@ public ZonedDateTime getLastTime() { * @return the consumer count */ public long getConsumerCount() { - return consumers; + return consumerCount; } + + @Override + public String toString() { + return "StreamState{" + + "msgs=" + msgs + + ", bytes=" + bytes + + ", firstSeq=" + firstSeq + + ", firstTime=" + firstTime + + ", lastSeq=" + lastSeq + + ", lastTime=" + lastTime + + ", consumerCount=" + consumerCount + + '}'; + } + } + + @Override + public String toString() { + return "StreamInfo{" + + "created=" + created + + ", " + config + + ", " + state + + '}'; } private StreamConfiguration config; diff --git a/src/main/java/io/nats/client/SubscribeOptions.java b/src/main/java/io/nats/client/SubscribeOptions.java index 24fa1a749..ed277eb23 100644 --- a/src/main/java/io/nats/client/SubscribeOptions.java +++ b/src/main/java/io/nats/client/SubscribeOptions.java @@ -235,4 +235,15 @@ public SubscribeOptions build() { return so; } } + + @Override + public String toString() { + return "SubscribeOptions{" + + "stream='" + stream + '\'' + + ", consumer='" + consumer + '\'' + + ", " + consumerConfiguration + + ", autoAck=" + autoAck + + ", pull=" + pull + + '}'; + } } diff --git a/src/main/java/io/nats/client/impl/JetStreamApiException.java b/src/main/java/io/nats/client/impl/JetStreamApiException.java new file mode 100644 index 000000000..b61a63584 --- /dev/null +++ b/src/main/java/io/nats/client/impl/JetStreamApiException.java @@ -0,0 +1,50 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +/** + * JetStreamApiException is used to indicate that the server returned an error while make a request + * related to JetStream. + */ +public class JetStreamApiException extends Exception { + private final JetStreamApiResponse jetStreamApiResponse; + + /** + * Construct an exception with the response from the server. + * + * @param jetStreamApiResponse the response from the server. + */ + public JetStreamApiException(JetStreamApiResponse jetStreamApiResponse) { + super(jetStreamApiResponse.getError()); + this.jetStreamApiResponse = jetStreamApiResponse; + } + + /** + * Get the error code from the response + * + * @return the code + */ + public long getErrorCode() { + return jetStreamApiResponse.getCode(); + } + + /** + * Get the description from the response + * + * @return the description + */ + public String getErrorDescription() { + return jetStreamApiResponse.getDescription(); + } +} diff --git a/src/main/java/io/nats/client/impl/JetstreamAPIResponse.java b/src/main/java/io/nats/client/impl/JetStreamApiResponse.java similarity index 93% rename from src/main/java/io/nats/client/impl/JetstreamAPIResponse.java rename to src/main/java/io/nats/client/impl/JetStreamApiResponse.java index 4b0884eee..9b05d4ca3 100644 --- a/src/main/java/io/nats/client/impl/JetstreamAPIResponse.java +++ b/src/main/java/io/nats/client/impl/JetStreamApiResponse.java @@ -10,14 +10,16 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package io.nats.client.impl; +import io.nats.client.Message; +import io.nats.client.impl.JsonUtils.FieldType; + import java.util.regex.Matcher; import java.util.regex.Pattern; -import io.nats.client.impl.JsonUtils.FieldType; - -class JetstreamAPIResponse { +class JetStreamApiResponse { private static final int UnsetErrorCode = -1; @@ -34,7 +36,11 @@ class JetstreamAPIResponse { private static final Pattern codeRE = JsonUtils.buildPattern(codeField, FieldType.jsonNumber); private static final Pattern descRE = JsonUtils.buildPattern(descField, FieldType.jsonString); - JetstreamAPIResponse(byte[] rawResponse) { + JetStreamApiResponse(Message msg) { + this(msg.getData()); + } + + JetStreamApiResponse(byte[] rawResponse) { response = new String(rawResponse); diff --git a/src/main/java/io/nats/client/impl/JsonUtils.java b/src/main/java/io/nats/client/impl/JsonUtils.java index 6b04b3621..7f053f309 100644 --- a/src/main/java/io/nats/client/impl/JsonUtils.java +++ b/src/main/java/io/nats/client/impl/JsonUtils.java @@ -18,13 +18,14 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.util.regex.Matcher; +import java.util.ArrayList; +import java.util.List; import java.util.regex.Pattern; /** * Internal json parsing helpers. */ -public final class JsonUtils { +public abstract class JsonUtils { private static final String OBJECT_RE = "\\{(.+?)\\}"; private static final String STRING_RE = "\\s*\"(.+?)\""; @@ -35,11 +36,13 @@ public final class JsonUtils { private static final String AFTER_FIELD_RE = "\"\\s*:\\s*"; private static final String Q = "\""; - private static final String QCOLONQ = "\": \""; - private static final String QCOLON = "\": "; + private static final String QCOLONQ = "\":\""; + private static final String QCOLON = "\":"; private static final String QCOMMA = "\","; private static final String COMMA = ","; + private JsonUtils() {} /* for Jacoco */ + public enum FieldType { jsonObject(OBJECT_RE), jsonString(STRING_RE), @@ -94,48 +97,84 @@ public static Pattern buildPattern(String fieldName, String typeRE) { } /** - * Internal method to get a JSON object. - * @param objectName - object name - * @param json - json - * @return string with object json + * Extract a JSON object string by object name. Returns empty object '{}' if not found. + * @param objectName object name + * @param json source json + * @return object json string */ public static String getJSONObject(String objectName, String json) { + int[] indexes = getBracketIndexes(objectName, json, "{", "}"); + return indexes[0] == -1 || indexes[1] == -1 ? "{}" : json.substring(indexes[0], indexes[1]+1); + } - int objStart = json.indexOf(objectName); - if (objStart < 0) { - return null; + /** + * Extract a list JSON object strings for list object name. Returns empty list '{}' if not found. + * Assumes that there are no brackets '{' or '}' in the actual data. + * @param objectName list object name + * @param json source json + * @return object json string + */ + public static List getJSONArray(String objectName, String json) { + int[] indexes = getBracketIndexes(objectName, json, "[", "]"); + List items = new ArrayList<>(); + StringBuilder item = new StringBuilder(); + int count = 0; + for (int x = indexes[0] + 1; x < indexes[1]; x++) { + char c = json.charAt(x); + if (c == '{') { + item.append(c); + count++; + } + else if (c == '}') { + item.append(c); + if (--count == 0) { + items.add(item.toString()); + item.setLength(0); + } + } + else if (count > 0) { + item.append(c); + } } - - int bracketStart = json.indexOf("{", objStart); - int bracketEnd = json.indexOf("}", bracketStart); - - if (bracketStart < 0 || bracketEnd < 0) { - return null; + return items; + } + + private static int[] getBracketIndexes(String objectName, String json, String start, String end) { + int[] result = new int[] {-1, -1}; + int objStart = json.indexOf(objectName); + if (objStart != -1) { + result[0] = json.indexOf(start, objStart); + result[1] = json.indexOf(end, result[0]); } - - return json.substring(bracketStart, bracketEnd+1); + return result; } /** - * Parses JSON string array field. - * @param fieldName - name of the field. - * @param json - JSON that contains this struct. + * Extract a list strings for list object name. Returns empty array if not found. + * Assumes that there are no brackets '{' or '}' in the actual data. + * @param objectName object name + * @param json source json * @return a string array, empty if no values are found. */ - public static String[] parseStringArray(String fieldName, String json) { - Pattern regex = JsonUtils.buildPattern(fieldName, FieldType.jsonStringArray); - Matcher m = regex.matcher(json); - if (!m.find()) { - return new String[0]; - } - String jsonArray = m.group(1); - String[] quotedStrings = jsonArray.split("\\s*,\\s*"); - String[] rv = new String[quotedStrings.length]; - for (int i = 0; i < quotedStrings.length; i++) { - // subjects cannot contain quotes, so just do a replace. - rv[i] = quotedStrings[i].replace(Q, ""); + public static String[] parseStringArray(String objectName, String json) { + // THIS CODE MAKES SOME ASSUMPTIONS THAT THE JSON IS FORMED IN A CONSISTENT MANNER + // ..."fieldName": [\n ],... + // ..."fieldName": [\n "value"\n ],... + // ..."fieldName": [\n "value",\n "value2"\n ],... + int ix = json.indexOf("\"" + objectName + "\":"); + if (ix != -1) { + ix = json.indexOf("\"", ix + objectName.length() + 3); + if (ix != -1) { + int endx = json.indexOf("]", ix); + String[] data = json.substring(ix, endx).split(","); + for (int x = 0; x < data.length; x++) { + data[x] = data[x].trim().replaceAll("\"", ""); + } + return data; + } } - return rv; + + return new String[0]; } public static StringBuilder beginJson() { @@ -250,4 +289,5 @@ public static ZonedDateTime parseDateTime(String dateTime) { Instant inst = Instant.parse(dateTime); return ZonedDateTime.ofInstant(inst, ZoneId.systemDefault()); } + } diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 77f29eb7d..dc8a82fc2 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -998,9 +998,9 @@ public Message request(Message message, Duration timeout) throws InterruptedExce return _request(asNatsMessage(message), timeout); } - private Message _request(NatsMessage message, Duration timeout) throws InterruptedException { + private Message _request(NatsMessage message, Duration timeout) { Message reply = null; - Future incoming = _request(message); + CompletableFuture incoming = _request(message); try { reply = incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS); } catch (TimeoutException | ExecutionException | CancellationException e) { @@ -1899,14 +1899,14 @@ public void flushBuffer() throws IOException { } @Override - public JetStream jetStream() throws InterruptedException, TimeoutException { + public JetStream jetStream() throws IOException { return jetStream(null); } @Override - public JetStream jetStream(JetStreamOptions options) throws InterruptedException, TimeoutException { + public JetStream jetStream(JetStreamOptions options) throws IOException { if (isClosing() || isClosed()) { - throw new IllegalStateException("A jetstream context can't be estabilished during close."); + throw new IOException("A JetStream context can't be established during close."); } return new NatsJetStream(this, options); } diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index f682bcd02..8fa194bd5 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -4,7 +4,7 @@ import java.io.IOException; import java.time.Duration; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.CompletableFuture; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -13,25 +13,52 @@ public class NatsJetStream implements JetStream { - public static final String jSDefaultApiPrefix = "$JS.API."; + private static final String JSAPI_DEFAULT_PREFIX = "$JS.API."; - // JSApiAccountInfo is for obtaining general information about JetStream. - private static final String jSApiAccountInfo = "INFO"; + // JSAPI_ACCOUNT_INFO is for obtaining general information about JetStream. + private static final String JSAPI_ACCOUNT_INFO = "INFO"; - // JSApiStreams can lookup a stream by subject. - private static final String jSApiStreams = "STREAM.NAMES"; + // JSAPI_CONSUMER_CREATE is used to create consumers. + private static final String JSAPI_CONSUMER_CREATE = "CONSUMER.CREATE.%s"; - // JSApiConsumerCreateT is used to create consumers. - private static final String jSApiConsumerCreateT = "CONSUMER.CREATE.%s"; + // JSAPI_DURABLE_CREATE is used to create durable consumers. + private static final String JSAPI_DURABLE_CREATE = "CONSUMER.DURABLE.CREATE.%s.%s"; - // JSApiDurableCreateT is used to create durable consumers. - private static final String jSApiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"; + // JSAPI_CONSUMER_INFO is used to create consumers. + private static final String JSAPI_CONSUMER_INFO = "CONSUMER.INFO.%s.%s"; - // JSApiConsumerInfoT is used to create consumers. - private static final String jSApiConsumerInfoT = "CONSUMER.INFO.%s.%s"; + // JSAPI_CONSUMER_REQUEST_NEXT is the prefix for the request next message(s) for a consumer in worker/pull mode. + private static final String JSAPI_CONSUMER_REQUEST_NEXT = "CONSUMER.MSG.NEXT.%s.%s"; - // JSApiStreamCreate is the endpoint to create new streams. - private static final String jSApiStreamCreateT = "STREAM.CREATE.%s"; + // JSAPI_CONSUMER_DELETE is used to delete consumers. + private static final String JSAPI_CONSUMER_DELETE = "CONSUMER.DELETE.%s.%s"; + + // JSAPI_CONSUMER_LIST is used to return all detailed consumer information + private static final String JSAPI_CONSUMER_LIST = "CONSUMER.LIST.%s"; + + // JSAPI_STREAMS can lookup a stream by subject. + private static final String JSAPI_STREAMS = "STREAM.NAMES"; + + // JSAPI_STREAM_CREATE is the endpoint to create new streams. + private static final String JSAPI_STREAM_CREATE = "STREAM.CREATE.%s"; + + // JSAPI_STREAM_INFO is the endpoint to get information on a stream. + private static final String JSAPI_STREAM_INFO = "STREAM.INFO.%s"; + + // JSAPI_STREAM_UPDATE is the endpoint to update existing streams. + private static final String JSAPI_STREAM_UPDATE = "STREAM.UPDATE.%s"; + + // JSAPI_STREAM_DELETE is the endpoint to delete streams. + private static final String JSAPI_STREAM_DELETE = "STREAM.DELETE.%s"; + + // JSAPI_STREAM_PURGE is the endpoint to purge streams. + private static final String JSAPI_STREAM_PURGE = "STREAM.PURGE.%s"; + + // JSAPI_STREAM_LIST is the endpoint that will return all detailed stream information + private static final String JSAPI_STREAM_LIST = "STREAM.LIST"; + + // JSAPI_MSG_DELETE is the endpoint to remove a message. + private static final String JSAPI_MSG_DELETE = "STREAM.MSG.DELETE.%s"; private static final PublishOptions DEFAULT_PUB_OPTS = PublishOptions.builder().build(); private static final Duration defaultTimeout = Options.DEFAULT_CONNECTION_TIMEOUT; @@ -57,51 +84,61 @@ public class NatsJetStream implements JetStream { private final JetStreamOptions options; public static class AccountLimitImpl implements AccountLimits { - long memory = -1; - long storage = -1; - long streams = -1; - long consumers = 1; + long maxMemory = -1; + long maxStorage = -1; + long maxStreams = -1; + long maxConsumers = 1; - AccountLimitImpl(String json) { + public AccountLimitImpl(String json) { Matcher m = LIMITS_MEMORY_RE.matcher(json); if (m.find()) { - this.memory = Integer.parseInt(m.group(1)); + this.maxMemory = Long.parseLong(m.group(1)); } m = LIMITS_STORAGE_RE.matcher(json); if (m.find()) { - this.storage = Integer.parseInt(m.group(1)); + this.maxStorage = Long.parseLong(m.group(1)); } m = LIMIT_STREAMS_RE.matcher(json); if (m.find()) { - this.streams = Integer.parseInt(m.group(1)); + this.maxStreams = Long.parseLong(m.group(1)); } m = LIMIT_CONSUMERS_RE.matcher(json); if (m.find()) { - this.consumers = Integer.parseInt(m.group(1)); + this.maxConsumers = Long.parseLong(m.group(1)); } } @Override public long getMaxMemory() { - return memory; + return maxMemory; } @Override public long getMaxStorage() { - return storage; + return maxStorage; } @Override public long getMaxStreams() { - return streams; + return maxStreams; } @Override public long getMaxConsumers() { - return consumers; + return maxConsumers; + } + + @Override + public String toString() { + return "AccountLimitImpl{" + + "memory=" + maxMemory + + ", storage=" + maxStorage + + ", streams=" + maxStreams + + ", consumers=" + maxConsumers + + '}'; } } @@ -111,25 +148,25 @@ public static class AccountStatsImpl implements AccountStatistics { long streams = -1; long consumers = 1; - AccountStatsImpl(String json) { + public AccountStatsImpl(String json) { Matcher m = STATS_MEMORY_RE.matcher(json); if (m.find()) { - this.memory = Integer.parseInt(m.group(1)); + this.memory = Long.parseLong(m.group(1)); } m = STATS_STORAGE_RE.matcher(json); if (m.find()) { - this.storage = Integer.parseInt(m.group(1)); + this.storage = Long.parseLong(m.group(1)); } m = STATS_STREAMS_RE.matcher(json); if (m.find()) { - this.streams = Integer.parseInt(m.group(1)); + this.streams = Long.parseLong(m.group(1)); } m = STATS_CONSUMERS_RE.matcher(json); if (m.find()) { - this.consumers = Integer.parseInt(m.group(1)); + this.consumers = Long.parseLong(m.group(1)); } } @Override @@ -151,6 +188,16 @@ public long getStreams() { public long getConsumers() { return consumers; } + + @Override + public String toString() { + return "AccountStatsImpl{" + + "memory=" + memory + + ", storage=" + storage + + ", streams=" + streams + + ", consumers=" + consumers + + '}'; + } } private static boolean isJetstreamEnabled(Message msg) { @@ -158,11 +205,11 @@ private static boolean isJetstreamEnabled(Message msg) { return false; } - JetstreamAPIResponse apiResp = new JetstreamAPIResponse(msg.getData()); + JetStreamApiResponse apiResp = new JetStreamApiResponse(msg); return apiResp.getCode() != 503 && apiResp.getError() == null; } - NatsJetStream(NatsConnection connection, JetStreamOptions jsOptions) throws InterruptedException, TimeoutException { + NatsJetStream(NatsConnection connection, JetStreamOptions jsOptions) throws IOException { if (jsOptions == null) { options = JetStreamOptions.builder().build(); } else { @@ -175,87 +222,99 @@ private static boolean isJetstreamEnabled(Message msg) { // override request style. conn.getOptions().setOldRequestStyle(true); - if (direct) { - return; - } - - String subj = appendPre(jSApiAccountInfo); - Message resp = conn.request(subj, null, defaultTimeout); - if (resp == null) { - throw new TimeoutException("No response from the NATS server"); - } - if (!isJetstreamEnabled(resp)) { - throw new IllegalStateException("Jetstream is not enabled."); - } - - // check the response - new AccountStatsImpl(new String(resp.getData())); - } + if (!direct) { + Message resp = makeRequest(JSAPI_ACCOUNT_INFO, null, defaultTimeout); + if (!isJetstreamEnabled(resp)) { + throw new IllegalStateException("Jetstream is not enabled."); + } - String appendPre(String subject) { - if (prefix == null) { - return jSDefaultApiPrefix + subject; + // check the response + new AccountStatsImpl(new String(resp.getData())); } - return prefix + subject; } - private ConsumerInfo createOrUpdateConsumer(String streamName, ConsumerConfiguration config) throws TimeoutException, InterruptedException, IOException { + private ConsumerInfo createOrUpdateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException { String durable = config.getDurable(); String requestJSON = config.toJSON(streamName); String subj; if (durable == null) { - subj = String.format(jSApiConsumerCreateT, streamName); + subj = String.format(JSAPI_CONSUMER_CREATE, streamName); } else { - subj = String.format(jSApiDurableCreateT, streamName, durable); - } - - Message resp = conn.request(appendPre(subj), requestJSON.getBytes(), conn.getOptions().getConnectionTimeout()); - - if (resp == null) { - throw new TimeoutException("Consumer request to jetstream timed out."); + subj = String.format(JSAPI_DURABLE_CREATE, streamName, durable); } - JetstreamAPIResponse jsResp = new JetstreamAPIResponse(resp.getData()); - if (jsResp.hasError()) { - throw new IOException(jsResp.getError()); - } + Message resp = makeRequest(subj, requestJSON.getBytes(), conn.getOptions().getConnectionTimeout()); + return new ConsumerInfo(extractApiResponse(resp).getResponse()); + } - return new ConsumerInfo(jsResp.getResponse()); + /** + * {@inheritDoc} + */ + @Override + public StreamInfo addStream(StreamConfiguration config) throws IOException, JetStreamApiException { + return _addOrUpdate(config, JSAPI_STREAM_CREATE); } + /** + * {@inheritDoc} + */ @Override - public StreamInfo addStream(StreamConfiguration config) throws TimeoutException, InterruptedException { + public StreamInfo updateStream(StreamConfiguration config) throws IOException, JetStreamApiException { + return _addOrUpdate(config, JSAPI_STREAM_UPDATE); + } + + private StreamInfo _addOrUpdate(StreamConfiguration config, String template) throws IOException, JetStreamApiException { if (config == null) { throw new IllegalArgumentException("configuration cannot be null."); } String streamName = config.getName(); - if (streamName == null || streamName.isEmpty()) { + if (nullOrEmpty(streamName)) { throw new IllegalArgumentException("Configuration must have a valid name"); } - String subj = appendPre(String.format(jSApiStreamCreateT, streamName)); - Message resp = conn.request(subj, config.toJSON().getBytes(), defaultTimeout); - if (resp == null) { - throw new TimeoutException("No response from the NATS server"); - } - JetstreamAPIResponse apiResp = new JetstreamAPIResponse(resp.getData()); - if (apiResp.hasError()) { - throw new IllegalStateException(String.format("Could not create stream. %d : %s", - apiResp.getCode(), apiResp.getDescription())); - } + String subj = String.format(template, streamName); + Message resp = makeRequest(subj, config.toJSON().getBytes(), defaultTimeout); + return new StreamInfo(extractApiResponse(resp).getResponse()); + } - return new StreamInfo(new String(resp.getData())); + @Override + public void deleteStream(String streamName) throws IOException, JetStreamApiException { + String subj = String.format(JSAPI_STREAM_DELETE, streamName); + extractApiResponse( makeRequest(subj, null, defaultTimeout) ); } + /** + * {@inheritDoc} + */ @Override - public ConsumerInfo addConsumer(String stream, ConsumerConfiguration config) throws InterruptedException, IOException, TimeoutException { - validateStreamName(stream); + public StreamInfo streamInfo(String streamName) throws IOException, JetStreamApiException { + String subj = String.format(JSAPI_STREAM_INFO, streamName); + Message resp = makeRequest(subj, null, defaultTimeout); + return new StreamInfo(extractApiResponseJson(resp)); + } + + /** + * {@inheritDoc} + */ + @Override + public StreamInfo purgeStream(String streamName) throws IOException, JetStreamApiException { + String subj = String.format(JSAPI_STREAM_PURGE, streamName); + Message resp = makeRequest(subj, null, defaultTimeout); + return new StreamInfo(extractApiResponseJson(resp)); + } + + /** + * {@inheritDoc} + */ + @Override + public ConsumerInfo addConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException { + validateStreamName(streamName); validateNotNull(config, "config"); - return addConsumer(null, stream, config); + return addConsumer(null, streamName, config); } - private ConsumerInfo addConsumer(String subject, String stream, ConsumerConfiguration config) throws InterruptedException, IOException, TimeoutException { + private ConsumerInfo addConsumer(String subject, String stream, ConsumerConfiguration config) throws IOException, JetStreamApiException { validateStreamName(stream); validateNotNull(config, "config"); if (provided(subject)) { @@ -264,31 +323,118 @@ private ConsumerInfo addConsumer(String subject, String stream, ConsumerConfigur return createOrUpdateConsumer(stream, config); } + /** + * {@inheritDoc} + */ + @Override + public void deleteConsumer(String streamName, String consumer) throws IOException, JetStreamApiException { + String subj = String.format(JSAPI_CONSUMER_DELETE, streamName, consumer); + extractApiResponse( makeRequest(subj, null, defaultTimeout) ); + } + + /** + * {@inheritDoc} + */ + @Override + public ConsumerLister newConsumerLister(String streamName) throws IOException, JetStreamApiException { + String subj = String.format(JSAPI_CONSUMER_LIST, streamName); + Message resp = makeRequest(subj, null, defaultTimeout); + return new ConsumerLister(extractApiResponseJson(resp)); + } + static NatsMessage buildMsg(String subject, byte[] payload) { return new NatsMessage.Builder().subject(subject).data(payload).build(); } + /** + * {@inheritDoc} + */ @Override - public PublishAck publish(String subject, byte[] body) throws IOException, InterruptedException, TimeoutException { + public PublishAck publish(String subject, byte[] body) throws IOException, JetStreamApiException { return publishInternal(buildMsg(subject, body), null); } + /** + * {@inheritDoc} + */ @Override - public PublishAck publish(String subject, byte[] body, PublishOptions options) throws IOException, InterruptedException, TimeoutException{ + public PublishAck publish(String subject, byte[] body, PublishOptions options) throws IOException, JetStreamApiException { return publishInternal(buildMsg(subject, body), options); } + /** + * {@inheritDoc} + */ @Override - public PublishAck publish(Message message) throws IOException, InterruptedException, TimeoutException { + public PublishAck publish(Message message) throws IOException, JetStreamApiException { return publishInternal(message, null); } + /** + * {@inheritDoc} + */ @Override - public PublishAck publish(Message message, PublishOptions options) throws IOException, InterruptedException, TimeoutException{ + public PublishAck publish(Message message, PublishOptions options) throws IOException, JetStreamApiException { return publishInternal(message, options); } - private PublishAck publishInternal(Message message, PublishOptions options) throws IOException, InterruptedException, TimeoutException{ + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture publishAsync(String subject, byte[] body) { + return CompletableFuture.supplyAsync(() -> { + try { + return publish(subject, body); + } catch (IOException | JetStreamApiException e) { + throw new RuntimeException(e); + } + }); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture publishAsync(String subject, byte[] body, PublishOptions options) { + return CompletableFuture.supplyAsync(() -> { + try { + return publish(subject, body, options); + } catch (IOException | JetStreamApiException e) { + throw new RuntimeException(e); + } + }); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture publishAsync(Message message) { + return CompletableFuture.supplyAsync(() -> { + try { + return publish(message); + } catch (IOException | JetStreamApiException e) { + throw new RuntimeException(e); + } + }); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture publishAsync(Message message, PublishOptions options) { + return CompletableFuture.supplyAsync(() -> { + try { + return publish(message, options); + } catch (IOException | JetStreamApiException e) { + throw new RuntimeException(e); + } + }); + } + + private PublishAck publishInternal(Message message, PublishOptions options) throws IOException { validateNotNull(message, "message"); NatsMessage natsMessage = message instanceof NatsMessage ? (NatsMessage)message : new NatsMessage(message); @@ -323,10 +469,7 @@ private PublishAck publishInternal(Message message, PublishOptions options) thro } } - Message resp = conn.request(natsMessage, opts.getStreamTimeout()); - if (resp == null) { - throw new TimeoutException("timeout waiting for jetstream"); - } + Message resp = makeRequest(natsMessage, opts.getStreamTimeout()); NatsPublishAck ack = new NatsPublishAck(resp.getData()); String ackStream = ack.getStream(); @@ -346,47 +489,28 @@ private boolean isStreamSpecified(String streamName) { return streamName != null; } - ConsumerInfo getConsumerInfo(String stream, String consumer) throws IOException, TimeoutException, - InterruptedException { - String ccInfoSubj = this.appendPre(String.format(jSApiConsumerInfoT, stream, consumer)); - Message resp = conn.request(ccInfoSubj, null, defaultTimeout); - if (resp == null) { - throw new TimeoutException("Consumer request to jetstream timed out."); - } - - JetstreamAPIResponse jsResp = new JetstreamAPIResponse(resp.getData()); - if (jsResp.hasError()) { - throw new IllegalStateException(jsResp.getError()); - } - - return new ConsumerInfo(jsResp.getResponse()); + ConsumerInfo getConsumerInfo(String stream, String consumer) throws IOException, JetStreamApiException { + String ccInfoSubj = String.format(JSAPI_CONSUMER_INFO, stream, consumer); + Message resp = makeRequest(ccInfoSubj, null, defaultTimeout); + return new ConsumerInfo(extractApiResponseJson(resp)); } - private String lookupStreamBySubject(String subject) throws InterruptedException, IOException, TimeoutException { + private String lookupStreamBySubject(String subject) throws IOException, JetStreamApiException { if (subject == null) { throw new IllegalArgumentException("subject cannot be null."); } String streamRequest = String.format("{\"subject\":\"%s\"}", subject); - Message resp = conn.request(appendPre(jSApiStreams), streamRequest.getBytes(), defaultTimeout); - if (resp == null) { - throw new TimeoutException("Consumer request to jetstream timed out."); - } - - JetstreamAPIResponse jsResp = new JetstreamAPIResponse(resp.getData()); - if (jsResp.hasError()) { - throw new IOException(jsResp.getError()); - } + Message resp = makeRequest(JSAPI_STREAMS, streamRequest.getBytes(), defaultTimeout); - String[] streams = JsonUtils.parseStringArray("streams", jsResp.getResponse()); + String[] streams = JsonUtils.parseStringArray("streams", extractApiResponseJson(resp)); if (streams.length != 1) { throw new IllegalStateException("No matching streams."); } return streams[0]; } - private class AutoAckMessageHandler implements MessageHandler { - + private static class AutoAckMessageHandler implements MessageHandler { MessageHandler mh; // caller must ensure userMH is not null @@ -400,12 +524,14 @@ public void onMessage(Message msg) throws InterruptedException { mh.onMessage(msg); msg.ack(); } catch (Exception e) { - // ignore?? schedule async error? + // TODO ignore?? schedule async error? } } } - NatsJetStreamSubscription createSubscription(String subject, String queueName, NatsDispatcher dispatcher, MessageHandler handler, SubscribeOptions options) throws InterruptedException, TimeoutException, IOException{ + NatsJetStreamSubscription createSubscription(String subject, String queueName, + NatsDispatcher dispatcher, MessageHandler handler, + SubscribeOptions options) throws IOException, JetStreamApiException { // setup the configuration, use a default. SubscribeOptions o = SubscribeOptions.getInstance(options); @@ -478,14 +604,15 @@ NatsJetStreamSubscription createSubscription(String subject, String queueName, N cfg.setMaxAckPending(sub.getPendingMessageLimit()); } - try { - ConsumerInfo ci = createOrUpdateConsumer(stream, cfg); - sub.setupJetStream(this, ci.getName(), ci.getStreamName(), - deliver, o.getPullBatchSize()); - } catch (Exception e) { + ConsumerInfo ci = null; + try { + ci = createOrUpdateConsumer(stream, cfg); + } catch (JetStreamApiException e) { sub.unsubscribe(); throw e; } + sub.setupJetStream(this, ci.getName(), ci.getStreamName(), deliver, o.getPullBatchSize()); + } else { String s = direct ? o.getConsumerConfiguration().getDeliverSubject() : ccfg.getDeliverSubject(); if (s == null) { @@ -501,37 +628,52 @@ NatsJetStreamSubscription createSubscription(String subject, String queueName, N return sub; } + /** + * {@inheritDoc} + */ @Override - public JetStreamSubscription subscribe(String subject) throws InterruptedException, TimeoutException, IOException { + public JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException { validateJsSubscribeSubject(subject); return createSubscription(subject, null, null, null, SubscribeOptions.builder().build()); } + /** + * {@inheritDoc} + */ @Override - public JetStreamSubscription subscribe(String subject, SubscribeOptions options) throws InterruptedException, TimeoutException, IOException { + public JetStreamSubscription subscribe(String subject, SubscribeOptions options) throws IOException, JetStreamApiException { validateJsSubscribeSubject(subject); validateNotNull(options, "options"); return createSubscription(subject, null, null, null, options); } + /** + * {@inheritDoc} + */ @Override - public JetStreamSubscription subscribe(String subject, String queue, SubscribeOptions options) throws InterruptedException, TimeoutException, IOException { + public JetStreamSubscription subscribe(String subject, String queue, SubscribeOptions options) throws IOException, JetStreamApiException { validateJsSubscribeSubject(subject); validateQueueName(queue); validateNotNull(options, "options"); return createSubscription(subject, queue, null, null, options); } + /** + * {@inheritDoc} + */ @Override - public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler) throws InterruptedException, TimeoutException, IOException { + public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException { validateJsSubscribeSubject(subject); validateNotNull(dispatcher, "dispatcher"); validateNotNull(handler, "handler"); return createSubscription(subject, null, (NatsDispatcher) dispatcher, handler, null); } + /** + * {@inheritDoc} + */ @Override - public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, SubscribeOptions options) throws InterruptedException, TimeoutException, IOException { + public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, SubscribeOptions options) throws IOException, JetStreamApiException { validateJsSubscribeSubject(subject); validateNotNull(dispatcher, "dispatcher"); validateNotNull(handler, "handler"); @@ -539,8 +681,11 @@ public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, Me return createSubscription(subject, null, (NatsDispatcher) dispatcher, handler, options); } + /** + * {@inheritDoc} + */ @Override - public JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler) throws InterruptedException, TimeoutException, IOException { + public JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException { validateJsSubscribeSubject(subject); validateQueueName(queue); validateNotNull(dispatcher, "dispatcher"); @@ -548,8 +693,11 @@ public JetStreamSubscription subscribe(String subject, String queue, Dispatcher return createSubscription(subject, queue, (NatsDispatcher) dispatcher, handler, null); } + /** + * {@inheritDoc} + */ @Override - public JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, SubscribeOptions options) throws InterruptedException, TimeoutException, IOException { + public JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, SubscribeOptions options) throws IOException, JetStreamApiException { validateJsSubscribeSubject(subject); validateQueueName(queue); validateNotNull(dispatcher, "dispatcher"); @@ -557,4 +705,50 @@ public JetStreamSubscription subscribe(String subject, String queue, Dispatcher validateNotNull(options, "options"); return createSubscription(subject, queue, (NatsDispatcher) dispatcher, handler, options); } + + private Message makeRequest(String subject, byte[] bytes, Duration timeout) throws IOException { + try { + return responseRequired(conn.request(appendPre(subject), bytes, timeout)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private Message makeRequest(NatsMessage natsMessage, Duration timeout) throws IOException { + try { + return responseRequired(conn.request(natsMessage, timeout)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private Message responseRequired(Message respMessage) throws IOException { + if (respMessage == null) { + throw new IOException("Timeout or no response waiting for NATS Jetstream server"); + } + return respMessage; + } + + private String extractApiResponseJson(Message respMessage) throws JetStreamApiException { + return extractApiResponse(respMessage).getResponse(); + } + + private JetStreamApiResponse extractApiResponse(Message respMessage) throws JetStreamApiException { + JetStreamApiResponse jsApiResp = new JetStreamApiResponse(respMessage); + if (jsApiResp.hasError()) { + throw new JetStreamApiException(jsApiResp); + } + return jsApiResp; + } + + private String appendPre(String template, Object... args) { + return appendPre(String.format(template, args)); + } + + String appendPre(String subject) { + if (prefix == null) { + return JSAPI_DEFAULT_PREFIX + subject; + } + return prefix + subject; + } } diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java b/src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java index ae6fb12ef..2534fbf93 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java @@ -13,20 +13,19 @@ package io.nats.client.impl; -import java.io.IOException; -import java.util.concurrent.TimeoutException; - import io.nats.client.ConsumerInfo; import io.nats.client.JetStreamSubscription; +import java.io.IOException; + /** * This is a jetstream specfic subscription. */ public class NatsJetStreamSubscription extends NatsSubscription implements JetStreamSubscription { - // JSApiRequestNextT is the prefix for the request next message(s) for a + // JSAPI_REQUEST_NEXT is the prefix for the request next message(s) for a // consumer in worker/pull mode. - private static final String jSApiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"; + private static final String JSAPI_REQUEST_NEXT = "CONSUMER.MSG.NEXT.%s.%s"; NatsJetStream js; String consumer; @@ -47,20 +46,35 @@ void setupJetStream(NatsJetStream js, String consumer, String stream, String del this.pull = pull; } + /** + * {@inheritDoc} + */ @Override public void poll() { if (deliver == null || pull == 0) { throw new IllegalStateException("Subscription type does not support poll."); } - String subj = js.appendPre(String.format(jSApiRequestNextT, stream, consumer)); + String subj = js.appendPre(String.format(JSAPI_REQUEST_NEXT, stream, consumer)); byte[] payload = String.format("{ \"batch\":%d}", pull).getBytes(); connection.publish(subj, getSubject(), payload); } + /** + * {@inheritDoc} + */ @Override - public ConsumerInfo getConsumerInfo() throws IOException, TimeoutException, InterruptedException { + public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException { return js.getConsumerInfo(stream, consumer); } - + + @Override + public String toString() { + return "NatsJetStreamSubscription{" + + "consumer='" + consumer + '\'' + + ", stream='" + stream + '\'' + + ", deliver='" + deliver + '\'' + + ", pull=" + pull + + '}'; + } } diff --git a/src/main/java/io/nats/client/impl/NatsPublishAck.java b/src/main/java/io/nats/client/impl/NatsPublishAck.java index a617d9015..ed62ac0d8 100644 --- a/src/main/java/io/nats/client/impl/NatsPublishAck.java +++ b/src/main/java/io/nats/client/impl/NatsPublishAck.java @@ -12,14 +12,14 @@ // limitations under the License. package io.nats.client.impl; +import io.nats.client.PublishAck; +import io.nats.client.impl.JsonUtils.FieldType; + import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.regex.Matcher; import java.util.regex.Pattern; -import io.nats.client.PublishAck; -import io.nats.client.impl.JsonUtils.FieldType; - // Internal class to handle jetstream acknowedgements class NatsPublishAck implements PublishAck { @@ -43,8 +43,8 @@ public NatsPublishAck(byte[] response) throws IOException { String s = new String(response, StandardCharsets.UTF_8); // check for error and then parse for speed. - if (JetstreamAPIResponse.isError(s)) { - JetstreamAPIResponse resp = new JetstreamAPIResponse(response); + if (JetStreamApiResponse.isError(s)) { + JetStreamApiResponse resp = new JetStreamApiResponse(response); if (resp.hasError()) { throw new IllegalStateException(resp.getError()); } diff --git a/src/main/java/io/nats/client/impl/SSLUtils.java b/src/main/java/io/nats/client/impl/SSLUtils.java index 7291ed861..9d0837a4c 100644 --- a/src/main/java/io/nats/client/impl/SSLUtils.java +++ b/src/main/java/io/nats/client/impl/SSLUtils.java @@ -13,14 +13,14 @@ package io.nats.client.impl; -import java.security.SecureRandom; -import java.security.cert.X509Certificate; +import io.nats.client.Options; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import java.security.cert.X509Certificate; -import io.nats.client.Options; +import static io.nats.client.support.RandomUtils.SRAND; public class SSLUtils { private static TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { @@ -40,7 +40,7 @@ public static SSLContext createOpenTLSContext() { try { context = SSLContext.getInstance(Options.DEFAULT_SSL_PROTOCOL); - context.init(null, trustAllCerts, new SecureRandom()); + context.init(null, trustAllCerts, SRAND); } catch (Exception e) { context = null; } diff --git a/src/main/java/io/nats/client/support/DebugUtil.java b/src/main/java/io/nats/client/support/DebugUtil.java new file mode 100644 index 000000000..f3cb044bd --- /dev/null +++ b/src/main/java/io/nats/client/support/DebugUtil.java @@ -0,0 +1,69 @@ +package io.nats.client.support; + +public class DebugUtil { + public static String printable(Object o) { + return printable(o.toString()); + } + + public static String printable(String s) { + String indent = ""; + boolean inPrimitiveArray = false; + boolean inObjectArray = false; + boolean lastEq = false; + boolean lastComma = false; + StringBuilder sb = new StringBuilder(); + for (int x = 0; x < s.length(); x++) { + char c = s.charAt(x); + if (c == '=') { + lastEq = true; + sb.append(": "); + } else { + if (c == '{') { + indent += " "; + sb.append(":\n").append(indent); + } else if (c == '}') { + indent = indent.substring(0, indent.length() - 2); + } else if (c == '[') { + if (lastEq) { + inPrimitiveArray = true; + indent += " "; + sb.append("\n").append(indent).append("- "); + } else if (lastComma) { + inObjectArray = true; + lastComma = false; + indent += " "; + sb.append("[\n").append(indent); + } else { + sb.append(c); + } + } else if (c == ']') { + if (inPrimitiveArray) { + inPrimitiveArray = false; + indent = indent.substring(0, indent.length() - 2); + } else if (inObjectArray) { + inObjectArray = false; + indent = indent.substring(0, indent.length() - 2); + sb.append('\n').append(indent).append(']'); + } else { + sb.append(c); + } + } else if (c == ',') { + sb.append("\n").append(indent); + if (inPrimitiveArray) { + sb.append("- "); + } else { + lastComma = true; + } + x++; + } else { + sb.append(c); + if (!Character.isWhitespace(c)) { + lastComma = false; + } + } + lastEq = false; + } + } + return sb.toString().replaceAll("'null'", "null"); + } +} diff --git a/src/main/java/io/nats/client/support/RandomUtils.java b/src/main/java/io/nats/client/support/RandomUtils.java new file mode 100644 index 000000000..1da6f8977 --- /dev/null +++ b/src/main/java/io/nats/client/support/RandomUtils.java @@ -0,0 +1,29 @@ +package io.nats.client.support; + +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.Random; + +public abstract class RandomUtils { + public static final SecureRandom SRAND = new SecureRandom(); + public static final Random PRAND = new Random(bytesToLong(SRAND.generateSeed(8))); // seed with 8 bytes (64 bits) + + public static long nextLong(Random rng, long maxValue) { + // error checking and 2^x checking removed for simplicity. + long bits; + long val; + do { + bits = (rng.nextLong() << 1) >>> 1; + val = bits % maxValue; + } while (bits - val + (maxValue - 1) < 0L); + return val; + } + + public static long bytesToLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE); + buffer.put(bytes); + buffer.flip();// need flip + return buffer.getLong(); + } + +} diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java index d14956033..8cd39ce51 100644 --- a/src/main/java/io/nats/client/support/Validator.java +++ b/src/main/java/io/nats/client/support/Validator.java @@ -15,7 +15,8 @@ import java.util.regex.Pattern; -public class Validator { +public abstract class Validator { + private Validator() {} /* for Jacoco */ static final Pattern STREAM_PATTERN = Pattern.compile("^[a-zA-Z0-9-]+$"); @@ -55,15 +56,15 @@ public static String validateStreamName(String s) { } public static String validateConsumer(String s) { - if (notNullButEmpty(s)) { - throw new IllegalArgumentException("Consumer cannot be blank when provided."); + if (notNullButEmpty(s) || containsDotWildGt(s)) { + throw new IllegalArgumentException("Consumer cannot be blank when provided and cannot contain a '.', '*' or '>'."); } return s; } public static String validateDurable(String s) { - if (notNullButEmpty(s)) { - throw new IllegalArgumentException("Durable cannot be blank when provided."); + if (nullOrEmpty(s) || containsDotWildGt(s)) { + throw new IllegalArgumentException("Durable cannot be blank and cannot contain a '.', '*' or '>'"); } return s; } @@ -121,11 +122,31 @@ public static boolean notNullButEmpty(String s) { } public static boolean containsWhitespace(String s) { - for (int i = 0; i < s.length(); i++){ - if (Character.isWhitespace(s.charAt(i))) { - return true; + if (s != null) { + for (int i = 0; i < s.length(); i++) { + if (Character.isWhitespace(s.charAt(i))) { + return true; + } } } return false; } + + public static boolean containsDotWildGt(String s) { + if (s != null) { + for (int i = 0; i < s.length(); i++) { + switch (s.charAt(i)) { + case '.': + case '*': + case '>': + return true; + } + } + } + return false; + } + + public static boolean containsDot(String s) { + return s != null && s.indexOf('.') > -1; + } } diff --git a/src/test/java/io/nats/client/JetStreamApiObjectsTests.java b/src/test/java/io/nats/client/JetStreamApiObjectsTests.java new file mode 100644 index 000000000..e8ea34820 --- /dev/null +++ b/src/test/java/io/nats/client/JetStreamApiObjectsTests.java @@ -0,0 +1,160 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client; + +import io.nats.client.impl.JsonUtils; +import io.nats.client.impl.NatsJetStream; +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static io.nats.client.utils.ResourceUtils.dataAsString; +import static org.junit.jupiter.api.Assertions.*; + +public class JetStreamApiObjectsTests { + + @Test + public void testOptions() { + JetStreamOptions jo = JetStreamOptions.builder().requestTimeout(Duration.ofSeconds(42)).prefix("pre").direct(true).build(); + assertEquals("pre", jo.getPrefix()); + assertEquals(Duration.ofSeconds(42), jo.getRequestTimeout()); + assertTrue(jo.isDirectMode()); + } + + @Test + public void testInvalidPrefix() { + assertThrows(IllegalArgumentException.class, () -> { JetStreamOptions.builder().prefix(">").build();}); + assertThrows(IllegalArgumentException.class, () -> { JetStreamOptions.builder().prefix("*").build();}); + } + + @Test + public void testAccountLimitImpl() { + String json = dataAsString("AccountLimitImpl.json"); + NatsJetStream.AccountLimitImpl ali = new NatsJetStream.AccountLimitImpl(json); + assertEquals(1, ali.getMaxMemory()); + assertEquals(2, ali.getMaxStorage()); + assertEquals(3, ali.getMaxStreams()); + assertEquals(4, ali.getMaxConsumers()); + } + + @Test + public void testAccountStatsImpl() { + String json = dataAsString("AccountStatsImpl.json"); + NatsJetStream.AccountStatsImpl asi = new NatsJetStream.AccountStatsImpl(json); + assertEquals(1, asi.getMemory()); + assertEquals(2, asi.getStorage()); + assertEquals(3, asi.getStreams()); + assertEquals(4, asi.getConsumers()); + } + + @Test + public void testConsumerLister() { + String json = dataAsString("ConsumerLister.json"); + ConsumerLister cl = new ConsumerLister(json); + assertEquals(2, cl.getTotal()); + assertEquals(42, cl.getOffset()); + assertEquals(256, cl.getLimit()); + assertEquals(2, cl.getConsumers().size()); + + ConsumerInfo ci = cl.getConsumers().get(0); + assertEquals("stream-1", ci.getStreamName()); + assertEquals("cname1", ci.getName()); + assertEquals(JsonUtils.parseDateTime("2021-01-20T23:41:08.579594Z"), ci.getCreationTime()); + assertEquals(5, ci.getNumAckPending()); + assertEquals(6, ci.getRedelivered()); + assertEquals(7, ci.getNumWaiting()); + assertEquals(8, ci.getNumPending()); + + ConsumerConfiguration cc = ci.getConsumerConfiguration(); + assertEquals("cname1", cc.getDurable()); + assertEquals("strm1-deliver", cc.getDeliverSubject()); + assertEquals(ConsumerConfiguration.DeliverPolicy.All, cc.getDeliverPolicy()); + assertEquals(ConsumerConfiguration.AckPolicy.Explicit, cc.getAckPolicy()); + assertEquals(Duration.ofSeconds(30), cc.getAckWait()); + assertEquals(99, cc.getMaxDeliver()); + assertEquals(ConsumerConfiguration.ReplayPolicy.Instant, cc.getReplayPolicy()); + + ConsumerInfo.SequencePair sp = ci.getDelivered(); + assertEquals(1, sp.getConsumerSequence()); + assertEquals(2, sp.getStreamSequence()); + + sp = ci.getAckFloor(); + assertEquals(3, sp.getConsumerSequence()); + assertEquals(4, sp.getStreamSequence()); + + cl = new ConsumerLister("{}"); + assertEquals(0, cl.getTotal()); + assertEquals(0, cl.getOffset()); + assertEquals(0, cl.getLimit()); + assertEquals(0, cl.getConsumers().size()); + } + + @Test + public void testStreamInfo() { + String json = dataAsString("StreamInfo.json"); + StreamInfo si = new StreamInfo(json); + assertEquals(JsonUtils.parseDateTime("2021-01-25T20:09:10.6225191Z"), si.getCreateTime()); + + StreamConfiguration sc = si.getConfiguration(); + assertEquals("streamName", sc.getName()); + assertEquals(2, sc.getSubjects().length); + assertEquals("sub0", sc.getSubjects()[0]); + assertEquals("sub1", sc.getSubjects()[1]); + + assertEquals(StreamConfiguration.RetentionPolicy.Limits, sc.getRetentionPolicy()); + assertEquals(StreamConfiguration.DiscardPolicy.Old, sc.getDiscardPolicy()); + assertEquals(StreamConfiguration.StorageType.Memory, sc.getStorageType()); + + assertNotNull(si.getConfiguration()); + assertNotNull(si.getStreamState()); + assertEquals(1, sc.getMaxConsumers()); + assertEquals(2, sc.getMaxMsgs()); + assertEquals(3, sc.getMaxBytes()); + assertEquals(4, sc.getMaxMsgSize()); + assertEquals(5, sc.getReplicas()); + + assertEquals(Duration.ofSeconds(100), sc.getMaxAge()); + assertEquals(Duration.ofSeconds(120), sc.getDuplicateWindow()); + + StreamInfo.StreamState ss = si.getStreamState(); + assertEquals(11, ss.getMsgCount()); + assertEquals(12, ss.getByteCount()); + assertEquals(13, ss.getFirstSequence()); + assertEquals(14, ss.getLastSequence()); + assertEquals(15, ss.getConsumerCount()); + + assertEquals(JsonUtils.parseDateTime("0001-01-01T00:00:00Z"), ss.getFirstTime()); + assertEquals(JsonUtils.parseDateTime("0001-01-01T00:00:00Z"), ss.getLastTime()); + + si = new StreamInfo("{}"); + assertNull(si.getCreateTime()); + assertNotNull(si.getStreamState()); + assertNotNull(si.getConfiguration()); + } + + @Test + public void testConsumerInfo() { + ConsumerInfo ci = new ConsumerInfo("{}"); + assertNull(ci.getStreamName()); + assertNull(ci.getName()); + assertNull(ci.getCreationTime()); + assertNotNull(ci.getConsumerConfiguration()); + assertNotNull(ci.getDelivered()); + assertNotNull(ci.getAckFloor()); + assertEquals(0, ci.getNumPending()); + assertEquals(0, ci.getNumWaiting()); + assertEquals(0, ci.getNumAckPending()); + assertEquals(0, ci.getRedelivered()); + } +} diff --git a/src/test/java/io/nats/client/JetStreamManagementTests.java b/src/test/java/io/nats/client/JetStreamManagementTests.java new file mode 100644 index 000000000..6a5e987c0 --- /dev/null +++ b/src/test/java/io/nats/client/JetStreamManagementTests.java @@ -0,0 +1,173 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client; + +import io.nats.client.StreamConfiguration.DiscardPolicy; +import io.nats.client.StreamConfiguration.RetentionPolicy; +import io.nats.client.StreamConfiguration.StorageType; +import io.nats.client.impl.JetStreamApiException; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.time.Duration; +import java.time.LocalTime; + +import static org.junit.jupiter.api.Assertions.*; + +public class JetStreamManagementTests { + private static final String STREAM1 = "test-stream-1"; + private static final String STREAM2 = "test-stream-2"; + private static final String STRM1SUB1 = "strm1sub1"; + private static final String STRM1SUB2 = "strm1sub2"; + private static final String STRM2SUB1 = "strm2sub1"; + private static final String STRM2SUB2 = "strm2sub2"; + + @Test + public void addStream() throws Exception { + try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { + LocalTime now = LocalTime.now(); + + JetStream js = nc.jetStream(); + StreamInfo si = addTestStream(js); + assertTrue(now.isBefore(si.getCreateTime().toLocalTime())); + + StreamConfiguration sc = si.getConfiguration(); + assertNotNull(sc); + assertEquals(STREAM1, sc.getName()); + assertNotNull(sc.getSubjects()); + assertEquals(2, sc.getSubjects().length); + assertEquals(STRM1SUB1, sc.getSubjects()[0]); + assertEquals(STRM1SUB2, sc.getSubjects()[1]); + assertEquals(RetentionPolicy.Limits, sc.getRetentionPolicy()); + assertEquals(-1, sc.getMaxConsumers()); + assertEquals(-1, sc.getMaxBytes()); + assertEquals(-1, sc.getMaxMsgSize()); + assertEquals(Duration.ZERO, sc.getMaxAge()); + assertEquals(StorageType.Memory, sc.getStorageType()); + assertEquals(DiscardPolicy.Old, sc.getDiscardPolicy()); + assertEquals(1, sc.getReplicas()); + assertFalse(sc.getNoAck()); + assertEquals(Duration.ofMinutes(2), sc.getDuplicateWindow()); + assertNull(sc.getTemplate()); + + StreamInfo.StreamState state = si.getStreamState(); + assertNotNull(state); + assertEquals(0, state.getMsgCount()); + assertEquals(0, state.getByteCount()); + assertEquals(0, state.getFirstSequence()); + assertEquals(0, state.getMsgCount()); + assertEquals(0, state.getLastSequence()); + assertEquals(0, state.getConsumerCount()); + } + } + + @Test + public void updateStream() throws Exception { + try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { + JetStream js = nc.jetStream(); + addTestStream(js); + + StreamConfiguration sc = StreamConfiguration.builder() + .name(STREAM1) + .storageType(StorageType.Memory) + .subjects(STRM2SUB1, STRM2SUB2) + .maxBytes(43) + .maxMsgSize(44) + .maxAge(Duration.ofDays(100)) + .discardPolicy(DiscardPolicy.New) + .noAck(true) + .duplicateWindow(Duration.ofMinutes(3)) + .build(); + StreamInfo si = js.updateStream(sc); + assertNotNull(si); + + sc = si.getConfiguration(); + assertNotNull(sc); + assertEquals(STREAM1, sc.getName()); + assertNotNull(sc.getSubjects()); + assertEquals(2, sc.getSubjects().length); + assertEquals(STRM2SUB1, sc.getSubjects()[0]); + assertEquals(STRM2SUB2, sc.getSubjects()[1]); + assertEquals(43, sc.getMaxBytes()); + assertEquals(44, sc.getMaxMsgSize()); + assertEquals(Duration.ofDays(100), sc.getMaxAge()); + assertEquals(StorageType.Memory, sc.getStorageType()); + assertEquals(DiscardPolicy.New, sc.getDiscardPolicy()); + assertEquals(1, sc.getReplicas()); + assertTrue(sc.getNoAck()); + assertEquals(Duration.ofMinutes(3), sc.getDuplicateWindow()); + assertNull(sc.getTemplate()); + } + } + + @Test + public void addOrUpdateStream_nullConfiguration_isNotValid() throws Exception { + try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { + JetStream js = nc.jetStream(); + assertThrows(IllegalArgumentException.class, () -> js.updateStream(null)); + } + } + + @Test + public void updateStream_cannotUpdate_nonExistentStream() throws Exception { + try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { + JetStream js = nc.jetStream(); + StreamConfiguration sc = getTestStreamConfiguration(); + assertThrows(JetStreamApiException.class, () -> js.updateStream(sc)); + } + } + + @Test + public void updateStream_cannotChangeMaxConsumers() throws Exception { + try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { + JetStream js = nc.jetStream(); + addTestStream(js); + StreamConfiguration sc = getTestStreamConfigurationBuilder() + .maxConsumers(2) + .build(); + assertThrows(JetStreamApiException.class, () -> js.updateStream(sc)); + } + } + + @Test + public void testUpdateStream_cannotChangeRetentionPolicy() throws Exception { + try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { + JetStream js = nc.jetStream(); + addTestStream(js); + StreamConfiguration sc = getTestStreamConfigurationBuilder() + .retentionPolicy(RetentionPolicy.Interest) + .build(); + assertThrows(JetStreamApiException.class, () -> js.updateStream(sc)); + } + } + + + private StreamInfo addTestStream(JetStream js) throws IOException, JetStreamApiException { + StreamInfo si = js.addStream(getTestStreamConfiguration()); + assertNotNull(si); + + return si; + } + + private StreamConfiguration getTestStreamConfiguration() { + return getTestStreamConfigurationBuilder().build(); + } + + private StreamConfiguration.Builder getTestStreamConfigurationBuilder() { + return StreamConfiguration.builder() + .name(STREAM1) + .storageType(StorageType.Memory) + .subjects(STRM1SUB1, STRM1SUB2); + } +} diff --git a/src/test/java/io/nats/client/JetstreamOptionsTests.java b/src/test/java/io/nats/client/JetstreamOptionsTests.java deleted file mode 100644 index 1e3af6dde..000000000 --- a/src/test/java/io/nats/client/JetstreamOptionsTests.java +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2020 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package io.nats.client; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.time.Duration; - -import org.junit.jupiter.api.Test; - -public class JetstreamOptionsTests { - - @Test - public void testOptions() { - JetStreamOptions jo = JetStreamOptions.builder().requestTimeout(Duration.ofSeconds(42)).prefix("pre").direct(true).build(); - assertEquals("pre", jo.getPrefix()); - assertEquals(Duration.ofSeconds(42), jo.getRequestTimeout()); - assertEquals(true, jo.isDirectMode()); - } - - @Test - public void testInvalidPrefix() { - assertThrows(IllegalArgumentException.class, () -> { JetStreamOptions.builder().prefix(">").build();}); - assertThrows(IllegalArgumentException.class, () -> { JetStreamOptions.builder().prefix("*").build();}); - } - - public void testDefaults() { - - } -} \ No newline at end of file diff --git a/src/test/java/io/nats/client/JetstreamTests.java b/src/test/java/io/nats/client/JetstreamTests.java index 6c5a1462f..137007ffb 100644 --- a/src/test/java/io/nats/client/JetstreamTests.java +++ b/src/test/java/io/nats/client/JetstreamTests.java @@ -18,8 +18,8 @@ import io.nats.client.StreamConfiguration.StorageType; import io.nats.client.StreamInfo.StreamState; import io.nats.client.impl.Headers; +import io.nats.client.impl.JetStreamApiException; import io.nats.client.impl.NatsMessage; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -27,24 +27,15 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import static io.nats.client.utils.TestMacros.sleep; import static org.junit.jupiter.api.Assertions.*; public class JetstreamTests { - private static StreamInfo createMemoryStream(JetStream js, String streamName, String subject) - throws TimeoutException, InterruptedException { - String[] subjects = new String[1]; - subjects[0] = subject; - - return createMemoryStream(js, streamName, subjects); - } - - private static StreamInfo createMemoryStream(JetStream js, String streamName, String[] subjects) - throws TimeoutException, InterruptedException { + private static StreamInfo createMemoryStream(JetStream js, String streamName, String... subjects) + throws IOException, JetStreamApiException { StreamConfiguration sc = StreamConfiguration.builder().name(streamName).storageType(StorageType.Memory) .subjects(subjects).build(); @@ -53,7 +44,7 @@ private static StreamInfo createMemoryStream(JetStream js, String streamName, St } @Test - public void testStreamAndConsumerCreate() throws IOException, InterruptedException, ExecutionException { + public void testStreamAndConsumerCreate() throws IOException, InterruptedException { try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { String[] subjects = { "foo" }; @@ -88,69 +79,52 @@ public void testStreamAndConsumerCreate() throws IOException, InterruptedExcepti } @Test - public void testJetstreamPublishDefaultOptions() throws IOException, InterruptedException, ExecutionException { + public void testJetstreamPublishDefaultOptions() throws IOException, JetStreamApiException, InterruptedException { try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { - - try { - JetStream js = nc.jetStream(); - createMemoryStream(js, "foo-stream", "foo"); - - PublishAck ack = js.publish("foo", null); - assertEquals(1, ack.getSeqno()); - } catch (Exception ex) { - Assertions.fail("Exception: " + ex.getMessage()); - } finally { - nc.close(); - } + JetStream js = nc.jetStream(); + createMemoryStream(js, "foo-stream", "foo"); + PublishAck ack = js.publish("foo", null); + assertEquals(1, ack.getSeqno()); } } @Test - public void testJetstreamNotAvailable() throws IOException, InterruptedException, ExecutionException { + public void testJetstreamNotAvailable() throws IOException, InterruptedException { try (NatsTestServer ts = new NatsTestServer(false, false); Connection nc = Nats.connect(ts.getURI())) { - assertThrows(TimeoutException.class, () -> { - nc.jetStream(); - }); + assertThrows(IOException.class, nc::jetStream); } } @Test - public void testJetstreamPublish() throws IOException, InterruptedException, ExecutionException, TimeoutException { + public void testJetstreamPublish() throws IOException, InterruptedException, JetStreamApiException { try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { + JetStream js = nc.jetStream(); - try { - JetStream js = nc.jetStream(); - - // check for failure w/ no stream. - assertThrows(Exception.class, () -> js.publish("foo", "hello".getBytes())); - - createMemoryStream(js, "foo-stream", "foo"); + // check for failure w/ no stream. + assertThrows(Exception.class, () -> js.publish("foo", "hello".getBytes())); - // this should succeed - js.publish("foo", "hello".getBytes()); + createMemoryStream(js, "foo-stream", "foo"); - // Set the stream and publish. - PublishOptions popts = PublishOptions.builder().stream("foo-stream").build(); - js.publish("foo", null, popts); + // this should succeed + js.publish("foo", "hello".getBytes()); - popts.setStream("bar-stream"); - assertThrows(Exception.class, () -> js.publish("foo", "hello".getBytes(), popts)); + // Set the stream and publish. + PublishOptions pubOpts = PublishOptions.builder().stream("foo-stream").build(); + js.publish("foo", null, pubOpts); - PublishAck pa = js.publish("foo", null); - assertEquals(4, pa.getSeqno()); - assertEquals("foo-stream", pa.getStream()); - assertFalse(pa.isDuplicate()); + pubOpts.setStream("bar-stream"); + Throwable t = assertThrows(IOException.class, () -> js.publish("foo", "hello".getBytes(), pubOpts)); + assertTrue(t.getMessage().contains("Expected ack from stream bar-stream, received from: foo-stream")); - } catch (Exception ex) { - Assertions.fail(ex); - } finally { - nc.close(); - } + PublishAck pa = js.publish("foo", null); + assertEquals(4, pa.getSeqno()); + assertEquals("foo-stream", pa.getStream()); + assertFalse(pa.isDuplicate()); } } @Test - public void testJetstreamPublishOptions() throws IOException, InterruptedException,ExecutionException, TimeoutException { + public void testJetstreamPublishOptions() throws IOException, InterruptedException { try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { @@ -163,7 +137,7 @@ public void testJetstreamPublishOptions() throws IOException, InterruptedExcepti // check with no previous message id opts.setExpectedLastMsgId("invalid"); - assertThrows(IllegalStateException.class, ()-> { js.publish("foo", null, opts); }); + assertThrows(IllegalStateException.class, ()-> js.publish("foo", null, opts)); // this should succeed. Reset our last expected Msg ID, and set this one. opts.setExpectedLastMsgId(null); @@ -178,7 +152,7 @@ public void testJetstreamPublishOptions() throws IOException, InterruptedExcepti // test invalid last ID. opts.setMessageId(null); opts.setExpectedLastMsgId("invalid"); - assertThrows(IllegalStateException.class, ()-> { js.publish("foo", null, opts); }); + assertThrows(IllegalStateException.class, ()-> js.publish("foo", null, opts)); // We're expecting two messages. Reset the last expeccted ID. opts.setExpectedLastMsgId(null); @@ -187,16 +161,16 @@ public void testJetstreamPublishOptions() throws IOException, InterruptedExcepti // invalid last sequence. opts.setExpectedLastSeqence(42); - assertThrows(IllegalStateException.class, ()-> { js.publish("foo", null, opts); }); + assertThrows(IllegalStateException.class, ()-> js.publish("foo", null, opts)); - // check success - TODO - debug... - // opts.setExpectedStream("foo-stream"); - // opts.setExpectedLastSeqence(PublishOptions.unsetLastSequence); - // js.publish("foo", null, opts); + // check success + opts.setExpectedStream("foo-stream"); + opts.setExpectedLastSeqence(PublishOptions.unsetLastSequence); + js.publish("foo", null, opts); // check failure opts.setExpectedStream("oof"); - assertThrows(IllegalStateException.class, ()-> { js.publish("foo", null, opts); }); + assertThrows(IllegalStateException.class, ()-> js.publish("foo", null, opts)); } catch (Exception ex) { Assertions.fail(ex); @@ -208,7 +182,7 @@ public void testJetstreamPublishOptions() throws IOException, InterruptedExcepti } @Test - public void testJetstreamSubscribe() throws IOException, InterruptedException,ExecutionException, TimeoutException { + public void testJetstreamSubscribe() throws IOException, InterruptedException { try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { @@ -226,7 +200,7 @@ public void testJetstreamSubscribe() throws IOException, InterruptedException,Ex Subscription s = js.subscribe("foo"); Message m = s.nextMessage(Duration.ofSeconds(5)); assertNotNull(m); - assertEquals(new String("payload"), new String(m.getData())); + assertEquals("payload", new String(m.getData())); // set the stream ConsumerConfiguration c = ConsumerConfiguration.builder().build(); @@ -234,7 +208,7 @@ public void testJetstreamSubscribe() throws IOException, InterruptedException,Ex s = js.subscribe("foo", so); m = s.nextMessage(Duration.ofSeconds(5)); assertNotNull(m); - assertEquals(new String("payload"), new String(m.getData())); + assertEquals("payload", new String(m.getData())); s.unsubscribe(); // FIXME test an invalid stream - is this a bug??? @@ -246,9 +220,7 @@ public void testJetstreamSubscribe() throws IOException, InterruptedException,Ex // try using a dispatcher and an ephemeral consumer. CountDownLatch latch = new CountDownLatch(1); Dispatcher d = nc.createDispatcher(null); - JetStreamSubscription jsub = js.subscribe("foo", d, (msg) -> { - latch.countDown(); - }); + JetStreamSubscription jsub = js.subscribe("foo", d, (msg) -> latch.countDown()); assertTrue(latch.await(5, TimeUnit.SECONDS)); @@ -303,11 +275,7 @@ private static long waitForPending(Subscription sub, int batch, Duration d) { if (count == batch) { break; } - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // noop - } + sleep(50); if (System.currentTimeMillis() - start > d.toMillis()) { break; } @@ -316,7 +284,7 @@ private static long waitForPending(Subscription sub, int batch, Duration d) { } @Test - public void testJetstreamPullBasedSubscribe() throws IOException, InterruptedException,ExecutionException, TimeoutException { + public void testJetstreamPullBasedSubscribe() throws IOException, InterruptedException { try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { @@ -329,10 +297,7 @@ public void testJetstreamPullBasedSubscribe() throws IOException, InterruptedExc // check invalid subscription Dispatcher d = nc.createDispatcher(null); final SubscribeOptions sop = SubscribeOptions.builder().pull(10).build(); - assertThrows(IllegalStateException.class, () -> { - js.subscribe("bar", d, (msg) -> {}, sop); - }); - + assertThrows(IllegalStateException.class, () -> js.subscribe("bar", d, (msg) -> {}, sop)); int batch = 5; int toSend = 10; @@ -372,10 +337,9 @@ public void testJetstreamPullBasedSubscribe() throws IOException, InterruptedExc jsub.unsubscribe(); - assertThrows(IllegalStateException.class, () -> { + assertThrows(JetStreamApiException.class, () -> js.subscribe("baz", SubscribeOptions.builder(). - attach("test-stream", "rip").pull(batch).build()); - }); + attach("test-stream", "rip").pull(batch).build())); // send 10 more messages. for (int i = 0; i < toSend; i++) { @@ -460,7 +424,7 @@ public void negativePathwaysCoverage() throws Exception { } @Test - public void testJetstreamAttachDirectNoConsumer() throws IOException, InterruptedException,ExecutionException, TimeoutException { + public void testJetstreamAttachDirectNoConsumer() throws IOException, InterruptedException { try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { @@ -494,7 +458,7 @@ public void testJetstreamAttachDirectNoConsumer() throws IOException, Interrupte } @Test - public void testJetstreamAttachDirectWithConsumer() throws IOException, InterruptedException,ExecutionException, TimeoutException { + public void testJetstreamAttachDirectWithConsumer() throws IOException, InterruptedException { try (NatsTestServer ts = new NatsTestServer(false, true); Connection nc = Nats.connect(ts.getURI())) { diff --git a/src/test/java/io/nats/client/NKeyTests.java b/src/test/java/io/nats/client/NKeyTests.java index 33d040e18..953cf0fb7 100644 --- a/src/test/java/io/nats/client/NKeyTests.java +++ b/src/test/java/io/nats/client/NKeyTests.java @@ -23,7 +23,7 @@ import java.util.Base64; import java.util.List; -import static io.nats.client.utils.ResourceUtils.getFileFromResourceAsStream; +import static io.nats.client.utils.ResourceUtils.resourceAsLines; import static org.junit.jupiter.api.Assertions.*; public class NKeyTests { @@ -67,8 +67,8 @@ public void testCRC16() { } @Test - public void testBase32() throws Exception { - List inputs = getFileFromResourceAsStream("utf8-test-strings.txt"); + public void testBase32() { + List inputs = resourceAsLines("data/utf8-test-strings.txt"); for (String expected : inputs) { byte[] bytes = expected.getBytes(StandardCharsets.UTF_8); diff --git a/src/test/java/io/nats/client/SubscriberTests.java b/src/test/java/io/nats/client/SubscriberTests.java index d1554fd5d..cce1c3f53 100644 --- a/src/test/java/io/nats/client/SubscriberTests.java +++ b/src/test/java/io/nats/client/SubscriberTests.java @@ -22,7 +22,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; -import static io.nats.client.utils.ResourceUtils.getFileFromResourceAsStream; +import static io.nats.client.utils.ResourceUtils.resourceAsLines; import static io.nats.client.utils.TestMacros.standardCloseConnection; import static io.nats.client.utils.TestMacros.standardConnectionWait; import static org.junit.jupiter.api.Assertions.*; @@ -173,7 +173,7 @@ public void testUTF8Subjects() throws IOException, TimeoutException, Interrupted standardConnectionWait(nc); // Some UTF8 from http://www.columbia.edu/~fdc/utf8/ - List subjects = getFileFromResourceAsStream("utf8-test-strings.txt"); + List subjects = resourceAsLines("data/utf8-test-strings.txt"); for (String subject : subjects) { subject = subject.replace(" ",""); // get rid of spaces diff --git a/src/test/java/io/nats/client/impl/ByteArrayBuilderTests.java b/src/test/java/io/nats/client/impl/ByteArrayBuilderTests.java index 2baf1a5f6..db8c2e6da 100644 --- a/src/test/java/io/nats/client/impl/ByteArrayBuilderTests.java +++ b/src/test/java/io/nats/client/impl/ByteArrayBuilderTests.java @@ -2,7 +2,6 @@ import org.junit.jupiter.api.Test; -import java.io.IOException; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -10,21 +9,21 @@ import java.util.List; import java.util.Random; -import static io.nats.client.utils.ResourceUtils.getFileFromResourceAsStream; +import static io.nats.client.support.RandomUtils.PRAND; +import static io.nats.client.utils.ResourceUtils.resourceAsLines; import static org.junit.jupiter.api.Assertions.assertEquals; public class ByteArrayBuilderTests { @Test - public void byte_array_builder_works() throws IOException { - Random r = new Random(); + public void byte_array_builder_works() { ByteArrayBuilder bab = new ByteArrayBuilder(); String testString = "abcdefghij"; - _test(r, bab, Collections.singletonList(testString), StandardCharsets.US_ASCII); + _test(PRAND, bab, Collections.singletonList(testString), StandardCharsets.US_ASCII); - List subjects = getFileFromResourceAsStream("utf8-test-strings.txt"); + List subjects = resourceAsLines("data/utf8-test-strings.txt"); bab = new ByteArrayBuilder(StandardCharsets.UTF_8); - _test(r, bab, subjects, StandardCharsets.UTF_8); + _test(PRAND, bab, subjects, StandardCharsets.UTF_8); } @Test diff --git a/src/test/java/io/nats/client/impl/JetstreamAPIResponseTests.java b/src/test/java/io/nats/client/impl/JetStreamApiResponseTests.java similarity index 73% rename from src/test/java/io/nats/client/impl/JetstreamAPIResponseTests.java rename to src/test/java/io/nats/client/impl/JetStreamApiResponseTests.java index 424fe89d0..b9054ab69 100644 --- a/src/test/java/io/nats/client/impl/JetstreamAPIResponseTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamApiResponseTests.java @@ -13,19 +13,17 @@ package io.nats.client.impl; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - import org.junit.jupiter.api.Test; -public class JetstreamAPIResponseTests { +import static org.junit.jupiter.api.Assertions.*; + +public class JetStreamApiResponseTests { @Test void testErrorResponses() { String msg = "Test generated error."; String json = "{\"type\" : \"thetype\",\"code\" : 1234, \"description\" : \"" + msg + "\"}"; - JetstreamAPIResponse resp = new JetstreamAPIResponse(json.getBytes()); - assertTrue(JetstreamAPIResponse.isError(json)); + JetStreamApiResponse resp = new JetStreamApiResponse(json.getBytes()); + assertTrue(JetStreamApiResponse.isError(json)); assertTrue(resp.hasError()); assertEquals(1234, resp.getCode()); assertEquals("thetype", resp.getType()); @@ -37,8 +35,8 @@ public class JetstreamAPIResponseTests { public void testSuccessResponse() { String json = "{\"whatever\":\"value\"}"; - JetstreamAPIResponse resp = new JetstreamAPIResponse(json.getBytes()); - assertFalse(JetstreamAPIResponse.isError(json)); + JetStreamApiResponse resp = new JetStreamApiResponse(json.getBytes()); + assertFalse(JetStreamApiResponse.isError(json)); assertEquals(-1, resp.getCode()); assertEquals(null, resp.getDescription()); assertFalse(resp.hasError()); diff --git a/src/test/java/io/nats/client/support/JsonUtilsTests.java b/src/test/java/io/nats/client/support/JsonUtilsTests.java new file mode 100644 index 000000000..53531c043 --- /dev/null +++ b/src/test/java/io/nats/client/support/JsonUtilsTests.java @@ -0,0 +1,81 @@ +// Copyright 2015-2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.support; + +import io.nats.client.ConsumerInfo; +import io.nats.client.ConsumerLister; +import io.nats.client.StreamInfo; +import io.nats.client.impl.JsonUtils; +import io.nats.client.utils.ResourceUtils; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static io.nats.client.impl.JsonUtils.parseStringArray; +import static io.nats.client.utils.ResourceUtils.dataAsString; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public final class JsonUtilsTests { + + @Test + public void testParseStringArray() { + String[] a = parseStringArray("fieldName", "...\"fieldName\": [\n ],..."); + assertNotNull(a); + assertEquals(0, a.length); + + a = parseStringArray("fieldName", "...\"fieldName\": [\n \"value1\"\n ],..."); + assertNotNull(a); + assertEquals(1, a.length); + assertEquals("value1", a[0]); + + a = parseStringArray("fieldName", "...\"fieldName\": [\n \"value1\",\n \"value2\"\n ],..."); + assertNotNull(a); + assertEquals(2, a.length); + assertEquals("value1", a[0]); + assertEquals("value2", a[1]); + } + + @Test + public void testGetJSONArray() { + String json = ResourceUtils.resourceAsString("data/ConsumerLister.json"); + ConsumerLister cl = new ConsumerLister(json); + assertEquals(2, cl.getTotal()); + assertEquals(42, cl.getOffset()); + assertEquals(256, cl.getLimit()); + List consumers = cl.getConsumers(); + assertNotNull(consumers); + assertEquals(2, consumers.size()); + } + + @Test + public void testCoverage_JsonUtils_addFld() { + StringBuilder sb = new StringBuilder(); + assertEquals(0, sb.length()); + String[] strArray = null; + JsonUtils.addFld(sb, "na", strArray); + assertEquals(0, sb.length()); + strArray = new String[]{}; + JsonUtils.addFld(sb, "na", strArray); + assertEquals(0, sb.length()); + } + + @Test + public void testCoverage_printable() { + // doesn't really test anything, this is not production code. just for coverage + DebugUtil.printable(new ConsumerLister(dataAsString("ConsumerLister.json"))); + DebugUtil.printable(new StreamInfo(dataAsString("StreamInfo.json"))); + } +} + diff --git a/src/test/java/io/nats/client/support/ValidatorTests.java b/src/test/java/io/nats/client/support/ValidatorTests.java index 68eb640a5..a227505eb 100644 --- a/src/test/java/io/nats/client/support/ValidatorTests.java +++ b/src/test/java/io/nats/client/support/ValidatorTests.java @@ -16,13 +16,14 @@ import org.junit.jupiter.api.Test; import static io.nats.client.support.NatsConstants.EMPTY; +import static io.nats.client.support.Validator.validateNotNull; import static io.nats.client.support.Validator.validatePullBatchSize; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class ValidatorTests { - private static final String PLAIN = "plain"; + private static final String PLAIN = "plain"; private static final String HAS_SPACE = "has space"; private static final String HAS_DASH = "has-dash"; private static final String HAS_DOT = "has.dot"; @@ -61,14 +62,14 @@ public void testValidateStreamName() { @Test public void testValidateConsumer() { - allowed(Validator::validateConsumer, null, PLAIN, HAS_SPACE, HAS_DASH, HAS_DOT, HAS_STAR, HAS_GT); - notAllowed(Validator::validateConsumer, EMPTY); + allowed(Validator::validateConsumer, null, PLAIN, HAS_SPACE, HAS_DASH); + notAllowed(Validator::validateConsumer, EMPTY, HAS_DOT, HAS_STAR, HAS_GT); } @Test public void testValidateDurable() { - allowed(Validator::validateDurable, null, PLAIN, HAS_SPACE, HAS_DASH, HAS_DOT, HAS_STAR, HAS_GT); - notAllowed(Validator::validateDurable, EMPTY); + allowed(Validator::validateDurable, PLAIN, HAS_SPACE, HAS_DASH); + notAllowed(Validator::validateDurable, null, EMPTY, HAS_DOT, HAS_STAR, HAS_GT); } @Test @@ -78,12 +79,20 @@ public void testValidateDeliverSubject() { } @Test - public void testvalidatePullBatchSize() { + public void testValidatePullBatchSize() { assertEquals(0, validatePullBatchSize(0)); assertEquals(1, validatePullBatchSize(1)); assertThrows(IllegalArgumentException.class, () -> validatePullBatchSize(-1)); } + @Test + public void testNotNull() { + Object o = null; + String s = null; + assertThrows(IllegalArgumentException.class, () -> validateNotNull(o, "fieldName")); + assertThrows(IllegalArgumentException.class, () -> validateNotNull(s, "fieldName")); + } + interface StringTest { String validate(String s); } private void allowed(StringTest test, String... strings) { diff --git a/src/test/java/io/nats/client/utils/ResourceUtils.java b/src/test/java/io/nats/client/utils/ResourceUtils.java index 367a12489..038edfaf4 100644 --- a/src/test/java/io/nats/client/utils/ResourceUtils.java +++ b/src/test/java/io/nats/client/utils/ResourceUtils.java @@ -1,15 +1,37 @@ package io.nats.client.utils; import java.io.File; -import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.List; public abstract class ResourceUtils { + public static List dataAsLines(String fileName) { + return resourceAsLines("data/" + fileName); + } + + public static String dataAsString(String fileName) { + return resourceAsString("data/" + fileName); + } + + public static List resourceAsLines(String fileName) { + try { + ClassLoader classLoader = ResourceUtils.class.getClassLoader(); + File file = new File(classLoader.getResource(fileName).getFile()); + return Files.readAllLines(file.toPath()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } - public static List getFileFromResourceAsStream(String fileName) throws IOException { - ClassLoader classLoader = ResourceUtils.class.getClassLoader(); - File file = new File(classLoader.getResource(fileName).getFile()); - return Files.readAllLines(file.toPath()); + public static String resourceAsString(String fileName) { + try { + ClassLoader classLoader = ResourceUtils.class.getClassLoader(); + File file = new File(classLoader.getResource(fileName).getFile()); + return new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } } } diff --git a/src/test/resources/data/AccountLimitImpl.json b/src/test/resources/data/AccountLimitImpl.json new file mode 100644 index 000000000..664de394b --- /dev/null +++ b/src/test/resources/data/AccountLimitImpl.json @@ -0,0 +1,6 @@ +{ + "max_memory": 1, + "max_storage": 2, + "max_streams": 3, + "max_consumers": 4 +} \ No newline at end of file diff --git a/src/test/resources/data/AccountStatsImpl.json b/src/test/resources/data/AccountStatsImpl.json new file mode 100644 index 000000000..25a1b026d --- /dev/null +++ b/src/test/resources/data/AccountStatsImpl.json @@ -0,0 +1,6 @@ +{ + "memory": 1, + "storage": 2, + "streams": 3, + "consumers": 4 +} \ No newline at end of file diff --git a/src/test/resources/data/ConsumerLister.json b/src/test/resources/data/ConsumerLister.json new file mode 100644 index 000000000..348e0c4ae --- /dev/null +++ b/src/test/resources/data/ConsumerLister.json @@ -0,0 +1,60 @@ +{ + "type": "io.nats.jetstream.api.v1.consumer_list_response", + "total": 2, + "offset": 42, + "limit": 256, + "consumers": [ + { + "stream_name": "stream-1", + "name": "cname1", + "created": "2021-01-20T23:41:08.579594Z", + "config": { + "durable_name": "cname1", + "deliver_subject": "strm1-deliver", + "deliver_policy": "all", + "ack_policy": "explicit", + "ack_wait": 30000000000, + "max_deliver": 99, + "replay_policy": "instant" + }, + "delivered": { + "consumer_seq": 1, + "stream_seq": 2 + }, + "ack_floor": { + "consumer_seq": 3, + "stream_seq": 4 + }, + "num_ack_pending": 5, + "num_redelivered": 6, + "num_waiting": 7, + "num_pending": 8 + }, + { + "stream_name": "stream-1", + "name": "cname2", + "created": "2021-01-20T23:26:04.6445175Z", + "config": { + "durable_name": "cname2", + "deliver_subject": "strm1-deliver", + "deliver_policy": "all", + "ack_policy": "explicit", + "ack_wait": 30000000000, + "max_deliver": -1, + "replay_policy": "instant" + }, + "delivered": { + "consumer_seq": 0, + "stream_seq": 0 + }, + "ack_floor": { + "consumer_seq": 0, + "stream_seq": 0 + }, + "num_ack_pending": 0, + "num_redelivered": 0, + "num_waiting": 0, + "num_pending": 0 + } + ] +} diff --git a/src/test/resources/data/StreamInfo.json b/src/test/resources/data/StreamInfo.json new file mode 100644 index 000000000..65b38c907 --- /dev/null +++ b/src/test/resources/data/StreamInfo.json @@ -0,0 +1,30 @@ +{ + "type": "io.nats.jetstream.api.v1.stream_create_response", + "config": { + "name": "streamName", + "subjects": [ + "sub0", + "sub1" + ], + "retention": "limits", + "max_consumers": 1, + "max_msgs": 2, + "max_bytes": 3, + "discard": "old", + "max_age": 100000000000, + "max_msg_size": 4, + "storage": "memory", + "num_replicas": 5, + "duplicate_window": 120000000000 + }, + "created": "2021-01-25T20:09:10.6225191Z", + "state": { + "messages": 11, + "bytes": 12, + "first_seq": 13, + "first_ts": "0001-01-01T00:00:00Z", + "last_seq": 14, + "last_ts": "0001-01-01T00:00:00Z", + "consumer_count": 15 + } +} diff --git a/src/test/resources/utf8-test-strings.txt b/src/test/resources/data/utf8-test-strings.txt similarity index 100% rename from src/test/resources/utf8-test-strings.txt rename to src/test/resources/data/utf8-test-strings.txt