Skip to content

Commit a895a59

Browse files
committed
[PIP-150][improve][broker] Support read the message of startMessageId position on the broker side
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 1d174db commit a895a59

File tree

16 files changed

+508
-65
lines changed

16 files changed

+508
-65
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java

+58-1
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,32 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<Strin
249249
Map<String, String> cursorProperties)
250250
throws InterruptedException, ManagedLedgerException;
251251

252+
/**
253+
* Open a ManagedCursor in this ManagedLedger.
254+
* <p>
255+
* If the cursors doesn't exist, a new one will be created and its position will be at the end of the ManagedLedger.
256+
*
257+
* @param name
258+
* the name associated with the ManagedCursor
259+
* @param initialPosition
260+
* the cursor will be set at latest position or not when first created
261+
* default is <b>true</b>
262+
* @param properties
263+
* user defined properties that will be attached to the first position of the cursor, if the open
264+
* operation will trigger the creation of the cursor.
265+
* @param cursorProperties
266+
* the properties for the Cursor
267+
* @param inclusive
268+
* whether to start read from the specified position
269+
* @return the ManagedCursor
270+
* @throws ManagedLedgerException
271+
*/
272+
default ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
273+
Map<String, String> cursorProperties, boolean inclusive)
274+
throws InterruptedException, ManagedLedgerException {
275+
return openCursor(name, initialPosition, properties, cursorProperties);
276+
}
277+
252278
/**
253279
* Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
254280
* exactly like a normal cursor, with the only difference in that after restart it will not remember which entries
@@ -269,6 +295,12 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<Strin
269295
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, InitialPosition initialPosition,
270296
boolean isReadCompacted) throws ManagedLedgerException;
271297

298+
default ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName,
299+
InitialPosition initialPosition, boolean isReadCompacted,
300+
boolean inclusive) throws ManagedLedgerException {
301+
return newNonDurableCursor(startPosition, subscriptionName, initialPosition, isReadCompacted);
302+
}
303+
272304
/**
273305
* Delete a ManagedCursor asynchronously.
274306
*
@@ -348,7 +380,32 @@ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionNam
348380
* opaque context
349381
*/
350382
void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
351-
Map<String, String> cursorProperties, OpenCursorCallback callback, Object ctx);
383+
Map<String, String> cursorProperties, OpenCursorCallback callback,
384+
Object ctx);
385+
386+
/**
387+
* Open a ManagedCursor asynchronously.
388+
*
389+
* @see #openCursor(String)
390+
* @param name
391+
* the name associated with the ManagedCursor
392+
* @param initialPosition
393+
* the cursor will be set at lastest position or not when first created
394+
* default is <b>true</b>
395+
* @param cursorProperties
396+
* the properties for the Cursor
397+
* @param inclusive
398+
* whether to read from the specified position
399+
* @param callback
400+
* callback object
401+
* @param ctx
402+
* opaque context
403+
*/
404+
default void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
405+
Map<String, String> cursorProperties, boolean inclusive, OpenCursorCallback callback,
406+
Object ctx) {
407+
asyncOpenCursor(name, initialPosition, properties, cursorProperties, callback, ctx);
408+
}
352409

