Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-12616: [BUG]KOP failed to get partition offset #3240

Open
sijie opened this issue Nov 9, 2021 · 1 comment
Open

ISSUE-12616: [BUG]KOP failed to get partition offset #3240

sijie opened this issue Nov 9, 2021 · 1 comment

Comments

@sijie
Copy link
Member

sijie commented Nov 9, 2021

Original Issue: apache#12616


@hangc0276 @BewareMyPower @codelipenghui 紧急求助

apache#11912

这个PR,在asyncReadEntry的时候做了ledger map匹配校验

    public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx) {
        LedgerHandle currentLedger = this.currentLedger;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
        }
        if (!ledgers.containsKey(position.getLedgerId())) {
            log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
                    + "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
            callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
                + "the ledgerId does not belong to this topic or has been deleted"), ctx);
            return;
        }

但是原有的接口:

   public PositionImpl getFirstPosition() {
        Long ledgerId = ledgers.firstKey();
        if (ledgerId == null) {
            return null;
        }
        if (ledgerId > lastConfirmedEntry.getLedgerId()) {
            checkState(ledgers.get(ledgerId).getEntries() == 0);
            ledgerId = lastConfirmedEntry.getLedgerId();
        }
        return new PositionImpl(ledgerId, -1);
    }

相关日志:

2021-11-03 20:10:04.378 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO o.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [XXX/XXX/persistent/__consumer_offsets-partition-39] Start checking if current ledger is full
2021-11-03 20:10:04.381 [main-EventThread] INFO o.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [XXXX/XXX/persistent/__consumer_offsets-partition-39] Creating a new ledger
2021-11-03 20:10:04.381 [main-EventThread] INFO o.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [XXX/XXX/persistent/__consumer_offsets-partition-39] Creating ledger, metadata: {component=[B@685c61b2, pulsar/managed-ledger=[B@764fe8e9, application=[B@6d075d7a} - metadata ops timeout : 60 seconds
2021-11-03 20:10:04.382 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO o.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - : [XXX/XXX/persistent/__consumer_offsets-partition-39] Start TrimConsumedLedgers. ledgers=[33] totalSize=65
2021-11-03 20:10:04.382 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO o.apache.bookkeeper.mledger.impl.ManagedLedgerImpl -: [XXX/XXX/persistent/__consumer_offsets-partition-39] Slowest consumer ledger id: 34
2021-11-03 20:10:04.386 [main-EventThread] INFO o.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - : [XXX/XXX/persistent/__consumer_offsets-partition-39] Created new ledger 205

2021-11-03 21:11:15.991 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO o.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [XXXX/XXXX/persistent/__consumer_offsets-partition-39] Checking ledger 33 -- time-clock: 1635945075991 ms -- time-ledger: 1635941404381 ms -- expired: true -- over-quota: false -- current-ledger: 205 -- current-retention: 3600000

2021-11-04 10:51:02.032 [pulsar-io-4-8] ERROR i.s.pulsar.handlers.kop.KafkaRequestHandler - [PersistentTopic{topic=persistent://XXXX/XXXX/__consumer_offsets-partition-39}] Failed to get offset for position 33:0
org.apache.bookkeeper.mledger.ManagedLedgerException$NonRecoverableLedgerException: Message not found, the ledgerId does not belong to this topic or has been deleted

2021-11-04 03:08:14.994 [pulsar-io-4-8] WARN o.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - : getFirstPosition ledgerId 205 and lastConfirmedEntryLedgerId is 33 , entryid 0

当前出现场景,当一个分区创建了一个新的ledger,但是没有新消息发送,当前lastConfirmedEntry还是指向该分区的前一个ledger,当这个ledger过期之后,KOP目前去获取当前分区消息数会查询首个offset的时候,一直失败。

io.streamnative.pulsar.handlers.kop.utils.OffsetFinder

    public static PositionImpl getFirstValidPosition(ManagedLedgerImpl managedLedger) {
        PositionImpl firstPosition = managedLedger.getFirstPosition();
        if (firstPosition == null) {
            return null;
        } else {
            return managedLedger.getNextValidPosition(firstPosition);
        }
    }

如果修改KOP的OffsetFinder,是不是获取到firstPosition之后,判断下ledger是否过期?还是直接修改ManagedLedgerImpl的getFirstPosition逻辑?不知道会不会引入其他问题,还请各位专家帮忙看下,谢谢。

@sijie sijie added the type/bug label Nov 9, 2021
@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant