diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index 66ab69f28..231b56b86 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -373,7 +373,8 @@ public List fetchMessageBatch(String streamName, MessageBatchGetReq public LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { validateMessageBatchGetRequest(streamName, messageBatchGetRequest); final LinkedBlockingQueue q = new LinkedBlockingQueue<>(); - conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, true, q::add)); + conn.getOptions().getExecutor().submit( + () -> _requestMessageBatch(streamName, messageBatchGetRequest, true, q::add)); return q; } @@ -398,19 +399,22 @@ private boolean _requestMessageBatch(String streamName, MessageBatchGetRequest m while (true) { Message msg = sub.nextMessage(getTimeout()); + Status errorOrNonEob = null; if (msg == null) { - return false; // should not time out before eob + errorOrNonEob = Status.TIMEOUT_OR_NO_MESSAGES; } - - if (msg.isStatusMessage()) { + else if (msg.isStatusMessage()) { if (msg.getStatus().isEob()) { return true; // will send eob in finally if caller asked } + errorOrNonEob = msg.getStatus(); + } - // All non eob statuses, always send, but it is the last message to the caller + if (errorOrNonEob != null) { + // All error or non eob statuses, always send, but it is the last message to the caller sendEob = false; - handler.onMessageInfo(new MessageInfo(msg.getStatus(), streamName, true)); - return false; // since this was an error + handler.onMessageInfo(new MessageInfo(Status.TIMEOUT_OR_NO_MESSAGES, streamName, true)); + return false; // should not time out before eob } MessageInfo messageInfo = new MessageInfo(msg, streamName, true); diff --git a/src/main/java/io/nats/client/support/Status.java b/src/main/java/io/nats/client/support/Status.java index 5b661e250..ed985bc9b 100644 --- a/src/main/java/io/nats/client/support/Status.java +++ b/src/main/java/io/nats/client/support/Status.java @@ -47,6 +47,7 @@ public class Status { public static String LEADERSHIP_CHANGE = "Leadership Change"; // 409 public static final Status EOB = new Status(EOB_CODE, EOB_TEXT); + public static final Status TIMEOUT_OR_NO_MESSAGES = new Status(NOT_FOUND_CODE, "Timeout or No Messages"); private final int code; private final String message;