Skip to content

Commit

Permalink
Implementation of kvcq message heap estimation
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe committed Oct 9, 2024
1 parent daf3d1a commit 954295d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,6 @@ public void run() {
}
}

@Override
public long estimateMessageCount(String topic, int queueId, long from, long to, MessageFilter filter) {
// todo
return 0;
}

@Override
public void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,27 @@ public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, Message

@Override
public long estimateMessageCount(long from, long to, MessageFilter filter) {
// todo
return 0;
if (from >= to) {
return 0;
}
if (to > getMaxOffsetInQueue()) {
to = getMaxOffsetInQueue();
}
int maxSampleSize = messageStore.getMessageStoreConfig().getMaxConsumeQueueScan();
int sampleSize = to - from > maxSampleSize ? maxSampleSize : (int) (to - from);
int matchSize = 0;
for (int i = 0; i < sampleSize; i++) {
long index = from + i;
Pair<CqUnit, Long> pair = getCqUnitAndStoreTime(index);
if (pair == null) {
continue;
}
CqUnit cqUnit = pair.getObject1();
if (filter.isMatchedByConsumeQueue(cqUnit.getTagsCode(), cqUnit.getCqExtUnit())) {
matchSize++;
}
}
return (to - from) * (matchSize / sampleSize);
}

@Override
Expand Down

0 comments on commit 954295d

Please sign in to comment.