Skip to content

Commit

Permalink
ManagedCursor: manually serialise PositionInfo (apache#270)
Browse files Browse the repository at this point in the history
* ManagedCursor: manually serialise PositionInfo
* Add tests and save last serialized side to prevent reallocations

(cherry picked from commit 8a365d0)
  • Loading branch information
eolivelli authored and dlg99 committed May 28, 2024
1 parent 062865e commit 132828b
Show file tree
Hide file tree
Showing 4 changed files with 1,679 additions and 43 deletions.
1 change: 1 addition & 0 deletions buildtools/src/main/resources/pulsar/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<suppress checks=".*" files=".+[\\/]generated[\\/].+\.java"/>
<suppress checks=".*" files=".+[\\/]generated-sources[\\/].+\.java"/>
<suppress checks=".*" files=".+[\\/]generated-test-sources[\\/].+\.java"/>
<suppress checks=".*" files=".+PositionInfoUtils.java"/>

<!-- suppress most all checks expect below-->
<suppress checks="^(?!.*(UnusedImports|IllegalImport)).*$" files=".*[\\/]src[\\/]test[\\/].*"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ protected LightMLDataFormats.PositionInfo initialValue() {
private final BookKeeper.DigestType digestType;

protected volatile PositionImpl markDeletePosition;
private int lastSerializedSize;

// this position is have persistent mark delete position
protected volatile PositionImpl persistentMarkDeletePosition;
Expand Down Expand Up @@ -250,7 +251,7 @@ protected LightMLDataFormats.PositionInfo initialValue() {
// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;

class MarkDeleteEntry {
static class MarkDeleteEntry {
final PositionImpl newPosition;
final MarkDeleteCallback callback;
final Object ctx;
Expand Down Expand Up @@ -679,6 +680,52 @@ private ChunkSequenceFooter parseChunkSequenceFooter(byte[] data) throws IOExcep
return ObjectMapperFactory.getMapper().getObjectMapper().readValue(data, ChunkSequenceFooter.class);
}

private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byte[] data) {
mbean.addReadCursorLedgerSize(data.length);

try {
data = decompressDataIfNeeded(data, lh);
} catch (Throwable e) {
callback.operationFailed(new ManagedLedgerException(e));
return;
}

PositionInfo positionInfo;
try {
positionInfo = PositionInfo.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
log.error("[{}] Failed to parse position info from ledger {} for cursor {}: {}", ledger.getName(),
lh.getId(), name, e);
// Rewind to oldest entry available
positionInfo = PositionInfo
.newBuilder()
.setLedgerId(-1)
.setEntryId(-1)
.build();
}

Map<String, Long> recoveredProperties = Collections.emptyMap();
if (positionInfo.getPropertiesCount() > 0) {
// Recover properties map
recoveredProperties = new HashMap<>();
for (int i = 0; i < positionInfo.getPropertiesCount(); i++) {
LongProperty property = positionInfo.getProperties(i);
recoveredProperties.put(property.getName(), property.getValue());
}
}

PositionImpl position = new PositionImpl(positionInfo);
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
if (getConfig().isDeletionAtBatchIndexLevelEnabled()
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
recoveredCursor(position, recoveredProperties, cursorProperties, lh);
callback.operationComplete();
}

private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
log.info("[{}] [{}] Recovering individual deleted messages. Number of ranges: {}",
ledger.getName(), name, individualDeletedMessagesList.size());
Expand Down Expand Up @@ -3145,34 +3192,28 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
}
}

private void addIndividualDeletedMessageRanges(LightMLDataFormats.PositionInfo lpi) {
private void scanIndividualDeletedMessageRanges(
PositionInfoUtils.IndividuallyDeletedMessagesRangeConsumer consumer) {
final int maxUnackedRangesToPersist = getConfig().getMaxUnackedRangesToPersist();
AtomicInteger acksSerializedSize = new AtomicInteger(0);
AtomicInteger rangeCount = new AtomicInteger(0);

lock.readLock().lock();
try {
if (individualDeletedMessages.isEmpty()) {
this.individualDeletedMessagesSerializedSize = 0;
return;
}

AtomicInteger acksSerializedSize = new AtomicInteger(0);
AtomicInteger rangeCount = new AtomicInteger(0);

individualDeletedMessages.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
LightMLDataFormats.MessageRange messageRange = lpi.addIndividualDeletedMessage();
messageRange.setLowerEndpoint()
.setLedgerId(lowerKey)
.setEntryId(lowerValue);
messageRange.setUpperEndpoint()
.setLedgerId(upperKey)
.setEntryId(upperValue);

acksSerializedSize.addAndGet(messageRange.getSerializedSize());

return rangeCount.incrementAndGet() <= getConfig().getMaxUnackedRangesToPersist();
acksSerializedSize.addAndGet(16 * 4);
consumer.acceptRange(lowerKey, lowerValue, upperKey, upperValue);
return rangeCount.incrementAndGet() <= maxUnackedRangesToPersist;
});

this.individualDeletedMessagesSerializedSize = acksSerializedSize.get();
individualDeletedMessages.resetDirtyKeys();
log.info("[{}] [{}] buildIndividualDeletedMessageRanges, numRanges {} "
log.info("[{}] [{}] scanIndividualDeletedMessageRanges, numRanges {} "
+ "individualDeletedMessagesSerializedSize {} rangeListSize {} "
+ "maxUnackedRangesToPersist {}",
ledger.getName(), name, individualDeletedMessages.size(),
Expand All @@ -3197,9 +3238,6 @@ private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && result.size() < getConfig().getMaxBatchDeletedIndexToPersist()) {
Map.Entry<PositionImpl, BitSetRecyclable> entry = iterator.next();
nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build());
long[] array = entry.getValue().toLongArray();
List<Long> deleteSet = new ArrayList<>(array.length);
for (long l : array) {
Expand All @@ -3215,27 +3253,23 @@ private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio
}
}

private void addAllBatchedEntryDeletionIndexInfo(LightMLDataFormats.PositionInfo lpi) {
private void buildBatchEntryDeletionIndexInfoList(
PositionInfoUtils.BatchedEntryDeletionIndexInfoConsumer consumer) {
if (!getConfig().isDeletionAtBatchIndexLevelEnabled()) {
return;
}
int maxBatchDeletedIndexToPersist = getConfig().getMaxBatchDeletedIndexToPersist();
lock.readLock().lock();
try {
if (!getConfig().isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) {
return;
}
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
int count = 0;
while (iterator.hasNext() && count < getConfig().getMaxBatchDeletedIndexToPersist()) {
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && count < maxBatchDeletedIndexToPersist) {
Map.Entry<PositionImpl, BitSetRecyclable> entry = iterator.next();

LightMLDataFormats.BatchedEntryDeletionIndexInfo batchInfo = lpi.addBatchedEntryDeletionIndexInfo();
batchInfo.setPosition()
.setLedgerId(entry.getKey().getLedgerId())
.setEntryId(entry.getKey().getEntryId());

long[] array = entry.getValue().toLongArray();
List<Long> deleteSet = new ArrayList<>(array.length);
for (long l : array) {
batchInfo.addDeleteSet(l);
}
consumer.acceptRange(entry.getKey().getLedgerId(), entry.getKey().getEntryId(), array);
count++;
}
} finally {
Expand All @@ -3254,23 +3288,17 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
long now = System.nanoTime();
PositionImpl position = mdEntry.newPosition;

LightMLDataFormats.PositionInfo pi = piThreadLocal.get();
pi.clear();

pi.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId());
addIndividualDeletedMessageRanges(pi);
addAllBatchedEntryDeletionIndexInfo(pi);
addAllProperties(pi, mdEntry.properties);

if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} Appending to ledger={} position={}", ledger.getName(), name, lh.getId(),
position);
}

requireNonNull(lh);
ByteBuf rawData = toByteBuf(pi);
ByteBuf rawData = PositionInfoUtils.serializePositionInfo(mdEntry, position,
this::scanIndividualDeletedMessageRanges, this::buildBatchEntryDeletionIndexInfoList,
lastSerializedSize);
long endSer = System.nanoTime();
this.lastSerializedSize = rawData.readableBytes();

// rawData is released by compressDataIfNeeded if needed
ByteBuf data = compressDataIfNeeded(rawData, lh);
Expand Down Expand Up @@ -3341,6 +3369,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}
}


private void writeToBookKeeperLastChunk(LedgerHandle lh,
MarkDeleteEntry mdEntry,
VoidCallback callback,
Expand Down Expand Up @@ -3977,4 +4006,5 @@ public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) thro
}
return newNonDurableCursor;
}

}
Loading

0 comments on commit 132828b

Please sign in to comment.