diff --git a/api.go b/api.go index b0a203ef..c4b5c08a 100644 --- a/api.go +++ b/api.go @@ -82,6 +82,8 @@ func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) { type PullConsumer interface { // Start the PullConsumer for consuming message Start() error + // GetTopicRouteInfo get topic route info + GetTopicRouteInfo(topic string) ([]*primitive.MessageQueue, error) // Subscribe a topic for consuming Subscribe(topic string, selector consumer.MessageSelector) error @@ -89,6 +91,9 @@ type PullConsumer interface { // Unsubscribe a topic Unsubscribe(topic string) error + // Assign assign message queue to consumer + Assign(topic string, mqs []*primitive.MessageQueue) error + // Shutdown the PullConsumer, all offset of MessageQueue will be commit to broker before process exit Shutdown() error @@ -104,6 +109,12 @@ type PullConsumer interface { // PullFrom pull messages of queue from the offset to offset + numbers PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) + // SeekOffset seek offset for specific queue + SeekOffset(queue *primitive.MessageQueue, offset int64) + + // OffsetForTimestamp get offset of specific queue with timestamp + OffsetForTimestamp(queue *primitive.MessageQueue, timestamp int64) (int64, error) + // UpdateOffset updateOffset update offset of queue in mem UpdateOffset(queue *primitive.MessageQueue, offset int64) error diff --git a/consumer/consumer.go b/consumer/consumer.go index 98eb17b1..9e9bedb2 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -316,6 +316,14 @@ func (dc *defaultConsumer) shutdown() error { return nil } +func (dc *defaultConsumer) isRunning() bool { + return atomic.LoadInt32(&dc.state) == int32(internal.StateRunning) +} + +func (dc *defaultConsumer) isStopped() bool { + return atomic.LoadInt32(&dc.state) == int32(internal.StateShutdown) +} + func (dc *defaultConsumer) persistConsumerOffset() error { err := dc.makeSureStateOK() if err != nil { diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go index 1d7c3bda..c1229278 100644 --- a/consumer/mock_offset_store.go +++ b/consumer/mock_offset_store.go @@ -117,7 +117,7 @@ func (mr *MockOffsetStoreMockRecorder) update(mq, offset, increaseOnly interface } // getMQOffsetMap mocks base method -func (m *MockOffsetStore) getMQOffsetMap(topic string) map[primitive.MessageQueue]int64 { +func (m *MockOffsetStore) getMQOffsetMap(topic string) map[primitive.MessageQueue]int64 { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "getMQOffsetMap", topic) ret0, _ := ret[0].(map[primitive.MessageQueue]int64) @@ -125,7 +125,7 @@ func (m *MockOffsetStore) getMQOffsetMap(topic string) map[primitive.MessageQueu } // getMQOffsetMap indicates an expected call of getMQOffsetMap -func (mr *MockOffsetStoreMockRecorder) getMQOffsetMap(topic string) *gomock.Call{ +func (mr *MockOffsetStoreMockRecorder) getMQOffsetMap(topic string) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getMQOffsetMap", reflect.TypeOf((*MockOffsetStore)(nil).getMQOffsetMap), topic) } diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index c66ffb7a..32587790 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -64,6 +64,14 @@ func (cr *ConsumeRequest) GetPQ() *processQueue { return cr.processQueue } +type SubscriptionType int + +const ( + None SubscriptionType = iota + Subscribe + Assign +) + type defaultPullConsumer struct { *defaultConsumer @@ -71,9 +79,11 @@ type defaultPullConsumer struct { selector MessageSelector GroupName string Model MessageModel + SubType SubscriptionType UnitMode bool nextQueueSequence int64 allocateQueues []*primitive.MessageQueue + mq2seekOffset sync.Map // key:primitive.MessageQueue,value:seekOffset done chan struct{} closeOnce sync.Once @@ -116,6 +126,7 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) { defaultConsumer: dc, done: make(chan struct{}, 1), consumeRequestCache: make(chan *ConsumeRequest, 4), + GroupName: dc.option.GroupName, } dc.mqChanged = c.messageQueueChanged c.submitToConsume = c.consumeMessageConcurrently @@ -123,11 +134,32 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) { return c, nil } +func (pc *defaultPullConsumer) GetTopicRouteInfo(topic string) ([]*primitive.MessageQueue, error) { + topicWithNs := utils.WrapNamespace(pc.option.Namespace, topic) + value, exist := pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs) + if exist { + return value.([]*primitive.MessageQueue), nil + } + pc.client.UpdateTopicRouteInfo() + value, exist = pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs) + if !exist { + return nil, errors2.ErrRouteNotFound + } + return value.([]*primitive.MessageQueue), nil +} + func (pc *defaultPullConsumer) Subscribe(topic string, selector MessageSelector) error { if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) || atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) { return errors2.ErrStartTopic } + if pc.SubType == Assign { + return errors2.ErrSubscriptionType + } + + if pc.SubType == None { + pc.SubType = Subscribe + } topic = utils.WrapNamespace(pc.option.Namespace, topic) data := buildSubscriptionData(topic, selector) @@ -139,11 +171,53 @@ func (pc *defaultPullConsumer) Subscribe(topic string, selector MessageSelector) } func (pc *defaultPullConsumer) Unsubscribe(topic string) error { + if pc.SubType == Assign { + return errors2.ErrSubscriptionType + } topic = utils.WrapNamespace(pc.option.Namespace, topic) pc.subscriptionDataTable.Delete(topic) return nil } +func (pc *defaultPullConsumer) Assign(topic string, mqs []*primitive.MessageQueue) error { + if pc.SubType == Subscribe { + return errors2.ErrSubscriptionType + } + if pc.SubType == None { + pc.SubType = Assign + } + topic = utils.WrapNamespace(pc.option.Namespace, topic) + data := buildSubscriptionData(topic, MessageSelector{TAG, _SubAll}) + pc.topic = topic + pc.subscriptionDataTable.Store(topic, data) + oldQueues := pc.allocateQueues + pc.allocateQueues = mqs + rlog.Info("pull consumer assign new mqs", map[string]interface{}{ + "topic": topic, + "group": pc.GroupName, + "oldMqs": oldQueues, + "newMqs": mqs, + }) + if pc.isRunning() { + pc.Rebalance() + } + return nil +} + +func (pc *defaultPullConsumer) nextPullOffset(mq *primitive.MessageQueue, originOffset int64) int64 { + if pc.SubType != Assign { + return originOffset + } + value, exist := pc.mq2seekOffset.LoadAndDelete(mq) + if !exist { + return originOffset + } else { + nextOffset := value.(int64) + _ = pc.updateOffset(mq, nextOffset) + return nextOffset + } +} + func (pc *defaultPullConsumer) Start() error { var err error pc.once.Do(func() { @@ -546,11 +620,34 @@ func (pc *defaultPullConsumer) GetWhere() string { } func (pc *defaultPullConsumer) Rebalance() { - pc.defaultConsumer.doBalance() + switch pc.SubType { + case Assign: + pc.RebalanceViaTopic() + break + case Subscribe: + pc.defaultConsumer.doBalance() + break + } } func (pc *defaultPullConsumer) RebalanceIfNotPaused() { - pc.defaultConsumer.doBalanceIfNotPaused() + switch pc.SubType { + case Assign: + pc.RebalanceViaTopic() + break + case Subscribe: + pc.defaultConsumer.doBalanceIfNotPaused() + break + } +} + +func (pc *defaultPullConsumer) RebalanceViaTopic() { + changed := pc.defaultConsumer.updateProcessQueueTable(pc.topic, pc.allocateQueues) + if changed { + rlog.Info("PullConsumer rebalance result changed ", map[string]interface{}{ + rlog.LogKeyAllocateMessageQueue: pc.allocateQueues, + }) + } } func (pc *defaultPullConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo { @@ -613,7 +710,23 @@ func (pc *defaultPullConsumer) ResetOffset(topic string, table map[primitive.Mes } +func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset int64) { + pc.mq2seekOffset.Store(mq, offset) + rlog.Info("pull consumer seek offset", map[string]interface{}{ + "mq": mq, + "offset": offset, + }) +} + +func (pc *defaultPullConsumer) OffsetForTimestamp(mq *primitive.MessageQueue, timestamp int64) (int64, error) { + return pc.searchOffsetByTimestamp(mq, timestamp) +} + func (pc *defaultPullConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) { + if pc.SubType == Assign { + return + } + var allocateQueues []*primitive.MessageQueue pc.defaultConsumer.processQueueTable.Range(func(key, value interface{}) bool { mq := key.(primitive.MessageQueue) @@ -734,6 +847,8 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) { sleepTime = _PullDelayTimeWhenError goto NEXT } + + nextOffset := pc.nextPullOffset(request.mq, request.nextOffset) beginTime := time.Now() sd := v.(*internal.SubscriptionData) @@ -743,7 +858,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) { ConsumerGroup: pc.consumerGroup, Topic: request.mq.Topic, QueueId: int32(request.mq.QueueId), - QueueOffset: request.nextOffset, + QueueOffset: nextOffset, MaxMsgNums: pc.option.PullBatchSize.Load(), SysFlag: sysFlag, CommitOffset: 0, @@ -880,5 +995,9 @@ func (pc *defaultPullConsumer) validate() error { return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup) } + if pc.SubType == None { + return errors2.ErrBlankSubType + } + return nil } diff --git a/consumer/statistics_test.go b/consumer/statistics_test.go index 930f0a30..7f29dd33 100644 --- a/consumer/statistics_test.go +++ b/consumer/statistics_test.go @@ -217,9 +217,9 @@ func TestNewStatsManager(t *testing.T) { stats := NewStatsManager() st := time.Now() - for { + for { stats.increasePullTPS("rocketmq", "default", 1) - time.Sleep(500*time.Millisecond) + time.Sleep(500 * time.Millisecond) if time.Now().Sub(st) > 5*time.Minute { break } diff --git a/errors/errors.go b/errors/errors.go index 28995068..03d0fb59 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -34,11 +34,14 @@ var ( ErrCreated = errors.New("consumer group has been created") ErrBrokerNotFound = errors.New("broker can not found") ErrStartTopic = errors.New("cannot subscribe topic since client either failed to start or has been shutdown.") + ErrSubscriptionType = errors.New("subscribe type is not matched") + ErrBlankSubType = errors.New("subscribe type should not be blank") ErrResponse = errors.New("response error") ErrCompressLevel = errors.New("unsupported compress level") ErrUnknownIP = errors.New("unknown IP address") ErrService = errors.New("service close is not running, please check") ErrTopicNotExist = errors.New("topic not exist") + ErrRouteNotFound = errors.New("topic route not found") ErrNotExisted = errors.New("not existed") ErrNoNameserver = errors.New("nameServerAddrs can't be empty.") ErrMultiIP = errors.New("multiple IP addr does not support") diff --git a/examples/consumer/pull/poll_assign/main.go b/examples/consumer/pull/poll_assign/main.go new file mode 100644 index 00000000..838400ad --- /dev/null +++ b/examples/consumer/pull/poll_assign/main.go @@ -0,0 +1,115 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "log" + _ "net/http/pprof" + "time" + + "github.com/apache/rocketmq-client-go/v2" + + "github.com/apache/rocketmq-client-go/v2/rlog" + + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +const ( + nameSrvAddr = "http://127.0.0.1:9876" + accessKey = "rocketmq" + secretKey = "12345678" + topic = "test-topic" + consumerGroupName = "testPullGroup" + tag = "testPull" + namespace = "ns" +) + +var pullConsumer rocketmq.PullConsumer +var sleepTime = 1 * time.Second + +func main() { + rlog.SetLogLevel("info") + var nameSrv, err = primitive.NewNamesrvAddr(nameSrvAddr) + if err != nil { + log.Fatalf("NewNamesrvAddr err: %v", err) + } + pullConsumer, err = rocketmq.NewPullConsumer( + consumer.WithGroupName(consumerGroupName), + consumer.WithNameServer(nameSrv), + consumer.WithNamespace(namespace), + consumer.WithMaxReconsumeTimes(2), + ) + if err != nil { + log.Fatalf("fail to new pullConsumer: %v", err) + } + + // assign nil firstly to help consumer start up + err = pullConsumer.Assign(topic, nil) + if err != nil { + log.Fatalf("fail to Assign: %v", err) + } + err = pullConsumer.Start() + if err != nil { + log.Fatalf("fail to Start: %v", err) + } + + mqs, err := pullConsumer.GetTopicRouteInfo(topic) + if err != nil { + log.Fatalf("fail to GetTopicRouteInfo: %v", err) + } + + for _, mq := range mqs { + offset, err := pullConsumer.OffsetForTimestamp(mq, time.Now().UnixMilli()-60*10) + if err != nil { + log.Fatalf("fail to get offset for timestamp: %v", err) + } else { + pullConsumer.SeekOffset(mq, offset) + } + } + + err = pullConsumer.Assign(topic, mqs) + if err != nil { + log.Fatalf("fail to Assign: %v", err) + } + + for { + poll() + } +} + +func poll() { + cr, err := pullConsumer.Poll(context.TODO(), time.Second*5) + if consumer.IsNoNewMsgError(err) { + log.Println("no new msg") + return + } + if err != nil { + log.Printf("[poll error] err=%v", err) + time.Sleep(sleepTime) + return + } + + // todo LOGIC CODE HERE + log.Println("msgList: ", cr.GetMsgList()) + log.Println("messageQueue: ", cr.GetMQ()) + log.Println("processQueue: ", cr.GetPQ()) + // pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeRetryLater) + pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeSuccess) +} diff --git a/examples/consumer/tls/main.go b/examples/consumer/tls/main.go index 248c8371..fab61aba 100644 --- a/examples/consumer/tls/main.go +++ b/examples/consumer/tls/main.go @@ -1,59 +1,59 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "context" - "fmt" - "os" - "time" - - "github.com/apache/rocketmq-client-go/v2" - "github.com/apache/rocketmq-client-go/v2/consumer" - "github.com/apache/rocketmq-client-go/v2/primitive" -) - -func main() { - c, _ := rocketmq.NewPushConsumer( - consumer.WithGroupName("testGroup"), - consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), - consumer.WithTls(true), - ) - err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, - msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { - for i := range msgs { - fmt.Printf("subscribe callback: %v \n", msgs[i]) - } - - return consumer.ConsumeSuccess, nil - }) - if err != nil { - fmt.Println(err.Error()) - } - // Note: start after subscribe - err = c.Start() - if err != nil { - fmt.Println(err.Error()) - os.Exit(-1) - } - time.Sleep(time.Hour) - err = c.Shutdown() - if err != nil { - fmt.Printf("shutdown Consumer error: %s", err.Error()) - } -} +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +func main() { + c, _ := rocketmq.NewPushConsumer( + consumer.WithGroupName("testGroup"), + consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), + consumer.WithTls(true), + ) + err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, + msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { + for i := range msgs { + fmt.Printf("subscribe callback: %v \n", msgs[i]) + } + + return consumer.ConsumeSuccess, nil + }) + if err != nil { + fmt.Println(err.Error()) + } + // Note: start after subscribe + err = c.Start() + if err != nil { + fmt.Println(err.Error()) + os.Exit(-1) + } + time.Sleep(time.Hour) + err = c.Shutdown() + if err != nil { + fmt.Printf("shutdown Consumer error: %s", err.Error()) + } +} diff --git a/examples/producer/rpc/async/main.go b/examples/producer/rpc/async/main.go index c6676d3f..c0ce57b9 100644 --- a/examples/producer/rpc/async/main.go +++ b/examples/producer/rpc/async/main.go @@ -15,7 +15,6 @@ See the License for the specific language governing permissions and limitations under the License. */ - package main import ( diff --git a/examples/producer/tls/main.go b/examples/producer/tls/main.go index c926c054..ddaf1651 100644 --- a/examples/producer/tls/main.go +++ b/examples/producer/tls/main.go @@ -1,62 +1,62 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "context" - "fmt" - "os" - "strconv" - - "github.com/apache/rocketmq-client-go/v2" - "github.com/apache/rocketmq-client-go/v2/primitive" - "github.com/apache/rocketmq-client-go/v2/producer" -) - -// Package main implements a simple producer to send message. -func main() { - p, _ := rocketmq.NewProducer( - producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), - producer.WithRetry(2), - producer.WithTls(true), - ) - err := p.Start() - if err != nil { - fmt.Printf("start producer error: %s", err.Error()) - os.Exit(1) - } - topic := "test" - - for i := 0; i < 10; i++ { - msg := &primitive.Message{ - Topic: topic, - Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)), - } - res, err := p.SendSync(context.Background(), msg) - - if err != nil { - fmt.Printf("send message error: %s\n", err) - } else { - fmt.Printf("send message success: result=%s\n", res.String()) - } - } - err = p.Shutdown() - if err != nil { - fmt.Printf("shutdown producer error: %s", err.Error()) - } -} +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "os" + "strconv" + + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/apache/rocketmq-client-go/v2/producer" +) + +// Package main implements a simple producer to send message. +func main() { + p, _ := rocketmq.NewProducer( + producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), + producer.WithRetry(2), + producer.WithTls(true), + ) + err := p.Start() + if err != nil { + fmt.Printf("start producer error: %s", err.Error()) + os.Exit(1) + } + topic := "test" + + for i := 0; i < 10; i++ { + msg := &primitive.Message{ + Topic: topic, + Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)), + } + res, err := p.SendSync(context.Background(), msg) + + if err != nil { + fmt.Printf("send message error: %s\n", err) + } else { + fmt.Printf("send message success: result=%s\n", res.String()) + } + } + err = p.Shutdown() + if err != nil { + fmt.Printf("shutdown producer error: %s", err.Error()) + } +}