Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] go pull consumer consume fail when storeType=defaultRocksDB #9146

Open
3 tasks done
yuz10 opened this issue Jan 18, 2025 · 1 comment
Open
3 tasks done

[Bug] go pull consumer consume fail when storeType=defaultRocksDB #9146

yuz10 opened this issue Jan 18, 2025 · 1 comment

Comments

@yuz10
Copy link
Member

yuz10 commented Jan 18, 2025

Before Creating the Bug Report

  • I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

ubuntu 24.04

RocketMQ version

5.3.1

JDK Version

1.8.0

Describe the Bug

go pull consumer consume fail when storeType=defaultRocksDB

Steps to Reproduce

1、set storeType=defaultRocksDB in broker.conf
2、consume message with go pull consumer:

package main

import (
	"context"
	"log"
	"time"

	"github.com/apache/rocketmq-client-go/v2"

	"github.com/apache/rocketmq-client-go/v2/rlog"

	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

const (
	nameSrvAddr       = "127.0.0.1:8200"
	topic             = "test-topic"
	consumerGroupName = "testPullGroup"
	tag               = "*"
)

var pullConsumer rocketmq.PullConsumer
var sleepTime = 1 * time.Second

func main() {
	rlog.SetLogLevel("info")
	var nameSrv, err = primitive.NewNamesrvAddr(nameSrvAddr)
	if err != nil {
		log.Fatalf("NewNamesrvAddr err: %v", err)
	}
	pullConsumer, err = rocketmq.NewPullConsumer(
		consumer.WithGroupName(consumerGroupName),
		consumer.WithNameServer(nameSrv),
		consumer.WithMaxReconsumeTimes(2),
		//consumer.WithPullBatchSize(32),
	)
	if err != nil {
		log.Fatalf("fail to new pullConsumer: %v", err)
	}
	selector := consumer.MessageSelector{
		Type:       consumer.TAG,
		Expression: tag,
	}
	err = pullConsumer.Subscribe(topic, selector)
	if err != nil {
		log.Fatalf("fail to Subscribe: %v", err)
	}
	err = pullConsumer.Start()
	if err != nil {
		log.Fatalf("fail to Start: %v", err)
	}

	for {
		poll()
	}
}

func poll() {
	cr, err := pullConsumer.Poll(context.TODO(), time.Second*5)
	if consumer.IsNoNewMsgError(err) {
		return
	}
	if err != nil {
		log.Printf("[poll error] err=%v", err)
		time.Sleep(sleepTime)
		return
	}
	log.Println("msgList: ", cr.GetMsgList())
	log.Println("messageQueue: ", cr.GetMQ())
	log.Println("processQueue: ", cr.GetPQ())
	pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeSuccess)
}

What Did You Expect to See?

expect consume success

What Did You See Instead?

consume fail with message:
consumer request topic: test-topic, offset: 0, minOffset: 0, maxOffset: 1, but access logic queue failed. Correct nextBeginOffset to 0

I found the maxMsgNums is 0 in pull request:

request:
{"code":11,"language":"GO","version":317,"opaque":976,"flag":0,"remark":"","extFields":{"consumerGroup":"testPullGroup","queueId":"1","maxMsgNums":"0","sysFlag":"6","subscription":"*","expressionType":"TAG","bname":"broker-0","topic":"test-topic","queueOffset":"0","commitOffset":"0","suspendTimeoutMillis":"20000","subVersion":"0"}}
response:
{"code":19,"extFields":{"suggestWhichBrokerId":"0","groupSysFlag":"0","msgCount4Commercial":"0","TRACE_ON":"true","nextBeginOffset":"0","MSG_REGION":"","maxOffset":"26","minOffset":"0","topicSysFlag":"0"},"flag":1,"language":"JAVA","opaque":976,"remark":"OFFSET_FOUND_NULL","serializeTypeCurrentRPC":"JSON","version":475}

Additional Context

the default pullBatchSize of go pull consumer is 0, but works fine with default consume queue.

@zqyhimself
Copy link

I have analyzed the root cause of the issue and its solutions.

Root Cause of the Issue

The issue arises because the default value of pullBatchSize in the Go Pull Consumer is set to 0. In RocketMQ’s implementation, maxMsgNums (which corresponds to pullBatchSize) cannot be 0, as this prevents correct access to the logical queue.

Specifically:
1. In your code, you did not set consumer.WithPullBatchSize(), which caused the default value of 0 to be used.
2. When this value is sent to the server, stricter parameter checks in the RocksDB storage mode result in message pull failures.
3. In the default ConsumeQueue storage mode, this issue might not be apparent due to fault tolerance mechanisms, but in RocksDB mode, it directly fails.

Solutions

Short-term Solution:
Explicitly set pullBatchSize when creating the Pull Consumer:

pullConsumer, err = rocketmq.NewPullConsumer(
consumer.WithGroupName(consumerGroupName),
consumer.WithNameServer(nameSrv),
consumer.WithMaxReconsumeTimes(2),
consumer.WithPullBatchSize(32), // Add this line and set a reasonable value (1-1024)
)

Long-term Solutions:
1. Set a reasonable default value (e.g., 32) for pullBatchSize in the Go client.
2. Add parameter validation to ensure that pullBatchSize falls within a valid range (1-1024).
3. Improve the documentation to clearly explain the requirements and limitations of this parameter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants