Skip to content

Commit 216b7a7

Browse files
committed
[PIP-150][improve][broker] Support read the message of startMessageId position on the broker side
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent a742eb5 commit 216b7a7

File tree

13 files changed

+499
-93
lines changed

13 files changed

+499
-93
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,10 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition) throws In
246246
ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties)
247247
throws InterruptedException, ManagedLedgerException;
248248

249+
ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
250+
boolean inclusive)
251+
throws InterruptedException, ManagedLedgerException;
252+
249253
/**
250254
* Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
251255
* exactly like a normal cursor, with the only difference in that after restart it will not remember which entries
@@ -262,9 +266,12 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<Strin
262266
* @return the new NonDurableCursor
263267
*/
264268
ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException;
265-
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException;
266-
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, InitialPosition initialPosition,
269+
ManagedCursor newNonDurableCursor(Position startPosition, String cursorName) throws ManagedLedgerException;
270+
ManagedCursor newNonDurableCursor(Position startPosition, String cursorName, InitialPosition initialPosition,
267271
boolean isReadCompacted) throws ManagedLedgerException;
272+
ManagedCursor newNonDurableCursor(Position startPosition, String cursorName, InitialPosition initialPosition,
273+
boolean isReadCompacted, boolean inclusive)
274+
throws ManagedLedgerException;
268275

269276
/**
270277
* Delete a ManagedCursor asynchronously.
@@ -344,6 +351,8 @@ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionNam
344351
*/
345352
void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
346353
OpenCursorCallback callback, Object ctx);
354+
void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
355+
boolean inclusive, OpenCursorCallback callback, Object ctx);
347356

