diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 5c6f1cdc9414..ce130d167ff8 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -402,51 +402,53 @@ public void processOutstandingBatches() { batchCallback = nextBatch.doneCallback; } } - - final PubsubMessage message = outstandingMessage.receivedMessage().getMessage(); - final AckHandler ackHandler = outstandingMessage.ackHandler(); - final SettableApiFuture response = SettableApiFuture.create(); - final AckReplyConsumer consumer = - new AckReplyConsumer() { - @Override - public void ack() { - response.set(AckReply.ACK); - } - - @Override - public void nack() { - response.set(AckReply.NACK); - } - }; - ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); - executor.execute( - new Runnable() { - @Override - public void run() { - try { - if (ackHandler - .totalExpiration - .plusSeconds(messageDeadlineSeconds.get()) - .isBefore(now())) { - // Message expired while waiting. We don't extend these messages anymore, - // so it was probably sent to someone else. Don't work on it. - // Don't nack it either, because we'd be nacking someone else's message. - ackHandler.forget(); - return; - } - - receiver.receiveMessage(message, consumer); - } catch (Exception e) { - response.setException(e); - } - } - }); + processOutstandingMessage( + outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler); if (batchDone) { batchCallback.run(); } } } + private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) { + final SettableApiFuture response = SettableApiFuture.create(); + final AckReplyConsumer consumer = + new AckReplyConsumer() { + @Override + public void ack() { + response.set(AckReply.ACK); + } + + @Override + public void nack() { + response.set(AckReply.NACK); + } + }; + ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); + executor.execute( + new Runnable() { + @Override + public void run() { + try { + if (ackHandler + .totalExpiration + .plusSeconds(messageDeadlineSeconds.get()) + .isBefore(now())) { + // Message expired while waiting. We don't extend these messages anymore, + // so it was probably sent to someone else. Don't work on it. + // Don't nack it either, because we'd be nacking someone else's message. + ackHandler.forget(); + return; + } + + receiver.receiveMessage(message, consumer); + } catch (Exception e) { + response.setException(e); + } + } + }); + } + /** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */ @InternalApi int computeDeadlineSeconds() {