Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-14926: C + + client batch consumption submission offset #3992

Open
sijie opened this issue Mar 29, 2022 · 0 comments
Open

ISSUE-14926: C + + client batch consumption submission offset #3992

sijie opened this issue Mar 29, 2022 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Mar 29, 2022

Original Issue: apache#14926


Describe the bug
Hello, I just send messages in batches and set batchenabled to true. During the test, it is found that in the C + + client, when receiving a single consumption, when the whole batch is not consumed, the offset cannot be submitted. You must consume the whole batch and call acknowledgecumulative to submit the offset effectively.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
When I expect to consume in batches, I can also submit the offset through acknowledgecumulative if the whole batch is not consumed in time

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: [e.g. iOS]
  • Linux x86

Additional context
Add any other context about the problem here.

int PulsarOperateConsumer::readSingleMsg(char *buffer, const int size)
{
    PULSAR_DEBUG("readSingleMsg %d,batch:%d", _readMsgCount, _readBatchMsgSize)
    if (_readMsgCount >= _readBatchMsgSize)
    {
        PULSAR_DEBUG("need ack msg")
        return PULSAR_INT_ZERO;
    }
    pulsar::Result result = _consumerPtr->receive(_msg, _recvTimeOutMs);
    if (result == pulsar::ResultTimeout)
    {
        PULSAR_DEBUG("no msg")
        return PULSAR_INT_ZERO;
    }
    if (pulsar::ResultOk != result)
    {
        PULSAR_SET_ERRMSG("recv msg error %d", result)
        PULSAR_DEBUG("recv msg error %d", result)
        return PULSAR_INT_NEGATIVE_ONE;
    }
    const void *ptrData = _msg.getData();
    int msgLen = _msg.getLength();
    if (msgLen > size)
    {
        msgLen = size;
    }
    memcpy(buffer, (const char *)ptrData, msgLen);
    buffer[msgLen] = '\0';
    _readMsgCount++;
    PULSAR_DEBUG("read msgcount %d,id:%s", _readMsgCount, messageIdToString(_msg.getMessageId()).c_str())
    return msgLen;
}
int PulsarOperateConsumer::ackMsg()
{
    if (_readMsgCount <= PULSAR_INT_ZERO)
    {
        PULSAR_DEBUG("_readMsgCount is 0,not need ackmsg")
        return PULSAR_INT_ZERO;
    }
    pulsar::Result result = _consumerPtr->acknowledgeCumulative(_msg.getMessageId());
    if (pulsar::ResultOk != result)
    {
        PULSAR_SET_ERRMSG("acknowledgeCumulative error result:%d,msgid=%s", result, messageIdToString(_msg.getMessageId()).c_str())
        PULSAR_DEBUG("acknowledgeCumulative error result:%d,msgid=%s", result, messageIdToString(_msg.getMessageId()).c_str())
        return PULSAR_INT_NEGATIVE_ONE;
    }
    PULSAR_DEBUG("ackMsg success %s", messageIdToString(_msg.getMessageId()).c_str())
    _readMsgCount = PULSAR_INT_ZERO;
    return PULSAR_INT_ZERO;
}
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant