Skip to content

Commit 038c574

Browse files
[improve] PIP-241: add TopicEventListener / topic events for the BrokerService (#5)
Co-authored-by: liudezhi <liudezhi2098@163.com>
1 parent 0e6e72e commit 038c574

File tree

6 files changed

+571
-4
lines changed

6 files changed

+571
-4
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+65-2
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@
114114
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
115115
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
116116
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
117+
import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
118+
import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
117119
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
118120
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
119121
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -287,6 +289,8 @@ public class BrokerService implements Closeable {
287289
private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
288290
private Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors;
289291

292+
private final TopicEventsDispatcher topicEventsDispatcher = new TopicEventsDispatcher();
293+
290294
public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception {
291295
this.pulsar = pulsar;
292296
this.preciseTopicPublishRateLimitingEnable =
@@ -401,6 +405,16 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
401405
this.bundlesQuotas = new BundlesQuotas(pulsar.getLocalMetadataStore());
402406
}
403407

408+
public void addTopicEventListener(TopicEventsListener... listeners) {
409+
topicEventsDispatcher.addTopicEventListener(listeners);
410+
getTopics().keys().forEach(topic ->
411+
TopicEventsDispatcher.notify(listeners, topic, TopicEvent.LOAD, EventStage.SUCCESS, null));
412+
}
413+
414+
public void removeTopicEventListener(TopicEventsListener... listeners) {
415+
topicEventsDispatcher.removeTopicEventListener(listeners);
416+
}
417+
404418
// This call is used for starting additional protocol handlers
405419
public void startProtocolHandlers(
406420
Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers) {
@@ -1010,17 +1024,37 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
10101024
});
10111025
} else {
10121026
return topics.computeIfAbsent(topic, (name) -> {
1027+
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE);
10131028
if (topicName.isPartitioned()) {
10141029
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
10151030
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
10161031
if (topicName.getPartitionIndex() < metadata.partitions) {
1017-
return createNonPersistentTopic(name);
1032+
topicEventsDispatcher
1033+
.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);
1034+
1035+
CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name);
1036+
1037+
CompletableFuture<Optional<Topic>> eventFuture = topicEventsDispatcher
1038+
.notifyOnCompletion(res, topicName.toString(), TopicEvent.CREATE);
1039+
topicEventsDispatcher
1040+
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
1041+
return res;
10181042
}
1043+
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
10191044
return CompletableFuture.completedFuture(Optional.empty());
10201045
});
10211046
} else if (createIfMissing) {
1022-
return createNonPersistentTopic(name);
1047+
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);
1048+
1049+
CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name);
1050+
1051+
CompletableFuture<Optional<Topic>> eventFuture = topicEventsDispatcher
1052+
.notifyOnCompletion(res, topicName.toString(), TopicEvent.CREATE);
1053+
topicEventsDispatcher
1054+
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
1055+
return res;
10231056
} else {
1057+
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
10241058
return CompletableFuture.completedFuture(Optional.empty());
10251059
}
10261060
});
@@ -1066,6 +1100,13 @@ public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete) {
10661100
}
10671101

