From 196e6df036169d90827ab175812c667b1aedf8e8 Mon Sep 17 00:00:00 2001 From: penghui Date: Wed, 3 Jun 2020 14:24:17 +0800 Subject: [PATCH] Listen all commands. --- .../apache/pulsar/broker/PulsarService.java | 2 +- .../events/BrokerEventListenerUtils.java | 14 ++--- .../broker/events/BrokerEventListeners.java | 60 ++++--------------- ...afeBrokerEventListenerWithClassLoader.java | 46 +++----------- .../pulsar/broker/service/BrokerService.java | 4 -- .../pulsar/broker/service/ServerCnx.java | 1 + .../common}/events/BrokerEventListener.java | 49 ++++++++------- .../pulsar/common/protocol/PulsarDecoder.java | 5 +- 8 files changed, 60 insertions(+), 121 deletions(-) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker => pulsar-common/src/main/java/org/apache/pulsar/common}/events/BrokerEventListener.java (56%) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 35b6a189d712c0..667a2cb328242f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -423,7 +423,7 @@ public void start() throws PulsarServerException { this.defaultOffloader = createManagedLedgerOffloader( OffloadPolicies.create(this.getConfiguration().getProperties())); - + this.brokerService.getEventListeners().initialize(); brokerService.start(); this.webService = new WebService(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/BrokerEventListenerUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/BrokerEventListenerUtils.java index c2e21786c3b6db..965b802b6c15ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/BrokerEventListenerUtils.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/BrokerEventListenerUtils.java @@ -21,6 +21,7 @@ import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.events.BrokerEventListener; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -41,7 +42,7 @@ @Slf4j public class BrokerEventListenerUtils { - static final String BROKER_LISTENER_DEFINITION_FILE = "broker_listener.yml"; + final String BROKER_LISTENER_DEFINITION_FILE = "broker_listener.yml"; /** * Retrieve the broker listener definition from the provided handler nar package. @@ -50,13 +51,13 @@ public class BrokerEventListenerUtils { * @return the broker listener definition * @throws IOException when fail to load the broker listener or get the definition */ - public static BrokerEventListenerDefinition getBrokerListenerDefinition(String narPath, String narExtractionDirectory) throws IOException { + public BrokerEventListenerDefinition getBrokerListenerDefinition(String narPath, String narExtractionDirectory) throws IOException { try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) { return getBrokerListenerDefinition(ncl); } } - private static BrokerEventListenerDefinition getBrokerListenerDefinition(NarClassLoader ncl) throws IOException { + private BrokerEventListenerDefinition getBrokerListenerDefinition(NarClassLoader ncl) throws IOException { String configStr = ncl.getServiceDefinition(BROKER_LISTENER_DEFINITION_FILE); return ObjectMapperFactory.getThreadLocalYaml().readValue( @@ -71,7 +72,7 @@ private static BrokerEventListenerDefinition getBrokerListenerDefinition(NarClas * @return a collection of broker listeners * @throws IOException when fail to load the available broker listeners from the provided directory. */ - public static BrokerEventListenerDefinitions searchForListeners(String listenersDirectory, String narExtractionDirectory) throws IOException { + public BrokerEventListenerDefinitions searchForListeners(String listenersDirectory, String narExtractionDirectory) throws IOException { Path path = Paths.get(listenersDirectory).toAbsolutePath(); log.info("Searching for broker listeners in {}", path); @@ -112,9 +113,8 @@ public static BrokerEventListenerDefinitions searchForListeners(String listeners * Load the broker listeners according to the listener definition. * * @param metadata the broker listeners definition. - * @return */ - static SafeBrokerEventListenerWithClassLoader load(BrokerEventListenerMetadata metadata, String narExtractionDirectory) throws IOException { + SafeBrokerEventListenerWithClassLoader load(BrokerEventListenerMetadata metadata, String narExtractionDirectory) throws IOException { NarClassLoader ncl = NarClassLoader.getFromArchive( metadata.getArchivePath().toAbsolutePath().toFile(), Collections.emptySet(), @@ -141,7 +141,7 @@ static SafeBrokerEventListenerWithClassLoader load(BrokerEventListenerMetadata m } } - private static void rethrowIOException(Throwable cause) + private void rethrowIOException(Throwable cause) throws IOException { if (cause instanceof IOException) { throw (IOException) cause; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/BrokerEventListeners.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/BrokerEventListeners.java index 517df670a6f219..73d22f4faa255f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/BrokerEventListeners.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/BrokerEventListeners.java @@ -21,7 +21,9 @@ import com.google.common.collect.ImmutableMap; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; +import org.apache.pulsar.common.events.BrokerEventListener; +import org.apache.pulsar.common.protocol.PulsarDecoder; import java.io.IOException; import java.util.Map; @@ -47,33 +49,8 @@ public BrokerEventListenersDisabled() { } @Override - public void onNewProducer(PulsarApi.CommandProducer command) { - //No-op - } - - @Override - public void onSubscribe(PulsarApi.CommandSubscribe command) { - //No-op - } - - @Override - public void onUnsubscribe(PulsarApi.CommandUnsubscribe command) { - //No-op - } - - @Override - public void onCloseProducer(PulsarApi.CommandCloseProducer command) { - //No-op - } - - @Override - public void onCloseConsumer(PulsarApi.CommandCloseConsumer command) { - //No-op - } - - @Override - public void close() { - //No-op + public void onCommand(BaseCommand command, PulsarDecoder decoder) { + // No-op } } @@ -119,32 +96,19 @@ public static BrokerEventListeners load(ServiceConfiguration conf) throws IOExce } @Override - public void onNewProducer(PulsarApi.CommandProducer command) { - listeners.values().forEach(listener -> listener.onNewProducer(command)); - } - - @Override - public void onSubscribe(PulsarApi.CommandSubscribe command) { - listeners.values().forEach(listener -> listener.onSubscribe(command)); - } - - @Override - public void onUnsubscribe(PulsarApi.CommandUnsubscribe command) { - listeners.values().forEach(listener -> listener.onUnsubscribe(command)); - } - - @Override - public void onCloseProducer(PulsarApi.CommandCloseProducer command) { - listeners.values().forEach(listener -> listener.onCloseProducer(command)); + public void onCommand(BaseCommand command, PulsarDecoder decoder) { + listeners.forEach((k, v) -> v.onCommand(command, decoder)); } @Override - public void onCloseConsumer(PulsarApi.CommandCloseConsumer command) { - listeners.values().forEach(listener -> listener.onCloseConsumer(command)); + public void initialize() throws Exception { + for (SafeBrokerEventListenerWithClassLoader v : listeners.values()) { + v.initialize(); + } } @Override public void close() { - listeners.values().forEach(listener -> listener.close()); + listeners.values().forEach(SafeBrokerEventListenerWithClassLoader::close); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/SafeBrokerEventListenerWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/SafeBrokerEventListenerWithClassLoader.java index 128c5d3333337e..1d59618d44fefc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/SafeBrokerEventListenerWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/SafeBrokerEventListenerWithClassLoader.java @@ -21,8 +21,10 @@ import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; +import org.apache.pulsar.common.events.BrokerEventListener; import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.protocol.PulsarDecoder; import java.io.IOException; @@ -37,50 +39,18 @@ public class SafeBrokerEventListenerWithClassLoader implements BrokerEventListen private final BrokerEventListener interceptor; private final NarClassLoader classLoader; - - @Override - public void onNewProducer(PulsarApi.CommandProducer command) { - try { - this.interceptor.onNewProducer(command); - } catch (Throwable e) { - log.error("Fail to execute on new producer on broker listener", e); - } - } - - @Override - public void onSubscribe(PulsarApi.CommandSubscribe command) { - try { - this.interceptor.onSubscribe(command); - } catch (Throwable e) { - log.error("Fail to execute on subscribe on broker listener", e); - } - } - @Override - public void onUnsubscribe(PulsarApi.CommandUnsubscribe command) { + public void onCommand(BaseCommand command, PulsarDecoder decoder) { try { - this.interceptor.onUnsubscribe(command); + this.interceptor.onCommand(command, decoder); } catch (Throwable e) { - log.error("Fail to execute on unsubscribe on broker listener", e); + log.error("Fail to execute on command on broker listener", e); } } @Override - public void onCloseProducer(PulsarApi.CommandCloseProducer command) { - try { - this.interceptor.onCloseProducer(command); - } catch (Throwable e) { - log.error("Fail to execute on close producer on broker listener", e); - } - } - - @Override - public void onCloseConsumer(PulsarApi.CommandCloseConsumer command) { - try { - this.interceptor.onCloseConsumer(command); - } catch (Throwable e) { - log.error("Fail to execute on close consumer on broker listener", e); - } + public void initialize() throws Exception { + this.interceptor.initialize(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 68e6f0a4720eba..5a33b31942a378 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2260,8 +2260,4 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin private boolean isSystemTopic(String topic) { return SystemTopicClient.isSystemTopic(TopicName.get(topic)); } - - public BrokerEventListeners getEventsListeners() { - return eventListeners; - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 0416594adc90a1..ccd610e94958bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -181,6 +181,7 @@ public ServerCnx(PulsarService pulsar) { this.service = pulsar.getBrokerService(); this.schemaService = pulsar.getSchemaRegistryService(); this.state = State.Start; + this.brokerEventListener = service.getEventListeners(); // This maps are not heavily contended since most accesses are within the cnx thread this.producers = new ConcurrentLongHashMap<>(8, 1); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/BrokerEventListener.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/BrokerEventListener.java similarity index 56% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/events/BrokerEventListener.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/events/BrokerEventListener.java index 5ab6324e7c4aa9..06c1df963b0d3f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/events/BrokerEventListener.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/BrokerEventListener.java @@ -16,14 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.events; +package org.apache.pulsar.common.events; import com.google.common.annotations.Beta; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer; +import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; +import org.apache.pulsar.common.protocol.PulsarDecoder; /** * A plugin interface that allows you to listen (and possibly mutate) the @@ -39,29 +36,37 @@ public interface BrokerEventListener extends AutoCloseable { /** - * This is called when the pulsar client send a new producer request. - */ - void onNewProducer(CommandProducer command); + * Called by the broker while new command incoming. - /** - * This is called when the pulsar client send a subscribe request. */ - void onSubscribe(CommandSubscribe command); + void onCommand(BaseCommand command, PulsarDecoder decoder); /** - * This is called when the pulsar client send a unsubscribe request. + * Initialize the broker event listener. + * + * @throws Exception when fail to initialize the broker event listener. */ - void onUnsubscribe(CommandUnsubscribe command); + void initialize() throws Exception; - /** - * This is called when the pulsar client close a producer or the client disconnected. - */ - void onCloseProducer(CommandCloseProducer command); + BrokerEventListener DISABLED = new BrokerEventListenerDisabled(); - /** - * This is called when the pulsar client close a consumer or the client disconnected. - */ - void onCloseConsumer(CommandCloseConsumer command); + class BrokerEventListenerDisabled implements BrokerEventListener { + + @Override + public void onCommand(BaseCommand command, PulsarDecoder decoder) { + //No-op + } + + @Override + public void initialize() throws Exception { + //No-op + } + + @Override + public void close() { + //No-op + } + } /** * Close this broker event listener. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index cfba5bbd5c73cd..c56b6038c59dbf 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -74,6 +74,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess; import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe; +import org.apache.pulsar.common.events.BrokerEventListener; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +84,8 @@ */ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { + protected BrokerEventListener brokerEventListener = BrokerEventListener.DISABLED; + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // Get a buffer that contains the full frame @@ -107,7 +110,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } messageReceived(); - + brokerEventListener.onCommand(cmd, this); switch (cmd.getType()) { case PARTITIONED_METADATA: checkArgument(cmd.hasPartitionMetadata());