Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

message size calculation improvements #820

Merged
merged 8 commits into from
Jan 9, 2023
Merged
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ repositories {
dependencies {
implementation 'net.i2p.crypto:eddsa:0.3.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0'
testImplementation 'io.nats:jnats-server-runner:1.0.9'
testImplementation 'io.nats:jnats-server-runner:1.0.14'
}

sourceSets {
Expand Down
2 changes: 1 addition & 1 deletion dependencies.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This file lists the dependencies used in this repository.

| Dependency | License |
|-------------------------------------------------|-----------------------------------------|
| io.nats:jnats-server-runner:1.0.9 | Apache 2.0 License |
| io.nats:jnats-server-runner:1.0.14 | Apache 2.0 License |
| org.apiguardian:apiguardian-api:1.1.0 | Apache 2.0 License |
| org.junit.jupiter:junit-jupiter:5.9.0 | Eclipse Public License v2.0 |
| org.junit:junit-bom:5.9.0 | Eclipse Public License v2.0 |
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/io/nats/client/impl/IncomingMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2015-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.impl;

public class IncomingMessage extends NatsMessage {
IncomingMessage() {}

IncomingMessage(byte[] data) {
super(data);
}

@Override
byte[] getProtocolBytes() {
throw new IllegalStateException("getProtocolBytes not supported for this type of message.");
}

@Override
int getControlLineLength() {
throw new IllegalStateException("getControlLineLength not supported for this type of message.");
}
}
76 changes: 76 additions & 0 deletions src/main/java/io/nats/client/impl/IncomingMessageFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2015-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.impl;

import io.nats.client.support.IncomingHeadersProcessor;
import io.nats.client.support.Status;

import static io.nats.client.support.NatsJetStreamConstants.JS_ACK_SUBJECT_PREFIX;

// ----------------------------------------------------------------------------------------------------
// Incoming Message Factory - internal use only
// ----------------------------------------------------------------------------------------------------
class IncomingMessageFactory {
private final String sid;
private final String subject;
private final String replyTo;
private final int protocolLineLength;
private final boolean utf8mode;

private byte[] data;
private Headers headers;
private Status status;
private int headerLen;

// Create an incoming message for a subscriber
// Doesn't check control line size, since the server sent us the message
IncomingMessageFactory(String sid, String subject, String replyTo, int protocolLength, boolean utf8mode) {
this.sid = sid;
this.subject = subject;
this.replyTo = replyTo;
this.protocolLineLength = protocolLength;
this.utf8mode = utf8mode;
}

void setHeaders(IncomingHeadersProcessor ihp) {
headers = ihp.getHeaders();
status = ihp.getStatus();
headerLen = ihp.getSerializedLength();
}

void setData(byte[] data) {
this.data = data;
}

NatsMessage getMessage() {
NatsMessage message;
if (status != null) {
message = new StatusMessage(status);
}
else if (replyTo != null && replyTo.startsWith(JS_ACK_SUBJECT_PREFIX)) {
message = new NatsJetStreamMessage(data);
}
else {
message = new IncomingMessage(data);
}
message.sid = sid;
message.subject = subject;
message.replyTo = replyTo;
message.headers = headers;
message.headerLen = headerLen;
message.utf8mode = utf8mode;
message.sizeInBytes = protocolLineLength + headerLen + message.dataLen + 4; // Two CRLFs
return message;
}
}
16 changes: 12 additions & 4 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.nats.client.*;
import io.nats.client.ConnectionListener.Events;
import io.nats.client.api.ServerInfo;
import io.nats.client.impl.NatsMessage.ProtocolMessage;
import io.nats.client.support.ByteArrayBuilder;
import io.nats.client.support.NatsRequestCompletableFuture;
import io.nats.client.support.Validator;
Expand Down Expand Up @@ -1286,18 +1285,27 @@ CompletableFuture<Boolean> sendPing(boolean treatAsInternal) {
pongQueue.add(pongFuture);

if (treatAsInternal) {
queueInternalOutgoing(new ProtocolMessage(OP_PING_BYTES));
queueInternalOutgoing(new ProtocolMessage(PING_PROTO));
} else {
queueOutgoing(new ProtocolMessage(OP_PING_BYTES));
queueOutgoing(new ProtocolMessage(PING_PROTO));
}

this.needPing.set(true);
this.statistics.incrementPingCount();
return pongFuture;
}

// This is a minor speed / memory enhancement.
// We can't reuse the same instance of any NatsMessage b/c of the "NatsMessage next" state
// But it is safe to share the data bytes and the size since those fields are just being read
// This constructor "ProtocolMessage(ProtocolMessage pm)" shares the data and size
// reducing allocation of data for something that is often created and used
// These static instances are the once that are used for copying, sendPing and sendPong
private static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES);
private static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES);

