Skip to content

Commit

Permalink
Tuning the loop
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Sep 26, 2024
1 parent c5cacc7 commit bd579ba
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 68 deletions.
37 changes: 30 additions & 7 deletions src/main/java/io/nats/RequestMany/RequestMany.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ public List<RequestManyMessage> fetch(String subject, byte[] payload) {
public List<RequestManyMessage> fetch(String subject, Headers headers, byte[] payload) {
List<RequestManyMessage> results = new ArrayList<>();
gather(subject, headers, payload, rmm -> {
results.add(rmm);
if (!rmm.isNormalEndOfData()) {
results.add(rmm);
}
return true;
});
return results;
Expand All @@ -116,7 +118,7 @@ public void gather(String subject, byte[] payload, RequestManyHandler handler) {
}

public void gather(String subject, Headers headers, byte[] payload, RequestManyHandler handler) {
RequestManyMessage eod = RequestManyMessage.EOD;
RequestManyMessage eod = RequestManyMessage.NORMAL_EOD; // the default end of data will be a normal end of data (vs status or exception)

Subscription sub = null;
try {
Expand All @@ -125,31 +127,52 @@ public void gather(String subject, Headers headers, byte[] payload, RequestManyH
conn.publish(subject, replyTo, headers, payload);

long resultsLeft = maxResponses;
long start = System.nanoTime();
long timeLeftNanos = totalWaitTimeNanos;
long timeoutNanos = totalWaitTimeNanos; // first time we wait the whole timeout

long start = System.nanoTime();
while (timeLeftNanos > 0) {
Message msg = sub.nextMessage(Duration.ofNanos(timeoutNanos));

// we calculate this here so it does not consider any of our or the handler's processing time.
timeLeftNanos = totalWaitTimeNanos - (System.nanoTime() - start);

if (msg == null) {
return;
return; // timeout indicates we are done. Uses the default eod
}
if (msg.isStatusMessage()) {
eod = new RequestManyMessage(msg);
return;
return; // status is terminal. Uses the status eod
}
if (!handler.gather(new RequestManyMessage(msg)) || --resultsLeft < 1) {
if (!handler.gather(new RequestManyMessage(msg))) {
eod = null; // they already know it's the end, the prevents them from getting an eod at all
return;
}
if (--resultsLeft < 1) {
return; // We got the count, we are done. Uses the default eod
}

timeoutNanos = Math.min(timeLeftNanos, maxStallNanos); // subsequent times we wait the shortest of the time left vs the max stall
}

// if it fell through, the last operation went over time. Fine, just use the default eod
}
catch (RuntimeException r) {
eod = new RequestManyMessage(r);
throw r;
}
catch (InterruptedException e) {
eod = new RequestManyMessage(e);
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
finally {
handler.gather(eod);
try {
if (eod != null) {
handler.gather(eod);
}
}
catch (Exception ignore) {}
try {
//noinspection DataFlowIssue
sub.unsubscribe();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/RequestMany/RequestManyMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.nats.client.impl.StatusMessage;

public class RequestManyMessage {
public static final RequestManyMessage EOD = new RequestManyMessage((Exception)null);
public static final RequestManyMessage NORMAL_EOD = new RequestManyMessage((Exception)null);

private final Message message;
private final Exception exception;
Expand Down
133 changes: 74 additions & 59 deletions src/test/java/io/nats/RequestMany/RequestManyTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,83 @@

public class RequestManyTests extends TestBase {

enum Last{ Normal, Status, Ex }
enum Last{ Normal, Status, Ex, None }

private void assertMessages(int regularMessages, Last last, List<RequestManyMessage> list) {
assertEquals(regularMessages + 1, list.size());
for (int x = 0; x < regularMessages; x++) {
assertTrue(list.get(x).isRegularMessage());
}
RequestManyMessage lastRmm = list.get(regularMessages);
switch (last) {
case Normal: assertTrue(lastRmm.isNormalEndOfData()); break;
case Status: assertTrue(lastRmm.isStatusMessage()); break;
case Ex: assertTrue(lastRmm.isException()); break;
if (last == Last.None) {
assertEquals(regularMessages, list.size());
}
else {
assertEquals(regularMessages + 1, list.size());
RequestManyMessage lastRmm = list.get(regularMessages);
switch (last) {
case Normal: assertTrue(lastRmm.isNormalEndOfData()); break;
case Status: assertTrue(lastRmm.isStatusMessage()); break;
case Ex: assertTrue(lastRmm.isException()); break;
}
}
}

@Test
public void testNoRespondersGather() throws Exception {
runInServer(nc -> {
String subject = subject();
RequestMany rm = maxResponseRequest(nc);
RequestMany rm = maxResponsesRequest(nc);
TestRequestManyHandler handler = new TestRequestManyHandler();
rm.gather(subject, null, handler);
assertTrue(handler.eodReceived.await(3, TimeUnit.SECONDS));
assertMessages(0, Last.Status, handler.list);
});
}

@Test
public void testNoRespondersFetch() throws Exception {
runInServer(nc -> {
String subject = subject();
RequestMany rm = maxResponsesRequest(nc);
TestRequestManyHandler handler = new TestRequestManyHandler();
rm.gather(subject, null, handler);
assertTrue(handler.eodReceived.await(3, TimeUnit.SECONDS));
assertMessages(0, Last.Status, handler.list);
});
}

private static RequestMany maxResponseRequest(Connection nc) {
private static RequestMany maxResponsesRequest(Connection nc) {
return RequestMany.builder(nc).maxResponses(3).build();
}

@Test
public void testMaxResponseFetch() throws Exception {
public void testMaxResponsesGather() throws Exception {
runInServer(nc -> {
try (Replier replier = new Replier(nc, 5)) {
RequestMany rm = maxResponsesRequest(nc);
TestRequestManyHandler handler = new TestRequestManyHandler();
rm.gather(replier.subject, null, handler);
assertTrue(handler.eodReceived.await(3, TimeUnit.SECONDS));
assertMessages(3, Last.Normal, handler.list);
}
});
}

@Test
public void testMaxResponsesFetch() throws Exception {
runInServer(nc -> {
try (Replier replier = new Replier(nc, 5)) {
RequestMany rm = maxResponseRequest(nc);
RequestMany rm = maxResponsesRequest(nc);
List<RequestManyMessage> list = rm.fetch(replier.subject, null);
assertMessages(3, Last.Normal, list);
assertMessages(3, Last.None, list);
}
});
}

@Test
public void testMaxResponseIterate() throws Exception {
public void testMaxResponsesIterate() throws Exception {
runInServer(nc -> {
try (Replier replier = new Replier(nc, 5)) {
RequestMany rm = maxResponseRequest(nc);
RequestMany rm = maxResponsesRequest(nc);
LinkedBlockingQueue<RequestManyMessage> it = rm.iterate(replier.subject, null);
List<RequestManyMessage> list = new ArrayList<>();
RequestManyMessage m = it.poll(DEFAULT_TOTAL_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Expand All @@ -80,19 +110,6 @@ public void testMaxResponseIterate() throws Exception {
});
}

@Test
public void testMaxResponseGather() throws Exception {
runInServer(nc -> {
try (Replier replier = new Replier(nc, 5)) {
RequestMany rm = maxResponseRequest(nc);
TestRequestManyHandler handler = new TestRequestManyHandler();
rm.gather(replier.subject, null, handler);
assertTrue(handler.eodReceived.await(3, TimeUnit.SECONDS));
assertMessages(3, Last.Normal, handler.list);
}
});
}

private static RequestMany maxWaitTimeRequest(Connection nc) {
return RequestMany.builder(nc).build();
}
Expand All @@ -102,30 +119,46 @@ private static RequestMany maxWaitTimeRequest(Connection nc, long totalWaitTime)
}

@Test
public void testMaxWaitTimeFetchDefault() throws Exception {
public void testMaxWaitTimeGather() throws Exception {
runInServer(nc -> {
_testMaxWaitTimeFetch(nc, DEFAULT_TOTAL_WAIT_TIME_MS);
try (Replier replier = new Replier(nc, 1, 1200, 1)) {
RequestMany rm = maxWaitTimeRequest(nc);

TestRequestManyHandler handler = new TestRequestManyHandler();
long start = System.currentTimeMillis();
rm.gather(replier.subject, null, handler);
assertTrue(handler.eodReceived.await(DEFAULT_TOTAL_WAIT_TIME_MS * 3 / 2, TimeUnit.MILLISECONDS));
long elapsed = System.currentTimeMillis() - start;

assertTrue(elapsed > DEFAULT_TOTAL_WAIT_TIME_MS && elapsed < (DEFAULT_TOTAL_WAIT_TIME_MS * 2));
assertMessages(1, Last.Normal, handler.list);
}
});
}

@Test
public void testMaxWaitTimeFetchDefault() throws Exception {
_testMaxWaitTimeFetch(DEFAULT_TOTAL_WAIT_TIME_MS);
}

@Test
public void testMaxWaitTimeFetchCustom() throws Exception {
runInServer(nc -> {
_testMaxWaitTimeFetch(nc, 500);
});
_testMaxWaitTimeFetch(500);
}

private void _testMaxWaitTimeFetch(Connection nc, long wait) throws Exception {
try (Replier replier = new Replier(nc, 1, wait + 200, 1)) {
RequestMany rm = maxWaitTimeRequest(nc, wait);
private void _testMaxWaitTimeFetch(long wait) throws Exception {
runInServer(nc -> {
try (Replier replier = new Replier(nc, 1, wait + 200, 1)) {
RequestMany rm = maxWaitTimeRequest(nc, wait);

long start = System.currentTimeMillis();
List<RequestManyMessage> list = rm.fetch(replier.subject, null);
long elapsed = System.currentTimeMillis() - start;
long start = System.currentTimeMillis();
List<RequestManyMessage> list = rm.fetch(replier.subject, null);
long elapsed = System.currentTimeMillis() - start;

assertTrue(elapsed > wait);
assertMessages(1, Last.Normal, list);
}
assertTrue(elapsed > wait);
assertMessages(1, Last.None, list);
}
});
}

@Test
Expand All @@ -147,24 +180,6 @@ public void testMaxWaitTimeIterate() throws Exception {
});
}

@Test
public void testMaxWaitTimeGather() throws Exception {
runInServer(nc -> {
try (Replier replier = new Replier(nc, 1, 1200, 1)) {
RequestMany rm = maxWaitTimeRequest(nc);

TestRequestManyHandler handler = new TestRequestManyHandler();
long start = System.currentTimeMillis();
rm.gather(replier.subject, null, handler);
assertTrue(handler.eodReceived.await(DEFAULT_TOTAL_WAIT_TIME_MS * 3 / 2, TimeUnit.MILLISECONDS));
long elapsed = System.currentTimeMillis() - start;

assertTrue(elapsed > DEFAULT_TOTAL_WAIT_TIME_MS && elapsed < (DEFAULT_TOTAL_WAIT_TIME_MS * 2));
assertMessages(1, Last.Normal, handler.list);
}
});
}

// ----------------------------------------------------------------------------------------------------
// Support Classes
// ----------------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/nats/compatibility/Kind.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static Kind instance(String text) {
return os;
}
}
System.err.println("Unknown kind: " + text);
System.err.println("Unknown endKind: " + text);
System.exit(-7);
return null;
}
Expand Down

0 comments on commit bd579ba

Please sign in to comment.