A topic in Pulsar is the place where messages are written to. They are consumed by subscriptions. A topic can have many subscriptions, and it is those that maintains the state of message acknowledgment, per subscription - which messages were acknowledged and which were not.
A subscription backlog is the set of unacknowledged messages in that subscription. A subscription backlog size is the sum of the size of the unacknowledged messages (in bytes)..
Since a topic can have many subscriptions, and each has its own backlog, how does one define a backlog for a topic? A topic backlog is defined as the backlog of the subscription which has the oldest unacknowledged message. Since acknowledged messages can be interleaved with unacknowledged messages, calculating the exact size of that subscription backlog can be expensive as it requires I/O operations to read the messages from the ledgers. For that reason, the topic backlog size is actually defined to be the estimated backlog size of that subscription. It does so by summarizing the size of all the ledgers, starting from the current active one (the one being written to), up to the ledger which contains the oldest unacknowledged message for that subscription (There is actually a faster way to calculate it, but this was the definition chosen for this estimation in Pulsar).
A topic backlog age is the age of the oldest unacknowledged message (same subscription as defined for topic backlog size). If that message was written 30 minutes ago, its age is 30 minutes, and so is the topic backlog age.
Pulsar has a feature called backlog quota. It allows a user to define a quota - in effect, a limit - which limits the topic backlog. There are two types of quotas:
- Size based: The limit is for the topic backlog size (as we defined above).
- Time based: The limit is for the topic backlog age (as we defined above).
Once a topic backlog exceeds either one of those limits, an action is taken to hold the backlog to that limit:
- The producer write is placed on hold for a certain amount of time before failing.
- The producer write is failed
- The subscriptions oldest unacknowledged messages will be acknowledged in-order until both the topic backlog size or age will fall inside the limit (quota). The process is called backlog eviction (happens every interval).
The quotas can be defined as a default value for any topic, by using the following broker configuration keys:
backlogQuotaDefaultLimitBytes
and backlogQuotaDefaultLimitSecond
.
The quota can also be specified directly for all topics in a given namespace using the namespace policy, or a specific topic using a topic policy.
The user today can calculate quota used for size based limit, since there are two metrics exposed today on
a topic level: pulsar_storage_backlog_quota_limit
and pulsar_storage_backlog_size
.
You can just divide the two to get a percentage and know how close the topic backlog to its size limit.
For the time-based limit, the only metric exposed today is the quota itself - pulsar_storage_backlog_quota_limit_time
The broker has a method called BrokerService.monitorBacklogQuota()
. It is scheduled to run every x seconds,
as defined by the configuration backlogQuotaCheckIntervalInSeconds
.
This method loops over all persistent topics, and for each topic is checks whether the topic backlog exceeded
either one of those topics.
As mentioned before, checking backlog size is a memory-only calculation, since
each topic has the list of ledgers stored in-memory, including the size of each ledger. Same goes for the subscriptions,
they are all stored in memory, and the ManagedCursor
keeps track of the subscription with the oldest unacknowledged
message, thus retrieveing it is O(1). Checking backlog based on time is costly if configuration key
preciseTimeBasedBacklogQuotaCheck
was set to true. In that case, it needs to read the oldest message to obtain
its public timestamp, which is expensive in terms of I/O. If it was set to false, it's in-memory access only, since
it uses the age of the ledger instead of the message, and the ledgers metadata is kept in memory.
For each topic which has exceeded its quota, if the policy chosen is eviction, then the process it performed synchronously. This process consumes I/O, as it needs read messages (using skip) to know where to stop acknowledging messages.
Users which have defined backlog quota based on time, have no means today to monitor the backlog quota usage, time-wise, to know whether the topic backlog is close to its time limit or even passed it.
If it has passed it, the user has no means to know if it happened, when and how many times.
- Allow the user to know the backlog quota usage for time-based quota, per topic
- Allow the user to know how many times backlog eviction happened, and for which backlog quota type
None
We'll use the existing backlog monitoring process running in intervals. For each topic, the subscription with the oldest unacknowledged message is retrieved, to calculate the topic backlog age. At that point, we will cache the following for the oldest unacknowledged message:
- Subscription name
- Message position
- Message publish timestamp
That cache will allow us to add a metric exposing the topic backlog age - pulsar_storage_backlog_age_seconds
,
which will be both consistent (same ones used for deciding on backlog eviction) and cheap to retrieve
(no additional I/O involved).
Coupled with the existing pulsar_storage_backlog_quota_limit_time
metric, the user can use both to divide and
get the usage of the quota (both are in seconds units).
We will add the subscription name containing the oldest unacknowledged message to the Admin API
topic stats endpoints ({tenant}/{namespace}/{topic}/stats
and {tenant}/{namespace}/{topic}/partitioned-stats
),
allowing the user a complete workflow: alert using metrics when topic backlog is about to be exceeded, then
query topic stats for that topic to retrieve the subscription name which contains the oldest message.
For completeness, we will also add the backlog quota limits, both age and size, and the age of oldest
unacknowledged message.
We will add a metric allowing the user to know how many times the usage exceeded the quota, both for time or size -
pulsar_storage_backlog_quota_exceeded_evictions_total
, where the quota_type
label will be either time
or
size
. Monitoring that counter over time will allow the user to know when a topic backlog exceeded its quota,
and if backlog eviction was chosen as action, then it happened, and how many times.
Some users may want the backlog quota check to happen more frequently, and as a consequence, the backlog age
metric more frequently updated. They can modify backlogQuotaCheckIntervalInSeconds
configuration key, but without
knowing how long this check takes, it will be hard for them. Hence, we will add the metric
pulsar_storage_backlog_quota_check_duration_seconds
which will be of histogram type.
Adding the following to the response of topic stats, of both {tenant}/{namespace}/{topic}/stats
and {tenant}/{namespace}/{topic}/partitioned-stats
:
backlogQuotaLimitSize
- the size in bytes of the topic backlog quotabacklogQuotaLimitTime
- the topic backlog age quota, in seconds.oldestBacklogMessageAgeSeconds
- the age of the oldest unacknowledged (i.e. backlog) message, measured by the time elapsed from its published time, in seconds. This value is recorded every backlog quota check interval, hence it represents the value seen in the last check.oldestBacklogMessageSubscriptionName
- the name of the subscription containing the oldest unacknowledged message. This value is recorded every backlog quota check interval, hence it represents the value seen in the last check.
Name | Description | Attributes | Units |
---|---|---|---|
pulsar_storage_backlog_age_seconds |
Gauge. The age of the oldest unacknowledged message (backlog) | cluster, namespace, topic | seconds |
pulsar_storage_backlog_quota_exceeded_evictions_total |
Counter. The number of times a backlog was evicted since it has exceeded its quota | cluster, namespace, topic, quota_type = (time | size) | |
pulsar_storage_backlog_quota_check_duration_seconds |
Histogram. The duration of the backlog quota check process. | cluster | seconds |
pulsar_broker_storage_backlog_quota_exceeded_evictions_total |
Counter. The number of times a backlog was evicted since it has exceeded its quota, in broker level | cluster, quota_type = (time | size) |
- Since
pulsar_storage_backlog_age_seconds
can not be aggregated, with proper meaning, to a namespace-level, it will not be included as a metric when configuration keyexposeTopicLevelMetricsInPrometheus
is set to false. pulsar_storage_backlog_quota_exceeded_evictions_total
will be included as a metric also in namespace aggregation.
One alternative is to separate the backlog quota check into 2 separate processes, running in their own frequency:
- Check backlog quota exceeded for all persistent topics. The result will be marked in memory. If precise time backlog quota was configured then this will the I/O cost as described before.
- Evict messages for those topics marked.
This may enable more frequent updates to the backlog age metric making it more fresh, but the cost associated with it might be high, since it might result in more frequent I/O calls, especially with many topics. Another disadvantage is that it makes the backlog check and eviction more complex.
- Mailing List discussion thread: https://lists.apache.org/thread/xv33xjjzc3t2n06ynz2gmcd4s06ckrqh
- Mailing List voting thread: https://lists.apache.org/thread/x2ypnft3x5jdyyxbwgvzxgcw20o44vps