From dd777a2aeef0e8b85417c3b9293602f18662a0fe Mon Sep 17 00:00:00 2001 From: "Alexandre DUVAL - @kannarfr" Date: Sun, 5 Mar 2023 21:14:21 +0100 Subject: [PATCH] reader --- .../org/apache/pulsar/client/api/Reader.java | 37 +++++++++++++++++++ .../apache/pulsar/client/impl/ReaderImpl.java | 14 +++++++ 2 files changed, 51 insertions(+) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java index 419a759f118ba..00ecc8503d119 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java @@ -74,6 +74,43 @@ public interface Reader extends Closeable { */ CompletableFuture> readNextAsync(); + /** + * Read the previous message in the topic. + * + *

This method will block until a message is available. + * + * @return the next message + * @throws PulsarClientException + */ + Message readPrevious() throws PulsarClientException; + + /** + * Read the previous message in the topic waiting for a maximum time. + * + *

Returns null if no message is received before the timeout. + * + * @return the next message(Could be null if none received in time) + * @throws PulsarClientException + */ + Message readPrevious(int timeout, TimeUnit unit) throws PulsarClientException; + + /** + * Read asynchronously the next message in the topic. + * + *

{@code readPreviousAsync()} should be called subsequently once returned {@code CompletableFuture} gets + * complete with received message. Else it creates backlog of receive requests in the application. + * + *

The returned future can be cancelled before completion by calling {@code .cancel(false)} + * ({@link CompletableFuture#cancel(boolean)}) to remove it from the the backlog of receive requests. Another + * choice for ensuring a proper clean up of the returned future is to use the CompletableFuture.orTimeout method + * which is available on JDK9+. That would remove it from the backlog of receive requests if receiving exceeds + * the timeout. + * + * @return a future that will yield a message (when it's available) or {@link PulsarClientException} if the reader + * is already closed. + */ + CompletableFuture> readPreviousAsync(); + /** * Asynchronously close the reader and stop the broker to push more messages. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 099098fcfabf4..874566f578bd6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -167,6 +167,20 @@ public Message readNext() throws PulsarClientException { return msg; } + @Override + public Message readPrevious() throws PulsarClientException { + Message msg = consumer.receivePrevious(); + + // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, + // it will specify the subscription position anyway + consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> { + log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(), + getConsumer().getSubscription(), msg.getMessageId(), ex); + return null; + }); + return msg; + } + @Override public Message readNext(int timeout, TimeUnit unit) throws PulsarClientException { Message msg = consumer.receive(timeout, unit);