void sendPong() {
queueInternalOutgoing(new ProtocolMessage(OP_PONG_BYTES));
queueInternalOutgoing(new ProtocolMessage(PONG_PROTO));
}

// Called by the reader
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/io/nats/client/impl/NatsConnectionReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package io.nats.client.impl;

import io.nats.client.impl.NatsMessage.InternalMessageFactory;
import io.nats.client.support.IncomingHeadersProcessor;

import java.io.IOException;
Expand Down Expand Up @@ -46,21 +45,21 @@ enum Mode {
private boolean gotCR;

private String op;
private char[] opArray;
private final char[] opArray;
private int opPos;

private char[] msgLineChars;
private final char[] msgLineChars;
private int msgLinePosition;

private Mode mode;

private InternalMessageFactory incoming;
private IncomingMessageFactory incoming;
private byte[] msgHeaders;
private byte[] msgData;
private int msgHeadersPosition;
private int msgDataPosition;

private byte[] buffer;
private final byte[] buffer;
private int bufferPosition;

private Future<Boolean> stopped;
Expand Down Expand Up @@ -418,7 +417,7 @@ static String opFor(char[] chars, int length) {
}
}

private static int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000};
private static final int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000};

public static int parseLength(String s) throws NumberFormatException {
int length = s.length();
Expand All @@ -433,7 +432,7 @@ public static int parseLength(String s) throws NumberFormatException {
int d = (c - '0');

if (d>9) {
throw new NumberFormatException("Invalid char in message length \'" + c + "\'");
throw new NumberFormatException("Invalid char in message length '" + c + "'");
}

retVal += d * TENS[length - i - 1];
Expand Down Expand Up @@ -476,7 +475,7 @@ void parseProtocolMessage() throws IOException {

int incomingLength = parseLength(lengthChars);

this.incoming = new InternalMessageFactory(sid, subject, replyTo, protocolLineLength, utf8Mode);
this.incoming = new IncomingMessageFactory(sid, subject, replyTo, protocolLineLength, utf8Mode);
this.mode = Mode.GATHER_DATA;
this.msgData = new byte[incomingLength];
this.msgDataPosition = 0;
Expand Down Expand Up @@ -518,7 +517,7 @@ void parseProtocolMessage() throws IOException {
throw new IllegalStateException("Bad HMSG control line, missing required fields");
}

this.incoming = new InternalMessageFactory(hSid, hSubject, hReplyTo, hProtocolLineLength, utf8Mode);
this.incoming = new IncomingMessageFactory(hSid, hSubject, hReplyTo, hProtocolLineLength, utf8Mode);
this.msgHeaders = new byte[hdrLen];
this.msgData = new byte[totLen - hdrLen];
this.mode = Mode.GATHER_HEADERS;
Expand All @@ -532,7 +531,7 @@ void parseProtocolMessage() throws IOException {
this.mode = Mode.GATHER_OP;
break;
case OP_ERR:
String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("\'", "");
String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("'", "");
this.connection.processError(errorText);
this.op = UNKNOWN_OP;
this.mode = Mode.GATHER_OP;
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/io/nats/client/impl/NatsJetStreamMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.nats.client.impl;

import io.nats.client.Connection;
import io.nats.client.impl.NatsMessage.InternalMessage;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
Expand All @@ -23,11 +22,13 @@
import static io.nats.client.support.NatsConstants.NANOS_PER_MILLI;
import static io.nats.client.support.Validator.validateDurationRequired;

class NatsJetStreamMessage extends InternalMessage {
class NatsJetStreamMessage extends IncomingMessage {

private NatsJetStreamMetaData jsMetaData = null;

NatsJetStreamMessage() {}
NatsJetStreamMessage(byte[] data) {
super(data);
}

/**
* {@inheritDoc}
Expand Down
Loading