Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C + + client batch consumption submission offset #14926

Closed
liuqt25 opened this issue Mar 29, 2022 · 4 comments
Closed

C + + client batch consumption submission offset #14926

liuqt25 opened this issue Mar 29, 2022 · 4 comments
Assignees
Labels
lifecycle/stale Stale type/bug The PR fixed a bug or issue reported a bug

Comments

@liuqt25
Copy link

liuqt25 commented Mar 29, 2022

Describe the bug
Hello, I just send messages in batches and set batchenabled to true. During the test, it is found that in the C + + client, when receiving a single consumption, when the whole batch is not consumed, the offset cannot be submitted. You must consume the whole batch and call acknowledgecumulative to submit the offset effectively.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
When I expect to consume in batches, I can also submit the offset through acknowledgecumulative if the whole batch is not consumed in time

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: [e.g. iOS]
  • Linux x86

Additional context
Add any other context about the problem here.

int PulsarOperateConsumer::readSingleMsg(char *buffer, const int size)
{
    PULSAR_DEBUG("readSingleMsg %d,batch:%d", _readMsgCount, _readBatchMsgSize)
    if (_readMsgCount >= _readBatchMsgSize)
    {
        PULSAR_DEBUG("need ack msg")
        return PULSAR_INT_ZERO;
    }
    pulsar::Result result = _consumerPtr->receive(_msg, _recvTimeOutMs);
    if (result == pulsar::ResultTimeout)
    {
        PULSAR_DEBUG("no msg")
        return PULSAR_INT_ZERO;
    }
    if (pulsar::ResultOk != result)
    {
        PULSAR_SET_ERRMSG("recv msg error %d", result)
        PULSAR_DEBUG("recv msg error %d", result)
        return PULSAR_INT_NEGATIVE_ONE;
    }
    const void *ptrData = _msg.getData();
    int msgLen = _msg.getLength();
    if (msgLen > size)
    {
        msgLen = size;
    }
    memcpy(buffer, (const char *)ptrData, msgLen);
    buffer[msgLen] = '\0';
    _readMsgCount++;
    PULSAR_DEBUG("read msgcount %d,id:%s", _readMsgCount, messageIdToString(_msg.getMessageId()).c_str())
    return msgLen;
}
int PulsarOperateConsumer::ackMsg()
{
    if (_readMsgCount <= PULSAR_INT_ZERO)
    {
        PULSAR_DEBUG("_readMsgCount is 0,not need ackmsg")
        return PULSAR_INT_ZERO;
    }
    pulsar::Result result = _consumerPtr->acknowledgeCumulative(_msg.getMessageId());
    if (pulsar::ResultOk != result)
    {
        PULSAR_SET_ERRMSG("acknowledgeCumulative error result:%d,msgid=%s", result, messageIdToString(_msg.getMessageId()).c_str())
        PULSAR_DEBUG("acknowledgeCumulative error result:%d,msgid=%s", result, messageIdToString(_msg.getMessageId()).c_str())
        return PULSAR_INT_NEGATIVE_ONE;
    }
    PULSAR_DEBUG("ackMsg success %s", messageIdToString(_msg.getMessageId()).c_str())
    _readMsgCount = PULSAR_INT_ZERO;
    return PULSAR_INT_ZERO;
}
@BewareMyPower
Copy link
Contributor

We might need a catchup for batch index ACK in C++ client. See https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level.

@BewareMyPower BewareMyPower self-assigned this Apr 3, 2022
@github-actions
Copy link

github-actions bot commented May 4, 2022

The issue had no activity for 30 days, mark with Stale label.

@github-actions
Copy link

github-actions bot commented Jun 4, 2022

The issue had no activity for 30 days, mark with Stale label.

@BewareMyPower
Copy link
Contributor

This feature request is tracked by apache/pulsar-client-cpp#87

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lifecycle/stale Stale type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

2 participants