348357
/**
349358
* Get a list of all the cursors reading from this ManagedLedger.

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

+27-7
Original file line numberDiff line numberDiff line change
@@ -1610,16 +1610,36 @@ boolean hasMoreEntries(PositionImpl position) {
16101610
return false;
16111611
}
16121612

1613-
void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
1614-
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
1615-
markDeletePosition = lastPositionCounter.getLeft();
1616-
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
1617-
persistentMarkDeletePosition = null;
1618-
inProgressMarkDeletePersistPosition = null;
1613+
protected long countMessagesConsumed(PositionImpl position) {
1614+
if (position == null) {
1615+
return 0;
1616+
}
1617+
Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter();
1618+
long initialBacklog = position.compareTo(lastEntryAndCounter.getLeft()) < 0
1619+
? ledger.getNumberOfEntries(Range.closed(position, lastEntryAndCounter.getLeft())) : 0;
1620+
return lastEntryAndCounter.getRight() - initialBacklog;
1621+
}
1622+
16191623

1624+
void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter, boolean inclusive) {
1625+
if (inclusive) {
1626+
// when entry id is invalid, we need to get a valid position.
1627+
if (lastPositionCounter.getLeft().getEntryId() < 0) {
1628+
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
1629+
} else {
1630+
readPosition = lastPositionCounter.getLeft();
1631+
}
1632+
markDeletePosition = PositionImpl.get(readPosition.getLedgerId(), readPosition.getEntryId() - 1);
1633+
} else {
1634+
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
1635+
markDeletePosition = lastPositionCounter.getLeft();
1636+
}
16201637
// Initialize the counter such that the difference between the messages written on the ML and the
16211638
// messagesConsumed is 0, to ensure the initial backlog count is 0.
1622-
messagesConsumedCounter = lastPositionCounter.getRight();
1639+
messagesConsumedCounter = countMessagesConsumed(readPosition);
1640+
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
1641+
persistentMarkDeletePosition = null;
1642+
inProgressMarkDeletePersistPosition = null;
16231643
}
16241644

16251645
/**

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

+44-4
Original file line numberDiff line numberDiff line change
@@ -851,14 +851,21 @@ public ManagedCursor openCursor(String cursorName, InitialPosition initialPositi
851851
@Override
852852
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties)
853853
throws InterruptedException, ManagedLedgerException {
854+
return openCursor(cursorName, initialPosition, properties, false);
855+
}
856+
857+
@Override
858+
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties,
859+
boolean inclusive)
860+
throws InterruptedException, ManagedLedgerException {
854861
final CountDownLatch counter = new CountDownLatch(1);
855862
class Result {
856863
ManagedCursor cursor = null;
857864
ManagedLedgerException exception = null;
858865
}
859866
final Result result = new Result();
860867

861-
asyncOpenCursor(cursorName, initialPosition, properties, new OpenCursorCallback() {
868+
asyncOpenCursor(cursorName, initialPosition, properties, inclusive, new OpenCursorCallback() {
862869
@Override
863870
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
864871
result.cursor = cursor;
@@ -898,7 +905,14 @@ public void asyncOpenCursor(final String cursorName, final InitialPosition initi
898905

899906
@Override
900907
public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
901-
Map<String, Long> properties, final OpenCursorCallback callback, final Object ctx) {
908+
Map<String, Long> properties, final OpenCursorCallback callback,
909+
final Object ctx) {
910+
this.asyncOpenCursor(cursorName, initialPosition, properties, false, callback, ctx);
911+
}
912+
913+
@Override
914+
public void asyncOpenCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties,
915+
boolean inclusive, OpenCursorCallback callback, Object ctx) {
902916
try {
903917
checkManagedLedgerIsOpen();
904918
checkFenced();
@@ -939,7 +953,7 @@ public void operationComplete() {
939953
cursor.setActive();
940954
// Update the ack position (ignoring entries that were written while the cursor was being created)
941955
cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter()
942-
: getFirstPositionAndCounter());
956+
: getFirstPositionAndCounter(), inclusive);
943957

944958
synchronized (ManagedLedgerImpl.this) {
945959
cursors.add(cursor);
@@ -1056,6 +1070,14 @@ public ManagedCursor newNonDurableCursor(Position startPosition, String subscrip
10561070
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName,
10571071
InitialPosition initialPosition, boolean isReadCompacted)
10581072
throws ManagedLedgerException {
1073+
return newNonDurableCursor(startCursorPosition, cursorName, initialPosition, isReadCompacted, false);
1074+
}
1075+
1076+
@Override
1077+
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName,
1078+
InitialPosition initialPosition,
1079+
boolean readCompacted, boolean inclusive)
1080+
throws ManagedLedgerException {
10591081
Objects.requireNonNull(cursorName, "cursor name can't be null");
10601082
checkManagedLedgerIsOpen();
10611083
checkFenced();
@@ -1069,7 +1091,7 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu
10691091
}
10701092

10711093
NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
1072-
(PositionImpl) startCursorPosition, initialPosition, isReadCompacted);
1094+
(PositionImpl) startCursorPosition, initialPosition, readCompacted, inclusive);
10731095
cursor.setActive();
10741096

10751097
log.info("[{}] Opened new cursor: {}", name, cursor);
@@ -3486,6 +3508,24 @@ Pair<PositionImpl, Long> getFirstPositionAndCounter() {
34863508
return Pair.of(pos, count);
34873509
}
34883510

3511+
/**
3512+
* Get the first position written in the managed ledger, alongside with the associated counter.
3513+
*/
3514+
Pair<PositionImpl, Long> getPositionAndCounter() {
3515+
PositionImpl pos;
3516+
long count;
3517+
Pair<PositionImpl, Long> lastPositionAndCounter;
3518+
3519+
do {
3520+
pos = getFirstPosition();
3521+
lastPositionAndCounter = getLastPositionAndCounter();
3522+
count = lastPositionAndCounter.getRight()
3523+
- getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.getLeft()));
3524+
} while (pos.compareTo(getFirstPosition()) != 0
3525+
|| lastPositionAndCounter.getLeft().compareTo(getLastPosition()) != 0);
3526+
return Pair.of(pos, count);
3527+
}
3528+
34893529
public void activateCursor(ManagedCursor cursor) {
34903530
if (activeCursors.get(cursor.getName()) == null) {
34913531
activeCursors.add(cursor);

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java

+20-15
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import com.google.common.base.MoreObjects;
22-
import com.google.common.collect.Range;
2322
import java.util.Map;
2423
import org.apache.bookkeeper.client.BookKeeper;
2524
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
2625
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
2726
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
2827
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
29-
import org.apache.commons.lang3.tuple.Pair;
3028
import org.apache.pulsar.common.api.proto.CommandSubscribe;
3129
import org.slf4j.Logger;
3230
import org.slf4j.LoggerFactory;
@@ -37,7 +35,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
3735

3836
NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
3937
PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition,
40-
boolean isReadCompacted) {
38+
boolean isReadCompacted, boolean inclusive) {
4139
super(bookkeeper, config, ledger, cursorName);
4240
this.readCompacted = isReadCompacted;
4341

@@ -48,39 +46,46 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
4846
// Start from last entry
4947
switch (initialPosition) {
5048
case Latest:
51-
initializeCursorPosition(ledger.getLastPositionAndCounter());
49+
initializeCursorPosition(ledger.getLastPositionAndCounter(), inclusive);
5250
break;
5351
case Earliest:
54-
initializeCursorPosition(ledger.getFirstPositionAndCounter());
52+
initializeCursorPosition(ledger.getFirstPositionAndCounter(), inclusive);
5553
break;
5654
}
5755
} else if (startCursorPosition.getLedgerId() == PositionImpl.EARLIEST.getLedgerId()) {
5856
// Start from invalid ledger to read from first available entry
59-
recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
57+
recoverCursor(ledger.getFirstPosition(), inclusive);
6058
} else {
6159
// Since the cursor is positioning on the mark-delete position, we need to take 1 step back from the desired
6260
// read-position
63-
recoverCursor(startCursorPosition);
61+
recoverCursor(startCursorPosition, inclusive);
6462
}
6563
STATE_UPDATER.set(this, State.Open);
6664
log.info("[{}] Created non-durable cursor read-position={} mark-delete-position={}", ledger.getName(),
6765
readPosition, markDeletePosition);
6866
}
6967