353410
/**
354411
* Get a list of all the cursors reading from this ManagedLedger.

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

+26-7
Original file line numberDiff line numberDiff line change
@@ -1674,16 +1674,35 @@ boolean hasMoreEntries(PositionImpl position) {
16741674
return false;
16751675
}
16761676

1677-
void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
1678-
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
1679-
markDeletePosition = lastPositionCounter.getLeft();
1680-
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
1681-
persistentMarkDeletePosition = null;
1682-
inProgressMarkDeletePersistPosition = null;
1677+
protected long countMessagesConsumed(PositionImpl position) {
1678+
if (position == null) {
1679+
return 0;
1680+
}
1681+
Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter();
1682+
long initialBacklog = position.compareTo(lastEntryAndCounter.getLeft()) < 0
1683+
? ledger.getNumberOfEntries(Range.closed(position, lastEntryAndCounter.getLeft())) : 0;
1684+
return lastEntryAndCounter.getRight() - initialBacklog;
1685+
}
16831686

1687+
void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter, boolean inclusive) {
1688+
if (inclusive) {
1689+
// when entry id is invalid, we need to get a valid position.
1690+
if (lastPositionCounter.getLeft().getEntryId() < 0) {
1691+
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
1692+
} else {
1693+
readPosition = lastPositionCounter.getLeft();
1694+
}
1695+
markDeletePosition = PositionImpl.get(readPosition.getLedgerId(), readPosition.getEntryId() - 1);
1696+
} else {
1697+
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
1698+
markDeletePosition = lastPositionCounter.getLeft();
1699+
}
16841700
// Initialize the counter such that the difference between the messages written on the ML and the
16851701
// messagesConsumed is 0, to ensure the initial backlog count is 0.
1686-
messagesConsumedCounter = lastPositionCounter.getRight();
1702+
messagesConsumedCounter = countMessagesConsumed(readPosition);
1703+
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
1704+
persistentMarkDeletePosition = null;
1705+
inProgressMarkDeletePersistPosition = null;
16871706
}
16881707

16891708
/**

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

+30-7
Original file line numberDiff line numberDiff line change
@@ -859,14 +859,21 @@ public ManagedCursor openCursor(String cursorName, InitialPosition initialPositi
859859
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties,
860860
Map<String, String> cursorProperties)
861861
throws InterruptedException, ManagedLedgerException {
862+
return openCursor(cursorName, initialPosition, properties, cursorProperties, false);
863+
}
864+
865+
@Override
866+
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties,
867+
Map<String, String> cursorProperties, boolean inclusive)
868+
throws InterruptedException, ManagedLedgerException {
862869
final CountDownLatch counter = new CountDownLatch(1);
863870
class Result {
864871
ManagedCursor cursor = null;
865872
ManagedLedgerException exception = null;
866873
}
867874
final Result result = new Result();
868875

869-
asyncOpenCursor(cursorName, initialPosition, properties, cursorProperties, new OpenCursorCallback() {
876+
asyncOpenCursor(cursorName, initialPosition, properties, cursorProperties, inclusive, new OpenCursorCallback() {
870877
@Override
871878
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
872879
result.cursor = cursor;
@@ -907,8 +914,16 @@ public void asyncOpenCursor(final String cursorName, final InitialPosition initi
907914

908915
@Override
909916
public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
910-
Map<String, Long> properties, Map<String, String> cursorProperties,
917+
Map<String, Long> properties, Map<String, String> cursorProperties,
911918
final OpenCursorCallback callback, final Object ctx) {
919+
this.asyncOpenCursor(cursorName, initialPosition, properties, cursorProperties, false
920+
callback, ctx);
921+
}
922+
923+
@Override
924+
public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
925+
Map<String, Long> properties, Map<String, String> cursorProperties,
926+
boolean inclusive, final OpenCursorCallback callback, final Object ctx) {
912927
try {
913928
checkManagedLedgerIsOpen();
914929
checkFenced();
@@ -920,9 +935,9 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
920935
if (uninitializedCursors.containsKey(cursorName)) {
921936
uninitializedCursors.get(cursorName).thenAccept(cursor -> callback.openCursorComplete(cursor, ctx))
922937
.exceptionally(ex -> {
923-
callback.openCursorFailed((ManagedLedgerException) ex, ctx);
924-
return null;
925-
});
938+
callback.openCursorFailed((ManagedLedgerException) ex, ctx);
939+
return null;
940+
});
926941
return;
927942
}
928943
ManagedCursor cachedCursor = cursors.get(cursorName);
@@ -949,7 +964,7 @@ public void operationComplete() {
949964
cursor.setActive();
950965
// Update the ack position (ignoring entries that were written while the cursor was being created)
951966
cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter()
952-
: getFirstPositionAndCounter());
967+
: getFirstPositionAndCounter(), inclusive);
953968

954969
synchronized (ManagedLedgerImpl.this) {
955970
cursors.add(cursor);
@@ -1066,6 +1081,14 @@ public ManagedCursor newNonDurableCursor(Position startPosition, String subscrip
10661081
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName,
10671082
InitialPosition initialPosition, boolean isReadCompacted)
10681083
throws ManagedLedgerException {
1084+
return newNonDurableCursor(startCursorPosition, cursorName, initialPosition, isReadCompacted, false);
1085+
}
1086+
1087+
@Override
1088+
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName,
1089+
InitialPosition initialPosition,
1090+
boolean isReadCompacted, boolean inclusive)
1091+
throws ManagedLedgerException {
10691092
Objects.requireNonNull(cursorName, "cursor name can't be null");
10701093
checkManagedLedgerIsOpen();
10711094
checkFenced();
@@ -1079,7 +1102,7 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu
10791102
}
10801103

10811104
NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
1082-
(PositionImpl) startCursorPosition, initialPosition, isReadCompacted);
1105+
(PositionImpl) startCursorPosition, initialPosition, isReadCompacted, inclusive);
10831106
cursor.setActive();
10841107

10851108
log.info("[{}] Opened new cursor: {}", name, cursor);

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java

+20-15
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import com.google.common.base.MoreObjects;
22-
import com.google.common.collect.Range;
2322
import java.util.Map;
2423
import org.apache.bookkeeper.client.BookKeeper;
2524
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
2625
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
2726
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
2827
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
29-
import org.apache.commons.lang3.tuple.Pair;
3028
import org.apache.pulsar.common.api.proto.CommandSubscribe;
3129
import org.slf4j.Logger;
3230
import org.slf4j.LoggerFactory;
@@ -37,7 +35,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
3735

3836
NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
3937
PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition,
40-
boolean isReadCompacted) {
38+
boolean isReadCompacted, boolean inclusive) {
4139
super(bookkeeper, config, ledger, cursorName);
4240
this.readCompacted = isReadCompacted;
4341

@@ -48,39 +46,46 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
4846
// Start from last entry
4947
switch (initialPosition) {
5048
case Latest:
51-
initializeCursorPosition(ledger.getLastPositionAndCounter());
49+
initializeCursorPosition(ledger.getLastPositionAndCounter(), inclusive);
5250
break;
5351
case Earliest:
54-
initializeCursorPosition(ledger.getFirstPositionAndCounter());
52+
initializeCursorPosition(ledger.getFirstPositionAndCounter(), inclusive);
5553
break;
5654
}
5755
} else if (startCursorPosition.getLedgerId() == PositionImpl.EARLIEST.getLedgerId()) {
5856
// Start from invalid ledger to read from first available entry
59-
recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
57+
recoverCursor(ledger.getFirstPosition(), inclusive);
6058
} else {
6159
// Since the cursor is positioning on the mark-delete position, we need to take 1 step back from the desired
6260
// read-position
63-
recoverCursor(startCursorPosition);
61+
recoverCursor(startCursorPosition, inclusive);
6462
}
6563
STATE_UPDATER.set(this, State.Open);
6664
log.info("[{}] Created non-durable cursor read-position={} mark-delete-position={}", ledger.getName(),
6765
readPosition, markDeletePosition);
6866
}
6967

70-
private void recoverCursor(PositionImpl mdPosition) {
71-
Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter();
72-
this.readPosition = isReadCompacted() ? mdPosition.getNext() : ledger.getNextValidPosition(mdPosition);
73-
markDeletePosition = mdPosition;
68+
private void recoverCursor(PositionImpl mdPosition, boolean inclusive) {
69+
if (inclusive) {
70+
// when entry id is invalid, we need to get a valid position.
71+
if (mdPosition.getEntryId() < 0) {
72+
readPosition = ledger.getNextValidPosition(mdPosition);
73+
} else {
74+
readPosition = mdPosition;
75+
}
76+
markDeletePosition = PositionImpl.get(readPosition.getLedgerId(), readPosition.getEntryId() - 1);
77+
} else {
78+
readPosition = isReadCompacted() ? mdPosition.getNext() : ledger.getNextValidPosition(mdPosition);
79+
markDeletePosition = mdPosition;
80+
}
7481

7582
// Initialize the counter such that the difference between the messages written on the ML and the
7683
// messagesConsumed is equal to the current backlog (negated).
7784
if (null != this.readPosition) {
78-
long initialBacklog = readPosition.compareTo(lastEntryAndCounter.getLeft()) < 0
79-
? ledger.getNumberOfEntries(Range.closed(readPosition, lastEntryAndCounter.getLeft())) : 0;
80-
messagesConsumedCounter = lastEntryAndCounter.getRight() - initialBacklog;
85+
messagesConsumedCounter = countMessagesConsumed(readPosition);
8186
} else {
8287
log.warn("Recovered a non-durable cursor from position {} but didn't find a valid read position {}",
83-
mdPosition, readPosition);
88+
mdPosition, readPosition);
8489
}
8590
}
8691

0 commit comments

Comments
 (0)