Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise seeking by timestamp #22129

Open
1 of 2 tasks
Samreay opened this issue Feb 27, 2024 · 3 comments · May be fixed by #22792
Open
1 of 2 tasks

Optimise seeking by timestamp #22129

Samreay opened this issue Feb 27, 2024 · 3 comments · May be fixed by #22792
Assignees

Comments

@Samreay
Copy link

Samreay commented Feb 27, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Right now it seems that seeking a reader or a consumer to a specific timestamp is an unoptimised process that can take many seconds / over a minute for larger topics (single GB data size, tens of messages per second). From a slack comment @lhotari it appears that seeking via a timestamp is not optimised, and I'm here to propose optimising it as a valuable feature.

Solution

Seeking currently works by message ID or by timestamp. I assume (though I could be wrong) that seeking by messageID is optimised. Without going into the implementation details properly and just spitballing ideas, something like binary searching on the time, or creating a treemap from timestamp to message ID (at any level of sparsity) might allow seeking to become far faster

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@lhotari
Copy link
Member

lhotari commented Feb 27, 2024

The current implementation for seeking by timestamp is here:

public CompletableFuture<Void> resetCursor(long timestamp) {
CompletableFuture<Void> future = new CompletableFuture<>();
PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(topicName, cursor);
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Resetting subscription to timestamp {}", topicName, subName, timestamp);
}
persistentMessageFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
final Position finalPosition;
if (position == null) {
// this should not happen ideally unless a reset is requested for a time
// that spans beyond the retention limits (time/size)
finalPosition = cursor.getFirstPosition();
if (finalPosition == null) {
log.warn("[{}][{}] Unable to find position for timestamp {}."
+ " Unable to reset cursor to first position",
topicName, subName, timestamp);
future.completeExceptionally(
new SubscriptionInvalidCursorPosition(
"Unable to find position for specified timestamp"));
return;
}
log.info(
"[{}][{}] Unable to find position for timestamp {}."
+ " Resetting cursor to first position {} in ledger",
topicName, subName, timestamp, finalPosition);
} else {
finalPosition = position.getNext();
}
resetCursor(finalPosition, future);
}
@Override
public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
// todo - what can go wrong here that needs to be retried?
if (exception instanceof ConcurrentFindCursorPositionException) {
future.completeExceptionally(new SubscriptionBusyException(exception.getMessage()));
} else {
future.completeExceptionally(new BrokerServiceException(exception));
}
}
});
return future;
}

called from here:
if (consumerCreated && seek.hasMessageId()) {
Consumer consumer = consumerFuture.getNow(null);
Subscription subscription = consumer.getSubscription();
MessageIdData msgIdData = seek.getMessageId();
long[] ackSet = null;
if (msgIdData.getAckSetsCount() > 0) {
ackSet = new long[msgIdData.getAckSetsCount()];
for (int i = 0; i < ackSet.length; i++) {
ackSet[i] = msgIdData.getAckSetAt(i);
}
}
Position position = new PositionImpl(msgIdData.getLedgerId(),
msgIdData.getEntryId(), ackSet);
subscription.resetCursor(position).thenRun(() -> {
log.info("[{}] [{}][{}] Reset subscription to message id {}", remoteAddress,
subscription.getTopic().getName(), subscription.getName(), position);
commandSender.sendSuccessResponse(requestId);
}).exceptionally(ex -> {
log.warn("[{}][{}] Failed to reset subscription: {}",
remoteAddress, subscription, ex.getMessage(), ex);
commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
"Error when resetting subscription: " + ex.getCause().getMessage());
return null;
});
} else if (consumerCreated && seek.hasMessagePublishTime()){
Consumer consumer = consumerFuture.getNow(null);
Subscription subscription = consumer.getSubscription();
long timestamp = seek.getMessagePublishTime();
subscription.resetCursor(timestamp).thenRun(() -> {
log.info("[{}] [{}][{}] Reset subscription to publish time {}", remoteAddress,
subscription.getTopic().getName(), subscription.getName(), timestamp);
commandSender.sendSuccessResponse(requestId);
}).exceptionally(ex -> {
log.warn("[{}][{}] Failed to reset subscription: {}", remoteAddress,
subscription, ex.getMessage(), ex);
commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
"Reset subscription to publish time error: " + ex.getCause().getMessage());
return null;
});
} else {
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
}

I guess the missed optimization is to use the ledger metadata as a first level filtering.
There's a binary search, but it doesn't use the ledger metadata:

public void readEntryComplete(Entry entry, Object ctx) {
final Position position = entry.getPosition();
switch (state) {
case checkFirst:
if (!condition.test(entry)) {
// If no entry is found that matches the condition, it is expected to pass null to the callback.
// Otherwise, a message before the expiration date will be deleted due to message TTL.
// cf. https://github.com/apache/pulsar/issues/5579
callback.findEntryComplete(null, OpFindNewest.this.ctx);
return;
} else {
lastMatchedPosition = position;
// check last entry
state = State.checkLast;
PositionImpl lastPosition = ledger.getLastPosition();
searchPosition = ledger.getPositionAfterN(searchPosition, max, PositionBound.startExcluded);
if (lastPosition.compareTo(searchPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("first position {} matches, last should be {}, but moving to lastPos {}", position,
searchPosition, lastPosition);
}
searchPosition = lastPosition;
}
find();
}
break;
case checkLast:
if (condition.test(entry)) {
callback.findEntryComplete(position, OpFindNewest.this.ctx);
return;
} else {
// start binary search
state = State.searching;
searchPosition = ledger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded);
find();
}
break;
case searching:
if (condition.test(entry)) {
// mid - last
lastMatchedPosition = position;
min = mid();
} else {
// start - mid
max = mid() - 1;
}
if (max <= min) {
callback.findEntryComplete(lastMatchedPosition, OpFindNewest.this.ctx);
return;
}
searchPosition = ledger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded);
find();
}
}

@lhotari
Copy link
Member

lhotari commented Feb 27, 2024

LedgerInfo contains the timestamp when the ledger was sealed (it got closed or was rolled over):

message LedgerInfo {
required int64 ledgerId = 1;
optional int64 entries = 2;
optional int64 size = 3;
optional int64 timestamp = 4;
optional OffloadContext offloadContext = 5;
}

there could be an initial binary search which uses this information available in the ManagedLedgerImpl via

public NavigableMap<Long, LedgerInfo> getLedgersInfo() {
return ledgers;
}

I guess there is a gotcha since the ledger's timestamp is the broker's clock, but the seek uses the message publish time which is using the client's (publisher's) clock. There might be corner cases because of this.

@lhotari
Copy link
Member

lhotari commented Feb 27, 2024

There's also a related issue #10488.

@dao-jun dao-jun self-assigned this Feb 28, 2024
@dao-jun dao-jun linked a pull request May 30, 2024 that will close this issue
15 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants