Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] Broker extensions to allow operators of enterprise wide cluster better control and flexibility #12536

Merged
merged 18 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
ffea4f7
Broker extensions to allow cluster operators more flexibility and con…
madhavan-narayanan Oct 28, 2021
db58618
Broker extensions to allow cluster operators more flexibility and con…
madhavan-narayanan Oct 29, 2021
1c255de
To support a list of interceptors for payload processing. LinkedHashS…
madhavan-narayanan Oct 31, 2021
654394e
Fixes to address review comments
madhavan-narayanan Nov 11, 2021
5338fee
Client configuration change not needed
madhavan-narayanan Dec 7, 2021
4bce0d2
Broker extensions to allow cluster operators more flexibility and con…
madhavan-narayanan Oct 28, 2021
e8f93e0
Broker extensions to allow cluster operators more flexibility and con…
madhavan-narayanan Oct 29, 2021
fa63498
To support a list of interceptors for payload processing. LinkedHashS…
madhavan-narayanan Oct 31, 2021
0000094
Fixes to address review comments
madhavan-narayanan Nov 11, 2021
69edd06
Client configuration change not needed
madhavan-narayanan Dec 7, 2021
b3a7ec6
resolved conflicts
madhavan-narayanan Dec 13, 2021
10e6ea3
Fixes for review comments
madhavan-narayanan Dec 13, 2021
f42542c
Fixes for review comments
madhavan-narayanan Dec 13, 2021
a3071e6
Fixes for review comments
madhavan-narayanan Dec 14, 2021
5c7301d
Merge remote-tracking branch 'apache/master' into broker_extensions
codelipenghui Dec 14, 2021
74ec4cc
Fixes build failure
madhavan-narayanan Dec 14, 2021
d368c05
Fixes for review comments
madhavan-narayanan Dec 14, 2021
45f4a86
Merge remote-tracking branch 'apache/master' into broker_extensions
codelipenghui Dec 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.RangeCache;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -50,6 +51,7 @@ public class EntryCacheImpl implements EntryCache {

private final EntryCacheManager manager;
private final ManagedLedgerImpl ml;
private ManagedLedgerInterceptor interceptor;
private final RangeCache<PositionImpl, EntryImpl> entries;
private final boolean copyEntries;

Expand All @@ -58,6 +60,7 @@ public class EntryCacheImpl implements EntryCache {
public EntryCacheImpl(EntryCacheManager manager, ManagedLedgerImpl ml, boolean copyEntries) {
this.manager = manager;
this.ml = ml;
this.interceptor = ml.getManagedLedgerInterceptor();
this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
this.copyEntries = copyEntries;

Expand Down Expand Up @@ -215,7 +218,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
EntryImpl returnEntry = EntryCacheManager.create(ledgerEntry, interceptor);

manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
ml.mbean.addReadEntriesSample(1, returnEntry.getLength());
Expand Down Expand Up @@ -301,7 +304,7 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
final List<EntryImpl> entriesToReturn
= Lists.newArrayListWithExpectedSize(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry = EntryImpl.create(e);
EntryImpl entry = EntryCacheManager.create(e, interceptor);

entriesToReturn.add(entry);
totalSize += entry.getLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -154,9 +156,11 @@ public void clear() {

protected class EntryCacheDisabled implements EntryCache {
private final ManagedLedgerImpl ml;
private final ManagedLedgerInterceptor interceptor;

public EntryCacheDisabled(ManagedLedgerImpl ml) {
this.ml = ml;
this.interceptor = ml.getManagedLedgerInterceptor();
}

@Override
Expand Down Expand Up @@ -200,7 +204,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
try {
for (LedgerEntry e : ledgerEntries) {
// Insert the entries at the end of the list (they will be unsorted for now)
EntryImpl entry = EntryImpl.create(e);
EntryImpl entry = create(e, interceptor);
entries.add(entry);
totalSize += entry.getLength();
}
Expand Down Expand Up @@ -232,7 +236,7 @@ public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
EntryImpl returnEntry = create(ledgerEntry, interceptor);

mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
ml.getMBean().addReadEntriesSample(1, returnEntry.getLength());
Expand Down Expand Up @@ -262,5 +266,26 @@ public static Entry create(long ledgerId, long entryId, ByteBuf data) {
return EntryImpl.create(ledgerId, entryId, data);
}

public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor) {
ManagedLedgerInterceptor.PayloadProcessorHandle processorHandle = null;
if (interceptor != null) {
ByteBuf duplicateBuffer = ledgerEntry.getEntryBuffer().retainedDuplicate();
processorHandle = interceptor
.processPayloadBeforeEntryCache(duplicateBuffer);
if (processorHandle != null) {
ledgerEntry = LedgerEntryImpl.create(ledgerEntry.getLedgerId(),ledgerEntry.getEntryId(),
ledgerEntry.getLength(),processorHandle.getProcessedPayload());
} else {
duplicateBuffer.release();
}
}
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
if (processorHandle != null) {
processorHandle.release();
ledgerEntry.close();
}
return returnEntry;
}

private static final Logger log = LoggerFactory.getLogger(EntryCacheManager.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
STATE_UPDATER.set(this, State.None);
this.ledgersStat = null;
this.mbean = new ManagedLedgerMBeanImpl(this);
if (config.getManagedLedgerInterceptor() != null) {
this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
}
this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
this.waitingCursors = Queues.newConcurrentLinkedQueue();
this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue();
Expand All @@ -319,9 +322,6 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.maximumRolloverTimeMs = getMaximumRolloverTimeMs(config);
this.mlOwnershipChecker = mlOwnershipChecker;
this.propertiesMap = Maps.newHashMap();
if (config.getManagedLedgerInterceptor() != null) {
this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
}
}

synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.SafeRunnable;

Expand Down Expand Up @@ -63,6 +64,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
@SuppressWarnings("unused")
ByteBuf data;
private int dataLength;
private ManagedLedgerInterceptor.PayloadProcessorHandle payloadProcessorHandle = null;

private static final AtomicReferenceFieldUpdater<OpAddEntry, OpAddEntry.State> STATE_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state");
Expand Down Expand Up @@ -105,6 +107,7 @@ private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, B
op.entryId = -1;
op.startTime = System.nanoTime();
op.state = State.OPEN;
op.payloadProcessorHandle = null;
ml.mbean.addAddEntrySample(op.dataLength);
return op;
}
Expand All @@ -125,6 +128,12 @@ public void initiate() {
// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
lastInitTime = System.nanoTime();
if (ml.getManagedLedgerInterceptor() != null) {
payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this, duplicateBuffer);
if (payloadProcessorHandle != null) {
duplicateBuffer = payloadProcessorHandle.getProcessedPayload();
}
}
ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
} else {
log.warn("[{}] initiate with unexpected state {}, expect OPEN state.", ml.getName(), state);
Expand All @@ -137,6 +146,9 @@ public void failed(ManagedLedgerException e) {
ReferenceCountUtil.release(data);
cb.addFailed(e, ctx);
ml.mbean.recordAddEntryError();
if (payloadProcessorHandle != null) {
payloadProcessorHandle.release();
}
}
}

Expand Down Expand Up @@ -177,6 +189,9 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
// Called in executor hashed on managed ledger name, once the add operation is complete
@Override
public void safeRun() {
if (payloadProcessorHandle != null) {
payloadProcessorHandle.release();
}
// Remove this entry from the head of the pending queue
OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
if (firstInQueue == null) {
Expand Down Expand Up @@ -308,6 +323,10 @@ public int getNumberOfMessages() {
return numberOfMessages;
}

public Object getCtx() {
return ctx;
}

public void setNumberOfMessages(int numberOfMessages) {
this.numberOfMessages = numberOfMessages;
}
Expand Down Expand Up @@ -343,6 +362,7 @@ public void recycle() {
entryId = -1;
startTime = -1;
lastInitTime = -1;
payloadProcessorHandle = null;
recyclerHandle.recycle(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.intercept;

import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
Expand Down Expand Up @@ -58,4 +59,38 @@ public interface ManagedLedgerInterceptor {
* @param propertiesMap map of properties.
*/
void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap);

/**
* A reference handle to the payload processor
*/
interface PayloadProcessorHandle {
/**
* To obtain the processed data
* @return processed data
*/
ByteBuf getProcessedPayload();

/**
* To release resources used in processor, if any
*/
void release();
}
/**
* Intercept after entry is read from ledger, before it gets cached.
* @param dataReadFromLedger data from ledger
* @return handle to the processor
*/
default PayloadProcessorHandle processPayloadBeforeEntryCache(ByteBuf dataReadFromLedger){
return null;
}

/**
* Intercept before payload gets written to ledger
* @param ledgerWriteOp OpAddEntry used to trigger ledger write.
* @param dataToBeStoredInLedger data to be stored in ledger
* @return handle to the processor
*/
default PayloadProcessorHandle processPayloadBeforeLedgerWrite(OpAddEntry ledgerWriteOp, ByteBuf dataToBeStoredInLedger){
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.util.internal.PlatformDependent;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -1089,6 +1090,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean disableBrokerInterceptors = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "List of interceptors for payload processing.")
private Set<String> brokerEntryPayloadProcessors = new LinkedHashSet<>();

@FieldContext(
doc = "There are two policies to apply when broker metadata session expires: session expired happens, \"shutdown\" or \"reconnect\". \n\n"
+ " With \"shutdown\", the broker will be restarted.\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@
*/
package org.apache.pulsar.broker.intercept;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Map;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
Expand Down Expand Up @@ -58,6 +64,65 @@ default void beforeSendMessage(Subscription subscription,
MessageMetadata msgMetadata) {
}

/**
* Called by the broker when a new connection is created.
*/
default void onConnectionCreated(ServerCnx cnx){
}

/**
* Called by the broker when a new connection is created.
*/
default void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
}

/**
* Intercept after a consumer is created.
*
* @param cnx client Connection
* @param consumer Consumer object
* @param metadata A map of metdata
*/
default void consumerCreated(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
}

/**
* Intercept after a message is produced.
*
* @param cnx client Connection
* @param producer Producer object
* @param publishContext Publish Context
*/
default void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please provide more context about why we should add rateIn as a param for this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently used in our logs to indicate the overall publish rate for the topic seen by the broker when this message was stored in the ledger. Used to troubleshoot latency issues and to support customer queries. Is there anything wrong in exposing this parameter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I think it's better to get the rateIn from the producer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. It was an oversight. I used rateIn as it was readily computed and available in MessagePublishContext::run. I will fix it now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix done

long entryId, Topic.PublishContext publishContext) {
}

/**
* Intercept after a message is dispatched to consumer.
*
* @param cnx client Connection
* @param consumer Consumer object
* @param ledgerId Ledger ID
* @param entryId Entry ID
* @param headersAndPayload Data
*/
default void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
long entryId, ByteBuf headersAndPayload) {
}

/**
* Intercept after a message ack is processed.
*
* @param cnx client Connection
* @param ackCmd Command object
*/
default void messageAcked(ServerCnx cnx, Consumer consumer,
CommandAck ackCmd) {
}

/**
* Called by the broker while new command incoming.
*/
Expand Down
Loading