PIP-39 introduces system topics and topic-level policies. Currently, the topic policies service (TopicPoliciesService
) has only one implementation (SystemTopicBasedTopicPoliciesService
) that depends on system topics. Therefore, the following configurations are required (though they are enabled by default):
systemTopicEnabled=true
topicLevelPoliciesEnabled=true
However, using system topics to manage topic policies may not always be the best choice. Users might need an alternative approach to manage topic policies.
The TopicPoliciesService
interface is poorly designed for third-party implementations due to the following reasons:
-
Methods that Should Not Be Exposed:
addOwnedNamespaceBundleAsync
andremoveOwnedNamespaceBundleAsync
are used internally inSystemTopicBasedTopicPoliciesService
.getTopicPoliciesBypassCacheAsync
is used only in tests to replay the__change_events
topic and construct the topic policies map.
-
Confusing and Inconsistent
getTopicPolicies
Methods:- There are two overrides of
getTopicPolicies
:TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException; TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException;
- The second method is equivalent to
getTopicPolicies(topicName, false)
. - These methods are asynchronous and start an asynchronous policies initialization, then try to get the policies from the cache. If the initialization hasn't started, they throw
TopicPoliciesCacheNotInitException
.
- There are two overrides of
These methods are hard to use and are primarily used in tests. The getTopicPoliciesAsyncWithRetry
method uses a user-provided executor and backoff policy to call getTopicPolicies
until TopicPoliciesCacheNotInitException
is not thrown:
default CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName,
final Backoff backoff, ScheduledExecutorService scheduledExecutorService, boolean isGlobal) {
The getTopicPolicies
methods are confusing for users who want to implement their own topic policies service. They need to look deeply into Pulsar's source code to understand these details.
PR #21231 adds two asynchronous overrides that are more user-friendly:
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal);
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName);
Now there are five asynchronous get
methods. Unlike getTopicPolicies
, getTopicPoliciesAsync(topic)
is not equivalent to getTopicPoliciesAsync(topic, false)
. Instead:
getTopicPoliciesAsync(topic)
tries getting local policies first, then global policies if absent.getTopicPoliciesAsync(topic, true)
tries getting global policies.getTopicPoliciesAsync(topic, false)
tries getting local policies.
Since PR #12517, topic policies support global policies across clusters. Therefore, there are local and global policies.
Currently:
getTopicPoliciesAsync(TopicName)
is used inBrokerService#getTopicPoliciesBypassSystemTopic
for initializing topic policies ofPersistentTopic
objects.getTopicPoliciesAsyncWithRetry
is used inAdminResource#getTopicPoliciesAsyncWithRetry
for all topic policies admin APIs.- Other methods are used only in tests.
There is also a sixth method, getTopicPoliciesIfExists
, which tries to get local topic policies from the cache:
TopicPolicies getTopicPoliciesIfExists(TopicName topicName);
However, this method is called just because there was no getTopicPoliciesAsync
methods before and getTopicPolicies
is hard to use. For example, here is an example code snippet in PersistentTopicsBase#internalUpdatePartitionedTopicAsync
:
TopicPolicies topicPolicies =
pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName);
if (topicPolicies != null && topicPolicies.getReplicationClusters() != null) {
replicationClusters = topicPolicies.getReplicationClustersSet();
}
With the new getTopicPoliciesAsync
methods, this code can be replaced with:
pulsarService.getTopicPoliciesService().getTopicPoliciesAsync(topicName, GetType.LOCAL_ONLY)
.thenAccept(topicPolicies -> {
if (topicPolicies.isPresent() && topicPolicies.get().getReplicationClusters() != null) {
replicationClusters = topicPolicies.get().getReplicationClustersSet();
}
});
Make TopicPoliciesService
pluggable so users can customize the topic policies service via another backend metadata store.
Redesign a clear and simple TopicPoliciesService
interface for users to customize.
Add a topicPoliciesServiceClassName
configuration to specify the topic policies service class name. If the class name is not the default SystemTopicBasedTopicPoliciesService
, systemTopicEnabled
will not be required unless the implementation requires it.
-
Add a unified method to get topic policies:
enum GetType { DEFAULT, // try getting the local topic policies, if not present, then get the global policies GLOBAL_ONLY, // only get the global policies LOCAL_ONLY, // only get the local policies } CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName topicName, GetType type);
getTopicPoliciesAsyncWithRetry
will be replaced bygetTopicPoliciesAsync(topicName, LOCAL_ONLY)
orgetTopicPoliciesAsync(topicName, GLOBAL_ONLY)
. The other two originalgetTopicPoliciesAsync
methods andgetTopicPoliciesIfExists
will be replaced bygetTopicPoliciesAsync(topicName, DEFAULT)
. -
Move
addOwnedNamespaceBundleAsync
andremoveOwnedNamespaceBundleAsync
to private methods ofSystemTopicBasedTopicPoliciesService
. -
Add a
TestUtils
class in tests to includegetTopicPolicies
andgetTopicPoliciesBypassCacheAsync
methods. -
Remove the generic parameter from
TopicPolicyListener
as the value type should always beTopicPolicies
. Mark this listener interface asStable
. -
Add a
PulsarService
parameter to thestart
method so that the implementation can have a constructor with an empty parameter list and get thePulsarService
instance from thestart
method. -
Add a
boolean
return value toregisterListener
sincePersistentTopic#initTopicPolicy
checks if the topic policies are enabled. The return value will indicate if theTopicPoliciesService
instance istopicPoliciesServiceClassName.DISABLED
.
Since the topic policies service is now decoupled from system topics, remove all isSystemTopicAndTopicLevelPoliciesEnabled()
calls.
Here is the refactored TopicPoliciesService
interface:
/**
* Delete policies for a topic asynchronously.
*
* @param topicName topic name
*/
CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName);
/**
* Update policies for a topic asynchronously.
*
* @param topicName topic name
* @param policies policies for the topic name
*/
CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies);
/**
* It controls the behavior of {@link TopicPoliciesService#getTopicPoliciesAsync}.
*/
enum GetType {
DEFAULT, // try getting the local topic policies, if not present, then get the global policies
GLOBAL_ONLY, // only get the global policies
LOCAL_ONLY, // only get the local policies
}
/**
* Retrieve the topic policies.
*/
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName topicName, GetType type);
/**
* Start the topic policy service.
*/
default void start(PulsarService pulsar) {
}
/**
* Close the resources if necessary.
*/
default void close() throws Exception {
}
/**
* Registers a listener for topic policies updates.
*
* <p>
* The listener will receive the latest topic policies when they are updated. If the policies are removed, the
* listener will receive a null value. Note that not every update is guaranteed to trigger the listener. For
* instance, if the policies change from A -> B -> null -> C in quick succession, only the final state (C) is
* guaranteed to be received by the listener.
* In summary, the listener is guaranteed to receive only the latest value.
* </p>
*
* @return true if the listener is registered successfully
*/
boolean registerListener(TopicName topicName, TopicPolicyListener listener);
/**
* Unregister the topic policies listener.
*/
void unregisterListener(TopicName topicName, TopicPolicyListener listener);
@InterfaceStability.Stable
public interface TopicPolicyListener {
void onUpdate(TopicPolicies data);
}
Add a new configuration topicPoliciesServiceClassName
.
If downstream applications need to call APIs from TopicPoliciesService
, they should modify the code to use the new API.
The current interface is poorly designed because it has only one implementation. Keeping these methods will burden developers who want to develop a customized interface. They need to understand where these confusing methods are called and handle them carefully.
- Mailing List discussion thread: https://lists.apache.org/thread/gf6h4n5n1z4n8v6bxdthct1n07onfdxt
- Mailing List voting thread: https://lists.apache.org/thread/potjbkb4w8brcwscgdwzlxnowgdf11gd