Delayed message delivery is an important feature which allows a producer to specify that a message should be delivered/consumed at a later time. Currently the broker will save a delayed message without any check. The message's deliverAt
time is checked when the broker dispatches messages to the Consumer. If a message has a deliverAt
time, then it is added to the DelayedDeliveryTracker
and will be delivered later when eligible.
Delayed message delivery is only available for persistent topics, and shared/key-shared subscription types.
Currently there is no max delay limit so a producer can specify any delay when publishing a message.
This poses a few challenges:
- Producer may miscalculate/misconfigure a very large delay (ex. 1,000 day instead of 100 day delay)
- Pulsar administrators may want to limit the max allowed delay since unacked messages (ex. messages with a large delay) will be stored forever (unless TTL is configured)
- The configured delay may be greater than the configured TTL which means the delayed message may be deleted before the
deliverAt
time (before the consumer can process it)
The purpose of this PIP is to introduce an optional configuration to limit the max allowed delay for delayed delivery.
- Add broker configuration to limit the max allowed delay for delayed delivery
- Configurable at broker/topic/namespace-level
We will add a configuration maxDeliveryDelayInMillis
and if configured, the broker will check incoming delayed messages to see if the message's deliverAt
time exceeds the configured limit. If it exceeds the limit, the broker will send an error back to the Producer.
A new maxDeliveryDelayInMillis
config will be added to the broker which is initially defaulted to 0 (disabled). The default (disabled) behavior will match the current delayed delivery behavior (no limit on delivery delay).
# broker.conf
delayedDeliveryMaxDeliveryDelayInMillis=0
This field will also be added to the existing DelayedDeliveryPolicies
interface to support topic & namespace-level configuration:
public interface DelayedDeliveryPolicies {
long getMaxDeliveryDelayInMillis();
}
The max delivery delay check will occur in the broker's Producer
class inside of checkAndStartPublish
(same place as other checks such as isEncryptionEnabled
).
We will give a ServerError.NotAllowedError
error if all of the following are true:
- Sending to a persistent topic
- Topic has
delayedDeliveryEnabled=true
MessageMetadata
deliver_at_time
has been specified- Topic has
>0
value formaxDeliveryDelayInMillis
deliver_at_time - publish_time
>maxDeliveryDelayInMillis
// In org.apache.pulsar.broker.service.Producer#checkAndStartPublish
if (topic.isPersistent()) {
PersistentTopic pTopic = (PersistentTopic) topic;
if (pTopic.isDelayedDeliveryEnabled()) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
if (msgMetadata.hasDeliverAtTime()) {
long maxDeliveryDelayInMillis = pTopic.getMaxDeliveryDelayInMillis();
if (maxDeliveryDelayInMillis > 0
&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMillis) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
String.format("Exceeds max allowed delivery delay of %s milliseconds", maxDeliveryDelayInMillis));
cnx.completedSendOperation(false, headersAndPayload.readableBytes());
});
return false;
}
}
}
}
The proposal does not involve any client changes, however it is important to note that setting a max delivery delay may impact the Consumer
since the Consumer
uses delayed delivery for retrying to the retry/dlq topic (ex. reconsumeLater
API). So the max Consumer
retry delay will be the same as the configured maxDeliveryDelayInMillis
(if enabled).
A problem will occur if max delivery delay is configured but a Consumer
uses a larger custom retry delay. In this scenario, the Consumer
will actually get stuck redelivering the message as the publish to the retry topic will fail. For this scenario, a larger retry delay should be configured specifically for the Consumer's retry topic (or no delay limit should be used for retry topics).
A more elegant solution would require a protocol change (see Alternatives
section below).
The optional maxDeliveryDelayInMillis
field will be added to the admin REST APIs for configuring topic/namespace policies:
POST /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery
POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery
And the corresponding GET
APIs will show maxDeliveryDelayInMillis
in the response:
GET /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery
GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery
Broker will have a new config in broker.conf
:
# The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which exceeds this max delay, then
# it will return an error to the producer.
# The default value is 0 which means there is no limit on the max delivery delay.
delayedDeliveryMaxDeliveryDelayInMillis=0
Both CmdTopics
and CmdNamespaces
will be updated to include this additional optional configuration.
Reverting to a previous version will simply get rid of this config/limitation which is the previous behavior.
We will default the value to 0/disabled (no limitation), so this is a backwards compatible change and will not cause any functional change when upgrading to this feature/version. This feature will only be applied once the config is changed.
If configured, the maxDeliveryDelayInMillis
limitation will affect:
- Producers who configure a longer max delivery delay (PIP-26: 2.4.0+)
- Consumers who configure a longer retry delay when using retry topic (PIP-58: 2.6.0+)
An alternative is to add the limit check to the client-side which requires a protocol change so that client Producer
/Consumer
will receive the delayed delivery configurations from the broker. The client Producer
can then throw an exception if the caller provides a delay greater than the configured limit. The client Consumer
can more elegantly handle when the retry publish delay is greater than the configured limit as it can default to using the limit instead of being stuck waiting for the limit to be increased.
This would still require the broker-side check as someone may be using a custom client. The main benefit is being able to elegantly handle the Consumer
retry topic scenario.
If we were to make this protocol change, then it might make sense to also have the Producer
check the delayedDeliveryEnabled
config. If delayed delivery is disabled and the Producer
tries to send a delayed message, then an exception is thrown to the caller (current behavior is the broker will just deliver the message instantly and no error is provided to the Producer
so it can be misleading).
We would also need to add the client-side checks to other supported client libraries.
Since the scope of this alternative would be quite expansive, we may want to pursue this in a follow-up PIP instead of trying to address it all at once.
- Mailing List discussion thread: https://lists.apache.org/thread/285nm08842or324rxc2zy83wxgqxtcjp
- Mailing List voting thread: https://lists.apache.org/thread/gkqrfrxx74j0dmrogg3now29v1of9zm9