70-
private void recoverCursor(PositionImpl mdPosition) {
71-
Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter();
72-
this.readPosition = isReadCompacted() ? mdPosition.getNext() : ledger.getNextValidPosition(mdPosition);
73-
markDeletePosition = mdPosition;
68+
private void recoverCursor(PositionImpl mdPosition, boolean inclusive) {
69+
if (inclusive) {
70+
// when entry id is invalid, we need to get a valid position.
71+
if (mdPosition.getEntryId() < 0) {
72+
readPosition = ledger.getNextValidPosition(mdPosition);
73+
} else {
74+
readPosition = mdPosition;
75+
}
76+
markDeletePosition = PositionImpl.get(readPosition.getLedgerId(), readPosition.getEntryId() - 1);
77+
} else {
78+
readPosition = isReadCompacted() ? mdPosition.getNext() : ledger.getNextValidPosition(mdPosition);
79+
markDeletePosition = mdPosition;
80+
}
7481

7582
// Initialize the counter such that the difference between the messages written on the ML and the
7683
// messagesConsumed is equal to the current backlog (negated).
7784
if (null != this.readPosition) {
78-
long initialBacklog = readPosition.compareTo(lastEntryAndCounter.getLeft()) < 0
79-
? ledger.getNumberOfEntries(Range.closed(readPosition, lastEntryAndCounter.getLeft())) : 0;
80-
messagesConsumedCounter = lastEntryAndCounter.getRight() - initialBacklog;
85+
messagesConsumedCounter = countMessagesConsumed(readPosition);
8186
} else {
8287
log.warn("Recovered a non-durable cursor from position {} but didn't find a valid read position {}",
83-
mdPosition, readPosition);
88+
mdPosition, readPosition);
8489
}
8590
}
8691

0 commit comments

Comments
 (0)