Skip to content

Commit

Permalink
[doc-complete] #16203 Added doc to fix issue with missing client inst…
Browse files Browse the repository at this point in the history
…ructions for batch index acknowledgment (#16204)

* #16203 Added doc to fix issue with missing client instructions for batch index acknowledgment

* Fix Case

Co-authored-by: momo-jun <60642177+momo-jun@users.noreply.github.com>

* Fix case

Co-authored-by: momo-jun <60642177+momo-jun@users.noreply.github.com>

* Fix case

Co-authored-by: momo-jun <60642177+momo-jun@users.noreply.github.com>

* Fix case

Co-authored-by: momo-jun <60642177+momo-jun@users.noreply.github.com>

Co-authored-by: Devin Bost <dbost@overstock.com>
Co-authored-by: Dave Fisher <dave@davefisher.tech>
Co-authored-by: momo-jun <60642177+momo-jun@users.noreply.github.com>
  • Loading branch information
4 people authored Jul 8, 2022
1 parent 86ab67f commit f519e22
Showing 1 changed file with 38 additions and 23 deletions.
61 changes: 38 additions & 23 deletions site2/docs/concepts-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import TabItem from '@theme/TabItem';
````


Pulsar is built on the [publish-subscribe](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) pattern (often abbreviated to pub-sub). In this pattern, [producers](#producers) publish messages to [topics](#topics); [consumers](#consumers) [subscribe](#subscription-types) to those topics, process incoming messages, and send [acknowledgements](#acknowledgement) to the broker when processing is finished.
Pulsar is built on the [publish-subscribe](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) pattern (often abbreviated to pub-sub). In this pattern, [producers](#producers) publish messages to [topics](#topics); [consumers](#consumers) [subscribe](#subscription-types) to those topics, process incoming messages, and send [acknowledgments](#acknowledgment) to the broker when processing is finished.

When a subscription is created, Pulsar [retains](concepts-architecture-overview.md#persistent-storage) all messages, even if the consumer is disconnected. The retained messages are discarded only when a consumer acknowledges that all these messages are processed successfully.

Expand Down Expand Up @@ -65,7 +65,7 @@ Producers send messages to brokers synchronously (sync) or asynchronously (async

| Mode | Description |
|:-----------|-----------|
| Sync send | The producer waits for an acknowledgement from the broker after sending every message. If the acknowledgment is not received, the producer treats the sending operation as a failure. |
| Sync send | The producer waits for an acknowledgment from the broker after sending every message. If the acknowledgment is not received, the producer treats the sending operation as a failure. |
| Async send | The producer puts a message in a blocking queue and returns immediately. The client library sends the message to the broker in the background. If the queue is full (you can [configure](reference-configuration.md#broker) the maximum size), the producer is blocked or fails immediately when calling the API, depending on arguments passed to the producer. |

### Access mode
Expand Down Expand Up @@ -104,12 +104,27 @@ When batching is enabled, the producer accumulates and sends a batch of messages

In Pulsar, batches are tracked and stored as single units rather than as individual messages. Consumers unbundle a batch into individual messages. However, scheduled messages (configured through the `deliverAt` or the `deliverAfter` parameter) are always sent as individual messages even when batching is enabled.

In general, a batch is acknowledged when all of its messages are acknowledged by a consumer. It means that when **not all** batch messages are acknowledged, then unexpected failures, negative acknowledgements, or acknowledgement timeouts can result in a redelivery of all messages in this batch.
In general, a batch is acknowledged when all of its messages are acknowledged by a consumer. It means that when **not all** batch messages are acknowledged, then unexpected failures, negative acknowledgments, or acknowledgment timeouts can result in a redelivery of all messages in this batch.

To avoid redelivering acknowledged messages in a batch to the consumer, Pulsar introduces batch index acknowledgement since Pulsar 2.6.0. When batch index acknowledgement is enabled, the consumer filters out the batch index that has been acknowledged and sends the batch index acknowledgement request to the broker. The broker maintains the batch index acknowledgement status and tracks the acknowledgement status of each batch index to avoid dispatching acknowledged messages to the consumer. The batch is deleted when all indices of the messages in it are acknowledged.
To avoid redelivering acknowledged messages in a batch to the consumer, Pulsar introduces batch index acknowledgment since Pulsar 2.6.0. When batch index acknowledgment is enabled, the consumer filters out the batch index that has been acknowledged and sends the batch index acknowledgment request to the broker. The broker maintains the batch index acknowledgment status and tracks the acknowledgment status of each batch index to avoid dispatching acknowledged messages to the consumer. The batch is deleted when all indices of the messages in it are acknowledged.

By default, batch index acknowledgement is disabled (`acknowledgmentAtBatchIndexLevelEnabled=false`). You can enable batch index acknowledgement by setting the `acknowledgmentAtBatchIndexLevelEnabled` parameter to `true` at the broker side. Enabling batch index acknowledgement results in more memory overheads.
By default, batch index acknowledgment is disabled (`acknowledgmentAtBatchIndexLevelEnabled=false`). You can enable batch index acknowledgment by setting the `acknowledgmentAtBatchIndexLevelEnabled` parameter to `true` at the broker side. Enabling batch index acknowledgment results in more memory overheads.

Batch index acknowledgment must also be enabled in the consumer by calling `.enableBatchIndexAcknowledgment(true);`

For example:

```java
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.newConsumer(Schema.BYTES)
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.enableBatchIndexAcknowledgment(true)
.subscribe();
```

### Chunking
Message chunking enables Pulsar to process large payload messages by splitting the message into chunks at the producer side and aggregating chunked messages at the consumer side.

Expand Down Expand Up @@ -174,16 +189,16 @@ Messages are received from [brokers](reference-terminology.md#broker) either syn

Client libraries provide listener implementation for consumers. For example, the [Java client](client-libraries-java.md) provides a {@inject: javadoc:MesssageListener:/client/org/apache/pulsar/client/api/MessageListener} interface. In this interface, the `received` method is called whenever a new message is received.

### Acknowledgement
### Acknowledgment

The consumer sends an acknowledgement request to the broker after it consumes a message successfully. Then, this consumed message will be permanently stored, and be deleted only after all the subscriptions have acknowledged it. If you want to store the messages that have been acknowledged by a consumer, you need to configure the [message retention policy](concepts-messaging.md#message-retention-and-expiry).
The consumer sends an acknowledgment request to the broker after it consumes a message successfully. Then, this consumed message will be permanently stored, and be deleted only after all the subscriptions have acknowledged it. If you want to store the messages that have been acknowledged by a consumer, you need to configure the [message retention policy](concepts-messaging.md#message-retention-and-expiry).

For batch messages, you can enable batch index acknowledgement to avoid dispatching acknowledged messages to the consumer. For details about batch index acknowledgement, see [batching](#batching).
For batch messages, you can enable batch index acknowledgment to avoid dispatching acknowledged messages to the consumer. For details about batch index acknowledgment, see [batching](#batching).

Messages can be acknowledged in one of the following two ways:

- Being acknowledged individually. With individual acknowledgement, the consumer acknowledges each message and sends an acknowledgement request to the broker.
- Being acknowledged cumulatively. With cumulative acknowledgement, the consumer **only** acknowledges the last message it received. All messages in the stream up to (and including) the provided message are not redelivered to that consumer.
- Being acknowledged individually. With individual acknowledgment, the consumer acknowledges each message and sends an acknowledgment request to the broker.
- Being acknowledged cumulatively. With cumulative acknowledgment, the consumer **only** acknowledges the last message it received. All messages in the stream up to (and including) the provided message are not redelivered to that consumer.

If you want to acknowledge messages individually, you can use the following API.

Expand All @@ -203,13 +218,13 @@ consumer.acknowledgeCumulative(msg);

:::note

Cumulative acknowledgement cannot be used in [Shared subscription type](#subscription-types), because Shared subscription type involves multiple consumers which have access to the same subscription. In Shared subscription type, messages are acknowledged individually.
Cumulative acknowledgment cannot be used in [Shared subscription type](#subscription-types), because Shared subscription type involves multiple consumers which have access to the same subscription. In Shared subscription type, messages are acknowledged individually.

:::

### Negative acknowledgement
### Negative acknowledgment

The [negative acknowledgement](#negative-acknowledgement) mechanism allows you to send a notification to the broker indicating the consumer did not process a message. When a consumer fails to consume a message and needs to re-consume it, the consumer sends a negative acknowledgement (nack) to the broker, triggering the broker to redeliver this message to the consumer.
The [negative acknowledgment](#negative-acknowledgment) mechanism allows you to send a notification to the broker indicating the consumer did not process a message. When a consumer fails to consume a message and needs to re-consume it, the consumer sends a negative acknowledgment (nack) to the broker, triggering the broker to redeliver this message to the consumer.

Messages are negatively acknowledged individually or cumulatively, depending on the consumption subscription type.

Expand All @@ -234,7 +249,7 @@ Consumer<byte[]> consumer = pulsarClient.newConsumer()

Message<byte[]> message = consumer.receive();

// call the API to send negative acknowledgement
// call the API to send negative acknowledgment
consumer.negativeAcknowledge(message);

message = consumer.receive();
Expand Down Expand Up @@ -279,11 +294,11 @@ If batching is enabled, all messages in one batch are redelivered to the consume

:::

### Acknowledgement timeout
### Acknowledgment timeout

The acknowledgement timeout mechanism allows you to set a time range during which the client tracks the unacknowledged messages. After this acknowledgement timeout (`ackTimeout`) period, the client sends `redeliver unacknowledged messages` request to the broker, thus the broker resends the unacknowledged messages to the consumer.
The acknowledgment timeout mechanism allows you to set a time range during which the client tracks the unacknowledged messages. After this acknowledgment timeout (`ackTimeout`) period, the client sends `redeliver unacknowledged messages` request to the broker, thus the broker resends the unacknowledged messages to the consumer.

You can configure the acknowledgement timeout mechanism to redeliver the message if it is not acknowledged after `ackTimeout` or to execute a timer task to check the acknowledgement timeout messages during every `ackTimeoutTickTime` period.
You can configure the acknowledgment timeout mechanism to redeliver the message if it is not acknowledged after `ackTimeout` or to execute a timer task to check the acknowledgment timeout messages during every `ackTimeoutTickTime` period.

You can also use the redelivery backoff mechanism, redeliver messages with different delays by setting the number
of times the messages is retried.
Expand Down Expand Up @@ -317,11 +332,11 @@ Redelivery count | Redelivery delay
:::note

- If batching is enabled, all messages in one batch are redelivered to the consumer.
- Compared with acknowledgement timeout, negative acknowledgement is preferred. First, it is difficult to set a timeout value. Second, a broker resends messages when the message processing time exceeds the acknowledgement timeout, but these messages might not need to be re-consumed.
- Compared with acknowledgment timeout, negative acknowledgment is preferred. First, it is difficult to set a timeout value. Second, a broker resends messages when the message processing time exceeds the acknowledgment timeout, but these messages might not need to be re-consumed.

:::

Use the following API to enable acknowledgement timeout.
Use the following API to enable acknowledgment timeout.

```java

Expand Down Expand Up @@ -498,7 +513,7 @@ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)

```

Dead letter topic serves message redelivery, which is triggered by [acknowledgement timeout](#acknowledgement-timeout) or [negative acknowledgement](#negative-acknowledgement) or [retry letter topic](#retry-letter-topic).
Dead letter topic serves message redelivery, which is triggered by [acknowledgment timeout](#acknowledgment-timeout) or [negative acknowledgment](#negative-acknowledgment) or [retry letter topic](#retry-letter-topic).
:::note

* Currently, dead letter topic is enabled in Shared and Key_Shared subscription types.
Expand Down Expand Up @@ -876,7 +891,7 @@ There are diverse system topics depending on namespaces. The following table out
| host/port | `heartbeat` | Persistent | 1 | Heartbeat detection |
| User-defined-ns | [`__change_events`](concepts-multi-tenancy.md#namespace-change-events-and-topic-level-policies) | Persistent | Default 4 | Topic events |
| User-defined-ns | `__transaction_buffer_snapshot` | Persistent | One per namespace | Transaction buffer snapshots |
| User-defined-ns | `${topicName}__transaction_pending_ack` | Persistent | One per every topic subscription acknowledged with transactions | Acknowledgements with transactions |
| User-defined-ns | `${topicName}__transaction_pending_ack` | Persistent | One per every topic subscription acknowledged with transactions | Acknowledgments with transactions |

:::note

Expand All @@ -903,8 +918,8 @@ Apache Pulsar avoids these and other message delivery failures using at-least-on

To utilize message redelivery, you need to enable this mechanism before the broker can resend the unacknowledged messages in Apache Pulsar client. You can activate the message redelivery mechanism in Apache Pulsar using three methods.

- [Negative Acknowledgment](#negative-acknowledgement)
- [Acknowledgement Timeout](#acknowledgement-timeout)
- [Negative Acknowledgment](#negative-acknowledgment)
- [Acknowledgment Timeout](#acknowledgment-timeout)
- [Retry letter topic](#retry-letter-topic)


Expand Down

0 comments on commit f519e22

Please sign in to comment.