Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Headers scott #353

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
11ac86b
still figuring out how to test it, but the implementation seems easy …
Aug 21, 2020
d0717a3
added test to #336
Aug 21, 2020
94f1603
added test for #334
Aug 21, 2020
d243ff2
merge
Aug 21, 2020
1efdac0
bumped the version to 2.8.0. I think the header is a bigger change. I…
Aug 21, 2020
9cd9dce
early work on #335
Aug 24, 2020
b4f13ff
work on headers
Aug 24, 2020
f896761
refactoring NatsConnectionReader to make it more testable
Aug 24, 2020
7af0589
Merge branch 'master' into headers
RichardHightower Aug 24, 2020
a79c6ac
new reader test w/o timing issues and using mocks
Aug 27, 2020
3d13eed
added new reader unit test
Aug 27, 2020
b716921
got unit test working for HMSG, able to parse message and headers and…
Aug 27, 2020
8e9ebe5
added builder for creating messages
Sep 2, 2020
325a5fc
added new methods to connection. Added Message Builder and updated so…
Sep 2, 2020
9b32740
added tests for message creation of PUB and HPUB
Sep 2, 2020
9f0a161
added non JDK 1.8 feature
Sep 2, 2020
d1aeec7
added more tests, moved LDM out of tests with race condition
Sep 2, 2020
78770b6
removed debug prints
Sep 2, 2020
40187ec
adding tests with drain so tests work
Sep 3, 2020
965e08c
headers-scott (1) initial check in
Nov 1, 2020
cf3e031
headers-scott (2) initial check in
Nov 1, 2020
31e378c
headers-scott (3) addings tests
Nov 2, 2020
419e1d1
headers-scott (4) more addings tests
Nov 2, 2020
7ab7c36
headers-scott (5) simplified keep it simple
Nov 2, 2020
61c8fa7
headers-scott (5) NatsMessage ProtocolBuilder
Nov 3, 2020
45df86d
headers-scott (6) wrapping Message next in functions instead of direc…
Nov 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 167 additions & 84 deletions src/main/java/io/nats/client/Connection.java

Large diffs are not rendered by default.

35 changes: 29 additions & 6 deletions src/main/java/io/nats/client/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.nats.client;

import io.nats.client.impl.Headers;

/**
* The NATS library uses a Message object to encapsulate incoming messages. Applications
* publish and send requests with raw strings and byte[] but incoming messages can have a few
Expand All @@ -22,35 +24,56 @@
* and is safe to manipulate.
*/
public interface Message {
byte[] EMPTY_BODY = new byte[0];

/**
* @return the subject that this message was sent to
*/
public String getSubject();
String getSubject();

/**
* @return the subject the application is expected to send a reply message on
*/
public String getReplyTo();
String getReplyTo();

/**
* @return the headers from the message
*/
Headers getHeaders();

/**
* @return if is utf8Mode
*/
boolean isUtf8mode();

/**
* @return the data from the message
*/
public byte[] getData();
byte[] getData();

/**
* @return the Subscription associated with this message, may be owned by a Dispatcher
*/
public Subscription getSubscription();
Subscription getSubscription();

/**
* @return the id associated with the subscription, used by the connection when processing an incoming
* message from the server
*/
public String getSID();
String getSID();

/**
* @return the connection which can be used for publishing, will be null if the subscription is null
*/
public Connection getConnection();
Connection getConnection();

/**
* @return the protocol bytes
*/
byte[] getProtocolBytes();

/**
* @return the message size in bytes
*/
long getSizeInBytes();
}
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/Nats.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

package io.nats.client;

import java.io.IOException;

import io.nats.client.impl.NatsImpl;

import java.io.IOException;

/**
* The Nats class is the entry point into the NATS client for Java. This class
* is used to create a connection to the NATS server. Connecting is a
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/io/nats/client/impl/ByteBufferUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.nats.client.impl;

import java.nio.ByteBuffer;

public class ByteBufferUtil {

public static ByteBuffer enlargeBuffer(ByteBuffer buffer, int atLeast) {
int current = buffer.capacity();
int newSize = Math.max(current * 2, atLeast);
ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
buffer.flip();
newBuffer.put(buffer);
return newBuffer;
}
}
188 changes: 188 additions & 0 deletions src/main/java/io/nats/client/impl/Headers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright 2015-2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.impl;

import java.util.*;

public class Headers {
private static final String KEY_CANNOT_BE_EMPTY_OR_NULL = "Header key cannot be null.";
private static final String VALUES_CANNOT_BE_EMPTY_OR_NULL = "Header values cannot be empty or null.";

private final Map<String, Set<String>> headerMap = new HashMap<>();

/**
* If the key is present add the values to the set of values for the key.
* If the key is not present, sets the specified values for the key.
* Duplicate values are ignored. Null and empty values are not allowed
*
* @param key the key
* @param values the values
* @return {@code true} if this object did not already contain values for the key
* @throws IllegalArgumentException if the key is null or empty
* -or- any value is null or empty.
*/
public boolean add(String key, String... values) {
return add(key, Arrays.asList(values));
}