10681102
public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete, boolean deleteSchema) {
1103+
topicEventsDispatcher.notify(topic, TopicEvent.DELETE, EventStage.BEFORE);
1104+
CompletableFuture<Void> result = deleteTopicInternal(topic, forceDelete, deleteSchema);
1105+
topicEventsDispatcher.notifyOnCompletion(result, topic, TopicEvent.DELETE);
1106+
return result;
1107+
}
1108+
1109+
public CompletableFuture<Void> deleteTopicInternal(String topic, boolean forceDelete, boolean deleteSchema) {
10691110
Optional<Topic> optTopic = getTopicReference(topic);
10701111
if (optTopic.isPresent()) {
10711112
Topic t = optTopic.get();
@@ -1526,6 +1567,24 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
15261567
managedLedgerConfig.setCreateIfMissing(createIfMissing);
15271568
managedLedgerConfig.setProperties(properties);
15281569

1570+
topicEventsDispatcher.notify(topic, TopicEvent.LOAD, EventStage.BEFORE);
1571+
// load can fail with topicFuture completed non-exceptionally
1572+
// work around this
1573+
final CompletableFuture<Void> loadFuture = new CompletableFuture<>();
1574+
topicFuture.whenComplete((res, ex) -> {
1575+
if (ex == null) {
1576+
loadFuture.complete(null);
1577+
} else {
1578+
loadFuture.completeExceptionally(ex);
1579+
}
1580+
});
1581+
1582+
if (createIfMissing) {
1583+
topicEventsDispatcher.notify(topic, TopicEvent.CREATE, EventStage.BEFORE);
1584+
topicEventsDispatcher.notifyOnCompletion(topicFuture, topic, TopicEvent.CREATE);
1585+
}
1586+
topicEventsDispatcher.notifyOnCompletion(loadFuture, topic, TopicEvent.LOAD);
1587+
15291588
// Once we have the configuration, we can proceed with the async open operation
15301589
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig,
15311590
new OpenLedgerCallback() {
@@ -1594,6 +1653,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
15941653
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
15951654
if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) {
15961655
// We were just trying to load a topic and the topic doesn't exist
1656+
loadFuture.completeExceptionally(exception);
15971657
topicFuture.complete(Optional.empty());
15981658
} else {
15991659
log.warn("Failed to create topic {}", topic, exception);
@@ -2085,6 +2145,8 @@ private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle,
20852145
String bundleName = namespaceBundle.toString();
20862146
String namespaceName = TopicName.get(topic).getNamespaceObject().toString();
20872147

2148+
topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.BEFORE);
2149+
20882150
synchronized (multiLayerTopicsMap) {
20892151
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> namespaceMap = multiLayerTopicsMap
20902152
.get(namespaceName);
@@ -2119,6 +2181,7 @@ private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle,
21192181
if (compactor != null) {
21202182
compactor.getStats().removeTopic(topic);
21212183
}
2184+
topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.SUCCESS);
21222185
}
21232186

21242187
public int getNumberOfNamespaceBundles() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.Objects;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CopyOnWriteArrayList;
26+
import lombok.extern.slf4j.Slf4j;
27+
28+
/**
29+
* Utility class to dispatch topic events.
30+
*/
31+
@Slf4j
32+
public class TopicEventsDispatcher {
33+
private final List<TopicEventsListener> topicEventListeners = new CopyOnWriteArrayList<>();
34+
35+
/**
36+
* Adds listeners, ignores null listeners.
37+
* @param listeners
38+
*/
39+
public void addTopicEventListener(TopicEventsListener... listeners) {
40+
Objects.requireNonNull(listeners);
41+
Arrays.stream(listeners)
42+
.filter(x -> x != null)
43+
.forEach(topicEventListeners::add);
44+
}
45+
46+
/**
47+
* Removes listeners.
48+
* @param listeners
49+
*/
50+
public void removeTopicEventListener(TopicEventsListener... listeners) {
51+
Objects.requireNonNull(listeners);
52+
Arrays.stream(listeners)
53+
.filter(x -> x != null)
54+
.forEach(topicEventListeners::remove);
55+
}
56+
57+
/**
58+
* Dispatches notification to all currently added listeners.
59+
* @param topic
60+
* @param event
61+
* @param stage
62+
*/
63+
public void notify(String topic,
64+
TopicEventsListener.TopicEvent event,
65+
TopicEventsListener.EventStage stage) {
66+
notify(topic, event, stage, null);
67+
}
68+
69+
/**
70+
* Dispatches notification to all currently added listeners.
71+
* @param topic
72+
* @param event
73+
* @param stage
74+
* @param t
75+
*/
76+
public void notify(String topic,
77+
TopicEventsListener.TopicEvent event,
78+
TopicEventsListener.EventStage stage,
79+
Throwable t) {
80+
topicEventListeners
81+
.forEach(listener -> notify(listener, topic, event, stage, t));
82+
}
83+
84+
/**
85+
* Dispatches SUCCESS/FAILURE notification to all currently added listeners on completion of the future.
86+
* @param future
87+
* @param topic
88+
* @param event
89+
* @param <T>
90+
* @return future of a new completion stage
91+
*/
92+
public <T> CompletableFuture<T> notifyOnCompletion(CompletableFuture<T> future,
93+
String topic,
94+
TopicEventsListener.TopicEvent event) {
95+
return future.whenComplete((r, ex) -> notify(topic,
96+
event,
97+
ex == null ? TopicEventsListener.EventStage.SUCCESS : TopicEventsListener.EventStage.FAILURE,
98+
ex));
99+
}
100+
101+
/**
102+
* Dispatches notification to specified listeners.
103+
* @param listeners
104+
* @param topic
105+
* @param event
106+
* @param stage
107+
* @param t
108+
*/
109+
public static void notify(TopicEventsListener[] listeners,
110+
String topic,
111+
TopicEventsListener.TopicEvent event,
112+
TopicEventsListener.EventStage stage,
113+
Throwable t) {
114+
Objects.requireNonNull(listeners);
115+
for (TopicEventsListener listener: listeners) {
116+
notify(listener, topic, event, stage, t);
117+
}
118+
}
119+
120+
private static void notify(TopicEventsListener listener,
121+
String topic,
122+
TopicEventsListener.TopicEvent event,
123+
TopicEventsListener.EventStage stage,
124+
Throwable t) {
125+
if (listener == null) {
126+
return;
127+
}
128+
129+
try {
130+
listener.handleEvent(topic, event, stage, t);
131+
} catch (Throwable ex) {
132+
log.error("TopicEventsListener {} exception while handling {}_{} for topic {}",
133+
listener, event, stage, topic, ex);
134+
}
135+
}
136+
137+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import org.apache.pulsar.common.classification.InterfaceAudience;
22+
import org.apache.pulsar.common.classification.InterfaceStability;
23+
24+
/**
25+
* Listener for the Topic events.
26+
*/
27+
@InterfaceStability.Evolving
28+
@InterfaceAudience.LimitedPrivate
29+
public interface TopicEventsListener {
30+
31+
/**
32+
* Types of events currently supported.
33+
* create/load/unload/delete
34+
*/
35+
enum TopicEvent {
36+
// create events included into load events
37+
CREATE,
38+
LOAD,
39+
UNLOAD,
40+
DELETE,
41+
}
42+
43+
/**
44+
* Stages of events currently supported.
45+
* before starting the event/successful completion/failed completion
46+
*/
47+
enum EventStage {
48+
BEFORE,
49+
SUCCESS,
50+
FAILURE
51+
}
52+
53+
/**
54+
* Handle topic event.
55+
* Choice of the thread / maintenance of the thread pool is up to the event handlers.
56+
* @param topicName - name of the topic
57+
* @param event - TopicEvent
58+
* @param stage - EventStage
59+
* @param t - exception in case of FAILURE, if present/known
60+
*/
61+
void handleEvent(String topicName, TopicEvent event, EventStage stage, Throwable t);
62+
}

0 commit comments

Comments
 (0)