Skip to content

Commit

Permalink
[pulsar-broker] Broker extensions to allow operators of enterprise wi…
Browse files Browse the repository at this point in the history
…de cluster better control and flexibility (#12536)

### Motivation

Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this

### Modifications

- Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing
- Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations
- Enhanced PulsarAdmin to give operators a control in managing super-users

### Verifying this change

- [x ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:
  - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *

(cherry picked from commit 03bbc8e)
  • Loading branch information
madhavan-narayanan authored and congbobo184 committed Nov 30, 2022
1 parent ef3fa9e commit c7124bb
Show file tree
Hide file tree
Showing 20 changed files with 702 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.RangeCache;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -50,6 +51,7 @@ public class EntryCacheImpl implements EntryCache {

private final EntryCacheManager manager;
private final ManagedLedgerImpl ml;
private ManagedLedgerInterceptor interceptor;
private final RangeCache<PositionImpl, EntryImpl> entries;
private final boolean copyEntries;

Expand All @@ -58,6 +60,7 @@ public class EntryCacheImpl implements EntryCache {
public EntryCacheImpl(EntryCacheManager manager, ManagedLedgerImpl ml, boolean copyEntries) {
this.manager = manager;
this.ml = ml;
this.interceptor = ml.getManagedLedgerInterceptor();
this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
this.copyEntries = copyEntries;

Expand Down Expand Up @@ -215,7 +218,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
EntryImpl returnEntry = EntryCacheManager.create(ledgerEntry, interceptor);

manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
ml.mbean.addReadEntriesSample(1, returnEntry.getLength());
Expand Down Expand Up @@ -301,7 +304,7 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
final List<EntryImpl> entriesToReturn
= Lists.newArrayListWithExpectedSize(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry = EntryImpl.create(e);
EntryImpl entry = EntryCacheManager.create(e, interceptor);

entriesToReturn.add(entry);
totalSize += entry.getLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -154,9 +156,11 @@ public void clear() {

protected class EntryCacheDisabled implements EntryCache {
private final ManagedLedgerImpl ml;
private final ManagedLedgerInterceptor interceptor;

public EntryCacheDisabled(ManagedLedgerImpl ml) {
this.ml = ml;
this.interceptor = ml.getManagedLedgerInterceptor();
}

@Override
Expand Down Expand Up @@ -200,7 +204,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
try {
for (LedgerEntry e : ledgerEntries) {
// Insert the entries at the end of the list (they will be unsorted for now)
EntryImpl entry = EntryImpl.create(e);
EntryImpl entry = create(e, interceptor);
entries.add(entry);
totalSize += entry.getLength();
}
Expand Down Expand Up @@ -232,7 +236,7 @@ public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
EntryImpl returnEntry = create(ledgerEntry, interceptor);

mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
ml.getMBean().addReadEntriesSample(1, returnEntry.getLength());
Expand Down Expand Up @@ -262,5 +266,26 @@ public static Entry create(long ledgerId, long entryId, ByteBuf data) {
return EntryImpl.create(ledgerId, entryId, data);
}

public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor) {
ManagedLedgerInterceptor.PayloadProcessorHandle processorHandle = null;
if (interceptor != null) {
ByteBuf duplicateBuffer = ledgerEntry.getEntryBuffer().retainedDuplicate();
processorHandle = interceptor
.processPayloadBeforeEntryCache(duplicateBuffer);
if (processorHandle != null) {
ledgerEntry = LedgerEntryImpl.create(ledgerEntry.getLedgerId(),ledgerEntry.getEntryId(),
ledgerEntry.getLength(),processorHandle.getProcessedPayload());
} else {
duplicateBuffer.release();
}
}
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
if (processorHandle != null) {
processorHandle.release();
ledgerEntry.close();
}
return returnEntry;
}

private static final Logger log = LoggerFactory.getLogger(EntryCacheManager.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
STATE_UPDATER.set(this, State.None);
this.ledgersStat = null;
this.mbean = new ManagedLedgerMBeanImpl(this);
if (config.getManagedLedgerInterceptor() != null) {
this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
}
this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
this.waitingCursors = Queues.newConcurrentLinkedQueue();
this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue();
Expand All @@ -320,9 +323,6 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.maximumRolloverTimeMs = getMaximumRolloverTimeMs(config);
this.mlOwnershipChecker = mlOwnershipChecker;
this.propertiesMap = Maps.newHashMap();
if (config.getManagedLedgerInterceptor() != null) {
this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
}
}

synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.SafeRunnable;

Expand Down Expand Up @@ -63,6 +64,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
@SuppressWarnings("unused")
ByteBuf data;
private int dataLength;
private ManagedLedgerInterceptor.PayloadProcessorHandle payloadProcessorHandle = null;

private static final AtomicReferenceFieldUpdater<OpAddEntry, OpAddEntry.State> STATE_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state");
Expand Down Expand Up @@ -105,6 +107,7 @@ private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, B
op.entryId = -1;
op.startTime = System.nanoTime();
op.state = State.OPEN;
op.payloadProcessorHandle = null;
ml.mbean.addAddEntrySample(op.dataLength);
return op;
}
Expand All @@ -125,6 +128,12 @@ public void initiate() {
// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
lastInitTime = System.nanoTime();
if (ml.getManagedLedgerInterceptor() != null) {
payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this, duplicateBuffer);
if (payloadProcessorHandle != null) {
duplicateBuffer = payloadProcessorHandle.getProcessedPayload();
}
}
ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
} else {
log.warn("[{}] initiate with unexpected state {}, expect OPEN state.", ml.getName(), state);
Expand All @@ -137,6 +146,9 @@ public void failed(ManagedLedgerException e) {
ReferenceCountUtil.release(data);
cb.addFailed(e, ctx);
ml.mbean.recordAddEntryError();
if (payloadProcessorHandle != null) {
payloadProcessorHandle.release();
}
}
}

Expand Down Expand Up @@ -177,6 +189,9 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
// Called in executor hashed on managed ledger name, once the add operation is complete
@Override
public void safeRun() {
if (payloadProcessorHandle != null) {
payloadProcessorHandle.release();
}
// Remove this entry from the head of the pending queue
OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
if (firstInQueue == null) {
Expand Down Expand Up @@ -308,6 +323,10 @@ public int getNumberOfMessages() {
return numberOfMessages;
}

public Object getCtx() {
return ctx;
}

public void setNumberOfMessages(int numberOfMessages) {
this.numberOfMessages = numberOfMessages;
}
Expand Down Expand Up @@ -343,6 +362,7 @@ public void recycle() {
entryId = -1;
startTime = -1;
lastInitTime = -1;
payloadProcessorHandle = null;
recyclerHandle.recycle(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.intercept;

import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
Expand Down Expand Up @@ -58,4 +59,38 @@ public interface ManagedLedgerInterceptor {
* @param propertiesMap map of properties.
*/
void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap);

/**
* A reference handle to the payload processor
*/
interface PayloadProcessorHandle {
/**
* To obtain the processed data
* @return processed data
*/
ByteBuf getProcessedPayload();

/**
* To release resources used in processor, if any
*/
void release();
}
/**
* Intercept after entry is read from ledger, before it gets cached.
* @param dataReadFromLedger data from ledger
* @return handle to the processor
*/
default PayloadProcessorHandle processPayloadBeforeEntryCache(ByteBuf dataReadFromLedger){
return null;
}

/**
* Intercept before payload gets written to ledger
* @param ledgerWriteOp OpAddEntry used to trigger ledger write.
* @param dataToBeStoredInLedger data to be stored in ledger
* @return handle to the processor
*/
default PayloadProcessorHandle processPayloadBeforeLedgerWrite(OpAddEntry ledgerWriteOp, ByteBuf dataToBeStoredInLedger){
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.util.internal.PlatformDependent;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -1056,6 +1057,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean disableBrokerInterceptors = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "List of interceptors for payload processing.")
private Set<String> brokerEntryPayloadProcessors = new LinkedHashSet<>();

@FieldContext(
doc = "There are two policies to apply when broker metadata session expires: session expired happens, \"shutdown\" or \"reconnect\". \n\n"
+ " With \"shutdown\", the broker will be restarted.\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@
*/
package org.apache.pulsar.broker.intercept;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Map;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
Expand Down Expand Up @@ -58,6 +64,65 @@ default void beforeSendMessage(Subscription subscription,
MessageMetadata msgMetadata) {
}

/**
* Called by the broker when a new connection is created.
*/
default void onConnectionCreated(ServerCnx cnx){
}

/**
* Called by the broker when a new connection is created.
*/
default void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
}

/**
* Intercept after a consumer is created.
*
* @param cnx client Connection
* @param consumer Consumer object
* @param metadata A map of metdata
*/
default void consumerCreated(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
}

/**
* Intercept after a message is produced.
*
* @param cnx client Connection
* @param producer Producer object
* @param publishContext Publish Context
*/
default void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId, Topic.PublishContext publishContext) {
}

/**
* Intercept after a message is dispatched to consumer.
*
* @param cnx client Connection
* @param consumer Consumer object
* @param ledgerId Ledger ID
* @param entryId Entry ID
* @param headersAndPayload Data
*/
default void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
long entryId, ByteBuf headersAndPayload) {
}

/**
* Intercept after a message ack is processed.
*
* @param cnx client Connection
* @param ackCmd Command object
*/
default void messageAcked(ServerCnx cnx, Consumer consumer,
CommandAck ackCmd) {
}

/**
* Called by the broker while new command incoming.
*/
Expand Down
Loading

0 comments on commit c7124bb

Please sign in to comment.