Skip to content

Commit

Permalink
Message Headers Improvement (#1054)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Dec 18, 2023
1 parent 7c75122 commit 4d7b99d
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 99 deletions.
14 changes: 7 additions & 7 deletions src/main/java/io/nats/client/api/MessageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,21 @@ public MessageInfo(Message msg, String streamName, boolean direct) {
this.direct = direct;

if (direct) {
this.headers = msg.getHeaders();
this.subject = headers.getLast(NATS_SUBJECT);
Headers msgHeaders = msg.getHeaders();
this.subject = msgHeaders.getLast(NATS_SUBJECT);
this.data = msg.getData();
seq = Long.parseLong(headers.getFirst(NATS_SEQUENCE));
time = DateTimeUtils.parseDateTime(headers.getFirst(NATS_TIMESTAMP));
stream = headers.getFirst(NATS_STREAM);
String temp = headers.getFirst(NATS_LAST_SEQUENCE);
seq = Long.parseLong(msgHeaders.getFirst(NATS_SEQUENCE));
time = DateTimeUtils.parseDateTime(msgHeaders.getFirst(NATS_TIMESTAMP));
stream = msgHeaders.getFirst(NATS_STREAM);
String temp = msgHeaders.getFirst(NATS_LAST_SEQUENCE);
if (temp == null) {
lastSeq = -1;
}
else {
lastSeq = JsonUtils.safeParseLong(temp, -1);
}
// these are control headers, not real headers so don't give them to the user.
headers.remove(NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE);
headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS);
}
else if (hasError()) {
subject = null;
Expand Down
92 changes: 63 additions & 29 deletions src/main/java/io/nats/client/impl/Headers.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* An object that represents a map of keys to a list of values. It does not accept
* null or invalid keys. It ignores null values, accepts empty string as a value
* and rejects invalid values.
*
* !!!
* THIS CLASS IS NOT THREAD SAFE
*/
public class Headers {
Expand All @@ -36,21 +36,47 @@ public class Headers {

private final Map<String, List<String>> valuesMap;
private final Map<String, Integer> lengthMap;
private final boolean readOnly;
private byte[] serialized;
private int dataLength;

public Headers() {
valuesMap = new HashMap<>();
lengthMap = new HashMap<>();
this(null, false, null);
}

public Headers(Headers headers) {
this();
this(headers, false, null);
}

public Headers(Headers headers, boolean readOnly) {
this(headers, readOnly, null);
}

public Headers(Headers headers, boolean readOnly, String[] keysNotToCopy) {
Map<String, List<String>> tempValuesMap = new HashMap<>();
Map<String, Integer> tempLengthMap = new HashMap<>();
if (headers != null) {
valuesMap.putAll(headers.valuesMap);
lengthMap.putAll(headers.lengthMap);
tempValuesMap.putAll(headers.valuesMap);
tempLengthMap.putAll(headers.lengthMap);
dataLength = headers.dataLength;
serialized = null;
if (keysNotToCopy != null) {
for (String key : keysNotToCopy) {
if (key != null) {
if (tempValuesMap.remove(key) != null) {
dataLength -= tempLengthMap.remove(key);
}
}
}
}
}
this.readOnly = readOnly;
if (readOnly) {
valuesMap = Collections.unmodifiableMap(tempValuesMap);
lengthMap = Collections.unmodifiableMap(tempLengthMap);
}
else {
valuesMap = tempValuesMap;
lengthMap = tempLengthMap;
}
}

Expand Down Expand Up @@ -155,7 +181,7 @@ public Headers put(Map<String, List<String>> map) {

// the put delegate that all puts call
private void _put(String key, Collection<String> values) {
if (key == null || key.length() == 0) {
if (key == null || key.isEmpty()) {
throw new IllegalArgumentException("Key cannot be null or empty.");
}
if (values != null) {
Expand Down Expand Up @@ -203,7 +229,7 @@ private void _remove(String key) {
}

/**
* Returns the number of keys (case sensitive) in the header.
* Returns the number of keys (case-sensitive) in the header.
*
* @return the number of keys
*/
Expand All @@ -221,7 +247,7 @@ public boolean isEmpty() {
}

/**
* Removes all of the keys The object map will be empty after this call returns.
* Removes all the keys The object map will be empty after this call returns.
*/
public void clear() {
valuesMap.clear();
Expand All @@ -231,20 +257,20 @@ public void clear() {
}

/**
* Returns <tt>true</tt> if key (case sensitive) is present (has values)
* Returns <tt>true</tt> if key (case-sensitive) is present (has values)
*
* @param key key whose presence is to be tested
* @return <tt>true</tt> if the key (case sensitive) is present (has values)
* @return <tt>true</tt> if the key (case-sensitive) is present (has values)
*/
public boolean containsKey(String key) {
return valuesMap.containsKey(key);
}

/**
* Returns <tt>true</tt> if key (case insensitive) is present (has values)
* Returns <tt>true</tt> if key (case-insensitive) is present (has values)
*
* @param key exact key whose presence is to be tested
* @return <tt>true</tt> if the key (case insensitive) is present (has values)
* @return <tt>true</tt> if the key (case-insensitive) is present (has values)
*/
public boolean containsKeyIgnoreCase(String key) {
for (String k : valuesMap.keySet()) {
Expand All @@ -256,7 +282,7 @@ public boolean containsKeyIgnoreCase(String key) {
}

/**
* Returns a {@link Set} view of the keys (case sensitive) contained in the object.
* Returns a {@link Set} view of the keys (case-sensitive) contained in the object.
*
* @return a read-only set the keys contained in this map
*/
Expand All @@ -265,7 +291,7 @@ public Set<String> keySet() {
}

/**
* Returns a {@link Set} view of the keys (case insensitive) contained in the object.
* Returns a {@link Set} view of the keys (case-insensitive) contained in the object.
*
* @return a read-only set of keys (in lowercase) contained in this map
*/
Expand All @@ -278,43 +304,43 @@ public Set<String> keySetIgnoreCase() {
}

/**
* Returns a {@link List} view of the values for the specific (case sensitive) key.
* Returns a {@link List} view of the values for the specific (case-sensitive) key.
* Will be {@code null} if the key is not found.
*
* @return a read-only list of the values for the case sensitive key.
* @return a read-only list of the values for the case-sensitive key.
*/
public List<String> get(String key) {
List<String> values = valuesMap.get(key);
return values == null ? null : Collections.unmodifiableList(values);
}

/**
* Returns the first value for the specific (case sensitive) key.
* Returns the first value for the specific (case-sensitive) key.
* Will be {@code null} if the key is not found.
*
* @return the first value for the case sensitive key.
* @return the first value for the case-sensitive key.
*/
public String getFirst(String key) {
List<String> values = valuesMap.get(key);
return values == null ? null : values.get(0);
}

/**
* Returns the last value for the specific (case sensitive) key.
* Returns the last value for the specific (case-sensitive) key.
* Will be {@code null} if the key is not found.
*
* @return the last value for the case sensitive key.
* @return the last value for the case-sensitive key.
*/
public String getLast(String key) {
List<String> values = valuesMap.get(key);
return values == null ? null : values.get(values.size() - 1);
}

/**
* Returns a {@link List} view of the values for the specific (case insensitive) key.
* Returns a {@link List} view of the values for the specific (case-insensitive) key.
* Will be {@code null} if the key is not found.
*
* @return a read-only list of the values for the case insensitive key.
* @return a read-only list of the values for the case-insensitive key.
*/
public List<String> getIgnoreCase(String key) {
List<String> values = new ArrayList<>();
Expand All @@ -323,11 +349,11 @@ public List<String> getIgnoreCase(String key) {
values.addAll(valuesMap.get(k));
}
}
return values.size() == 0 ? null : Collections.unmodifiableList(values);
return values.isEmpty() ? null : Collections.unmodifiableList(values);
}

/**
* Performs the given action for each header entry (case sensitive keys) until all entries
* Performs the given action for each header entry (case-sensitive keys) until all entries
* have been processed or the action throws an exception.
* Any attempt to modify the values will throw an exception.
*
Expand All @@ -341,7 +367,7 @@ public void forEach(BiConsumer<String, List<String>> action) {
}

/**
* Returns a {@link Set} read only view of the mappings contained in the header (case sensitive keys).
* Returns a {@link Set} read only view of the mappings contained in the header (case-sensitive keys).
* The set is not modifiable and any attempt to modify will throw an exception.
*
* @return a set view of the mappings contained in this map
Expand Down Expand Up @@ -447,7 +473,7 @@ public int serializeToArray(int destPosition, byte[] dest) {
*/
private void checkKey(String key) {
// key cannot be null or empty and contain only printable characters except colon
if (key == null || key.length() == 0) {
if (key == null || key.isEmpty()) {
throw new IllegalArgumentException(KEY_CANNOT_BE_EMPTY_OR_NULL);
}

Expand Down Expand Up @@ -500,10 +526,18 @@ private class Checker {
}

boolean hasValues() {
return list.size() > 0;
return !list.isEmpty();
}
}

/**
* Whether the entire Headers is read only
* @return the read only state
*/
public boolean isReadOnly() {
return readOnly;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/io/nats/client/impl/NatsMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public NatsMessage(String subject, String replyTo, Headers headers, byte[] data)
this(data);
this.subject = validateSubject(subject, true);
this.replyTo = validateReplyTo(replyTo, false);
this.headers = headers;
this.headers = readOnlyOf(headers);
this.utf8mode = false;
finishConstruct();
}
Expand All @@ -99,11 +99,18 @@ public NatsMessage(Message message) {
this(message.getData());
this.subject = message.getSubject();
this.replyTo = message.getReplyTo();
this.headers = message.getHeaders();
this.headers = readOnlyOf(message.getHeaders());
this.utf8mode = message.isUtf8mode();
finishConstruct();
}

private static Headers readOnlyOf(Headers headers) {
if (headers == null || headers.isReadOnly()) {
return headers;
}
return new Headers(headers, true, null);
}

protected void finishConstruct() {
int replyToLen = replyTo == null ? 0 : replyTo.length();

Expand Down Expand Up @@ -167,13 +174,6 @@ int getControlLineLength() {
return controlLineLength;
}

Headers getOrCreateHeaders() {
if (headers == null) {
headers = new Headers();
}
return headers;
}

void setSubscription(NatsSubscription sub) {
subscription = sub;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public interface NatsJetStreamConstants {
String NATS_TIMESTAMP = "Nats-Time-Stamp";
String NATS_SUBJECT = "Nats-Subject";
String NATS_LAST_SEQUENCE = "Nats-Last-Sequence";
String[] MESSAGE_INFO_HEADERS = new String[]{NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE};

String NATS_PENDING_MESSAGES = "Nats-Pending-Messages";
String NATS_PENDING_BYTES = "Nats-Pending-Bytes";
Expand Down
Loading

0 comments on commit 4d7b99d

Please sign in to comment.