Skip to content

Commit

Permalink
tuned request message batch
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Nov 25, 2024
1 parent 6a405ba commit 0318b12
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
18 changes: 11 additions & 7 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ public List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetReq
public LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
final LinkedBlockingQueue<MessageInfo> q = new LinkedBlockingQueue<>();
conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, true, q::add));
conn.getOptions().getExecutor().submit(
() -> _requestMessageBatch(streamName, messageBatchGetRequest, true, q::add));
return q;
}

Expand All @@ -398,19 +399,22 @@ private boolean _requestMessageBatch(String streamName, MessageBatchGetRequest m

while (true) {
Message msg = sub.nextMessage(getTimeout());
Status errorOrNonEob = null;
if (msg == null) {
return false; // should not time out before eob
errorOrNonEob = Status.TIMEOUT_OR_NO_MESSAGES;
}

if (msg.isStatusMessage()) {
else if (msg.isStatusMessage()) {
if (msg.getStatus().isEob()) {
return true; // will send eob in finally if caller asked
}
errorOrNonEob = msg.getStatus();
}

// All non eob statuses, always send, but it is the last message to the caller
if (errorOrNonEob != null) {
// All error or non eob statuses, always send, but it is the last message to the caller
sendEob = false;
handler.onMessageInfo(new MessageInfo(msg.getStatus(), streamName, true));
return false; // since this was an error
handler.onMessageInfo(new MessageInfo(Status.TIMEOUT_OR_NO_MESSAGES, streamName, true));
return false; // should not time out before eob
}

MessageInfo messageInfo = new MessageInfo(msg, streamName, true);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class Status {
public static String LEADERSHIP_CHANGE = "Leadership Change"; // 409

public static final Status EOB = new Status(EOB_CODE, EOB_TEXT);
public static final Status TIMEOUT_OR_NO_MESSAGES = new Status(NOT_FOUND_CODE, "Timeout or No Messages");

private final int code;
private final String message;
Expand Down

0 comments on commit 0318b12

Please sign in to comment.