Skip to content

Commit

Permalink
393-js-publish-async plus management
Browse files Browse the repository at this point in the history
* 393-js-publish-async

* RandomUtils-tests-misc

* management

* management more (1)

* documentation and cleanup

* documentation and cleanup (2)

* consumer/durable name validation

* consumer info and stream info tests

* added javadoc
  • Loading branch information
scottf authored Jan 26, 2021
1 parent df4a044 commit ce1c050
Show file tree
Hide file tree
Showing 43 changed files with 2,013 additions and 652 deletions.
29 changes: 22 additions & 7 deletions src/examples/java/io/nats/examples/ExampleUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@

import io.nats.client.*;
import io.nats.client.StreamConfiguration.StorageType;
import io.nats.client.impl.JetStreamApiException;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeoutException;

public class ExampleUtils {
public static Options createExampleOptions(String server, boolean allowReconnect) throws Exception {
Expand Down Expand Up @@ -58,15 +59,20 @@ public void slowConsumerDetected(Connection conn, Consumer consumer) {
}

public static void createTestStream(JetStream js, String streamName, String subject)
throws TimeoutException, InterruptedException {
throws IOException, JetStreamApiException {
createTestStream(js, streamName, subject, StorageType.File);
}

public static void createTestStream(JetStream js, String streamName, String subject, StorageType storageType)
throws IOException, JetStreamApiException {

// Create a stream, here will use a file storage type, and one subject,
// the passed subject.
StreamConfiguration sc = StreamConfiguration.builder().
name(streamName).
storageType(StorageType.File).
subjects(new String[] { subject }).
build();
StreamConfiguration sc = StreamConfiguration.builder()
.name(streamName)
.storageType(storageType)
.subjects(subject)
.build();

// Add or use an existing stream.
StreamInfo si = js.addStream(sc);
Expand Down Expand Up @@ -104,6 +110,15 @@ public static ExampleArgs readReplyArgs(String[] args, String usageString) {
return readSubscribeArgs(args, usageString);
}

public static ExampleArgs readConsumerArgs(String[] args, String usageString) {
ExampleArgs ea = new ExampleArgs(args, false, usageString);
if (ea.msgCount < 1 || ea.stream == null || ea.consumer == null) {
System.out.println("Stream name and consumer name are required to attach.\nSubject and message count are required.\n");
usage(usageString);
}
return ea;
}

private static void usage(String usageString) {
System.out.println(usageString);
System.exit(-1);
Expand Down
132 changes: 132 additions & 0 deletions src/examples/java/io/nats/examples/NatsJsManage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples;

import io.nats.client.*;

import static io.nats.client.support.DebugUtil.printable;

public class NatsJsManage {

static final String usageString = "\nUsage: java NatsJsManage [-s server]\n"
+ "\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n"
+ "\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n"
+ "\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n"
+ "\nUse the URL for user/pass/token authentication.\n";

private static final String STREAM1 = "example-stream-1";
private static final String STREAM2 = "example-stream-2";
private static final String STRM1SUB1 = "strm1sub1";
private static final String STRM1SUB2 = "strm1sub2";
private static final String STRM2SUB1 = "strm2sub1";
private static final String STRM2SUB2 = "strm2sub2";

public static void main(String[] args) {
ExampleArgs exArgs = new ExampleArgs(args, false, usageString);

try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server, false))) {
// Create a jetstream context. This hangs off the original connection
// allowing us to produce data to streams and consume data from
// jetstream consumers.
JetStream js = nc.jetStream();

action("Configure And Add Stream 1");
StreamConfiguration sc1 = StreamConfiguration.builder()
.name(STREAM1)
.storageType(StreamConfiguration.StorageType.Memory)
.subjects(STRM1SUB1, STRM1SUB2)
.build();
js.addStream(sc1);

StreamInfo si = js.streamInfo(STREAM1);
printObject(si);

action("Configure And Add Stream 2");
StreamConfiguration sc2 = StreamConfiguration.builder()
.name(STREAM2)
.storageType(StreamConfiguration.StorageType.Memory)
.subjects(STRM2SUB1, STRM2SUB2)
.build();
js.addStream(sc2);

si = js.streamInfo(STREAM2);
printObject(si);

action("Delete Stream 2");
js.deleteStream(STREAM2);

action("Delete Non-Existent Stream");
try {
js.deleteStream(STREAM2);
}
catch (IllegalStateException ise) {
System.out.println(ise.getMessage());
}

action("Update Non-Existent Stream ");
try {
StreamConfiguration non = StreamConfiguration.builder()
.name(STREAM2)
.storageType(StreamConfiguration.StorageType.Memory)
.subjects(STRM2SUB1, STRM2SUB2)
.build();
js.updateStream(non);
}
catch (IllegalStateException ise) {
System.out.println(ise.getMessage());
}

action("Configure And Add Consumer");
ConsumerConfiguration cc = ConsumerConfiguration.builder()
.deliverSubject("strm1-deliver")
.durable("GQJ3IvWo")
.build();
ConsumerInfo ci = js.addConsumer(STREAM1, cc);
printObject(ci);
si = js.streamInfo(STREAM1);
printObject(si);

action("Make And Use Subscription");
SubscribeOptions so = SubscribeOptions.builder()
.pullDirect(STREAM1, "GQJ3IvWo", 10)
// .configuration(STREAM1, cc)
.build();
printObject(so);

si = js.streamInfo(STREAM1);
printObject(si);

JetStreamSubscription sub = js.subscribe(STRM1SUB1, so);
printObject(sub);

action("List Consumers");
ConsumerLister lister = js.newConsumerLister(STREAM1);
printObject(lister);
}
catch (Exception exp) {
action("EXCEPTION!!!");
exp.printStackTrace();
}
}

private static void action(String label) {
System.out.println("================================================================================");
System.out.println(label);
System.out.println("--------------------------------------------------------------------------------");
}

public static void printObject(Object o) {
System.out.println(printable(o.toString()) + "\n");
}
}
6 changes: 1 addition & 5 deletions src/examples/java/io/nats/examples/NatsJsSubAttach.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ public class NatsJsSubAttach {
+ "\nUse the URL for user/pass/token authentication.\n";

public static void main(String[] args) {
ExampleArgs exArgs = ExampleUtils.readSubscribeArgs(args, usageString);
ExampleArgs exArgs = ExampleUtils.readConsumerArgs(args, usageString);

if (exArgs.stream == null || exArgs.consumer == null) {
System.out.println("-stream <name> and -consumer <name> is required to attach.");
System.exit(1);
}
System.out.printf("\nTrying to connect to %s, and listen to %s for %d messages.\n\n", exArgs.server,
exArgs.subject, exArgs.msgCount);

Expand Down
8 changes: 2 additions & 6 deletions src/examples/java/io/nats/examples/NatsJsSubAttachDirect.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ public class NatsJsSubAttachDirect {
+ "\nUse the URL for user/pass/token authentication.\n";

public static void main(String[] args) {
ExampleArgs exArgs = ExampleUtils.readSubscribeArgs(args, usageString);
ExampleArgs exArgs = ExampleUtils.readConsumerArgs(args, usageString);

if (exArgs.stream == null || exArgs.consumer == null) {
System.out.println("-stream <name> and -consumer <name> is required to attach.");
System.exit(1);
}
System.out.printf("\nTrying to connect to %s, and listen to %s for %d messages.\n\n", exArgs.server,
exArgs.subject, exArgs.msgCount);

Expand All @@ -60,7 +56,7 @@ public static void main(String[] args) {
for(int i=1;i<=exArgs.msgCount;i++) {
Message msg = sub.nextMessage(Duration.ofHours(1));

System.out.printf("\nMessage Received:\n");
System.out.println("\nMessage Received:");

if (msg.hasHeaders()) {
System.out.println(" Headers:");
Expand Down
7 changes: 4 additions & 3 deletions src/examples/java/io/nats/examples/benchmark/Sample.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

package io.nats.examples.benchmark;

import static io.nats.examples.benchmark.Utils.humanBytes;

import io.nats.client.Statistics;

import java.text.DecimalFormat;

import static io.nats.examples.benchmark.Utils.humanBytes;

public class Sample {
int jobMsgCnt;
long msgCnt;
Expand Down Expand Up @@ -91,7 +92,7 @@ public double seconds() {
}

/**
* {@inheritDoc}.
* {@inheritDoc}
*/
@Override
public String toString() {
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/io/nats/client/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,18 +399,17 @@ enum Status {
* Gets a context for publishing and subscribing to subjects backed by Jetstream streams
* and consumers.
* @return a JetStream instance.
* @throws TimeoutException timed out verifying jetstream
* @throws InterruptedException the operation was interrupted
* @throws IOException various IO exception such as timeout or interruption
*/
JetStream jetStream() throws InterruptedException, TimeoutException;
JetStream jetStream() throws IOException;

/**
* Gets a context for publishing and subscribing to subjects backed by Jetstream streams
* and consumers.
* @param options JetStream options.
* @return a JetStream instance.
* @throws TimeoutException timed out verifying jetstream
* @throws InterruptedException the operation was interrupted
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
*/
JetStream jetStream(JetStreamOptions options) throws InterruptedException, TimeoutException;
JetStream jetStream(JetStreamOptions options) throws IOException;
}
20 changes: 20 additions & 0 deletions src/main/java/io/nats/client/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -649,4 +649,24 @@ public void setMaxAckPending(long maxAckPending) {
public long getMaxWaiting() {
return maxWaiting;
}

@Override
public String toString() {
return "ConsumerConfiguration{" +
"durable='" + durable + '\'' +
", deliverPolicy=" + deliverPolicy +
", deliverSubject='" + deliverSubject + '\'' +
", startSeq=" + startSeq +
", startTime=" + startTime +
", ackPolicy=" + ackPolicy +
", ackWait=" + ackWait +
", maxDeliver=" + maxDeliver +
", filterSubject='" + filterSubject + '\'' +
", replayPolicy=" + replayPolicy +
", sampleFrequency='" + sampleFrequency + '\'' +
", rateLimit=" + rateLimit +
", maxWaiting=" + maxWaiting +
", maxAckPending=" + maxAckPending +
'}';
}
}
Loading

0 comments on commit ce1c050

Please sign in to comment.