From 5ac94239bb0c426b500d2e9cfee3b61e936970f1 Mon Sep 17 00:00:00 2001 From: scottf <scottfauerbach@gmail.com> Date: Fri, 30 Dec 2022 14:30:54 -0500 Subject: [PATCH 1/7] message size calculation improvements --- build.gradle | 3 +- dependencies.md | 2 +- .../io/nats/client/impl/IncomingMessage.java | 35 +++ .../client/impl/IncomingMessageFactory.java | 85 ++++++ .../io/nats/client/impl/NatsConnection.java | 1 - .../client/impl/NatsConnectionReader.java | 19 +- .../client/impl/NatsJetStreamMessage.java | 3 +- .../java/io/nats/client/impl/NatsMessage.java | 268 ++++-------------- .../io/nats/client/impl/ProtocolMessage.java | 41 +++ .../io/nats/client/impl/StatusMessage.java | 42 +++ .../java/io/nats/client/NatsTestServer.java | 5 + .../io/nats/client/impl/HeadersTests.java | 2 +- .../nats/client/impl/JetStreamTestBase.java | 4 +- .../nats/client/impl/MessageManagerTests.java | 8 +- .../MessageProtocolCreationBenchmark.java | 2 +- .../client/impl/MessageQueueBenchmark.java | 2 +- .../nats/client/impl/MessageQueueTests.java | 1 - .../io/nats/client/impl/NatsMessageTests.java | 32 +-- 18 files changed, 296 insertions(+), 259 deletions(-) create mode 100644 src/main/java/io/nats/client/impl/IncomingMessage.java create mode 100644 src/main/java/io/nats/client/impl/IncomingMessageFactory.java create mode 100644 src/main/java/io/nats/client/impl/ProtocolMessage.java create mode 100644 src/main/java/io/nats/client/impl/StatusMessage.java diff --git a/build.gradle b/build.gradle index 363053111..29bd6053f 100644 --- a/build.gradle +++ b/build.gradle @@ -35,12 +35,13 @@ java { repositories { mavenCentral() maven { url "https://oss.sonatype.org/content/repositories/releases/" } + mavenLocal() } dependencies { implementation 'net.i2p.crypto:eddsa:0.3.0' testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0' - testImplementation 'io.nats:jnats-server-runner:1.0.9' + testImplementation 'io.nats:jnats-server-runner:1.0.13' } sourceSets { diff --git a/dependencies.md b/dependencies.md index cb18ddb00..a6cbec30b 100644 --- a/dependencies.md +++ b/dependencies.md @@ -12,7 +12,7 @@ This file lists the dependencies used in this repository. | Dependency | License | |-------------------------------------------------|-----------------------------------------| -| io.nats:jnats-server-runner:1.0.9 | Apache 2.0 License | +| io.nats:jnats-server-runner:1.0.13 | Apache 2.0 License | | org.apiguardian:apiguardian-api:1.1.0 | Apache 2.0 License | | org.junit.jupiter:junit-jupiter:5.9.0 | Eclipse Public License v2.0 | | org.junit:junit-bom:5.9.0 | Eclipse Public License v2.0 | diff --git a/src/main/java/io/nats/client/impl/IncomingMessage.java b/src/main/java/io/nats/client/impl/IncomingMessage.java new file mode 100644 index 000000000..7b15dcd73 --- /dev/null +++ b/src/main/java/io/nats/client/impl/IncomingMessage.java @@ -0,0 +1,35 @@ +// Copyright 2015-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +import io.nats.client.support.ByteArrayBuilder; + +public class IncomingMessage extends NatsMessage { + IncomingMessage() {} + + @Override + byte[] getProtocolBytes() { + throw new IllegalStateException("getProtocolBytes not supported for this type of message."); + } + + @Override + ByteArrayBuilder getProtocolBab() { + throw new IllegalStateException("getProtocolBab not supported for this type of message."); + } + + @Override + int getControlLineLength() { + throw new IllegalStateException("getControlLineLength not supported for this type of message."); + } +} diff --git a/src/main/java/io/nats/client/impl/IncomingMessageFactory.java b/src/main/java/io/nats/client/impl/IncomingMessageFactory.java new file mode 100644 index 000000000..5d961551f --- /dev/null +++ b/src/main/java/io/nats/client/impl/IncomingMessageFactory.java @@ -0,0 +1,85 @@ +// Copyright 2015-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +import io.nats.client.support.IncomingHeadersProcessor; +import io.nats.client.support.Status; + +import static io.nats.client.support.NatsJetStreamConstants.JS_ACK_SUBJECT_PREFIX; + +// ---------------------------------------------------------------------------------------------------- +// Incoming Message Factory - internal use only +// ---------------------------------------------------------------------------------------------------- +class IncomingMessageFactory { + private final String sid; + private final String subject; + private final String replyTo; + private final int protocolLineLength; + private final boolean utf8mode; + + private byte[] data; + private Headers headers; + private Status status; + private int hdrLen = 0; + private int dataLen = 0; + private int totLen = 0; + + // Create an incoming message for a subscriber + // Doesn't check control line size, since the server sent us the message + IncomingMessageFactory(String sid, String subject, String replyTo, int protocolLength, boolean utf8mode) { + this.sid = sid; + this.subject = subject; + this.replyTo = replyTo; + this.protocolLineLength = protocolLength; + this.utf8mode = utf8mode; + // headers and data are set later and sizes are calculated during those setters + } + + void setHeaders(IncomingHeadersProcessor ihp) { + headers = ihp.getHeaders(); + status = ihp.getStatus(); + hdrLen = ihp.getSerializedLength(); + totLen = hdrLen + dataLen; + } + + void setData(byte[] data) { + this.data = data; + dataLen = data == null ? 0 : data.length; + totLen = hdrLen + dataLen; + } + + NatsMessage getMessage() { + NatsMessage message; + if (status != null) { + message = new StatusMessage(status); + } + else if (replyTo != null && replyTo.startsWith(JS_ACK_SUBJECT_PREFIX)) { + message = new NatsJetStreamMessage(); + } + else { + message = new IncomingMessage(); + } + message.initData(data); + message.sid = this.sid; + message.subject = this.subject; + message.replyTo = this.replyTo; + message.protocolLineLength = this.protocolLineLength; + message.headers = this.headers; + message.utf8mode = this.utf8mode; + message.hdrLen = this.hdrLen; + message.dataLen = this.dataLen; + message.sizeInBytes = protocolLineLength + hdrLen + dataLen + 4; // Two CRLFs + return message; + } +} diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 03195a427..1491ddb6d 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -16,7 +16,6 @@ import io.nats.client.*; import io.nats.client.ConnectionListener.Events; import io.nats.client.api.ServerInfo; -import io.nats.client.impl.NatsMessage.ProtocolMessage; import io.nats.client.support.ByteArrayBuilder; import io.nats.client.support.NatsRequestCompletableFuture; import io.nats.client.support.Validator; diff --git a/src/main/java/io/nats/client/impl/NatsConnectionReader.java b/src/main/java/io/nats/client/impl/NatsConnectionReader.java index 971a680df..d3d151343 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionReader.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionReader.java @@ -13,7 +13,6 @@ package io.nats.client.impl; -import io.nats.client.impl.NatsMessage.InternalMessageFactory; import io.nats.client.support.IncomingHeadersProcessor; import java.io.IOException; @@ -46,21 +45,21 @@ enum Mode { private boolean gotCR; private String op; - private char[] opArray; + private final char[] opArray; private int opPos; - private char[] msgLineChars; + private final char[] msgLineChars; private int msgLinePosition; private Mode mode; - private InternalMessageFactory incoming; + private IncomingMessageFactory incoming; private byte[] msgHeaders; private byte[] msgData; private int msgHeadersPosition; private int msgDataPosition; - private byte[] buffer; + private final byte[] buffer; private int bufferPosition; private Future<Boolean> stopped; @@ -418,7 +417,7 @@ static String opFor(char[] chars, int length) { } } - private static int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000}; + private static final int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000}; public static int parseLength(String s) throws NumberFormatException { int length = s.length(); @@ -433,7 +432,7 @@ public static int parseLength(String s) throws NumberFormatException { int d = (c - '0'); if (d>9) { - throw new NumberFormatException("Invalid char in message length \'" + c + "\'"); + throw new NumberFormatException("Invalid char in message length '" + c + "'"); } retVal += d * TENS[length - i - 1]; @@ -476,7 +475,7 @@ void parseProtocolMessage() throws IOException { int incomingLength = parseLength(lengthChars); - this.incoming = new InternalMessageFactory(sid, subject, replyTo, protocolLineLength, utf8Mode); + this.incoming = new IncomingMessageFactory(sid, subject, replyTo, protocolLineLength, utf8Mode); this.mode = Mode.GATHER_DATA; this.msgData = new byte[incomingLength]; this.msgDataPosition = 0; @@ -518,7 +517,7 @@ void parseProtocolMessage() throws IOException { throw new IllegalStateException("Bad HMSG control line, missing required fields"); } - this.incoming = new InternalMessageFactory(hSid, hSubject, hReplyTo, hProtocolLineLength, utf8Mode); + this.incoming = new IncomingMessageFactory(hSid, hSubject, hReplyTo, hProtocolLineLength, utf8Mode); this.msgHeaders = new byte[hdrLen]; this.msgData = new byte[totLen - hdrLen]; this.mode = Mode.GATHER_HEADERS; @@ -532,7 +531,7 @@ void parseProtocolMessage() throws IOException { this.mode = Mode.GATHER_OP; break; case OP_ERR: - String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("\'", ""); + String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("'", ""); this.connection.processError(errorText); this.op = UNKNOWN_OP; this.mode = Mode.GATHER_OP; diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java b/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java index 54e9aac71..a4bcc436f 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java @@ -14,7 +14,6 @@ package io.nats.client.impl; import io.nats.client.Connection; -import io.nats.client.impl.NatsMessage.InternalMessage; import java.time.Duration; import java.util.concurrent.TimeoutException; @@ -23,7 +22,7 @@ import static io.nats.client.support.NatsConstants.NANOS_PER_MILLI; import static io.nats.client.support.Validator.validateDurationRequired; -class NatsJetStreamMessage extends InternalMessage { +class NatsJetStreamMessage extends IncomingMessage { private NatsJetStreamMetaData jsMetaData = null; diff --git a/src/main/java/io/nats/client/impl/NatsMessage.java b/src/main/java/io/nats/client/impl/NatsMessage.java index 71a35f348..b82c19aea 100644 --- a/src/main/java/io/nats/client/impl/NatsMessage.java +++ b/src/main/java/io/nats/client/impl/NatsMessage.java @@ -17,7 +17,6 @@ import io.nats.client.Message; import io.nats.client.Subscription; import io.nats.client.support.ByteArrayBuilder; -import io.nats.client.support.IncomingHeadersProcessor; import io.nats.client.support.Status; import java.nio.charset.Charset; @@ -26,7 +25,6 @@ import java.util.concurrent.TimeoutException; import static io.nats.client.support.NatsConstants.*; -import static io.nats.client.support.NatsJetStreamConstants.JS_ACK_SUBJECT_PREFIX; import static io.nats.client.support.Validator.validateReplyTo; import static io.nats.client.support.Validator.validateSubject; import static java.nio.charset.StandardCharsets.US_ASCII; @@ -45,6 +43,7 @@ public class NatsMessage implements Message { // incoming specific : subject, replyTo, data and these fields protected String sid; protected int protocolLineLength; + protected int controlLineLength; // protocol specific : just this field protected ByteArrayBuilder protocolBab; @@ -53,9 +52,6 @@ public class NatsMessage implements Message { protected int sizeInBytes = -1; protected int hdrLen = 0; protected int dataLen = 0; - protected int totLen = 0; - - protected boolean dirty = false; protected NatsSubscription subscription; @@ -66,32 +62,33 @@ public class NatsMessage implements Message { // ---------------------------------------------------------------------------------------------------- // Constructors - Prefer to use Builder // ---------------------------------------------------------------------------------------------------- - private NatsMessage() { - this.data = EMPTY_BODY; + protected NatsMessage() { + initData(EMPTY_BODY); } - private NatsMessage(byte[] data) { - this.data = data == null ? EMPTY_BODY : data; + protected NatsMessage(byte[] data) { + initData(data); } @Deprecated // Plans are to remove allowing utf8-mode public NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) { - this(subject, replyTo, null, data, utf8mode); + this(subject, replyTo, null, data); + this.utf8mode = utf8mode; } public NatsMessage(String subject, String replyTo, byte[] data) { - this(subject, replyTo, null, data, false); + this(subject, replyTo, null, data); } public NatsMessage(Message message) { - this(message.getSubject(), - message.getReplyTo(), - message.getHeaders(), - message.getData(), - message.isUtf8mode()); + initData(message.getData()); + this.subject = message.getSubject(); + this.replyTo = message.getReplyTo(); + this.headers = message.getHeaders(); + this.utf8mode = message.isUtf8mode(); + finishConstruct(); } - @Deprecated // Plans are to remove allowing utf8-mode public NatsMessage(String subject, String replyTo, Headers headers, byte[] data, boolean utf8mode) { this(subject, replyTo, headers, data); @@ -99,86 +96,67 @@ public NatsMessage(String subject, String replyTo, Headers headers, byte[] data, } public NatsMessage(String subject, String replyTo, Headers headers, byte[] data) { - this(data); + initData(data); this.subject = validateSubject(subject, true); this.replyTo = validateReplyTo(replyTo, false); this.headers = headers; this.utf8mode = false; + finishConstruct(); + } - dirty = true; + protected void initData(byte[] data) { + this.data = data == null ? EMPTY_BODY : data; + dataLen = this.data.length; } - // ---------------------------------------------------------------------------------------------------- - // Only for implementors. The user created message is the only current one that calculates. - // ---------------------------------------------------------------------------------------------------- - protected boolean calculateIfDirty() { - if (dirty || (hasHeaders() && headers.isDirty())) { - int replyToLen = replyTo == null ? 0 : replyTo.length(); - dataLen = data.length; + protected void finishConstruct() { + int replyToLen = replyTo == null ? 0 : replyTo.length(); - if (headers != null && !headers.isEmpty()) { - hdrLen = headers.serializedLength(); - } - else { - hdrLen = 0; - } - totLen = hdrLen + dataLen; + if (headers != null && !headers.isEmpty()) { + hdrLen = headers.serializedLength(); + } + else { + hdrLen = 0; + } + int hdrAndDataLen = hdrLen + dataLen; - // initialize the builder with a reasonable length, preventing resize in 99.9% of the cases - // 32 for misc + subject length doubled in case of utf8 mode + replyToLen + totLen (hdrLen + dataLen) - ByteArrayBuilder bab = new ByteArrayBuilder(32 + (subject.length() * 2) + replyToLen + totLen); + // initialize the builder with a reasonable length, preventing resize in 99.9% of the cases + // 32 for misc + subject length doubled in case of utf8 mode + replyToLen + totLen (hdrLen + dataLen) + ByteArrayBuilder bab = new ByteArrayBuilder(32 + (subject.length() * 2) + replyToLen + hdrAndDataLen); - // protocol come first - if (hdrLen > 0) { - bab.append(HPUB_SP_BYTES, 0, HPUB_SP_BYTES_LEN); - } - else { - bab.append(PUB_SP_BYTES, 0, PUB_SP_BYTES_LEN); - } + // protocol come first + if (hdrLen > 0) { + bab.append(HPUB_SP_BYTES, 0, HPUB_SP_BYTES_LEN); + } + else { + bab.append(PUB_SP_BYTES, 0, PUB_SP_BYTES_LEN); + } - // next comes the subject - bab.append(subject.getBytes(UTF_8)).append(SP); + // next comes the subject + bab.append(subject.getBytes(UTF_8)).append(SP); - // reply to if it's there - if (replyToLen > 0) { - bab.append(replyTo.getBytes(UTF_8)).append(SP); - } + // reply to if it's there + if (replyToLen > 0) { + bab.append(replyTo.getBytes(UTF_8)).append(SP); + } - // header length if there are headers - if (hdrLen > 0) { - bab.append(Integer.toString(hdrLen).getBytes(US_ASCII)).append(SP); - } + // header length if there are headers + if (hdrLen > 0) { + bab.append(Integer.toString(hdrLen).getBytes(US_ASCII)).append(SP); + } - // payload length - bab.append(Integer.toString(totLen).getBytes(US_ASCII)); + // payload length + bab.append(Integer.toString(hdrAndDataLen).getBytes(US_ASCII)); - protocolBab = bab; - dirty = false; - return true; - } - return false; + protocolBab = bab; + controlLineLength = protocolBab.length() + 2; // One CRLF + sizeInBytes = protocolBab.length() + hdrAndDataLen + 4; // Two CRLFs } // ---------------------------------------------------------------------------------------------------- // Client and Message Internal Methods // ---------------------------------------------------------------------------------------------------- long getSizeInBytes() { - if (calculateIfDirty() || sizeInBytes == -1) { - sizeInBytes = protocolLineLength; - if (protocolBab != null) { - sizeInBytes += protocolBab.length(); - } - sizeInBytes += 2; // CRLF - if (!isProtocol()) { - if (hdrLen > 0) { - sizeInBytes += hdrLen; - } - if (dataLen > 0) { - sizeInBytes += dataLen; - } - sizeInBytes += 2; // CRLF - } - } return sizeInBytes; } @@ -187,18 +165,15 @@ boolean isProtocol() { } byte[] getProtocolBytes() { - calculateIfDirty(); return protocolBab.toByteArray(); } ByteArrayBuilder getProtocolBab() { - calculateIfDirty(); return protocolBab; } int getControlLineLength() { - calculateIfDirty(); - return (protocolBab != null) ? protocolBab.length() + 2 : -1; + return controlLineLength; } Headers getOrCreateHeaders() { @@ -405,7 +380,6 @@ public String toString() { } String toDetailString() { - calculateIfDirty(); return "NatsMessage:" + "\n subject='" + subject + '\'' + "\n replyTo='" + replyToString() + '\'' + @@ -418,7 +392,6 @@ String toDetailString() { "\n sizeInBytes=" + sizeInBytes + "\n hdrLen=" + hdrLen + "\n dataLen=" + dataLen + - "\n totLen=" + totLen + "\n subscription=" + subscription + "\n next=" + nextToString(); @@ -563,135 +536,4 @@ public NatsMessage build() { return new NatsMessage(subject, replyTo, headers, data, utf8mode); } } - - // ---------------------------------------------------------------------------------------------------- - // Incoming Message Factory - internal use only - // ---------------------------------------------------------------------------------------------------- - static class InternalMessageFactory { - private final String sid; - private final String subject; - private final String replyTo; - private final int protocolLineLength; - private final boolean utf8mode; - - private byte[] data; - private Headers headers; - private Status status; - private int hdrLen = 0; - private int dataLen = 0; - private int totLen = 0; - - // Create an incoming message for a subscriber - // Doesn't check control line size, since the server sent us the message - InternalMessageFactory(String sid, String subject, String replyTo, int protocolLength, boolean utf8mode) { - this.sid = sid; - this.subject = subject; - this.replyTo = replyTo; - this.protocolLineLength = protocolLength; - this.utf8mode = utf8mode; - // headers and data are set later and sizes are calculated during those setters - } - - void setHeaders(IncomingHeadersProcessor ihp) { - headers = ihp.getHeaders(); - status = ihp.getStatus(); - hdrLen = ihp.getSerializedLength(); - totLen = hdrLen + dataLen; - } - - void setData(byte[] data) { - this.data = data; - dataLen = data == null ? 0 : data.length; - totLen = hdrLen + dataLen; - } - - NatsMessage getMessage() { - NatsMessage message = null; - if (status != null) { - message = new StatusMessage(status); - } - else if (replyTo != null && replyTo.startsWith(JS_ACK_SUBJECT_PREFIX)) { - message = new NatsJetStreamMessage(); - } - if (message == null) { - message = new InternalMessage(); - } - message.sid = this.sid; - message.subject = this.subject; - message.replyTo = this.replyTo; - message.protocolLineLength = this.protocolLineLength; - message.headers = this.headers; - message.data = this.data == null ? EMPTY_BODY : this.data; - message.utf8mode = this.utf8mode; - message.hdrLen = this.hdrLen; - message.dataLen = this.dataLen; - message.totLen = this.totLen; - - return message; - } - } - - static class InternalMessage extends NatsMessage { - @Override - protected boolean calculateIfDirty() { - return false; - } - } - - private static final ByteArrayBuilder EMPTY_BAB = new ByteArrayBuilder(); - - static class ProtocolMessage extends InternalMessage { - ProtocolMessage(byte[] protocol) { - this.protocolBab = protocol == null ? EMPTY_BAB : new ByteArrayBuilder(protocol); - } - - ProtocolMessage(ByteArrayBuilder babProtocol) { - protocolBab = babProtocol; - } - - ProtocolMessage(String asciiProtocol) { - protocolBab = new ByteArrayBuilder().append(asciiProtocol); - } - - @Override - byte[] getProtocolBytes() { - return protocolBab.toByteArray(); - } - - @Override - ByteArrayBuilder getProtocolBab() { - return protocolBab; - } - - @Override - boolean isProtocol() { - return true; - } - } - - static class StatusMessage extends InternalMessage { - private final Status status; - - public StatusMessage(Status status) { - this.status = status; - } - - @Override - public boolean isStatusMessage() { - return true; - } - - @Override - public Status getStatus() { - return status; - } - - @Override - public String toString() { - return "StatusMessage{" + - "code=" + status.getCode() + - ", message='" + status.getMessage() + '\'' + - '}'; - } - } } diff --git a/src/main/java/io/nats/client/impl/ProtocolMessage.java b/src/main/java/io/nats/client/impl/ProtocolMessage.java new file mode 100644 index 000000000..7225e2874 --- /dev/null +++ b/src/main/java/io/nats/client/impl/ProtocolMessage.java @@ -0,0 +1,41 @@ +// Copyright 2015-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +import io.nats.client.support.ByteArrayBuilder; + +// ---------------------------------------------------------------------------------------------------- +// Protocol message is a special version of a NatsMessage +// ---------------------------------------------------------------------------------------------------- +class ProtocolMessage extends NatsMessage { + private static final ByteArrayBuilder EMPTY_BAB = new ByteArrayBuilder(); + + ProtocolMessage(ByteArrayBuilder babProtocol) { + protocolBab = babProtocol; + sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF + } + + ProtocolMessage(byte[] protocol) { + this(protocol == null ? EMPTY_BAB : new ByteArrayBuilder(protocol)); + } + + ProtocolMessage(String asciiProtocol) { + this(new ByteArrayBuilder().append(asciiProtocol)); + } + + @Override + boolean isProtocol() { + return true; + } +} diff --git a/src/main/java/io/nats/client/impl/StatusMessage.java b/src/main/java/io/nats/client/impl/StatusMessage.java new file mode 100644 index 000000000..5cd313ef9 --- /dev/null +++ b/src/main/java/io/nats/client/impl/StatusMessage.java @@ -0,0 +1,42 @@ +// Copyright 2015-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +import io.nats.client.support.Status; + +public class StatusMessage extends IncomingMessage { + private final Status status; + + StatusMessage(Status status) { + this.status = status; + } + + @Override + public boolean isStatusMessage() { + return true; + } + + @Override + public Status getStatus() { + return status; + } + + @Override + public String toString() { + return "StatusMessage{" + + "code=" + status.getCode() + + ", message='" + status.getMessage() + '\'' + + '}'; + } +} diff --git a/src/test/java/io/nats/client/NatsTestServer.java b/src/test/java/io/nats/client/NatsTestServer.java index f5233bc5f..dfe208122 100644 --- a/src/test/java/io/nats/client/NatsTestServer.java +++ b/src/test/java/io/nats/client/NatsTestServer.java @@ -17,8 +17,13 @@ import nats.io.NatsServerRunner; import java.io.IOException; +import java.util.logging.Level; public class NatsTestServer extends NatsServerRunner { + static { + NatsServerRunner.setLoggingLevel(Level.SEVERE); + } + public NatsTestServer() throws IOException { super(); } diff --git a/src/test/java/io/nats/client/impl/HeadersTests.java b/src/test/java/io/nats/client/impl/HeadersTests.java index 267618db6..1c447c0b5 100644 --- a/src/test/java/io/nats/client/impl/HeadersTests.java +++ b/src/test/java/io/nats/client/impl/HeadersTests.java @@ -561,7 +561,7 @@ private IncomingHeadersProcessor assertValidStatus(IncomingHeadersProcessor ihp, if (msg != null) { assertEquals(msg, status.getMessage()); } - NatsMessage.InternalMessageFactory imf = new NatsMessage.InternalMessageFactory("sid", "sub", "rt", 0, false); + IncomingMessageFactory imf = new IncomingMessageFactory("sid", "sub", "rt", 0, false); imf.setHeaders(ihp); assertTrue(imf.getMessage().isStatusMessage()); return ihp; diff --git a/src/test/java/io/nats/client/impl/JetStreamTestBase.java b/src/test/java/io/nats/client/impl/JetStreamTestBase.java index ffd0c1032..fd50d6f4f 100644 --- a/src/test/java/io/nats/client/impl/JetStreamTestBase.java +++ b/src/test/java/io/nats/client/impl/JetStreamTestBase.java @@ -67,11 +67,11 @@ public NatsMessage getTestJsMessage(long seq, String sid) { } public NatsMessage getTestMessage(String replyTo) { - return new NatsMessage.InternalMessageFactory(mockSid(), "subj", replyTo, 0, false).getMessage(); + return new IncomingMessageFactory(mockSid(), "subj", replyTo, 0, false).getMessage(); } public NatsMessage getTestMessage(String replyTo, String sid) { - return new NatsMessage.InternalMessageFactory(sid, "subj", replyTo, 0, false).getMessage(); + return new IncomingMessageFactory(sid, "subj", replyTo, 0, false).getMessage(); } static class NoopMessageManager extends MessageManager {} diff --git a/src/test/java/io/nats/client/impl/MessageManagerTests.java b/src/test/java/io/nats/client/impl/MessageManagerTests.java index 2bfcaa4a0..1a92bbea8 100644 --- a/src/test/java/io/nats/client/impl/MessageManagerTests.java +++ b/src/test/java/io/nats/client/impl/MessageManagerTests.java @@ -366,7 +366,7 @@ private PushMessageManager getManager(Connection conn, SubscribeOptions so, Nats } private NatsMessage getFlowControl(int replyToId, String sid) { - NatsMessage.InternalMessageFactory imf = new NatsMessage.InternalMessageFactory(sid, "subj", getFcSubject(replyToId), 0, false); + IncomingMessageFactory imf = new IncomingMessageFactory(sid, "subj", getFcSubject(replyToId), 0, false); imf.setHeaders(new IncomingHeadersProcessor(("NATS/1.0 " + FLOW_OR_HEARTBEAT_STATUS_CODE + " " + FLOW_CONTROL_TEXT + "\r\n").getBytes())); return imf.getMessage(); } @@ -376,14 +376,14 @@ private String getFcSubject(int id) { } private NatsMessage getFcHeartbeat(int replyToId, String sid) { - NatsMessage.InternalMessageFactory imf = new NatsMessage.InternalMessageFactory(sid, "subj", null, 0, false); + IncomingMessageFactory imf = new IncomingMessageFactory(sid, "subj", null, 0, false); String s = "NATS/1.0 " + FLOW_OR_HEARTBEAT_STATUS_CODE + " " + HEARTBEAT_TEXT + "\r\n" + CONSUMER_STALLED_HDR + ":" + getFcSubject(replyToId) + "\r\n\r\n"; imf.setHeaders(new IncomingHeadersProcessor(s.getBytes())); return imf.getMessage(); } private NatsMessage getHeartbeat(String sid) { - NatsMessage.InternalMessageFactory imf = new NatsMessage.InternalMessageFactory(mockSid(), "subj", null, 0, false); + IncomingMessageFactory imf = new IncomingMessageFactory(mockSid(), "subj", null, 0, false); String s = "NATS/1.0 " + FLOW_OR_HEARTBEAT_STATUS_CODE + " " + HEARTBEAT_TEXT + "\r\n"; imf.setHeaders(new IncomingHeadersProcessor(s.getBytes())); return imf.getMessage(); @@ -402,7 +402,7 @@ private NatsMessage getUnkStatus(String sid) { } private NatsMessage getStatus(int code, String message, String sid) { - NatsMessage.InternalMessageFactory imf = new NatsMessage.InternalMessageFactory(sid, "subj", null, 0, false); + IncomingMessageFactory imf = new IncomingMessageFactory(sid, "subj", null, 0, false); imf.setHeaders(new IncomingHeadersProcessor(("NATS/1.0 " + code + " " + message + "\r\n").getBytes())); return imf.getMessage(); } diff --git a/src/test/java/io/nats/client/impl/MessageProtocolCreationBenchmark.java b/src/test/java/io/nats/client/impl/MessageProtocolCreationBenchmark.java index a757c263d..0b7ac6cc3 100644 --- a/src/test/java/io/nats/client/impl/MessageProtocolCreationBenchmark.java +++ b/src/test/java/io/nats/client/impl/MessageProtocolCreationBenchmark.java @@ -54,7 +54,7 @@ public static void main(String args[]) throws InterruptedException { start = System.nanoTime(); for (int j = 0; j < msgCount; j++) { - new NatsMessage.ProtocolMessage(EMPTY_BODY); + new ProtocolMessage(EMPTY_BODY); } end = System.nanoTime(); diff --git a/src/test/java/io/nats/client/impl/MessageQueueBenchmark.java b/src/test/java/io/nats/client/impl/MessageQueueBenchmark.java index 8e7cd190d..16ada6c7d 100644 --- a/src/test/java/io/nats/client/impl/MessageQueueBenchmark.java +++ b/src/test/java/io/nats/client/impl/MessageQueueBenchmark.java @@ -29,7 +29,7 @@ public static void main(String args[]) throws InterruptedException { MessageQueue warm = new MessageQueue(false); for (int j = 0; j < msgCount; j++) { - msgs[j] = new NatsMessage.ProtocolMessage(warmBytes); + msgs[j] = new ProtocolMessage(warmBytes); warm.push(msgs[j]); } diff --git a/src/test/java/io/nats/client/impl/MessageQueueTests.java b/src/test/java/io/nats/client/impl/MessageQueueTests.java index 854bc8530..d00f3ac19 100644 --- a/src/test/java/io/nats/client/impl/MessageQueueTests.java +++ b/src/test/java/io/nats/client/impl/MessageQueueTests.java @@ -13,7 +13,6 @@ package io.nats.client.impl; -import io.nats.client.impl.NatsMessage.ProtocolMessage; import org.junit.jupiter.api.Test; import java.io.UnsupportedEncodingException; diff --git a/src/test/java/io/nats/client/impl/NatsMessageTests.java b/src/test/java/io/nats/client/impl/NatsMessageTests.java index 99034a095..ee7083805 100644 --- a/src/test/java/io/nats/client/impl/NatsMessageTests.java +++ b/src/test/java/io/nats/client/impl/NatsMessageTests.java @@ -30,7 +30,7 @@ public class NatsMessageTests { @Test public void testSizeOnProtocolMessage() { - NatsMessage msg = new NatsMessage.ProtocolMessage("PING"); + NatsMessage msg = new ProtocolMessage("PING"); assertEquals(msg.getProtocolBytes().length + 2, msg.getSizeInBytes(), "Size is set, with CRLF"); assertEquals("PING".getBytes(StandardCharsets.UTF_8).length + 2, msg.getSizeInBytes(), "Size is correct"); assertTrue(msg.toString().endsWith("PING")); // toString COVERAGE @@ -245,37 +245,27 @@ public void miscCoverage() { assertNotNull(m.getHeaders()); m.headers = null; // we can do this because we have package access - m.dirty = true; // for later tests, also is true b/c we nerfed the headers assertFalse(m.hasHeaders()); assertNull(m.getHeaders()); assertNotNull(m.toString()); // COVERAGE assertNotNull(m.getOrCreateHeaders()); - NatsMessage.ProtocolMessage pm = new NatsMessage.ProtocolMessage((byte[])null); + ProtocolMessage pm = new ProtocolMessage((byte[])null); assertNotNull(pm.protocolBab); assertEquals(0, pm.protocolBab.length()); - NatsMessage.InternalMessage scm = new NatsMessage.InternalMessage() {}; + IncomingMessage scm = new IncomingMessage() {}; assertNull(scm.protocolBab); - assertEquals(-1, scm.getControlLineLength()); + assertThrows(IllegalStateException.class, scm::getProtocolBytes); + assertThrows(IllegalStateException.class, scm::getProtocolBab); + assertThrows(IllegalStateException.class, scm::getControlLineLength); // coverage coverage coverage + //noinspection deprecation NatsMessage nmCov = new NatsMessage("sub", "reply", null, true); assertTrue(nmCov.isUtf8mode()); - nmCov.dirty = false; - nmCov.calculateIfDirty(); - - nmCov.dirty = false; - nmCov.headers = new Headers().add("foo", "bar"); - nmCov.calculateIfDirty(); - - nmCov.dirty = false; - nmCov.headers = new Headers().add("foo", "bar"); - nmCov.headers.getSerialized(); - nmCov.calculateIfDirty(); - - assertTrue(nmCov.toDetailString().contains("HPUB sub reply 21 21")); + assertTrue(nmCov.toDetailString().contains("PUB sub reply 0")); assertTrue(nmCov.toDetailString().contains("next=No")); nmCov.protocolBab = null; @@ -300,8 +290,8 @@ public void constructorWithMessage() { public void testFactoryProducesStatusMessage() { IncomingHeadersProcessor incomingHeadersProcessor = new IncomingHeadersProcessor("NATS/1.0 503 No Responders\r\n".getBytes()); - NatsMessage.InternalMessageFactory factory = - new NatsMessage.InternalMessageFactory("sid", "subj", "replyTo", 0, false); + IncomingMessageFactory factory = + new IncomingMessageFactory("sid", "subj", "replyTo", 0, false); factory.setHeaders(incomingHeadersProcessor); factory.setData(null); // coverage @@ -310,7 +300,7 @@ public void testFactoryProducesStatusMessage() { assertNotNull(m.getStatus()); assertEquals(503, m.getStatus().getCode()); assertNotNull(m.getStatus().toString()); - NatsMessage.StatusMessage sm = (NatsMessage.StatusMessage)m; + StatusMessage sm = (StatusMessage)m; assertNotNull(sm.toString()); } From c2dcf90540d3f3f5beddebe6785f471f786aab66 Mon Sep 17 00:00:00 2001 From: scottf <scottfauerbach@gmail.com> Date: Sat, 31 Dec 2022 09:46:24 -0500 Subject: [PATCH 2/7] fine tuning, removing unnecessary vars, fixing flapper test --- .../io/nats/client/impl/IncomingMessage.java | 11 ++- .../client/impl/IncomingMessageFactory.java | 20 ++---- .../client/impl/NatsJetStreamMessage.java | 4 +- .../java/io/nats/client/impl/NatsMessage.java | 68 ++++++++----------- .../io/nats/client/impl/ProtocolMessage.java | 2 +- .../nats/client/impl/ErrorListenerTests.java | 4 +- .../client/impl/JetStreamGeneralTests.java | 2 +- .../io/nats/client/impl/NatsMessageTests.java | 1 - 8 files changed, 46 insertions(+), 66 deletions(-) diff --git a/src/main/java/io/nats/client/impl/IncomingMessage.java b/src/main/java/io/nats/client/impl/IncomingMessage.java index 7b15dcd73..a108f0af2 100644 --- a/src/main/java/io/nats/client/impl/IncomingMessage.java +++ b/src/main/java/io/nats/client/impl/IncomingMessage.java @@ -13,19 +13,16 @@ package io.nats.client.impl; -import io.nats.client.support.ByteArrayBuilder; - public class IncomingMessage extends NatsMessage { IncomingMessage() {} - @Override - byte[] getProtocolBytes() { - throw new IllegalStateException("getProtocolBytes not supported for this type of message."); + IncomingMessage(byte[] data) { + super(data); } @Override - ByteArrayBuilder getProtocolBab() { - throw new IllegalStateException("getProtocolBab not supported for this type of message."); + byte[] getProtocolBytes() { + throw new IllegalStateException("getProtocolBytes not supported for this type of message."); } @Override diff --git a/src/main/java/io/nats/client/impl/IncomingMessageFactory.java b/src/main/java/io/nats/client/impl/IncomingMessageFactory.java index 5d961551f..dbe275474 100644 --- a/src/main/java/io/nats/client/impl/IncomingMessageFactory.java +++ b/src/main/java/io/nats/client/impl/IncomingMessageFactory.java @@ -31,9 +31,7 @@ class IncomingMessageFactory { private byte[] data; private Headers headers; private Status status; - private int hdrLen = 0; - private int dataLen = 0; - private int totLen = 0; + private int headerLen = 0; // Create an incoming message for a subscriber // Doesn't check control line size, since the server sent us the message @@ -49,14 +47,11 @@ class IncomingMessageFactory { void setHeaders(IncomingHeadersProcessor ihp) { headers = ihp.getHeaders(); status = ihp.getStatus(); - hdrLen = ihp.getSerializedLength(); - totLen = hdrLen + dataLen; + headerLen = ihp.getSerializedLength(); } void setData(byte[] data) { this.data = data; - dataLen = data == null ? 0 : data.length; - totLen = hdrLen + dataLen; } NatsMessage getMessage() { @@ -65,21 +60,18 @@ NatsMessage getMessage() { message = new StatusMessage(status); } else if (replyTo != null && replyTo.startsWith(JS_ACK_SUBJECT_PREFIX)) { - message = new NatsJetStreamMessage(); + message = new NatsJetStreamMessage(data); } else { - message = new IncomingMessage(); + message = new IncomingMessage(data); } - message.initData(data); message.sid = this.sid; message.subject = this.subject; message.replyTo = this.replyTo; - message.protocolLineLength = this.protocolLineLength; message.headers = this.headers; message.utf8mode = this.utf8mode; - message.hdrLen = this.hdrLen; - message.dataLen = this.dataLen; - message.sizeInBytes = protocolLineLength + hdrLen + dataLen + 4; // Two CRLFs + message.headerLen = this.headerLen; + message.sizeInBytes = protocolLineLength + message.headerLen + message.dataLen + 4; // Two CRLFs return message; } } diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java b/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java index a4bcc436f..a57a312fa 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamMessage.java @@ -26,7 +26,9 @@ class NatsJetStreamMessage extends IncomingMessage { private NatsJetStreamMetaData jsMetaData = null; - NatsJetStreamMessage() {} + NatsJetStreamMessage(byte[] data) { + super(data); + } /** * {@inheritDoc} diff --git a/src/main/java/io/nats/client/impl/NatsMessage.java b/src/main/java/io/nats/client/impl/NatsMessage.java index b82c19aea..dc9c69a62 100644 --- a/src/main/java/io/nats/client/impl/NatsMessage.java +++ b/src/main/java/io/nats/client/impl/NatsMessage.java @@ -42,7 +42,6 @@ public class NatsMessage implements Message { // incoming specific : subject, replyTo, data and these fields protected String sid; - protected int protocolLineLength; protected int controlLineLength; // protocol specific : just this field @@ -50,8 +49,8 @@ public class NatsMessage implements Message { // housekeeping protected int sizeInBytes = -1; - protected int hdrLen = 0; - protected int dataLen = 0; + protected int headerLen = 0; + protected int dataLen; protected NatsSubscription subscription; @@ -63,11 +62,12 @@ public class NatsMessage implements Message { // Constructors - Prefer to use Builder // ---------------------------------------------------------------------------------------------------- protected NatsMessage() { - initData(EMPTY_BODY); + this((byte[])null); } protected NatsMessage(byte[] data) { - initData(data); + this.data = data == null ? EMPTY_BODY : data; + dataLen = this.data.length; } @Deprecated // Plans are to remove allowing utf8-mode @@ -76,27 +76,18 @@ public NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode this.utf8mode = utf8mode; } - public NatsMessage(String subject, String replyTo, byte[] data) { - this(subject, replyTo, null, data); - } - - public NatsMessage(Message message) { - initData(message.getData()); - this.subject = message.getSubject(); - this.replyTo = message.getReplyTo(); - this.headers = message.getHeaders(); - this.utf8mode = message.isUtf8mode(); - finishConstruct(); - } - @Deprecated // Plans are to remove allowing utf8-mode public NatsMessage(String subject, String replyTo, Headers headers, byte[] data, boolean utf8mode) { this(subject, replyTo, headers, data); this.utf8mode = utf8mode; } + public NatsMessage(String subject, String replyTo, byte[] data) { + this(subject, replyTo, null, data); + } + public NatsMessage(String subject, String replyTo, Headers headers, byte[] data) { - initData(data); + this(data); this.subject = validateSubject(subject, true); this.replyTo = validateReplyTo(replyTo, false); this.headers = headers; @@ -104,28 +95,32 @@ public NatsMessage(String subject, String replyTo, Headers headers, byte[] data) finishConstruct(); } - protected void initData(byte[] data) { - this.data = data == null ? EMPTY_BODY : data; - dataLen = this.data.length; + public NatsMessage(Message message) { + this(message.getData()); + this.subject = message.getSubject(); + this.replyTo = message.getReplyTo(); + this.headers = message.getHeaders(); + this.utf8mode = message.isUtf8mode(); + finishConstruct(); } protected void finishConstruct() { int replyToLen = replyTo == null ? 0 : replyTo.length(); if (headers != null && !headers.isEmpty()) { - hdrLen = headers.serializedLength(); + headerLen = headers.serializedLength(); } else { - hdrLen = 0; + headerLen = 0; } - int hdrAndDataLen = hdrLen + dataLen; + int headerAndDataLen = headerLen + dataLen; // initialize the builder with a reasonable length, preventing resize in 99.9% of the cases - // 32 for misc + subject length doubled in case of utf8 mode + replyToLen + totLen (hdrLen + dataLen) - ByteArrayBuilder bab = new ByteArrayBuilder(32 + (subject.length() * 2) + replyToLen + hdrAndDataLen); + // 32 for misc + subject length doubled in case of utf8 mode + replyToLen + totLen (headerLen + dataLen) + ByteArrayBuilder bab = new ByteArrayBuilder(32 + (subject.length() * 2) + replyToLen + headerAndDataLen); // protocol come first - if (hdrLen > 0) { + if (headerLen > 0) { bab.append(HPUB_SP_BYTES, 0, HPUB_SP_BYTES_LEN); } else { @@ -141,16 +136,16 @@ protected void finishConstruct() { } // header length if there are headers - if (hdrLen > 0) { - bab.append(Integer.toString(hdrLen).getBytes(US_ASCII)).append(SP); + if (headerLen > 0) { + bab.append(Integer.toString(headerLen).getBytes(US_ASCII)).append(SP); } // payload length - bab.append(Integer.toString(hdrAndDataLen).getBytes(US_ASCII)); + bab.append(Integer.toString(headerAndDataLen).getBytes(US_ASCII)); protocolBab = bab; - controlLineLength = protocolBab.length() + 2; // One CRLF - sizeInBytes = protocolBab.length() + hdrAndDataLen + 4; // Two CRLFs + controlLineLength = protocolBab.length() + 2; // One CRLF. This is just how controlLineLength is defined. + sizeInBytes = controlLineLength + headerAndDataLen + 2; // The 2nd CRLFs } // ---------------------------------------------------------------------------------------------------- @@ -168,10 +163,6 @@ byte[] getProtocolBytes() { return protocolBab.toByteArray(); } - ByteArrayBuilder getProtocolBab() { - return protocolBab; - } - int getControlLineLength() { return controlLineLength; } @@ -387,10 +378,9 @@ String toDetailString() { "\n utf8mode=" + utf8mode + "\n headers=" + headersToString() + "\n sid='" + sid + '\'' + - "\n protocolLineLength=" + protocolLineLength + "\n protocolBytes=" + protocolBytesToString() + "\n sizeInBytes=" + sizeInBytes + - "\n hdrLen=" + hdrLen + + "\n headerLen=" + headerLen + "\n dataLen=" + dataLen + "\n subscription=" + subscription + "\n next=" + nextToString(); diff --git a/src/main/java/io/nats/client/impl/ProtocolMessage.java b/src/main/java/io/nats/client/impl/ProtocolMessage.java index 7225e2874..8569e35a9 100644 --- a/src/main/java/io/nats/client/impl/ProtocolMessage.java +++ b/src/main/java/io/nats/client/impl/ProtocolMessage.java @@ -23,7 +23,7 @@ class ProtocolMessage extends NatsMessage { ProtocolMessage(ByteArrayBuilder babProtocol) { protocolBab = babProtocol; - sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF + sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF, protocol doesn't have data } ProtocolMessage(byte[] protocol) { diff --git a/src/test/java/io/nats/client/impl/ErrorListenerTests.java b/src/test/java/io/nats/client/impl/ErrorListenerTests.java index dc2594a90..2862e1595 100644 --- a/src/test/java/io/nats/client/impl/ErrorListenerTests.java +++ b/src/test/java/io/nats/client/impl/ErrorListenerTests.java @@ -26,8 +26,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import static io.nats.client.utils.TestBase.standardCloseConnection; -import static io.nats.client.utils.TestBase.standardConnection; +import static io.nats.client.utils.TestBase.*; import static org.junit.jupiter.api.Assertions.*; public class ErrorListenerTests { @@ -130,6 +129,7 @@ public void testErrorOnNoAuth() throws Exception { String[] customArgs = {"--user", "stephen", "--pass", "password"}; TestHandler handler = new TestHandler(); try (NatsTestServer ts = new NatsTestServer(customArgs, false)) { + sleep(100); // give the server time to get ready, otherwise sometimes this test flaps // See config file for user/pass Options options = new Options.Builder(). server(ts.getURI()) diff --git a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java index 6a4252ebb..6bcddb51b 100644 --- a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java @@ -888,7 +888,7 @@ public void testInternalLookupConsumerInfoCoverage() throws Exception { @Test public void testGetJetStreamValidatedConnectionCoverage() { - NatsJetStreamMessage njsm = new NatsJetStreamMessage(); + NatsJetStreamMessage njsm = new NatsJetStreamMessage(null); IllegalStateException ise = assertThrows(IllegalStateException.class, njsm::getJetStreamValidatedConnection); assertTrue(ise.getMessage().contains("subscription")); diff --git a/src/test/java/io/nats/client/impl/NatsMessageTests.java b/src/test/java/io/nats/client/impl/NatsMessageTests.java index ee7083805..975546fa1 100644 --- a/src/test/java/io/nats/client/impl/NatsMessageTests.java +++ b/src/test/java/io/nats/client/impl/NatsMessageTests.java @@ -257,7 +257,6 @@ public void miscCoverage() { IncomingMessage scm = new IncomingMessage() {}; assertNull(scm.protocolBab); assertThrows(IllegalStateException.class, scm::getProtocolBytes); - assertThrows(IllegalStateException.class, scm::getProtocolBab); assertThrows(IllegalStateException.class, scm::getControlLineLength); // coverage coverage coverage From 7f770dcca2a287a46ec0e2df672f25a6ac52de1f Mon Sep 17 00:00:00 2001 From: scottf <scottfauerbach@gmail.com> Date: Sat, 31 Dec 2022 10:46:10 -0500 Subject: [PATCH 3/7] fine tuning, removing unnecessary vars, fixing flapper test --- .../client/impl/IncomingMessageFactory.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/nats/client/impl/IncomingMessageFactory.java b/src/main/java/io/nats/client/impl/IncomingMessageFactory.java index dbe275474..58146aa03 100644 --- a/src/main/java/io/nats/client/impl/IncomingMessageFactory.java +++ b/src/main/java/io/nats/client/impl/IncomingMessageFactory.java @@ -31,7 +31,7 @@ class IncomingMessageFactory { private byte[] data; private Headers headers; private Status status; - private int headerLen = 0; + private int headerLen; // Create an incoming message for a subscriber // Doesn't check control line size, since the server sent us the message @@ -41,7 +41,6 @@ class IncomingMessageFactory { this.replyTo = replyTo; this.protocolLineLength = protocolLength; this.utf8mode = utf8mode; - // headers and data are set later and sizes are calculated during those setters } void setHeaders(IncomingHeadersProcessor ihp) { @@ -65,13 +64,13 @@ else if (replyTo != null && replyTo.startsWith(JS_ACK_SUBJECT_PREFIX)) { else { message = new IncomingMessage(data); } - message.sid = this.sid; - message.subject = this.subject; - message.replyTo = this.replyTo; - message.headers = this.headers; - message.utf8mode = this.utf8mode; - message.headerLen = this.headerLen; - message.sizeInBytes = protocolLineLength + message.headerLen + message.dataLen + 4; // Two CRLFs + message.sid = sid; + message.subject = subject; + message.replyTo = replyTo; + message.headers = headers; + message.headerLen = headerLen; + message.utf8mode = utf8mode; + message.sizeInBytes = protocolLineLength + headerLen + message.dataLen + 4; // Two CRLFs return message; } } From c894b664b33f72b4ec00c12e3307ebfa66373309 Mon Sep 17 00:00:00 2001 From: scottf <scottfauerbach@gmail.com> Date: Wed, 4 Jan 2023 17:48:20 -0500 Subject: [PATCH 4/7] update dependencies --- build.gradle | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 29bd6053f..f6ae05556 100644 --- a/build.gradle +++ b/build.gradle @@ -35,13 +35,12 @@ java { repositories { mavenCentral() maven { url "https://oss.sonatype.org/content/repositories/releases/" } - mavenLocal() } dependencies { implementation 'net.i2p.crypto:eddsa:0.3.0' testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0' - testImplementation 'io.nats:jnats-server-runner:1.0.13' + testImplementation 'io.nats:jnats-server-runner:1.0.14' } sourceSets { From 5076599f08459b29919ea6ee99404a8f4f79fb3b Mon Sep 17 00:00:00 2001 From: scottf <scottfauerbach@gmail.com> Date: Wed, 4 Jan 2023 17:56:00 -0500 Subject: [PATCH 5/7] keeping multiple prs in sync --- dependencies.md | 2 +- src/test/java/io/nats/client/NatsTestServer.java | 16 ++++++++++++---- .../io/nats/client/impl/TLSConnectTests.java | 2 +- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/dependencies.md b/dependencies.md index a6cbec30b..df3e6dce8 100644 --- a/dependencies.md +++ b/dependencies.md @@ -12,7 +12,7 @@ This file lists the dependencies used in this repository. | Dependency | License | |-------------------------------------------------|-----------------------------------------| -| io.nats:jnats-server-runner:1.0.13 | Apache 2.0 License | +| io.nats:jnats-server-runner:1.0.14 | Apache 2.0 License | | org.apiguardian:apiguardian-api:1.1.0 | Apache 2.0 License | | org.junit.jupiter:junit-jupiter:5.9.0 | Eclipse Public License v2.0 | | org.junit:junit-bom:5.9.0 | Eclipse Public License v2.0 | diff --git a/src/test/java/io/nats/client/NatsTestServer.java b/src/test/java/io/nats/client/NatsTestServer.java index dfe208122..e151846ac 100644 --- a/src/test/java/io/nats/client/NatsTestServer.java +++ b/src/test/java/io/nats/client/NatsTestServer.java @@ -1,4 +1,4 @@ -// Copyright 2015-2020 The NATS Authors +// Copyright 2015-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at: @@ -76,7 +76,15 @@ public static int nextPort() throws IOException { return NatsRunnerUtils.nextPort(); } - public static String getURIForPort(int port) { - return NatsRunnerUtils.getURIForPort(port); + public String getLocalhostUri(String schema) { + return NatsRunnerUtils.getLocalhostUri(schema, getPort()); } -} \ No newline at end of file + + public static String getNatsLocalhostUri(int port) { + return NatsRunnerUtils.getNatsLocalhostUri(port); + } + + public static String getLocalhostUri(String schema, int port) { + return NatsRunnerUtils.getLocalhostUri(schema, port); + } +} diff --git a/src/test/java/io/nats/client/impl/TLSConnectTests.java b/src/test/java/io/nats/client/impl/TLSConnectTests.java index d968b77da..211b4ebf9 100644 --- a/src/test/java/io/nats/client/impl/TLSConnectTests.java +++ b/src/test/java/io/nats/client/impl/TLSConnectTests.java @@ -214,7 +214,7 @@ public void testTLSOnReconnect() throws InterruptedException, Exception { SSLContext ctx = TestSSLUtils.createTestSSLContext(); Options options = new Options.Builder(). server(ts.getURI()). - server(NatsTestServer.getURIForPort(newPort)). + server(NatsTestServer.getNatsLocalhostUri(newPort)). maxReconnects(-1). sslContext(ctx). connectionListener(handler). From cd96f3c6a67fede6e11e2def935251ee76e0143a Mon Sep 17 00:00:00 2001 From: scottf <scottfauerbach@gmail.com> Date: Thu, 5 Jan 2023 08:38:11 -0500 Subject: [PATCH 6/7] partially resuable Protocol Message saves constant allocation and size calculation --- .../java/io/nats/client/impl/NatsConnection.java | 14 +++++++++++--- .../java/io/nats/client/impl/ProtocolMessage.java | 9 +++++---- .../java/io/nats/client/impl/NatsMessageTests.java | 2 +- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 1491ddb6d..55881af78 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -1285,9 +1285,9 @@ CompletableFuture<Boolean> sendPing(boolean treatAsInternal) { pongQueue.add(pongFuture); if (treatAsInternal) { - queueInternalOutgoing(new ProtocolMessage(OP_PING_BYTES)); + queueInternalOutgoing(new ProtocolMessage(PING_PROTO)); } else { - queueOutgoing(new ProtocolMessage(OP_PING_BYTES)); + queueOutgoing(new ProtocolMessage(PING_PROTO)); } this.needPing.set(true); @@ -1295,8 +1295,16 @@ CompletableFuture<Boolean> sendPing(boolean treatAsInternal) { return pongFuture; } + // This is a minor speed / memory enhancement. + // We can't reuse the same instance of any NatsMessage b/c of the "NatsMessage next" state + // But it is safe to share the data bytes and the size since those fields are just being read + // This constructor "ProtocolMessage(ProtocolMessage pm)" shares the data and size + // reducing allocation of data for something that is often created and used + static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES); + static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES); + void sendPong() { - queueInternalOutgoing(new ProtocolMessage(OP_PONG_BYTES)); + queueInternalOutgoing(new ProtocolMessage(PONG_PROTO)); } // Called by the reader diff --git a/src/main/java/io/nats/client/impl/ProtocolMessage.java b/src/main/java/io/nats/client/impl/ProtocolMessage.java index 8569e35a9..5bad98a5d 100644 --- a/src/main/java/io/nats/client/impl/ProtocolMessage.java +++ b/src/main/java/io/nats/client/impl/ProtocolMessage.java @@ -30,12 +30,13 @@ class ProtocolMessage extends NatsMessage { this(protocol == null ? EMPTY_BAB : new ByteArrayBuilder(protocol)); } - ProtocolMessage(String asciiProtocol) { - this(new ByteArrayBuilder().append(asciiProtocol)); - } - @Override boolean isProtocol() { return true; } + + ProtocolMessage(ProtocolMessage pm) { + protocolBab = pm.protocolBab; + sizeInBytes = pm.sizeInBytes; + } } diff --git a/src/test/java/io/nats/client/impl/NatsMessageTests.java b/src/test/java/io/nats/client/impl/NatsMessageTests.java index 975546fa1..afd29bb3e 100644 --- a/src/test/java/io/nats/client/impl/NatsMessageTests.java +++ b/src/test/java/io/nats/client/impl/NatsMessageTests.java @@ -30,7 +30,7 @@ public class NatsMessageTests { @Test public void testSizeOnProtocolMessage() { - NatsMessage msg = new ProtocolMessage("PING"); + NatsMessage msg = new ProtocolMessage("PING".getBytes()); assertEquals(msg.getProtocolBytes().length + 2, msg.getSizeInBytes(), "Size is set, with CRLF"); assertEquals("PING".getBytes(StandardCharsets.UTF_8).length + 2, msg.getSizeInBytes(), "Size is correct"); assertTrue(msg.toString().endsWith("PING")); // toString COVERAGE From a66212e21c794eb8ebd9d474fe0e48e09f776a3b Mon Sep 17 00:00:00 2001 From: scottf <scottfauerbach@gmail.com> Date: Thu, 5 Jan 2023 09:11:22 -0500 Subject: [PATCH 7/7] partially resuable Protocol Message saves constant allocation and size calculation aroc --- src/main/java/io/nats/client/impl/NatsConnection.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 55881af78..96d120b34 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -1300,8 +1300,9 @@ CompletableFuture<Boolean> sendPing(boolean treatAsInternal) { // But it is safe to share the data bytes and the size since those fields are just being read // This constructor "ProtocolMessage(ProtocolMessage pm)" shares the data and size // reducing allocation of data for something that is often created and used - static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES); - static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES); + // These static instances are the once that are used for copying, sendPing and sendPong + private static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES); + private static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES); void sendPong() { queueInternalOutgoing(new ProtocolMessage(PONG_PROTO));