/**
* If the key is present add the values to the set of values for the key.
* If the key is not present, sets the specified values for the key.
* Duplicate values are ignored. Null and empty values are not allowed.
*
* @param key the key
* @param values the values
* @return {@code true} if this object did not already contain the key or the
* values for the key changed.
* @throws IllegalArgumentException if the key is null or empty
* -or- if then input collection is null
* -or- if any item in the collection is null or empty.
*/
public boolean add(String key, Collection<String> values) {
Set<String> validatedSet = validateKeyAndValues(key, values);

Set<String> currentSet = headerMap.get(key);
if (currentSet == null) {
headerMap.put(key, validatedSet);
return true;
}

return currentSet.addAll(validatedSet);
}

private Set<String> validateKeyAndValues(String key, Collection<String> values) {
keyCannotBeNull(key);
valuesCannotBeEmptyOrNull(values);
Set<String> validatedSet = new HashSet<>();
for (String v : values) {
valueCannotBeEmptyOrNull(v);
validatedSet.add(v);
}
return validatedSet;
}

/**
* Associates the specified values with the key. If the key was already present
* any existing values are removed and replaced with the new set.
* Duplicate values are ignored. Null and empty values are not allowed
*
* @param key the key
* @param values the values
* @return {@code true} if this object did not already contain values for the key
* @throws IllegalArgumentException if the key is null or empty
* -or- any value is null or empty.
*/
public boolean put(String key, String... values) {
return put(key, Arrays.asList(values));
}

/**
* Associates the specified values with the key. If the key was already present
* any existing values are removed and replaced with the new set.
* Duplicate values are ignored. Null and empty values are not allowed
*
* @param key the key
* @param values the values
* @return {@code true} if this object did not already contain values for the key
* @throws IllegalArgumentException if the key is null or empty
* -or- if then input collection is null
* -or- if any item in the collection is null or empty.
*/
public boolean put(String key, Collection<String> values) {
Set<String> validatedSet = validateKeyAndValues(key, values);
return headerMap.put(key, validatedSet) == null;
}

/**
* Removes each key and its values if the key was present
*
* @param keys the key or keys to remove
* @return {@code true} if any key was present
*/
public boolean remove(String... keys) {
return remove(Arrays.asList(keys));
}

/**
* Removes each key and its values if the key was present
*
* @param keys the key or keys to remove
* @return {@code true} if any key was present
*/
public boolean remove(Collection<String> keys) {
boolean changed = false;
for (String key : keys) {
if (headerMap.remove(key) != null) {
changed = true;
}
}
return changed;
}

public int size() {
return headerMap.size();
}

public boolean isEmpty() {
return headerMap.isEmpty();
}

public void clear() {
headerMap.clear();
}

public boolean containsKey(String key) {
return headerMap.containsKey(key);
}

public Set<String> values(String key) {
Set<String> set = headerMap.get(key);
return set == null ? null : Collections.unmodifiableSet(set);
}

public Set<String> keySet() {
return headerMap.keySet();
}

private void keyCannotBeNull(String key) {
if (key == null || key.length() == 0) {
throw new IllegalArgumentException(KEY_CANNOT_BE_EMPTY_OR_NULL);
}
}

private void valueCannotBeEmptyOrNull(String val) {
if (val == null || val.length() == 0) {
throw new IllegalArgumentException(VALUES_CANNOT_BE_EMPTY_OR_NULL);
}
}

private void valuesCannotBeEmptyOrNull(Collection<String> vals) {
if (vals == null || vals.size() == 0) {
throw new IllegalArgumentException(VALUES_CANNOT_BE_EMPTY_OR_NULL);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Headers headers = (Headers) o;
return Objects.equals(headerMap, headers.headerMap);
}

@Override
public int hashCode() {
return Objects.hash(headerMap);
}
}
8 changes: 4 additions & 4 deletions src/main/java/io/nats/client/impl/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class MessageQueue {
this.length = new AtomicLong(0);

// The poisonPill is used to stop poll and accumulate when the queue is stopped
this.poisonPill = new NatsMessage("_poison", null, NatsConnection.EMPTY_BODY, false);
this.poisonPill = new NatsMessage.PublishBuilder().subject("_poison").build();

this.filterLock = new ReentrantLock();

Expand Down Expand Up @@ -240,9 +240,9 @@ NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout)
if (maxSize<0 || (size + s) < maxSize) { // keep going
size += s;
count++;
cursor.next = this.queue.poll();
cursor = cursor.next;

cursor.setNext(this.queue.poll());
cursor = cursor.getNext();

if (count == maxMessages) {
break;
Expand Down
Loading