Skip to content

Commit

Permalink
reader
Browse files Browse the repository at this point in the history
  • Loading branch information
KannarFr committed Mar 5, 2023
1 parent fec4578 commit dd777a2
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,43 @@ public interface Reader<T> extends Closeable {
*/
CompletableFuture<Message<T>> readNextAsync();

/**
* Read the previous message in the topic.
*
* <p>This method will block until a message is available.
*
* @return the next message
* @throws PulsarClientException
*/
Message<T> readPrevious() throws PulsarClientException;

/**
* Read the previous message in the topic waiting for a maximum time.
*
* <p>Returns null if no message is received before the timeout.
*
* @return the next message(Could be null if none received in time)
* @throws PulsarClientException
*/
Message<T> readPrevious(int timeout, TimeUnit unit) throws PulsarClientException;

/**
* Read asynchronously the next message in the topic.
*
* <p>{@code readPreviousAsync()} should be called subsequently once returned {@code CompletableFuture} gets
* complete with received message. Else it creates <i> backlog of receive requests </i> in the application.
*
* <p>The returned future can be cancelled before completion by calling {@code .cancel(false)}
* ({@link CompletableFuture#cancel(boolean)}) to remove it from the the backlog of receive requests. Another
* choice for ensuring a proper clean up of the returned future is to use the CompletableFuture.orTimeout method
* which is available on JDK9+. That would remove it from the backlog of receive requests if receiving exceeds
* the timeout.
*
* @return a future that will yield a message (when it's available) or {@link PulsarClientException} if the reader
* is already closed.
*/
CompletableFuture<Message<T>> readPreviousAsync();

/**
* Asynchronously close the reader and stop the broker to push more messages.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ public Message<T> readNext() throws PulsarClientException {
return msg;
}

@Override
public Message<T> readPrevious() throws PulsarClientException {
Message<T> msg = consumer.receivePrevious();

// Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects,
// it will specify the subscription position anyway
consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
getConsumer().getSubscription(), msg.getMessageId(), ex);
return null;
});
return msg;
}

@Override
public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException {
Message<T> msg = consumer.receive(timeout, unit);
Expand Down

0 comments on commit dd777a2

Please sign in to comment.