-
Notifications
You must be signed in to change notification settings - Fork 177
/
Copy pathconcepts-messaging.md
1415 lines (959 loc) · 88.2 KB
/
concepts-messaging.md
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
---
id: concepts-messaging
title: Messaging
sidebar_label: "Messaging"
description: Get a comprehensive understanding of essential messaging concepts within Pulsar, including topics, namespaces, subscriptions, and more.
---
````mdx-code-block
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
````
Pulsar is built on the [publish-subscribe](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) pattern (often abbreviated to pub-sub). In this pattern, [producers](concepts-clients.md#producer) publish messages to [topics](#topics); [consumers](concepts-clients.md#consumer) [subscribe](#subscriptions) to those topics, process incoming messages, and send [acknowledgments](#acknowledgment) to the broker when processing is finished.

When a subscription is created, Pulsar [retains](concepts-architecture-overview.md#persistent-storage) all messages, even if the consumer is disconnected. The retained messages are discarded only when a consumer acknowledges that all these messages are processed successfully.
If the consumption of a message fails and you want this message to be consumed again, you can enable the [message redelivery mechanism](#message-redelivery) to request the broker to resend this message.
## Messages
Messages are the basic "unit" of Pulsar. They're what producers publish to topics and what consumers then consume from topics. The following table lists the components of messages.
| Component | Description |
|:---------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Value / data payload | The data carried by the message. All Pulsar messages contain raw bytes, although message data can also conform to data [schemas](schema-get-started.md). |
| Key | The key (string type) of the message. It is a short name of message key or partition key. Messages are optionally tagged with keys, which is useful for features like [topic compaction](concepts-topic-compaction.md). |
| Properties | An optional key/value map of user-defined properties. |
| Producer name | The name of the producer who produces the message. If you do not specify a producer name, the default name is used. |
| Topic name | The name of the topic that the message is published to. |
| Schema version | The version number of the schema that the message is produced with. |
| Sequence ID | Each Pulsar message belongs to an ordered sequence on its topic. The sequence ID of a message is initially assigned by its producer, indicating its order in that sequence, and can also be customized.<br />Sequence ID can be used for message deduplication. If `brokerDeduplicationEnabled` is set to `true`, the sequence ID of each message is unique within a producer of a topic (non-partitioned) or a partition. |
| Message ID | The message ID of a message is assigned by bookies as soon as the message is persistently stored. Message ID indicates a message's specific position in a ledger and is unique within a Pulsar cluster. |
| Publish time | The timestamp of when the message is published. The timestamp is automatically applied by the producer. |
| Event time | An optional timestamp attached to a message by applications. For example, applications attach a timestamp on when the message is processed. If nothing is set to event time, the value is `0`. |
The default max size of a message is 5 MB. You can configure the max size of a message with the following configuration options.
- In the `broker.conf` file.
```bash
# The max size of a message (in bytes).
maxMessageSize=5242880
```
- In the `bookkeeper.conf` file.
```bash
# The max size of the netty frame (in bytes). Any messages received larger than this value are rejected. The default value is 5 MB.
nettyMaxFrameSizeBytes=5253120
```
> For more information on Pulsar messages, see Pulsar [binary protocol](developing-binary-protocol.md).
### Acknowledgment
A message acknowledgment is sent by a consumer to a broker after the consumer consumes a message successfully. Then, this consumed message will be permanently stored and deleted only after all the subscriptions have acknowledged it. An acknowledgment (ack) is Pulsar's way of knowing that the message can be deleted from the system. If you want to store the messages that have been acknowledged by a consumer, you need to configure the [message retention policy](concepts-messaging.md#message-retention-and-expiry).
For batch messages, you can enable batch index acknowledgment to avoid dispatching acknowledged messages to the consumer. For details about batch index acknowledgment, see [batching](#batching).
Messages can be acknowledged in one of the following two ways:
- Being acknowledged individually
With individual acknowledgment, the consumer acknowledges each message and sends an acknowledgment request to the broker.
- Being acknowledged cumulatively
With cumulative acknowledgment, the consumer **only** acknowledges the last message it received. All messages in the stream up to (and including) the provided message are not redelivered to that consumer.
If you want to acknowledge messages individually, you can use the following API.
```java
consumer.acknowledge(msg);
```
If you want to acknowledge messages cumulatively, you can use the following API.
```java
consumer.acknowledgeCumulative(msg);
```
:::note
Cumulative acknowledgment cannot be used in [Shared or Key_shared subscription type](#subscription-types), because Shared or Key_Shared subscription type involves multiple consumers which have access to the same subscription. In Shared and Key_Shared subscription types, messages should be acknowledged individually.
:::
### Negative acknowledgment
The [negative acknowledgment](#negative-acknowledgment) mechanism allows you to send a notification to the broker indicating the consumer did not process a message. When a consumer fails to consume a message and needs to re-consume it, the consumer sends a negative acknowledgment (nack) to the broker, triggering the broker to redeliver this message to the consumer.
Messages are negatively acknowledged individually or cumulatively, depending on the consumption subscription type.
In Exclusive and Failover subscription types, consumers only negatively acknowledge the last message they receive.
In Shared and Key_Shared subscription types, consumers can negatively acknowledge messages individually.
Be aware that negative acknowledgments on ordered subscription types, such as Exclusive, Failover and Key_Shared, might cause failed messages being sent to consumers out of the original order.
If you are going to use negative acknowledgment on a message, make sure it is negatively acknowledged before the acknowledgment timeout.
Use the following API to negatively acknowledge message consumption.
```java
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-negative-ack")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryDelay(2, TimeUnit.SECONDS) // the default value is 1 min
.subscribe();
Message<byte[]> message = consumer.receive();
// call the API to send negative acknowledgment
consumer.negativeAcknowledge(message);
message = consumer.receive();
consumer.acknowledge(message);
```
To redeliver messages with different delays, you can use the **redelivery backoff mechanism** by setting the number of retries to deliver the messages.
Use the following API to enable `Negative Redelivery Backoff`.
```java
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-negative-ack")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(1000)
.maxDelayMs(60 * 1000)
.multiplier(2)
.build())
.subscribe();
```
The message redelivery behavior should be as follows.
| Redelivery count | Redelivery delay |
|:-----------------|:-----------------|
| 1 | 1 seconds |
| 2 | 2 seconds |
| 3 | 4 seconds |
| 4 | 8 seconds |
| 5 | 16 seconds |
| 6 | 32 seconds |
| 7 | 60 seconds |
| 8 | 60 seconds |
:::note
If batching is enabled, all messages in one batch are redelivered to the consumer.
:::
### Acknowledgment timeout
:::note
By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.
:::
The acknowledgment timeout mechanism allows you to set a time range during which the client tracks the unacknowledged messages. After this acknowledgment timeout (`ackTimeout`) period, the client sends `redeliver unacknowledged messages` request to the broker, thus the broker resends the unacknowledged messages to the consumer.
You can configure the acknowledgment timeout mechanism to redeliver the message if it is not acknowledged after `ackTimeout` or to execute a timer task to check the acknowledgment timeout messages during every `ackTimeoutTickTime` period.
You can also use the redelivery backoff mechanism to redeliver messages with different delays by setting the number of times the messages are retried.
If you want to use redelivery backoff, you can use the following API.
```java
consumer.ackTimeout(10, TimeUnit.SECOND)
.ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(1000)
.maxDelayMs(60 * 1000)
.multiplier(2)
.build());
```
The message redelivery behavior should be as follows.
| Redelivery count | Redelivery delay |
|:-----------------|:-----------------|
| 1 | 10 + 1 seconds |
| 2 | 10 + 2 seconds |
| 3 | 10 + 4 seconds |
| 4 | 10 + 8 seconds |
| 5 | 10 + 16 seconds |
| 6 | 10 + 32 seconds |
| 7 | 10 + 60 seconds |
| 8 | 10 + 60 seconds |
:::note
- If batching is enabled, all messages in one batch are redelivered to the consumer.
- Compared with acknowledgment timeout, negative acknowledgment is preferred. First, it is difficult to set a timeout value. Second, a broker resends messages when the message processing time exceeds the acknowledgment timeout, but these messages might not need to be re-consumed.
:::
Use the following API to enable acknowledgment timeout.
```java
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.ackTimeout(2, TimeUnit.SECONDS) // the default value is 0
.ackTimeoutTickTime(1, TimeUnit.SECONDS)
.subscriptionName("sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Message<byte[]> message = consumer.receive();
// wait at least 2 seconds
message = consumer.receive();
consumer.acknowledge(message);
```
### Retry letter topic
Retry letter topic allows you to store the messages that failed to be consumed and retry consuming them later. With this method, you can customize the interval at which the messages are redelivered. Consumers on the original topic are automatically subscribed to the retry letter topic as well. Once the maximum number of retries has been reached, the unconsumed messages are moved to a [dead letter topic](#dead-letter-topic) for manual processing. The functionality of a retry letter topic is implemented by consumers.
The diagram below illustrates the concept of the retry letter topic.

The intention of using retry letter topic is different from using [delayed message delivery](#delayed-message-delivery), even though both are aiming to consume a message later. Retry letter topic serves failure handling through message redelivery to ensure critical data is not lost, while delayed message delivery is intended to deliver a message with a specified time delay.
By default, automatic retry is disabled. You can set `enableRetry` to `true` to enable automatic retry on the consumer.
Use the following API to consume messages from a retry letter topic. When the value of `maxRedeliverCount` is reached, the unconsumed messages are moved to a dead letter topic.
```java
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
```
The default retry letter topic uses this format:
```text
<topicname>-<subscriptionname>-RETRY
```
:::note
- For Pulsar 2.6.x and 2.7.x, the default retry letter topic uses the format of `<subscriptionname>-RETRY`. If you upgrade from 2.6.x~2.7.x to 2.8.x or later, you need to delete historical retry letter topics and retry letter partitioned topics. Otherwise, Pulsar continues to use original topics, which are formatted with `<subscriptionname>-RETRY`.
- It is not recommended to use `<subscriptionname>-RETRY` because if multiple topics under the same namespace have the same subscription, then retry message topic names for multiple topics might be the same, which will result in mutual consumptions.
:::
Use the Java client to specify the name of the retry letter topic.
```java
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("my-retry-letter-topic-name")
.build())
.subscribe();
```
The messages in the retry letter topic contain some special properties that are automatically created by the client.
| Special property | Description |
|:--------------------|:-----------------------------------------------------------|
| `REAL_TOPIC` | The real topic name. |
| `ORIGIN_MESSAGE_ID` | The origin message ID. It is crucial for message tracking. |
| `RECONSUMETIMES` | The number of retries to consume messages. |
| `DELAY_TIME` | Message retry interval in milliseconds. |
**Example**
```conf
REAL_TOPIC = persistent://public/default/my-topic
ORIGIN_MESSAGE_ID = 1:0:-1:0
RECONSUMETIMES = 6
DELAY_TIME = 3000
```
Use the following API to store the messages in a retrial queue.
```java
consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS);
```
Use the following API to add custom properties for the `reconsumeLater` function. In the next attempt to consume, custom properties can be get from message#getProperty.
```java
Map<String, String> customProperties = new HashMap<String, String>();
customProperties.put("custom-key-1", "custom-value-1");
customProperties.put("custom-key-2", "custom-value-2");
consumer.reconsumeLater(msg, customProperties, 3, TimeUnit.SECONDS);
```
:::note
* Currently, retry letter topic is enabled in Shared subscription types.
* Compared with negative acknowledgment, retry letter topic is more suitable for messages that require a large number of retries with a configurable retry interval. Because messages in the retry letter topic are persisted to BookKeeper, while messages that need to be retried due to negative acknowledgment are cached on the client side.
:::
### Dead letter topic
Dead letter topic allows you to continue message consumption even when some messages are not consumed successfully. The messages that have failed to be consumed are stored in a specific topic, which is called the dead letter topic. The functionality of a dead letter topic is implemented by consumers. You can decide how to handle the messages in the dead letter topic.
Enable dead letter topic in a Java client using the default dead letter topic.
```java
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
```
The default dead letter topic uses this format:
```
<topicname>-<subscriptionname>-DLQ
```
The dead letter producerName uses this format:
```
<topicname>-<subscriptionname>-<consumername>-<randomstring>-DLQ
```
:::note
- For Pulsar 2.6.x and 2.7.x, the default dead letter topic uses the format of `<subscriptionname>-DLQ`. If you upgrade from 2.6.x~2.7.x to 2.8.x or later, you need to delete historical dead letter topics and retry letter partitioned topics. Otherwise, Pulsar continues to use original topics, which are formatted with `<subscriptionname>-DLQ`.
- It is not recommended to use `<subscriptionname>-DLQ` because if multiple topics under the same namespace have the same subscription, then dead message topic names for multiple topics might be the same, which will result in mutual consumptions.
- From Pulsar 2.3.x to 2.10.x, Java SDK dead letter policy will set a 30 seconds acknowledgment timeout when there is no user defined acknowledgment timeout. This default timeout policy has been removed since 3.0.x.
:::
Use the Java client to specify the name of the dead letter topic.
```java
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("my-dead-letter-topic-name")
.build())
.subscribe();
```
By default, there is no subscription during DLQ topic creation. Without a just-in-time subscription to the DLQ topic, you may lose messages. To automatically create an initial subscription for the DLQ, you can specify the `initialSubscriptionName` parameter. If this parameter is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer will fail to be created.
```java
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("my-dead-letter-topic-name")
.initialSubscriptionName("init-sub")
.build())
.subscribe();
```
Dead letter topic serves message redelivery, which is triggered by [acknowledgment timeout](#acknowledgment-timeout) or [negative acknowledgment](#negative-acknowledgment) or [retry letter topic](#retry-letter-topic).
:::note
Currently, dead letter topic is enabled in Shared and Key_Shared subscription types.
:::
### Compression
Message compression can reduce message size by paying some CPU overhead. The Pulsar client supports the following compression types:
* [LZ4](https://github.com/lz4/lz4)
* [ZLIB](https://zlib.net/)
* [ZSTD](https://facebook.github.io/zstd/)
* [SNAPPY](https://google.github.io/snappy/)
Compression types are stored in the message metadata, so consumers can adopt different compression types automatically, as needed.
The sample code below shows how to enable compression type for a producer:
```java
client.newProducer()
.topic("topic-name")
.compressionType(CompressionType.LZ4)
.create();
```
### Batching
When batching is enabled, the producer accumulates and sends a batch of messages in a single request. The batch size is defined by the maximum number of messages and the maximum publish latency. Therefore, the backlog size represents the total number of batches instead of the total number of messages.

In Pulsar, batches are tracked and stored as single units rather than as individual messages. Consumers unbundle a batch into individual messages. However, scheduled messages (configured through the `deliverAt` or the `deliverAfter` parameter) are always sent as individual messages even when batching is enabled.
In general, a batch is acknowledged when all of its messages are acknowledged by a consumer. It means that when **not all** batch messages are acknowledged, then unexpected failures, negative acknowledgments, or acknowledgment timeouts can result in a redelivery of all messages in this batch.
To avoid redelivering acknowledged messages in a batch to the consumer, Pulsar introduces batch index acknowledgment since Pulsar 2.6.0. When batch index acknowledgment is enabled, the consumer filters out the batch index that has been acknowledged and sends the batch index acknowledgment request to the broker. The broker maintains the batch index acknowledgment status and tracks the acknowledgment status of each batch index to avoid dispatching acknowledged messages to the consumer. The batch is deleted when all indices of the messages in it are acknowledged.
By default, batch index acknowledgment is disabled (`acknowledgmentAtBatchIndexLevelEnabled=false`). You can enable batch index acknowledgment by setting the `acknowledgmentAtBatchIndexLevelEnabled` parameter to `true` at the broker side. Enabling batch index acknowledgment results in more memory overheads.
Batch index acknowledgment must also be enabled in the consumer by calling `.enableBatchIndexAcknowledgment(true);`
For example:
```java
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.enableBatchIndexAcknowledgment(true)
.subscribe();
```
:::note
When using the synchronous `send` method for producing messages, the batch will be sent immediately even if it is not full. This helps reduce message sending latency and prevents blocking of the caller's thread. When producing messages in a single thread, you should use the asynchronous `sendAsync` method to send messages in batches.
:::
### Chunking
Message chunking enables Pulsar to process large payload messages by splitting the message into chunks at the producer side and aggregating chunked messages at the consumer side.
With message chunking enabled, when the size of a message exceeds the allowed maximum payload size (the `maxMessageSize` parameter of broker), the workflow of messaging is as follows:
1. The producer splits the original message into chunked messages and publishes them with chunked metadata to the broker separately and in order.
2. The broker stores the chunked messages in one managed ledger in the same way as that of ordinary messages, and it uses the `chunkedMessageRate` parameter to record chunked message rate on the topic.
3. The consumer buffers the chunked messages and aggregates them into the receiver queue when it receives all the chunks of a message.
4. The client consumes the aggregated message from the receiver queue.
:::note
- Chunking is only available for persistent topics.
- Chunking cannot be enabled simultaneously with batching. Before enabling chunking, you need to disable batching.
:::
#### Handle consecutive chunked messages with one ordered consumer
The following figure shows a topic with one producer that publishes a large message payload in chunked messages along with regular non-chunked messages. The producer publishes message M1 in three chunks labeled M1-C1, M1-C2 and M1-C3. The broker stores all the three chunked messages in the [managed ledger](concepts-architecture-overview.md#managed-ledgers) and dispatches them to the ordered (exclusive/failover) consumer in the same order. The consumer buffers all the chunked messages in memory until it receives all the chunked messages, aggregates them into one message and then hands over the original message M1 to the client.

#### Handle interwoven chunked messages with one ordered consumer
When multiple producers publish chunked messages into a single topic, the broker stores all the chunked messages coming from different producers in the same [managed ledger](concepts-architecture-overview.md#managed-ledgers). The chunked messages in the managed ledger can be interwoven with each other. As shown below, Producer 1 publishes message M1 in three chunks M1-C1, M1-C2 and M1-C3. Producer 2 publishes message M2 in three chunks M2-C1, M2-C2 and M2-C3. All chunked messages of the specific message are still in order but might not be consecutive in the managed ledger.

:::note
In this case, interwoven chunked messages may bring some memory pressure to the consumer because the consumer keeps a separate buffer for each large message to aggregate all its chunks in one message. You can limit the maximum number of chunked messages a consumer maintains concurrently by configuring the `maxPendingChunkedMessage` parameter. When the threshold is reached, the consumer drops pending messages by silently acknowledging them or asking the broker to redeliver them later, optimizing memory utilization.
:::
#### Enable Message Chunking
**Prerequisite:** Disable batching by setting the `enableBatching` parameter to `false`.
The message chunking feature is OFF by default.
To enable message chunking, set the `chunkingEnabled` parameter to `true` when creating a producer.
:::note
If the consumer fails to receive all chunks of a message within a specified period, it expires incomplete chunks. The default value is 1 minute. For more information about the `expireTimeOfIncompleteChunkedMessage` parameter, refer to [org.apache.pulsar.client.api](/api/client/).
:::
## Topics
A Pulsar topic is a unit of storage that organizes messages into a stream. As in other pub-sub systems, topics in Pulsar are named channels for transmitting messages from producers to consumers. Topic names are URLs that have a well-defined structure:
```http
{persistent|non-persistent}://tenant/namespace/topic
```
| Topic name component | Description |
|:--------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `persistent` / `non-persistent` | This identifies the type of topic. Pulsar supports two kind of topics: [persistent](concepts-architecture-overview.md#persistent-storage) and [non-persistent](#non-persistent-topics). The default is persistent, so if you do not specify a type, the topic is persistent. With persistent topics, all messages are durably persisted on disks (if the broker is not standalone, messages are durably persisted on multiple disks), whereas data for non-persistent topics is not persisted to storage disks. |
| `tenant` | The topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread across clusters. |
| `namespace` | The administrative unit of the topic, which acts as a grouping mechanism for related topics. Most topic configuration is performed at the [namespace](#namespaces) level. Each tenant has one or more namespaces. |
| `topic` | The final part of the name. Topic names have no special meaning in a Pulsar instance. |
:::note
You do not need to explicitly create topics in Pulsar. If a client attempts to write or receive messages to/from a topic that does not yet exist, Pulsar creates that topic under the namespace provided in the [topic name](#topics) automatically.
If no tenant or namespace is specified when a client creates a topic, the topic is created in the default tenant and namespace. You can also create a topic in a specified tenant and namespace, such as `persistent://my-tenant/my-namespace/my-topic`. `persistent://my-tenant/my-namespace/my-topic` means the `my-topic` topic is created in the `my-namespace` namespace of the `my-tenant` tenant.
:::
## Namespaces
A Pulsar namespace is a logical grouping of topics as well as a logical nomenclature within a tenant. A tenant creates namespaces via the [admin API](admin-api-namespaces.md#create-namespaces). For instance, a tenant with different applications can create a separate namespace for each application. A namespace allows the application to create and manage a hierarchy of topics. The topic `my-tenant/app1` is a namespace for the application `app1` for `my-tenant`. You can create any number of [topics](#topics) under the namespace.
## Subscriptions
A Pulsar subscription is a named configuration rule that determines how messages are delivered to consumers. It is a lease on a topic established by a group of consumers. There are four subscription types in Pulsar:
- [exclusive](#exclusive)
- [shared](#shared)
- [failover](#failover)
- [key_shared](#key_shared)
These types are illustrated in the figure below.

:::tip
**Pub-Sub or Queuing**
In Pulsar, you can use different subscriptions flexibly.
* If you want to achieve traditional "fan-out pub-sub messaging" among consumers, specify a unique subscription name for each consumer. It is an exclusive subscription type.
* If you want to achieve "message queuing" among consumers, share the same subscription name among multiple consumers (shared, failover, key_shared).
* If you want to achieve both effects simultaneously, combine exclusive subscription types with other subscription types for consumers.
:::
### Subscription types
When a subscription has no consumers, its subscription type is undefined. The type of a subscription is defined when a consumer connects to it, and the type can be changed by restarting all consumers with a different configuration.
#### Exclusive
The exclusive type is a subscription type that only allows a single consumer to attach to the subscription. If multiple consumers subscribe to a topic using the same subscription, an error occurs. Note that if the topic is partitioned, all partitions will be consumed by the single consumer allowed to be connected to the subscription.
In the diagram below, only **Consumer A** is allowed to consume messages.
:::tip
Exclusive is the default subscription type.
:::

#### Failover
The failover type is a subscription type that multiple consumers can attach to the same subscription.
A master consumer is picked for a non-partitioned topic or each partition of a partitioned topic and receives messages.
When the master consumer disconnects, all (non-acknowledged and subsequent) messages are delivered to the next consumer in line.
:::note
In some cases, a partition may have an older active consumer processing messages while a newly switched over active consumer starts receiving new messages. This may lead to message duplication or out-of-order.
:::
##### Failover | Partitioned topics
For partitioned topics, the broker sorts consumers by priority and lexicographical order of consumer name.
The broker tries to evenly assign partitions to consumers with the highest priority.
A consumer is selected by running a module operation `mod (partition index, consumer index)`.
- If the number of partitions in a partitioned topic is **less** than the number of consumers:
For example, in the diagram below, this partitioned topic has 2 partitions and there are 4 consumers.
Each partition has 1 active consumer and 3 stand-by consumers.
- For P0, Consumer A is the master consumer, while Consumer B, Consumer C, and Consumer D would be the next consumer in line to receive messages if consumer A is disconnected.
- For P1, Consumer B is the master consumer, while Consumer A, Consumer C, and Consumer D would be the next consumer in line to receive messages if consumer B is disconnected.
- Moreover, if Consumer A and consumer B are disconnected, then
- for P0: Consumer C is the active consumer and Consumer D is the stand-by consumer.
- for P1: Consumer D is the active consumer and Consumer C is the stand-by consumer.

- If the number of partitions in a partitioned topic is **greater** than the number of consumers:
For example, in the diagram below, this partitioned topic has 9 partitions and 3 consumers.
- P0, P3, and P6 are assigned to Consumer A. Consumer A is their active consumer. Consumer B and Consumer C are their stand-by consumers.
- P1, P4, and P7 are assigned to Consumer B. Consumer B is their active consumer. Consumer A and Consumer C are their stand-by consumers.
- P2, P5, and P8 are assigned to Consumer C. Consumer C is their active consumer. Consumer A and Consumer B are their stand-by consumers.

##### Failover | Non-partitioned topics
- If there is one non-partitioned topic. The broker picks consumers in the order they subscribe to non-partitioned topics.
For example, in the diagram below, this non-partitioned topic has 1 topic and there are 2 consumers.
The topic has 1 active consumer and 1 stand-by consumer.
Consumer A is the master consumer, while consumer B would be the next consumer in line to receive messages if consumer A is disconnected.

- If there are multiple non-partitioned topics, a consumer is selected based on **consumer name hash** and **topic name hash**. The client uses the same consumer name to subscribe to all the topics.
For example, in the diagram below, there are 4 non-partitioned topics and 2 consumers.
- The non-partitioned topic 1 and non-partitioned topic 4 are assigned to consumer B. Consumer A is their stand-by consumer.
- The non-partitioned topic 2 and non-partitioned topic 3 are assigned to consumer A. Consumer B is their stand-by consumer.

#### Shared
The shared subscription type in Pulsar allows multiple consumers to attach to the same subscription. Messages are delivered in a round-robin distribution across consumers, and any given message is delivered to only one consumer. When a consumer disconnects, all the messages that were sent to it and not acknowledged will be rescheduled for sending to the remaining consumers.
In the diagram below, **Consumer A**, **Consumer B** and **Consumer C** are all able to subscribe to the topic.
:::note
Shared subscriptions do not guarantee message ordering or support cumulative acknowledgment.
:::

#### Key_Shared
The Key_Shared subscription type in Pulsar allows multiple consumers to attach to the same subscription. But different with the Shared type, messages in the Key_Shared type are delivered in distribution across consumers and messages with the same key or same ordering key are delivered to only one consumer. No matter how many times the message is re-delivered, it is delivered to the same consumer.

:::note
If there is a newly switched over active consumer, it will start reading messages from the position where messages are acked by the old inactive consumer.
For example, if P0 is assigned to Consumer A. Consumer A is the active consumer and Consumer B is the stand-by consumer.
- If Consumer A gets disconnected without reading any messages from P0, when Consumer C is added and becomes the new active consumer, then Consumer C will start reading messages directly from P0.
- If Consumer A gets disconnected after reading messages (0,1,2,3) from P0, when Consumer C is added and becomes the active consumer, then Consumer C will start reading messages (4,5,6,7) from P0.
:::
There are three types of mapping algorithms dictating how to select a consumer for a given message key (or ordering key):
- Auto-split Hash Range
- Auto-split Consistent Hashing
- Sticky
The steps for all mapping algorithms are:
1. The message key (or ordering key) is passed to a hash function (e.g., Murmur3 32-bit), yielding a 32-bit integer hash.
2. That hash number is fed to the algorithm to select a consumer from the existing connected consumers.
```
+--------------+ +-----------+
Message Key -----> / Hash Function / ----- hash (32-bit) -------> / Algorithm / ----> Consumer
+---------------+ +----------+
```
When a new consumer is connected and thus added to the list of connected consumers, the algorithm re-adjusts the mapping such that some keys currently mapped to existing consumers will be mapped to the newly added consumer. When a consumer is disconnected, thus removed from the list of connected consumers, keys mapped to it will be mapped to other consumers. The sections below will explain how a consumer is selected given the message hash and how the mapping is adjusted given a new consumer is connected or an existing consumer disconnects for each algorithm.
##### Auto-split Hash Range
Auto-split Hash Range assumes each consumer is mapped into a single region in a range of numbers between 0 to 2^16 (65,536). So all mapped regions cover the entire range, and no regions overlap. A consumer is selected for a given key by running a modulo operation on the message hash by the range size (65,536). The number received ( 0 <= i < 65,536) is contained within a single region. The consumer mapped to that region is the one selected.
Example:
Suppose we have 4 consumers (C1, C2, C3 and C4), then:
```
0 16,384 32,768 49,152 65,536
|------- C3 ------|------- C2 ------|------- C1 ------|------- C4 ------|
```
Given a message key `Order-3459134`, its hash would be `murmur32("Order-3459134") = 3112179635`, and its index in the range would be `3112179635 mod 65536 = 6067`. That index is contained within region `[0, 16384)` thus consumer C3 will be mapped to this message key.
When a new consumer is connected, the largest region is chosen and is then split in half - the lower half will be mapped to the newly added consumer and upper half will be mapped to the consumer owning that region. Here is how it looks like from 1 to 4 consumers:
```
C1 connected:
|---------------------------------- C1 ---------------------------------|
C2 connected:
|--------------- C2 ----------------|---------------- C1 ---------------|
C3 connected:
|------- C3 ------|------- C2 ------|---------------- C1 ---------------|
C4 connected:
|------- C3 ------|------- C2 ------|------- C4 ------|------- C1 ------|
```
When a consumer is disconnected its region will be merged into the region on its right. Examples:
C4 is disconnected:
```
|------- C3 ------|------- C2 ------|---------------- C1 ---------------|
```
C1 is disconnected:
```
|------- C3 ------|-------------------------- C2 -----------------------|
```
The advantages of this algorithm is that it affects only a single existing consumer upon add/delete consumer, at the expense of regions not evenly sized. This means some consumers gets more keys that others. The next algorithm does the other way around.
##### Auto-split Consistent Hashing
Auto-split Consistent Hashing assumes each consumer is mapped into a Hash Ring. It's a range of number from 0 to 65,535 in which if you traverse the range, when reaching 65,535, the next number would be zero. It is as if you took a line starting from 0 ending at 65,535 and bent into a circle such that the end glues to the start:
```
65,535 ------++--------- 0
||
, - ~ ~ ~ - ,
, ' ' ,
, ,
, ,
, ,
, ,
, ,
, ,
, ,
, , '
' - , _ _ _ , '
```
When adding a consumer, we mark 100 points on that circle and associate them to the newly added consumer. For each number between 1 and 100, we concatenate the consumer name to that number and run the hash function on it to get the location of the point on the circle that will be marked. For Example, if the consumer name is "orders-aggregator-pod-2345-consumer" then we would mark 100 points on that circle:
```
murmur32("orders-aggregator-pod-2345-consumer␀0␀1") = 1003084738 % 65535 = 6028
murmur32("orders-aggregator-pod-2345-consumer␀0␀2") = 373317202 % 65535 = 29842
...
murmur32("orders-aggregator-pod-2345-consumer␀0␀100") = 320276078 % 65535 = 6533
```
Since the hash function has the uniform distribution attribute, those points would be uniformly distributed across the circle in random order.
```
C1-33
, - ~ ~ ~ - , C1-3
, ' ' ,
, ,
, , C1-45
, ,
, ,
, ,
, , C1-23
, ,
, , '
' - , _ _ _ , ' ...
```
A consumer is selected for a given message key by putting its hash on the circle then continue clock-wise on the circle until you reach a marking point. The point might have more than one consumer on it (hash function might have collisions). In the case of collisions, the first added consumer will handle the hash range. When it leaves, the next consumer in the colliding consumers for the particular hash ring point will take over.
When a consumer is added, we add 100 marking points to the circle as explained before. Due to the uniform distribution of the hash function, those 100 points act as if the new consumer takes a small slice of keys out of each existing consumer. It maintains the even distribution, on the trade-off that it impacts all existing consumers. [This video](https://www.youtube.com/watch?v=zaRkONvyGr8) explains the concept of Consistent Hashing quite well (the only difference is that in Pulsar's case we used K points instead of K hash functions as noted in the comments)
##### Sticky
Sticky assumes each consumer is mapped to multiple regions in a range of numbers between 0 to 2^16 (65,536) and there is no overlap between regions. The consumer is selected by running a modulo operation on the message hash by the range size (65,536), the number received (0 <= i < 65,536), is contained within a single region. The consumer mapped to the region is the one selected.
In this algorithm you have full control. Every newly added consumer specifies the ranges it wishes to be mapped to by using Consumer API. When the consumer object is constructed, you can specify the list of ranges. It's your responsibility to make sure there are no overlaps and all the range is covered by regions.
Example:
Suppose we have 2 consumers (C1 and C2) each specified their ranges, then:
```
C1 = [0, 16384), [32768, 49152)
C2 = [16384, 32768), [49152, 65536)
0 16,384 32,768 49,152 65,536
|------- C1 ------|------- C2 ------|------- C1 ------|------- C2 ------|
```
Given a message key `Order-3459134`, it's hash would be `murmur32("Order-3459134") = 3112179635`, and it's index in the range would be `3112179635 mod 65536 = 6067`. That index is contained within `[0, 16384)` thus consumer C1 will map to this message key.
If the newly connected consumer didn't supply their ranges, or they overlap with existing consumer ranges, it's disconnected, removed from the consumers list and reverted as if it never tried to connect.
##### How to use mapping algorithms?
To use a mapping algorithm mentioned above, you can specify the Key_Shared Mode when building the consumer:
- `AUTO_SPLIT` - Auto-split Hash Range
- `STICKY` - Sticky
Consistent Hashing will be used instead of Hash Range for Auto-split if the broker configuration `subscriptionKeySharedUseConsistentHashing` is enabled.
##### Preserving order of message delivery by key
In Pulsar 4.0.0, Key_Shared Subscription has been improved to preserve the order of message delivery with the same key when using the `AUTO_SPLIT` mode. The message delivery will no longer be blocked completely when new consumers join or leave.
For Key_Shared subscriptions, messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time. This ensures that the order of message delivery by key is preserved.
When new consumers join or leave, the consumer handling a message key can change when the default `AUTO_SPLIT` mode is used, but only after all unacknowledged messages for a particular key are acknowledged or the original consumer disconnects.
:::note
The Key_Shared subscription doesn't prevent using any methods in the consumer API. For example, the application might call `negativeAcknowledge` or the `redeliverUnacknowledgedMessages` method. When messages are scheduled for delivery due to these methods, they will get redelivered as soon as possible. There's no ordering guarantee in these cases, however the guarantee of delivering a message key to a single consumer at a time will continue to be preserved.
:::
##### Troubleshooting issues when message delivery is blocked for a key in Key_Shared subscriptions `AUTO_SPLIT` mode
Pulsar 4.0.0 added consumer-level topic stats to observe unacknowledged messages that block message delivery for a key in Key_Shared subscriptions using the `AUTO_SPLIT` mode.
- `drainingHashesCount` - the current number of hashes in the draining state for this consumer
- `drainingHashesClearedTotal` - the total number of hashes cleared from the draining state since the consumer connected
- `drainingHashesUnackedMessages` - the total number of unacknowledged messages for all draining hashes for this consumer
- `drainingHashes` - draining hashes information for this consumer
- `hash` - the sticky key hash which is draining
- `unackMsgs` - the number of unacknowledged messages for this hash
- `blockedAttempts` - the number of times the hash has blocked an attempted delivery of a message
Instead of tracking individual blocked keys, the `drainingHashes` field tracks the hashes that are in the draining state and being blocked by unacknowledged messages. The reason to track the hashes instead of the keys is to avoid the overhead of tracking individual keys so that the broker can scale better when there are a large number of keys. The hash space has been reduced to 2^16 (65,536) in Pulsar 4.0.0, down from 2^32 in previous Pulsar versions.
It's possible to calculate the hash for a key by using the Murmur3 32-bit hash function. The pseudo code to calculate the hash for a key is:
```
hash = murmur32("key") % 65536 + 1
```
In addition, the consumer-level topic stats contains the following fields:
- `keyHashRangeArrays` - the consumer's hash range assignments in a list of lists where each item contains the start and end as elements.
- example `[ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 54464 ], [ 55155, 61273 ] ]`
This field `keyHashRangeArrays` replaces `keyHashRange` field available in earlier Pulsar versions. The format of the field is different.
Example of both fields where the difference is visible:
```json
{
"keyHashRangeArrays" : [ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 54464 ], [ 55155, 61273 ] ],
"keyHashRanges" : [ "[2960, 5968]", "[22258, 43033]", "[49261, 54464]", "[55155, 61273]" ],
}
```
The field `keyHashRanges` contained the information as a list of string values, which isn't very usable for most use cases since it would need to be parsed before it can be used.
Example of the consumer stats part of the topic stats for a subscription:
```json
{
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 1560,
"msgOutCounter" : 30,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "c1",
"availablePermits" : 70,
"unackedMessages" : 30,
"avgMessagesPerEntry" : 1,
"blockedConsumerOnUnackedMsgs" : false,
"drainingHashesCount" : 5,
"drainingHashesClearedTotal" : 0,
"drainingHashesUnackedMessages" : 10,
"drainingHashes" : [ {
"hash" : 2862,
"unackMsgs" : 2,
"blockedAttempts" : 5
}, {
"hash" : 11707,
"unackMsgs" : 2,
"blockedAttempts" : 9
}, {
"hash" : 15786,
"unackMsgs" : 2,
"blockedAttempts" : 6
}, {
"hash" : 43539,
"unackMsgs" : 2,
"blockedAttempts" : 6
}, {
"hash" : 45436,
"unackMsgs" : 2,
"blockedAttempts" : 9
} ],
"address" : "/127.0.0.1:55829",
"connectedSince" : "2024-10-21T05:39:39.077284+03:00",
"clientVersion" : "Pulsar-Java-v4.0.0",
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 1728527979411,
"lastConsumedFlowTimestamp" : 1728527979106,
"keyHashRangeArrays" : [ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 54464 ], [ 55155, 61273 ] ],
"metadata" : { },
"lastAckedTime" : "1970-01-01T02:00:00+02:00",
"lastConsumedTime" : "2024-10-21T05:39:39.411+03:00"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "c2",
"availablePermits" : 1000,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"drainingHashesCount" : 0,
"drainingHashesClearedTotal" : 0,
"drainingHashesUnackedMessages" : 0,
"drainingHashes" : [ ],
"address" : "/127.0.0.1:55829",
"connectedSince" : "2024-10-21T05:39:39.294216+03:00",
"clientVersion" : "Pulsar-Java-v4.0.0",
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1728527979297,
"keyHashRangeArrays" : [ [ 1, 2959 ], [ 5969, 22257 ], [ 43034, 49260 ], [ 54465, 55154 ], [ 61274, 65535 ] ],
"metadata" : { },
"lastAckedTime" : "1970-01-01T02:00:00+02:00",
"lastConsumedTime" : "1970-01-01T02:00:00+02:00"
} ]
}
```
Relevant information for consumer c1:
```json
{
"drainingHashesCount" : 5,
"drainingHashesClearedTotal" : 0,
"drainingHashesUnackedMessages" : 10,
"drainingHashes" : [ {
"hash" : 2862,
"unackMsgs" : 2,
"blockedAttempts" : 5
}, {
"hash" : 11707,
"unackMsgs" : 2,
"blockedAttempts" : 9
}, {
"hash" : 15786,
"unackMsgs" : 2,
"blockedAttempts" : 6
}, {
"hash" : 43539,
"unackMsgs" : 2,
"blockedAttempts" : 6
}, {
"hash" : 45436,
"unackMsgs" : 2,
"blockedAttempts" : 9
} ],
}
```
Relevant information in this case about consumer c2:
```json
{
"keyHashRangeArrays" : [ [ 1, 2959 ], [ 5969, 22257 ], [ 43034, 49260 ], [ 54465, 55154 ], [ 61274, 65535 ] ],
}
```
In Pulsar 4.0.0, the Key_Shared implementation only blocks hashes that are necessary. For each hash, there is a way to obtain detailed information to determine why the delivery is blocked. The major difference from the previous `readPositionWhenJoining` solution is that it is now possible to automate and build CLI and web user interface tools to assist users, making it very easy to troubleshoot issues when message delivery is blocked by unacknowledged messages in Key_Shared `AUTO_SPLIT` subscriptions.
A future improvement will be to add a REST API for retrieving the unacknowledged message ID information of the unacknowledged message for a hash. Using this information, it will be possible to find out the details of the message that is blocking a particular hash from being delivered to a consumer. The REST API could also have additional features, such as searching by key or calculating the hash for a given key.
##### Batching for Key_Shared Subscriptions
:::note
When the consumers are using the Key_Shared subscription type, you need to **disable batching** or **use key-based batching** for the producers.
:::
There are two reasons why the key-based batching is necessary for the Key_Shared subscription type:
1. The broker dispatches messages according to the keys of the messages, but the default batching approach might fail to pack the messages with the same key to the same batch.
2. Since it is the consumers instead of the broker who dispatch the messages from the batches, the key of the first message in one batch is considered as the key to all messages in this batch, thereby leading to context errors.
The key-based batching aims at resolving the above-mentioned issues. This batching method ensures that the producers pack the messages with the same key to the same batch. The messages without a key are packed into one batch and this batch has no key. When the broker dispatches messages from this batch, it uses `NON_KEY` as the key. In addition, each consumer is associated with **only one** key and should receive **only one message batch** for the connected key. By default, you can limit batching by configuring the number of messages that producers are allowed to send.
Below are examples of enabling the key-based batching under the Key_Shared subscription type, with `client` being the Pulsar client that you created.
````mdx-code-block
<Tabs groupId="lang-choice"