Skip to content

Commit

Permalink
Listen all commands.
Browse files Browse the repository at this point in the history
  • Loading branch information
codelipenghui committed Jun 3, 2020
1 parent e848d25 commit 196e6df
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ public void start() throws PulsarServerException {

this.defaultOffloader = createManagedLedgerOffloader(
OffloadPolicies.create(this.getConfiguration().getProperties()));

this.brokerService.getEventListeners().initialize();
brokerService.start();

this.webService = new WebService(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.events.BrokerEventListener;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;

Expand All @@ -41,7 +42,7 @@
@Slf4j
public class BrokerEventListenerUtils {

static final String BROKER_LISTENER_DEFINITION_FILE = "broker_listener.yml";
final String BROKER_LISTENER_DEFINITION_FILE = "broker_listener.yml";

/**
* Retrieve the broker listener definition from the provided handler nar package.
Expand All @@ -50,13 +51,13 @@ public class BrokerEventListenerUtils {
* @return the broker listener definition
* @throws IOException when fail to load the broker listener or get the definition
*/
public static BrokerEventListenerDefinition getBrokerListenerDefinition(String narPath, String narExtractionDirectory) throws IOException {
public BrokerEventListenerDefinition getBrokerListenerDefinition(String narPath, String narExtractionDirectory) throws IOException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
return getBrokerListenerDefinition(ncl);
}
}

private static BrokerEventListenerDefinition getBrokerListenerDefinition(NarClassLoader ncl) throws IOException {
private BrokerEventListenerDefinition getBrokerListenerDefinition(NarClassLoader ncl) throws IOException {
String configStr = ncl.getServiceDefinition(BROKER_LISTENER_DEFINITION_FILE);

return ObjectMapperFactory.getThreadLocalYaml().readValue(
Expand All @@ -71,7 +72,7 @@ private static BrokerEventListenerDefinition getBrokerListenerDefinition(NarClas
* @return a collection of broker listeners
* @throws IOException when fail to load the available broker listeners from the provided directory.
*/
public static BrokerEventListenerDefinitions searchForListeners(String listenersDirectory, String narExtractionDirectory) throws IOException {
public BrokerEventListenerDefinitions searchForListeners(String listenersDirectory, String narExtractionDirectory) throws IOException {
Path path = Paths.get(listenersDirectory).toAbsolutePath();
log.info("Searching for broker listeners in {}", path);

Expand Down Expand Up @@ -112,9 +113,8 @@ public static BrokerEventListenerDefinitions searchForListeners(String listeners
* Load the broker listeners according to the listener definition.
*
* @param metadata the broker listeners definition.
* @return
*/
static SafeBrokerEventListenerWithClassLoader load(BrokerEventListenerMetadata metadata, String narExtractionDirectory) throws IOException {
SafeBrokerEventListenerWithClassLoader load(BrokerEventListenerMetadata metadata, String narExtractionDirectory) throws IOException {
NarClassLoader ncl = NarClassLoader.getFromArchive(
metadata.getArchivePath().toAbsolutePath().toFile(),
Collections.emptySet(),
Expand All @@ -141,7 +141,7 @@ static SafeBrokerEventListenerWithClassLoader load(BrokerEventListenerMetadata m
}
}

private static void rethrowIOException(Throwable cause)
private void rethrowIOException(Throwable cause)
throws IOException {
if (cause instanceof IOException) {
throw (IOException) cause;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.events.BrokerEventListener;
import org.apache.pulsar.common.protocol.PulsarDecoder;

import java.io.IOException;
import java.util.Map;
Expand All @@ -47,33 +49,8 @@ public BrokerEventListenersDisabled() {
}

@Override
public void onNewProducer(PulsarApi.CommandProducer command) {
//No-op
}

@Override
public void onSubscribe(PulsarApi.CommandSubscribe command) {
//No-op
}

@Override
public void onUnsubscribe(PulsarApi.CommandUnsubscribe command) {
//No-op
}

@Override
public void onCloseProducer(PulsarApi.CommandCloseProducer command) {
//No-op
}

@Override
public void onCloseConsumer(PulsarApi.CommandCloseConsumer command) {
//No-op
}

@Override
public void close() {
//No-op
public void onCommand(BaseCommand command, PulsarDecoder decoder) {
// No-op
}
}

Expand Down Expand Up @@ -119,32 +96,19 @@ public static BrokerEventListeners load(ServiceConfiguration conf) throws IOExce
}

@Override
public void onNewProducer(PulsarApi.CommandProducer command) {
listeners.values().forEach(listener -> listener.onNewProducer(command));
}

@Override
public void onSubscribe(PulsarApi.CommandSubscribe command) {
listeners.values().forEach(listener -> listener.onSubscribe(command));
}

@Override
public void onUnsubscribe(PulsarApi.CommandUnsubscribe command) {
listeners.values().forEach(listener -> listener.onUnsubscribe(command));
}

@Override
public void onCloseProducer(PulsarApi.CommandCloseProducer command) {
listeners.values().forEach(listener -> listener.onCloseProducer(command));
public void onCommand(BaseCommand command, PulsarDecoder decoder) {
listeners.forEach((k, v) -> v.onCommand(command, decoder));
}

@Override
public void onCloseConsumer(PulsarApi.CommandCloseConsumer command) {
listeners.values().forEach(listener -> listener.onCloseConsumer(command));
public void initialize() throws Exception {
for (SafeBrokerEventListenerWithClassLoader v : listeners.values()) {
v.initialize();
}
}

@Override
public void close() {
listeners.values().forEach(listener -> listener.close());
listeners.values().forEach(SafeBrokerEventListenerWithClassLoader::close);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.events.BrokerEventListener;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.protocol.PulsarDecoder;

import java.io.IOException;

Expand All @@ -37,50 +39,18 @@ public class SafeBrokerEventListenerWithClassLoader implements BrokerEventListen
private final BrokerEventListener interceptor;
private final NarClassLoader classLoader;


@Override
public void onNewProducer(PulsarApi.CommandProducer command) {
try {
this.interceptor.onNewProducer(command);
} catch (Throwable e) {
log.error("Fail to execute on new producer on broker listener", e);
}
}

@Override
public void onSubscribe(PulsarApi.CommandSubscribe command) {
try {
this.interceptor.onSubscribe(command);
} catch (Throwable e) {
log.error("Fail to execute on subscribe on broker listener", e);
}
}

@Override
public void onUnsubscribe(PulsarApi.CommandUnsubscribe command) {
public void onCommand(BaseCommand command, PulsarDecoder decoder) {
try {
this.interceptor.onUnsubscribe(command);
this.interceptor.onCommand(command, decoder);
} catch (Throwable e) {
log.error("Fail to execute on unsubscribe on broker listener", e);
log.error("Fail to execute on command on broker listener", e);
}
}

@Override
public void onCloseProducer(PulsarApi.CommandCloseProducer command) {
try {
this.interceptor.onCloseProducer(command);
} catch (Throwable e) {
log.error("Fail to execute on close producer on broker listener", e);
}
}

@Override
public void onCloseConsumer(PulsarApi.CommandCloseConsumer command) {
try {
this.interceptor.onCloseConsumer(command);
} catch (Throwable e) {
log.error("Fail to execute on close consumer on broker listener", e);
}
public void initialize() throws Exception {
this.interceptor.initialize();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2260,8 +2260,4 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin
private boolean isSystemTopic(String topic) {
return SystemTopicClient.isSystemTopic(TopicName.get(topic));
}

public BrokerEventListeners getEventsListeners() {
return eventListeners;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public ServerCnx(PulsarService pulsar) {
this.service = pulsar.getBrokerService();
this.schemaService = pulsar.getSchemaRegistryService();
this.state = State.Start;
this.brokerEventListener = service.getEventListeners();

// This maps are not heavily contended since most accesses are within the cnx thread
this.producers = new ConcurrentLongHashMap<>(8, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.events;
package org.apache.pulsar.common.events;

import com.google.common.annotations.Beta;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.protocol.PulsarDecoder;

/**
* A plugin interface that allows you to listen (and possibly mutate) the
Expand All @@ -39,29 +36,37 @@
public interface BrokerEventListener extends AutoCloseable {

/**
* This is called when the pulsar client send a new producer request.
*/
void onNewProducer(CommandProducer command);
* Called by the broker while new command incoming.
/**
* This is called when the pulsar client send a subscribe request.
*/
void onSubscribe(CommandSubscribe command);
void onCommand(BaseCommand command, PulsarDecoder decoder);

/**
* This is called when the pulsar client send a unsubscribe request.
* Initialize the broker event listener.
*
* @throws Exception when fail to initialize the broker event listener.
*/
void onUnsubscribe(CommandUnsubscribe command);
void initialize() throws Exception;

/**
* This is called when the pulsar client close a producer or the client disconnected.
*/
void onCloseProducer(CommandCloseProducer command);
BrokerEventListener DISABLED = new BrokerEventListenerDisabled();

/**
* This is called when the pulsar client close a consumer or the client disconnected.
*/
void onCloseConsumer(CommandCloseConsumer command);
class BrokerEventListenerDisabled implements BrokerEventListener {

@Override
public void onCommand(BaseCommand command, PulsarDecoder decoder) {
//No-op
}

@Override
public void initialize() throws Exception {
//No-op
}

@Override
public void close() {
//No-op
}
}

/**
* Close this broker event listener.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
import org.apache.pulsar.common.events.BrokerEventListener;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -83,6 +84,8 @@
*/
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {

protected BrokerEventListener brokerEventListener = BrokerEventListener.DISABLED;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Get a buffer that contains the full frame
Expand All @@ -107,7 +110,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}

messageReceived();

brokerEventListener.onCommand(cmd, this);
switch (cmd.getType()) {
case PARTITIONED_METADATA:
checkArgument(cmd.hasPartitionMetadata());
Expand Down

0 comments on commit 196e6df

Please sign in to comment.