From e800736319a6c084960386a8ddcb71c687608472 Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 17 Oct 2023 22:16:47 -0400 Subject: [PATCH 1/4] Direct Message Subject Header May Contain Multiple Subjects --- .../java/io/nats/client/api/MessageInfo.java | 2 +- .../java/io/nats/client/impl/Headers.java | 11 + .../client/impl/JetStreamManagementTests.java | 273 ++++++++++-------- .../nats/client/impl/JetStreamTestBase.java | 2 +- 4 files changed, 171 insertions(+), 117 deletions(-) diff --git a/src/main/java/io/nats/client/api/MessageInfo.java b/src/main/java/io/nats/client/api/MessageInfo.java index c89c6fdff..4d2c143ef 100644 --- a/src/main/java/io/nats/client/api/MessageInfo.java +++ b/src/main/java/io/nats/client/api/MessageInfo.java @@ -65,7 +65,7 @@ public MessageInfo(Message msg, String streamName, boolean direct) { if (direct) { this.headers = msg.getHeaders(); - this.subject = headers.getFirst(NATS_SUBJECT); + this.subject = headers.getLast(NATS_SUBJECT); this.data = msg.getData(); seq = Long.parseLong(headers.getFirst(NATS_SEQUENCE)); time = DateTimeUtils.parseDateTime(headers.getFirst(NATS_TIMESTAMP)); diff --git a/src/main/java/io/nats/client/impl/Headers.java b/src/main/java/io/nats/client/impl/Headers.java index d5f7f0e95..2c1f01436 100644 --- a/src/main/java/io/nats/client/impl/Headers.java +++ b/src/main/java/io/nats/client/impl/Headers.java @@ -299,6 +299,17 @@ public String getFirst(String key) { return values == null ? null : values.get(0); } + /** + * Returns the last value for the specific (case sensitive) key. + * Will be {@code null} if the key is not found. + * + * @return the last value for the case sensitive key. + */ + public String getLast(String key) { + List values = valuesMap.get(key); + return values == null ? null : values.get(values.size() - 1); + } + /** * Returns a {@link List} view of the values for the specific (case insensitive) key. * Will be {@code null} if the key is not found. diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 4343b7487..c457e72be 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -101,14 +101,14 @@ public void testStreamCreate() throws Exception { @Test public void testStreamMetadata() throws Exception { - runInJsServer(nc -> { + jsServer.run(nc -> { Map metaData = new HashMap<>(); metaData.put("meta-foo", "meta-bar"); JetStreamManagement jsm = nc.jetStreamManagement(); StreamConfiguration sc = StreamConfiguration.builder() - .name(STREAM) + .name(stream()) .storageType(StorageType.Memory) - .subjects(subject(0), subject(1)) + .subjects(subject()) .metadata(metaData) .build(); @@ -127,13 +127,14 @@ public void testStreamMetadata() throws Exception { @Test public void testStreamCreateWithNoSubject() throws Exception { - runInJsServer(nc -> { + jsServer.run(nc -> { long now = ZonedDateTime.now().toEpochSecond(); JetStreamManagement jsm = nc.jetStreamManagement(); + String stream = stream(); StreamConfiguration sc = StreamConfiguration.builder() - .name(STREAM) + .name(stream) .storageType(StorageType.Memory) .build(); @@ -141,10 +142,10 @@ public void testStreamCreateWithNoSubject() throws Exception { assertTrue(now <= si.getCreateTime().toEpochSecond()); sc = si.getConfiguration(); - assertEquals(STREAM, sc.getName()); + assertEquals(stream, sc.getName()); assertEquals(1, sc.getSubjects().size()); - assertEquals(STREAM, sc.getSubjects().get(0)); + assertEquals(stream, sc.getSubjects().get(0)); assertEquals(RetentionPolicy.Limits, sc.getRetentionPolicy()); assertEquals(DiscardPolicy.Old, sc.getDiscardPolicy()); @@ -593,27 +594,27 @@ private void assertStreamNameList(List list, int... ids) { @Test public void testDeleteStream() throws Exception { - runInJsServer(nc -> { + jsServer.run(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); JetStreamApiException jsapiEx = - assertThrows(JetStreamApiException.class, () -> jsm.deleteStream(STREAM)); + assertThrows(JetStreamApiException.class, () -> jsm.deleteStream(stream())); assertEquals(10059, jsapiEx.getApiErrorCode()); - createDefaultTestStream(jsm); - assertNotNull(jsm.getStreamInfo(STREAM)); - assertTrue(jsm.deleteStream(STREAM)); + TestingStreamContainer tsc = new TestingStreamContainer(jsm); + assertNotNull(jsm.getStreamInfo(tsc.stream)); + assertTrue(jsm.deleteStream(tsc.stream)); - jsapiEx = assertThrows(JetStreamApiException.class, () -> jsm.getStreamInfo(STREAM)); + jsapiEx = assertThrows(JetStreamApiException.class, () -> jsm.getStreamInfo(tsc.stream)); assertEquals(10059, jsapiEx.getApiErrorCode()); - jsapiEx = assertThrows(JetStreamApiException.class, () -> jsm.deleteStream(STREAM)); + jsapiEx = assertThrows(JetStreamApiException.class, () -> jsm.deleteStream(tsc.stream)); assertEquals(10059, jsapiEx.getApiErrorCode()); }); } @Test public void testPurgeStreamAndOptions() throws Exception { - runInJsServer(nc -> { + jsServer.run(nc -> { // invalid to have both keep and seq assertThrows(IllegalArgumentException.class, () -> PurgeOptions.builder().keep(1).sequence(1).build()); @@ -621,48 +622,49 @@ public void testPurgeStreamAndOptions() throws Exception { JetStreamManagement jsm = nc.jetStreamManagement(); // error to purge a stream that does not exist - assertThrows(JetStreamApiException.class, () -> jsm.purgeStream(STREAM)); + assertThrows(JetStreamApiException.class, () -> jsm.purgeStream(stream())); - createMemoryStream(jsm, STREAM, subject(1), subject(2)); + TestingStreamContainer tsc = new TestingStreamContainer(nc, 2); + createMemoryStream(jsm, tsc.stream, tsc.subject(0), tsc.subject(1)); - StreamInfo si = jsm.getStreamInfo(STREAM); + StreamInfo si = jsm.getStreamInfo(tsc.stream); assertEquals(0, si.getStreamState().getMsgCount()); - jsPublish(nc, subject(1), 10); - si = jsm.getStreamInfo(STREAM); + jsPublish(nc, tsc.subject(0), 10); + si = jsm.getStreamInfo(tsc.stream); assertEquals(10, si.getStreamState().getMsgCount()); PurgeOptions options = PurgeOptions.builder().keep(7).build(); - PurgeResponse pr = jsm.purgeStream(STREAM, options); + PurgeResponse pr = jsm.purgeStream(tsc.stream, options); assertTrue(pr.isSuccess()); assertEquals(3, pr.getPurged()); options = PurgeOptions.builder().sequence(9).build(); - pr = jsm.purgeStream(STREAM, options); + pr = jsm.purgeStream(tsc.stream, options); assertTrue(pr.isSuccess()); assertEquals(5, pr.getPurged()); - si = jsm.getStreamInfo(STREAM); + si = jsm.getStreamInfo(tsc.stream); assertEquals(2, si.getStreamState().getMsgCount()); - pr = jsm.purgeStream(STREAM); + pr = jsm.purgeStream(tsc.stream); assertTrue(pr.isSuccess()); assertEquals(2, pr.getPurged()); - si = jsm.getStreamInfo(STREAM); + si = jsm.getStreamInfo(tsc.stream); assertEquals(0, si.getStreamState().getMsgCount()); - jsPublish(nc, subject(1), 10); - jsPublish(nc, subject(2), 10); - si = jsm.getStreamInfo(STREAM); + jsPublish(nc, tsc.subject(0), 10); + jsPublish(nc, tsc.subject(1), 10); + si = jsm.getStreamInfo(tsc.stream); assertEquals(20, si.getStreamState().getMsgCount()); - jsm.purgeStream(STREAM, PurgeOptions.subject(subject(1))); - si = jsm.getStreamInfo(STREAM); + jsm.purgeStream(tsc.stream, PurgeOptions.subject(tsc.subject(0))); + si = jsm.getStreamInfo(tsc.stream); assertEquals(10, si.getStreamState().getMsgCount()); - options = PurgeOptions.builder().subject(subject(1)).sequence(1).build(); - assertEquals(subject(1), options.getSubject()); + options = PurgeOptions.builder().subject(tsc.subject(0)).sequence(1).build(); + assertEquals(tsc.subject(0), options.getSubject()); assertEquals(1, options.getSequence()); - options = PurgeOptions.builder().subject(subject(1)).keep(2).build(); + options = PurgeOptions.builder().subject(tsc.subject(0)).keep(2).build(); assertEquals(2, options.getKeep()); }); } @@ -858,17 +860,17 @@ private void assertValidAddOrUpdate(JetStreamManagement jsm, ConsumerConfigurati @Test public void testConsumerMetadata() throws Exception { - runInJsServer(nc -> { + jsServer.run(nc -> { Map metaData = new HashMap<>(); metaData.put("meta-foo", "meta-bar"); JetStreamManagement jsm = nc.jetStreamManagement(); - createDefaultTestStream(jsm); + TestingStreamContainer tsc = new TestingStreamContainer(jsm); ConsumerConfiguration cc = ConsumerConfiguration.builder() - .durable(DURABLE) + .durable(tsc.name()) .metadata(metaData) .build(); - ConsumerInfo ci = jsm.addOrUpdateConsumer(STREAM, cc); + ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc); if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { assertEquals(1, ci.getConsumerConfiguration().getMetadata().size()); assertEquals("meta-bar", ci.getConsumerConfiguration().getMetadata().get("meta-foo")); @@ -924,18 +926,18 @@ public void testCreateConsumersWithFilters() throws Exception { @Test public void testGetConsumerInfo() throws Exception { - runInJsServer(nc -> { + jsServer.run(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); - createDefaultTestStream(jsm); - assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(STREAM, DURABLE)); - ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build(); - ConsumerInfo ci = jsm.addOrUpdateConsumer(STREAM, cc); - assertEquals(STREAM, ci.getStreamName()); - assertEquals(DURABLE, ci.getName()); - ci = jsm.getConsumerInfo(STREAM, DURABLE); - assertEquals(STREAM, ci.getStreamName()); - assertEquals(DURABLE, ci.getName()); - assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(STREAM, durable(999))); + TestingStreamContainer tsc = new TestingStreamContainer(jsm); + assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, tsc.name())); + ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build(); + ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc); + assertEquals(tsc.stream, ci.getStreamName()); + assertEquals(tsc.name(), ci.getName()); + ci = jsm.getConsumerInfo(tsc.stream, tsc.name()); + assertEquals(tsc.stream, ci.getStreamName()); + assertEquals(tsc.name(), ci.getName()); + assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, durable(999))); if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { assertNotNull(ci.getTimestamp()); } @@ -947,17 +949,17 @@ public void testGetConsumerInfo() throws Exception { @Test public void testGetConsumers() throws Exception { - runInJsServer(nc -> { + jsServer.run(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); - createMemoryStream(jsm, STREAM, subject(0), subject(1)); + TestingStreamContainer tsc = new TestingStreamContainer(jsm); - addConsumers(jsm, STREAM, 600, "A", null); // getConsumers pages at 256 + addConsumers(jsm, tsc.stream, 600, "A", null); // getConsumers pages at 256 - List list = jsm.getConsumers(STREAM); + List list = jsm.getConsumers(tsc.stream); assertEquals(600, list.size()); - addConsumers(jsm, STREAM, 500, "B", null); // getConsumerNames pages at 1024 - List names = jsm.getConsumerNames(STREAM); + addConsumers(jsm, tsc.stream, 500, "B", null); // getConsumerNames pages at 1024 + List names = jsm.getConsumerNames(tsc.stream); assertEquals(1100, names.size()); }); } @@ -1018,7 +1020,7 @@ public void testDeleteMessage() throws Exception { assertNull(mi.getData()); assertEquals(2, mi.getSeq()); assertTrue(mi.getTime().toEpochSecond() >= beforeCreated.toEpochSecond()); - assertTrue(mi.getHeaders() == null || mi.getHeaders().size() == 0); + assertTrue(mi.getHeaders() == null || mi.getHeaders().isEmpty()); assertTrue(jsm.deleteMessage(STREAM, 1, false)); // added coverage for use of erase (no_erase) flag. assertThrows(JetStreamApiException.class, () -> jsm.deleteMessage(STREAM, 1)); @@ -1031,7 +1033,6 @@ public void testDeleteMessage() throws Exception { @Test public void testAuthCreateUpdateStream() throws Exception { - try (NatsTestServer ts = new NatsTestServer("src/test/resources/js_authorization.conf", false)) { Options optionsSrc = new Options.Builder().server(ts.getURI()) .userInfo("serviceup".toCharArray(), "uppass".toCharArray()).build(); @@ -1058,20 +1059,21 @@ public void testAuthCreateUpdateStream() throws Exception { @Test public void testSealed() throws Exception { - runInJsServer(nc -> { + jsServer.run(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); - StreamInfo si = createMemoryStream(jsm, STREAM, SUBJECT); - assertFalse(si.getConfiguration().getSealed()); + + TestingStreamContainer tsc = new TestingStreamContainer(nc); + assertFalse(tsc.si.getConfiguration().getSealed()); JetStream js = nc.jetStream(); - js.publish(SUBJECT, "data1".getBytes()); + js.publish(tsc.subject(), "data1".getBytes()); - StreamConfiguration sc = new StreamConfiguration.Builder(si.getConfiguration()) + StreamConfiguration sc = new StreamConfiguration.Builder(tsc.si.getConfiguration()) .seal().build(); - si = jsm.updateStream(sc); + StreamInfo si = jsm.updateStream(sc); assertTrue(si.getConfiguration().getSealed()); - assertThrows(JetStreamApiException.class, () -> js.publish(SUBJECT, "data2".getBytes())); + assertThrows(JetStreamApiException.class, () -> js.publish(tsc.subject(), "data2".getBytes())); }); } @@ -1086,53 +1088,50 @@ public void testStorageTypeCoverage() { @Test public void testConsumerReplica() throws Exception { - runInJsServer(nc -> { + jsServer.run(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); - createMemoryStream(jsm, STREAM, subject(0), subject(1)); + TestingStreamContainer tsc = new TestingStreamContainer(nc); final ConsumerConfiguration cc0 = ConsumerConfiguration.builder() - .durable(durable(0)) + .durable(tsc.name()) .build(); - ConsumerInfo ci = jsm.addOrUpdateConsumer(STREAM, cc0); + ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc0); // server returns 0 when value is not set assertEquals(0, ci.getConsumerConfiguration().getNumReplicas()); final ConsumerConfiguration cc1 = ConsumerConfiguration.builder() - .durable(durable(0)) + .durable(tsc.name()) .numReplicas(1) .build(); - ci = jsm.addOrUpdateConsumer(STREAM, cc1); + ci = jsm.addOrUpdateConsumer(tsc.stream, cc1); assertEquals(1, ci.getConsumerConfiguration().getNumReplicas()); }); } @Test public void testGetMessage() throws Exception { - runInJsServer(nc -> { + jsServer.run(nc -> { if (nc.getServerInfo().isNewerVersionThan("2.8.4")) { JetStreamManagement jsm = nc.jetStreamManagement(); JetStream js = nc.jetStream(); - StreamConfiguration sc = StreamConfiguration.builder() - .name(STREAM) - .storageType(StorageType.Memory) - .subjects(subject(1), subject(2)) - .build(); - StreamInfo si = jsm.addStream(sc); + TestingStreamContainer tsc = new TestingStreamContainer(nc, 2); + assertFalse(tsc.si.getConfiguration().getAllowDirect()); ZonedDateTime beforeCreated = ZonedDateTime.now(); - js.publish(buildTestGetMessage(1, 1)); - js.publish(buildTestGetMessage(2, 2)); - js.publish(buildTestGetMessage(1, 3)); - js.publish(buildTestGetMessage(2, 4)); - js.publish(buildTestGetMessage(1, 5)); - js.publish(buildTestGetMessage(2, 6)); + js.publish(buildTestGetMessage(tsc, 0, 1)); + js.publish(buildTestGetMessage(tsc, 1, 2)); + js.publish(buildTestGetMessage(tsc, 0, 3)); + js.publish(buildTestGetMessage(tsc, 1, 4)); + js.publish(buildTestGetMessage(tsc, 0, 5)); + js.publish(buildTestGetMessage(tsc, 1, 6)); - validateGetMessage(jsm, si, false, beforeCreated); + validateGetMessage(jsm, tsc, beforeCreated); - sc = StreamConfiguration.builder(si.getConfiguration()).allowDirect(true).build(); - si = jsm.updateStream(sc); - validateGetMessage(jsm, si, true, beforeCreated); + StreamConfiguration sc = StreamConfiguration.builder(tsc.si.getConfiguration()).allowDirect(true).build(); + StreamInfo si = jsm.updateStream(sc); + assertTrue(si.getConfiguration().getAllowDirect()); + validateGetMessage(jsm, tsc, beforeCreated); // error case stream doesn't exist assertThrows(JetStreamApiException.class, () -> jsm.getMessage(stream(999), 1)); @@ -1140,54 +1139,53 @@ public void testGetMessage() throws Exception { }); } - private static NatsMessage buildTestGetMessage(int s, int q) { + private static NatsMessage buildTestGetMessage(TestingStreamContainer tsc, int s, int q) { String data = "s" + s + "-q" + q; return NatsMessage.builder() - .subject(subject(s)) + .subject(tsc.subject(s)) .data("d-" + data) .headers(new Headers().put("h", "h-" + data)) .build(); } - private void validateGetMessage(JetStreamManagement jsm, StreamInfo si, boolean allowDirect, ZonedDateTime beforeCreated) throws IOException, JetStreamApiException { - assertEquals(allowDirect, si.getConfiguration().getAllowDirect()); + private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer tsc, ZonedDateTime beforeCreated) throws IOException, JetStreamApiException { - assertMessageInfo(1, 1, jsm.getMessage(STREAM, 1), beforeCreated); - assertMessageInfo(1, 5, jsm.getLastMessage(STREAM, subject(1)), beforeCreated); - assertMessageInfo(2, 6, jsm.getLastMessage(STREAM, subject(2)), beforeCreated); + assertMessageInfo(tsc, 0, 1, jsm.getMessage(tsc.stream, 1), beforeCreated); + assertMessageInfo(tsc, 0, 5, jsm.getLastMessage(tsc.stream, tsc.subject(0)), beforeCreated); + assertMessageInfo(tsc, 1, 6, jsm.getLastMessage(tsc.stream, tsc.subject(1)), beforeCreated); - assertMessageInfo(1, 1, jsm.getNextMessage(STREAM, -1, subject(1)), beforeCreated); - assertMessageInfo(2, 2, jsm.getNextMessage(STREAM, -1, subject(2)), beforeCreated); - assertMessageInfo(1, 1, jsm.getNextMessage(STREAM, 0, subject(1)), beforeCreated); - assertMessageInfo(2, 2, jsm.getNextMessage(STREAM, 0, subject(2)), beforeCreated); - assertMessageInfo(1, 1, jsm.getFirstMessage(STREAM, subject(1)), beforeCreated); - assertMessageInfo(2, 2, jsm.getFirstMessage(STREAM, subject(2)), beforeCreated); + assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, -1, tsc.subject(0)), beforeCreated); + assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, -1, tsc.subject(1)), beforeCreated); + assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, 0, tsc.subject(0)), beforeCreated); + assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 0, tsc.subject(1)), beforeCreated); + assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, tsc.subject(0)), beforeCreated); + assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, tsc.subject(1)), beforeCreated); - assertMessageInfo(1, 1, jsm.getNextMessage(STREAM, 1, subject(1)), beforeCreated); - assertMessageInfo(2, 2, jsm.getNextMessage(STREAM, 1, subject(2)), beforeCreated); + assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, 1, tsc.subject(0)), beforeCreated); + assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 1, tsc.subject(1)), beforeCreated); - assertMessageInfo(1, 3, jsm.getNextMessage(STREAM, 2, subject(1)), beforeCreated); - assertMessageInfo(2, 2, jsm.getNextMessage(STREAM, 2, subject(2)), beforeCreated); + assertMessageInfo(tsc, 0, 3, jsm.getNextMessage(tsc.stream, 2, tsc.subject(0)), beforeCreated); + assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 2, tsc.subject(1)), beforeCreated); - assertMessageInfo(1, 5, jsm.getNextMessage(STREAM, 5, subject(1)), beforeCreated); - assertMessageInfo(2, 6, jsm.getNextMessage(STREAM, 5, subject(2)), beforeCreated); + assertMessageInfo(tsc, 0, 5, jsm.getNextMessage(tsc.stream, 5, tsc.subject(0)), beforeCreated); + assertMessageInfo(tsc, 1, 6, jsm.getNextMessage(tsc.stream, 5, tsc.subject(1)), beforeCreated); - assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(STREAM, -1))); - assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(STREAM, 0))); - assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(STREAM, 9))); - assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getLastMessage(STREAM, "not-a-subject"))); - assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(STREAM, "not-a-subject"))); - assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(STREAM, 9, subject(1)))); - assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(STREAM, 1, "not-a-subject"))); + assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, -1))); + assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 0))); + assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 9))); + assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getLastMessage(tsc.stream, "not-a-subject"))); + assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, "not-a-subject"))); + assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(tsc.stream, 9, tsc.subject(0)))); + assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(tsc.stream, 1, "not-a-subject"))); } private void assertStatus(int apiErrorCode, JetStreamApiException jsae) { assertEquals(apiErrorCode, jsae.getApiErrorCode()); } - private void assertMessageInfo(int subj, long seq, MessageInfo mi, ZonedDateTime beforeCreated) { - assertEquals(STREAM, mi.getStream()); - assertEquals(subject(subj), mi.getSubject()); + private void assertMessageInfo(TestingStreamContainer tsc, int subj, long seq, MessageInfo mi, ZonedDateTime beforeCreated) { + assertEquals(tsc.stream, mi.getStream()); + assertEquals(tsc.subject(subj), mi.getSubject()); assertEquals(seq, mi.getSeq()); assertNotNull(mi.getTime()); assertTrue(mi.getTime().toEpochSecond() >= beforeCreated.toEpochSecond()); @@ -1240,4 +1238,49 @@ private void validateMessageGetRequest( assertEquals(lastBySubject != null, mgr.isLastBySubject()); assertEquals(nextBySubject != null, mgr.isNextBySubject()); } + + @Test + public void testDirectMessageRepublishedSubject() throws Exception { + jsServer.run(nc -> { + JetStreamManagement jsm = nc.jetStreamManagement(); + String streamBucketName = "sb-" + variant(null); + String subject = subject(); + String streamSubject = subject + ".>"; + String publishSubject1 = subject + ".one"; + String publishSubject2 = subject + ".two"; + String publishSubject3 = subject + ".three"; + String republishDest = "$KV." + streamBucketName + ".>"; + + StreamConfiguration sc = StreamConfiguration.builder() + .name(streamBucketName) + .storageType(StorageType.Memory) + .subjects(streamSubject) + .republish(Republish.builder().source(">").destination(republishDest).build()) + .build(); + jsm.addStream(sc); + + KeyValueConfiguration kvc = KeyValueConfiguration.builder().name(streamBucketName).build(); + nc.keyValueManagement().create(kvc); + KeyValue kv = nc.keyValue(streamBucketName); + + nc.publish(publishSubject1, "uno".getBytes()); + nc.jetStream().publish(publishSubject2, "dos".getBytes()); + kv.put(publishSubject3, "tres"); + + KeyValueEntry kve1 = kv.get(publishSubject1); + assertEquals(streamBucketName, kve1.getBucket()); + assertEquals(publishSubject1, kve1.getKey()); + assertEquals("uno", kve1.getValueAsString()); + + KeyValueEntry kve2 = kv.get(publishSubject2); + assertEquals(streamBucketName, kve2.getBucket()); + assertEquals(publishSubject2, kve2.getKey()); + assertEquals("dos", kve2.getValueAsString()); + + KeyValueEntry kve3 = kv.get(publishSubject3); + assertEquals(streamBucketName, kve3.getBucket()); + assertEquals(publishSubject3, kve3.getKey()); + assertEquals("tres", kve3.getValueAsString()); + }); + } } diff --git a/src/test/java/io/nats/client/impl/JetStreamTestBase.java b/src/test/java/io/nats/client/impl/JetStreamTestBase.java index ee126b1a3..1fb2eddcc 100644 --- a/src/test/java/io/nats/client/impl/JetStreamTestBase.java +++ b/src/test/java/io/nats/client/impl/JetStreamTestBase.java @@ -110,10 +110,10 @@ public NatsMessage getTestMessage(String replyTo, String sid) { public static class TestingStreamContainer { private String defaultSubjectVariant; private final String defaultNameVariant = TestBase.name(); - public final StreamInfo si; public final String stream = stream(); private final Map subjects = new HashMap<>(); private final Map names = new HashMap<>(); + public final StreamInfo si; public TestingStreamContainer(Connection nc) throws JetStreamApiException, IOException { this(nc.jetStreamManagement(), (String[])null); From 994082411229371c24825e646af05f61ca00b4c3 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 18 Oct 2023 10:42:51 -0400 Subject: [PATCH 2/4] Direct Message Subject Header May Contain Multiple Subjects --- .../client/impl/JetStreamManagementTests.java | 268 +++++++++--------- 1 file changed, 135 insertions(+), 133 deletions(-) diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index c457e72be..067ee6a8f 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -101,14 +101,14 @@ public void testStreamCreate() throws Exception { @Test public void testStreamMetadata() throws Exception { - jsServer.run(nc -> { + runInJsServer(nc -> { Map metaData = new HashMap<>(); metaData.put("meta-foo", "meta-bar"); JetStreamManagement jsm = nc.jetStreamManagement(); StreamConfiguration sc = StreamConfiguration.builder() - .name(stream()) + .name(STREAM) .storageType(StorageType.Memory) - .subjects(subject()) + .subjects(subject(0), subject(1)) .metadata(metaData) .build(); @@ -127,25 +127,24 @@ public void testStreamMetadata() throws Exception { @Test public void testStreamCreateWithNoSubject() throws Exception { - jsServer.run(nc -> { + runInJsServer(nc -> { long now = ZonedDateTime.now().toEpochSecond(); JetStreamManagement jsm = nc.jetStreamManagement(); - String stream = stream(); StreamConfiguration sc = StreamConfiguration.builder() - .name(stream) - .storageType(StorageType.Memory) - .build(); + .name(STREAM) + .storageType(StorageType.Memory) + .build(); StreamInfo si = jsm.addStream(sc); assertTrue(now <= si.getCreateTime().toEpochSecond()); sc = si.getConfiguration(); - assertEquals(stream, sc.getName()); + assertEquals(STREAM, sc.getName()); assertEquals(1, sc.getSubjects().size()); - assertEquals(stream, sc.getSubjects().get(0)); + assertEquals(STREAM, sc.getSubjects().get(0)); assertEquals(RetentionPolicy.Limits, sc.getRetentionPolicy()); assertEquals(DiscardPolicy.Old, sc.getDiscardPolicy()); @@ -273,8 +272,8 @@ public void testAddUpdateStreamInvalids() throws Exception { // cannot change MaxConsumers StreamConfiguration scMaxCon = getTestStreamConfigurationBuilder() - .maxConsumers(2) - .build(); + .maxConsumers(2) + .build(); assertThrows(JetStreamApiException.class, () -> jsm.updateStream(scMaxCon)); StreamConfiguration scReten = getTestStreamConfigurationBuilder() @@ -307,9 +306,9 @@ private static StreamConfiguration getTestStreamConfiguration() { private static StreamConfiguration.Builder getTestStreamConfigurationBuilder() { return StreamConfiguration.builder() - .name(STREAM) - .storageType(StorageType.Memory) - .subjects(subject(0), subject(1)); + .name(STREAM) + .storageType(StorageType.Memory) + .subjects(subject(0), subject(1)); } @Test @@ -594,27 +593,27 @@ private void assertStreamNameList(List list, int... ids) { @Test public void testDeleteStream() throws Exception { - jsServer.run(nc -> { + runInJsServer(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); JetStreamApiException jsapiEx = - assertThrows(JetStreamApiException.class, () -> jsm.deleteStream(stream())); + assertThrows(JetStreamApiException.class, () -> jsm.deleteStream(STREAM)); assertEquals(10059, jsapiEx.getApiErrorCode()); - TestingStreamContainer tsc = new TestingStreamContainer(jsm); - assertNotNull(jsm.getStreamInfo(tsc.stream)); - assertTrue(jsm.deleteStream(tsc.stream)); + createDefaultTestStream(jsm); + assertNotNull(jsm.getStreamInfo(STREAM)); + assertTrue(jsm.deleteStream(STREAM)); - jsapiEx = assertThrows(JetStreamApiException.class, () -> jsm.getStreamInfo(tsc.stream)); + jsapiEx = assertThrows(JetStreamApiException.class, () -> jsm.getStreamInfo(STREAM)); assertEquals(10059, jsapiEx.getApiErrorCode()); - jsapiEx = assertThrows(JetStreamApiException.class, () -> jsm.deleteStream(tsc.stream)); + jsapiEx = assertThrows(JetStreamApiException.class, () -> jsm.deleteStream(STREAM)); assertEquals(10059, jsapiEx.getApiErrorCode()); }); } @Test public void testPurgeStreamAndOptions() throws Exception { - jsServer.run(nc -> { + runInJsServer(nc -> { // invalid to have both keep and seq assertThrows(IllegalArgumentException.class, () -> PurgeOptions.builder().keep(1).sequence(1).build()); @@ -622,49 +621,48 @@ public void testPurgeStreamAndOptions() throws Exception { JetStreamManagement jsm = nc.jetStreamManagement(); // error to purge a stream that does not exist - assertThrows(JetStreamApiException.class, () -> jsm.purgeStream(stream())); + assertThrows(JetStreamApiException.class, () -> jsm.purgeStream(STREAM)); - TestingStreamContainer tsc = new TestingStreamContainer(nc, 2); - createMemoryStream(jsm, tsc.stream, tsc.subject(0), tsc.subject(1)); + createMemoryStream(jsm, STREAM, subject(1), subject(2)); - StreamInfo si = jsm.getStreamInfo(tsc.stream); + StreamInfo si = jsm.getStreamInfo(STREAM); assertEquals(0, si.getStreamState().getMsgCount()); - jsPublish(nc, tsc.subject(0), 10); - si = jsm.getStreamInfo(tsc.stream); + jsPublish(nc, subject(1), 10); + si = jsm.getStreamInfo(STREAM); assertEquals(10, si.getStreamState().getMsgCount()); PurgeOptions options = PurgeOptions.builder().keep(7).build(); - PurgeResponse pr = jsm.purgeStream(tsc.stream, options); + PurgeResponse pr = jsm.purgeStream(STREAM, options); assertTrue(pr.isSuccess()); assertEquals(3, pr.getPurged()); options = PurgeOptions.builder().sequence(9).build(); - pr = jsm.purgeStream(tsc.stream, options); + pr = jsm.purgeStream(STREAM, options); assertTrue(pr.isSuccess()); assertEquals(5, pr.getPurged()); - si = jsm.getStreamInfo(tsc.stream); + si = jsm.getStreamInfo(STREAM); assertEquals(2, si.getStreamState().getMsgCount()); - pr = jsm.purgeStream(tsc.stream); + pr = jsm.purgeStream(STREAM); assertTrue(pr.isSuccess()); assertEquals(2, pr.getPurged()); - si = jsm.getStreamInfo(tsc.stream); + si = jsm.getStreamInfo(STREAM); assertEquals(0, si.getStreamState().getMsgCount()); - jsPublish(nc, tsc.subject(0), 10); - jsPublish(nc, tsc.subject(1), 10); - si = jsm.getStreamInfo(tsc.stream); + jsPublish(nc, subject(1), 10); + jsPublish(nc, subject(2), 10); + si = jsm.getStreamInfo(STREAM); assertEquals(20, si.getStreamState().getMsgCount()); - jsm.purgeStream(tsc.stream, PurgeOptions.subject(tsc.subject(0))); - si = jsm.getStreamInfo(tsc.stream); + jsm.purgeStream(STREAM, PurgeOptions.subject(subject(1))); + si = jsm.getStreamInfo(STREAM); assertEquals(10, si.getStreamState().getMsgCount()); - options = PurgeOptions.builder().subject(tsc.subject(0)).sequence(1).build(); - assertEquals(tsc.subject(0), options.getSubject()); + options = PurgeOptions.builder().subject(subject(1)).sequence(1).build(); + assertEquals(subject(1), options.getSubject()); assertEquals(1, options.getSequence()); - options = PurgeOptions.builder().subject(tsc.subject(0)).keep(2).build(); + options = PurgeOptions.builder().subject(subject(1)).keep(2).build(); assertEquals(2, options.getKeep()); }); } @@ -683,7 +681,7 @@ public void testAddDeleteConsumer() throws Exception { final ConsumerConfiguration cc = ConsumerConfiguration.builder().build(); IllegalArgumentException iae = - assertThrows(IllegalArgumentException.class, () -> jsm.addOrUpdateConsumer(null, cc)); + assertThrows(IllegalArgumentException.class, () -> jsm.addOrUpdateConsumer(null, cc)); assertTrue(iae.getMessage().contains("Stream cannot be null or empty")); iae = assertThrows(IllegalArgumentException.class, () -> jsm.addOrUpdateConsumer(STREAM, null)); assertTrue(iae.getMessage().contains("Config cannot be null")); @@ -828,12 +826,12 @@ private ConsumerConfiguration prepForUpdateTest(JetStreamManagement jsm) throws catch (Exception e) { /* ignore */ } ConsumerConfiguration cc = ConsumerConfiguration.builder() - .durable(durable(1)) - .ackPolicy(AckPolicy.Explicit) - .deliverSubject(deliver(1)) - .maxDeliver(3) - .filterSubject(SUBJECT_GT) - .build(); + .durable(durable(1)) + .ackPolicy(AckPolicy.Explicit) + .deliverSubject(deliver(1)) + .maxDeliver(3) + .filterSubject(SUBJECT_GT) + .build(); assertValidAddOrUpdate(jsm, cc); return cc; } @@ -860,17 +858,17 @@ private void assertValidAddOrUpdate(JetStreamManagement jsm, ConsumerConfigurati @Test public void testConsumerMetadata() throws Exception { - jsServer.run(nc -> { + runInJsServer(nc -> { Map metaData = new HashMap<>(); metaData.put("meta-foo", "meta-bar"); JetStreamManagement jsm = nc.jetStreamManagement(); - TestingStreamContainer tsc = new TestingStreamContainer(jsm); + createDefaultTestStream(jsm); ConsumerConfiguration cc = ConsumerConfiguration.builder() - .durable(tsc.name()) + .durable(DURABLE) .metadata(metaData) .build(); - ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc); + ConsumerInfo ci = jsm.addOrUpdateConsumer(STREAM, cc); if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { assertEquals(1, ci.getConsumerConfiguration().getMetadata().size()); assertEquals("meta-bar", ci.getConsumerConfiguration().getMetadata().get("meta-foo")); @@ -926,18 +924,18 @@ public void testCreateConsumersWithFilters() throws Exception { @Test public void testGetConsumerInfo() throws Exception { - jsServer.run(nc -> { + runInJsServer(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); - TestingStreamContainer tsc = new TestingStreamContainer(jsm); - assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, tsc.name())); - ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build(); - ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc); - assertEquals(tsc.stream, ci.getStreamName()); - assertEquals(tsc.name(), ci.getName()); - ci = jsm.getConsumerInfo(tsc.stream, tsc.name()); - assertEquals(tsc.stream, ci.getStreamName()); - assertEquals(tsc.name(), ci.getName()); - assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, durable(999))); + createDefaultTestStream(jsm); + assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(STREAM, DURABLE)); + ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build(); + ConsumerInfo ci = jsm.addOrUpdateConsumer(STREAM, cc); + assertEquals(STREAM, ci.getStreamName()); + assertEquals(DURABLE, ci.getName()); + ci = jsm.getConsumerInfo(STREAM, DURABLE); + assertEquals(STREAM, ci.getStreamName()); + assertEquals(DURABLE, ci.getName()); + assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(STREAM, durable(999))); if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { assertNotNull(ci.getTimestamp()); } @@ -949,17 +947,17 @@ public void testGetConsumerInfo() throws Exception { @Test public void testGetConsumers() throws Exception { - jsServer.run(nc -> { + runInJsServer(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); - TestingStreamContainer tsc = new TestingStreamContainer(jsm); + createMemoryStream(jsm, STREAM, subject(0), subject(1)); - addConsumers(jsm, tsc.stream, 600, "A", null); // getConsumers pages at 256 + addConsumers(jsm, STREAM, 600, "A", null); // getConsumers pages at 256 - List list = jsm.getConsumers(tsc.stream); + List list = jsm.getConsumers(STREAM); assertEquals(600, list.size()); - addConsumers(jsm, tsc.stream, 500, "B", null); // getConsumerNames pages at 1024 - List names = jsm.getConsumerNames(tsc.stream); + addConsumers(jsm, STREAM, 500, "B", null); // getConsumerNames pages at 1024 + List names = jsm.getConsumerNames(STREAM); assertEquals(1100, names.size()); }); } @@ -968,9 +966,9 @@ private void addConsumers(JetStreamManagement jsm, String stream, int count, Str for (int x = 1; x <= count; x++) { String dur = durable(durableVary, x); ConsumerConfiguration cc = ConsumerConfiguration.builder() - .durable(dur) - .filterSubject(filterSubject) - .build(); + .durable(dur) + .filterSubject(filterSubject) + .build(); ConsumerInfo ci = jsm.addOrUpdateConsumer(stream, cc); assertEquals(dur, ci.getName()); assertEquals(dur, ci.getConsumerConfiguration().getDurable()); @@ -1020,7 +1018,7 @@ public void testDeleteMessage() throws Exception { assertNull(mi.getData()); assertEquals(2, mi.getSeq()); assertTrue(mi.getTime().toEpochSecond() >= beforeCreated.toEpochSecond()); - assertTrue(mi.getHeaders() == null || mi.getHeaders().isEmpty()); + assertTrue(mi.getHeaders() == null || mi.getHeaders().size() == 0); assertTrue(jsm.deleteMessage(STREAM, 1, false)); // added coverage for use of erase (no_erase) flag. assertThrows(JetStreamApiException.class, () -> jsm.deleteMessage(STREAM, 1)); @@ -1033,6 +1031,7 @@ public void testDeleteMessage() throws Exception { @Test public void testAuthCreateUpdateStream() throws Exception { + try (NatsTestServer ts = new NatsTestServer("src/test/resources/js_authorization.conf", false)) { Options optionsSrc = new Options.Builder().server(ts.getURI()) .userInfo("serviceup".toCharArray(), "uppass".toCharArray()).build(); @@ -1059,21 +1058,20 @@ public void testAuthCreateUpdateStream() throws Exception { @Test public void testSealed() throws Exception { - jsServer.run(nc -> { + runInJsServer(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); - - TestingStreamContainer tsc = new TestingStreamContainer(nc); - assertFalse(tsc.si.getConfiguration().getSealed()); + StreamInfo si = createMemoryStream(jsm, STREAM, SUBJECT); + assertFalse(si.getConfiguration().getSealed()); JetStream js = nc.jetStream(); - js.publish(tsc.subject(), "data1".getBytes()); + js.publish(SUBJECT, "data1".getBytes()); - StreamConfiguration sc = new StreamConfiguration.Builder(tsc.si.getConfiguration()) + StreamConfiguration sc = new StreamConfiguration.Builder(si.getConfiguration()) .seal().build(); - StreamInfo si = jsm.updateStream(sc); + si = jsm.updateStream(sc); assertTrue(si.getConfiguration().getSealed()); - assertThrows(JetStreamApiException.class, () -> js.publish(tsc.subject(), "data2".getBytes())); + assertThrows(JetStreamApiException.class, () -> js.publish(SUBJECT, "data2".getBytes())); }); } @@ -1088,50 +1086,53 @@ public void testStorageTypeCoverage() { @Test public void testConsumerReplica() throws Exception { - jsServer.run(nc -> { + runInJsServer(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); - TestingStreamContainer tsc = new TestingStreamContainer(nc); + createMemoryStream(jsm, STREAM, subject(0), subject(1)); final ConsumerConfiguration cc0 = ConsumerConfiguration.builder() - .durable(tsc.name()) - .build(); - ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc0); + .durable(durable(0)) + .build(); + ConsumerInfo ci = jsm.addOrUpdateConsumer(STREAM, cc0); // server returns 0 when value is not set assertEquals(0, ci.getConsumerConfiguration().getNumReplicas()); final ConsumerConfiguration cc1 = ConsumerConfiguration.builder() - .durable(tsc.name()) - .numReplicas(1) - .build(); - ci = jsm.addOrUpdateConsumer(tsc.stream, cc1); + .durable(durable(0)) + .numReplicas(1) + .build(); + ci = jsm.addOrUpdateConsumer(STREAM, cc1); assertEquals(1, ci.getConsumerConfiguration().getNumReplicas()); }); } @Test public void testGetMessage() throws Exception { - jsServer.run(nc -> { + runInJsServer(nc -> { if (nc.getServerInfo().isNewerVersionThan("2.8.4")) { JetStreamManagement jsm = nc.jetStreamManagement(); JetStream js = nc.jetStream(); - TestingStreamContainer tsc = new TestingStreamContainer(nc, 2); - assertFalse(tsc.si.getConfiguration().getAllowDirect()); + StreamConfiguration sc = StreamConfiguration.builder() + .name(STREAM) + .storageType(StorageType.Memory) + .subjects(subject(1), subject(2)) + .build(); + StreamInfo si = jsm.addStream(sc); ZonedDateTime beforeCreated = ZonedDateTime.now(); - js.publish(buildTestGetMessage(tsc, 0, 1)); - js.publish(buildTestGetMessage(tsc, 1, 2)); - js.publish(buildTestGetMessage(tsc, 0, 3)); - js.publish(buildTestGetMessage(tsc, 1, 4)); - js.publish(buildTestGetMessage(tsc, 0, 5)); - js.publish(buildTestGetMessage(tsc, 1, 6)); + js.publish(buildTestGetMessage(1, 1)); + js.publish(buildTestGetMessage(2, 2)); + js.publish(buildTestGetMessage(1, 3)); + js.publish(buildTestGetMessage(2, 4)); + js.publish(buildTestGetMessage(1, 5)); + js.publish(buildTestGetMessage(2, 6)); - validateGetMessage(jsm, tsc, beforeCreated); + validateGetMessage(jsm, si, false, beforeCreated); - StreamConfiguration sc = StreamConfiguration.builder(tsc.si.getConfiguration()).allowDirect(true).build(); - StreamInfo si = jsm.updateStream(sc); - assertTrue(si.getConfiguration().getAllowDirect()); - validateGetMessage(jsm, tsc, beforeCreated); + sc = StreamConfiguration.builder(si.getConfiguration()).allowDirect(true).build(); + si = jsm.updateStream(sc); + validateGetMessage(jsm, si, true, beforeCreated); // error case stream doesn't exist assertThrows(JetStreamApiException.class, () -> jsm.getMessage(stream(999), 1)); @@ -1139,53 +1140,54 @@ public void testGetMessage() throws Exception { }); } - private static NatsMessage buildTestGetMessage(TestingStreamContainer tsc, int s, int q) { + private static NatsMessage buildTestGetMessage(int s, int q) { String data = "s" + s + "-q" + q; return NatsMessage.builder() - .subject(tsc.subject(s)) + .subject(subject(s)) .data("d-" + data) .headers(new Headers().put("h", "h-" + data)) .build(); } - private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer tsc, ZonedDateTime beforeCreated) throws IOException, JetStreamApiException { + private void validateGetMessage(JetStreamManagement jsm, StreamInfo si, boolean allowDirect, ZonedDateTime beforeCreated) throws IOException, JetStreamApiException { + assertEquals(allowDirect, si.getConfiguration().getAllowDirect()); - assertMessageInfo(tsc, 0, 1, jsm.getMessage(tsc.stream, 1), beforeCreated); - assertMessageInfo(tsc, 0, 5, jsm.getLastMessage(tsc.stream, tsc.subject(0)), beforeCreated); - assertMessageInfo(tsc, 1, 6, jsm.getLastMessage(tsc.stream, tsc.subject(1)), beforeCreated); + assertMessageInfo(1, 1, jsm.getMessage(STREAM, 1), beforeCreated); + assertMessageInfo(1, 5, jsm.getLastMessage(STREAM, subject(1)), beforeCreated); + assertMessageInfo(2, 6, jsm.getLastMessage(STREAM, subject(2)), beforeCreated); - assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, -1, tsc.subject(0)), beforeCreated); - assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, -1, tsc.subject(1)), beforeCreated); - assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, 0, tsc.subject(0)), beforeCreated); - assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 0, tsc.subject(1)), beforeCreated); - assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, tsc.subject(0)), beforeCreated); - assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, tsc.subject(1)), beforeCreated); + assertMessageInfo(1, 1, jsm.getNextMessage(STREAM, -1, subject(1)), beforeCreated); + assertMessageInfo(2, 2, jsm.getNextMessage(STREAM, -1, subject(2)), beforeCreated); + assertMessageInfo(1, 1, jsm.getNextMessage(STREAM, 0, subject(1)), beforeCreated); + assertMessageInfo(2, 2, jsm.getNextMessage(STREAM, 0, subject(2)), beforeCreated); + assertMessageInfo(1, 1, jsm.getFirstMessage(STREAM, subject(1)), beforeCreated); + assertMessageInfo(2, 2, jsm.getFirstMessage(STREAM, subject(2)), beforeCreated); - assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, 1, tsc.subject(0)), beforeCreated); - assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 1, tsc.subject(1)), beforeCreated); + assertMessageInfo(1, 1, jsm.getNextMessage(STREAM, 1, subject(1)), beforeCreated); + assertMessageInfo(2, 2, jsm.getNextMessage(STREAM, 1, subject(2)), beforeCreated); - assertMessageInfo(tsc, 0, 3, jsm.getNextMessage(tsc.stream, 2, tsc.subject(0)), beforeCreated); - assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 2, tsc.subject(1)), beforeCreated); + assertMessageInfo(1, 3, jsm.getNextMessage(STREAM, 2, subject(1)), beforeCreated); + assertMessageInfo(2, 2, jsm.getNextMessage(STREAM, 2, subject(2)), beforeCreated); - assertMessageInfo(tsc, 0, 5, jsm.getNextMessage(tsc.stream, 5, tsc.subject(0)), beforeCreated); - assertMessageInfo(tsc, 1, 6, jsm.getNextMessage(tsc.stream, 5, tsc.subject(1)), beforeCreated); + assertMessageInfo(1, 5, jsm.getNextMessage(STREAM, 5, subject(1)), beforeCreated); + assertMessageInfo(2, 6, jsm.getNextMessage(STREAM, 5, subject(2)), beforeCreated); - assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, -1))); - assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 0))); - assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 9))); - assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getLastMessage(tsc.stream, "not-a-subject"))); - assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, "not-a-subject"))); - assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(tsc.stream, 9, tsc.subject(0)))); - assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(tsc.stream, 1, "not-a-subject"))); + assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(STREAM, -1))); + assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(STREAM, 0))); + assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(STREAM, 9))); + assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getLastMessage(STREAM, "not-a-subject"))); + assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(STREAM, "not-a-subject"))); + assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(STREAM, 9, subject(1)))); + assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(STREAM, 1, "not-a-subject"))); } private void assertStatus(int apiErrorCode, JetStreamApiException jsae) { assertEquals(apiErrorCode, jsae.getApiErrorCode()); } - private void assertMessageInfo(TestingStreamContainer tsc, int subj, long seq, MessageInfo mi, ZonedDateTime beforeCreated) { - assertEquals(tsc.stream, mi.getStream()); - assertEquals(tsc.subject(subj), mi.getSubject()); + private void assertMessageInfo(int subj, long seq, MessageInfo mi, ZonedDateTime beforeCreated) { + assertEquals(STREAM, mi.getStream()); + assertEquals(subject(subj), mi.getSubject()); assertEquals(seq, mi.getSeq()); assertNotNull(mi.getTime()); assertTrue(mi.getTime().toEpochSecond() >= beforeCreated.toEpochSecond()); From af27339caf862ce4d8bf4616025c25a4357ffb2c Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 18 Oct 2023 10:44:42 -0400 Subject: [PATCH 3/4] Direct Message Subject Header May Contain Multiple Subjects --- src/test/java/io/nats/client/impl/JetStreamTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/nats/client/impl/JetStreamTestBase.java b/src/test/java/io/nats/client/impl/JetStreamTestBase.java index 1fb2eddcc..ee126b1a3 100644 --- a/src/test/java/io/nats/client/impl/JetStreamTestBase.java +++ b/src/test/java/io/nats/client/impl/JetStreamTestBase.java @@ -110,10 +110,10 @@ public NatsMessage getTestMessage(String replyTo, String sid) { public static class TestingStreamContainer { private String defaultSubjectVariant; private final String defaultNameVariant = TestBase.name(); + public final StreamInfo si; public final String stream = stream(); private final Map subjects = new HashMap<>(); private final Map names = new HashMap<>(); - public final StreamInfo si; public TestingStreamContainer(Connection nc) throws JetStreamApiException, IOException { this(nc.jetStreamManagement(), (String[])null); From 533fb97fa80b27910e1bebc20a0be19edb771773 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 18 Oct 2023 10:46:14 -0400 Subject: [PATCH 4/4] Direct Message Subject Header May Contain Multiple Subjects --- src/test/java/io/nats/client/impl/HeadersTests.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/nats/client/impl/HeadersTests.java b/src/test/java/io/nats/client/impl/HeadersTests.java index 44d7cfc1c..3afb985e8 100644 --- a/src/test/java/io/nats/client/impl/HeadersTests.java +++ b/src/test/java/io/nats/client/impl/HeadersTests.java @@ -357,15 +357,19 @@ public void remove_collection_work() { } @Test - public void getFirsts() { + public void testGetFirstGetLast() { Headers headers = new Headers(); assertNull(headers.getFirst(KEY1)); + assertNull(headers.getLast(KEY1)); headers.add(KEY1, VAL1); assertEquals(VAL1, headers.getFirst(KEY1)); + assertEquals(VAL1, headers.getLast(KEY1)); headers.add(KEY1, VAL2); assertEquals(VAL1, headers.getFirst(KEY1)); + assertEquals(VAL2, headers.getLast(KEY1)); headers.put(KEY1, VAL3); assertEquals(VAL3, headers.getFirst(KEY1)); + assertEquals(VAL3, headers.getLast(KEY1)); } private void remove(