diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index a5b17f66..27998415 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -502,7 +502,9 @@ func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*pr rlog.LogKeyValueChangedFrom: data.SubVersion, rlog.LogKeyValueChangedTo: newVersion, }) + data.Lock() data.SubVersion = newVersion + data.Unlock() // TODO: optimize count := 0 diff --git a/internal/model.go b/internal/model.go index 36e06a10..3cc59d0a 100644 --- a/internal/model.go +++ b/internal/model.go @@ -25,6 +25,7 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/apache/rocketmq-client-go/v2/internal/utils" "github.com/apache/rocketmq-client-go/v2/primitive" @@ -60,9 +61,20 @@ type SubscriptionData struct { Codes utils.Set `json:"codeSet"` SubVersion int64 `json:"subVersion"` ExpType string `json:"expressionType"` + mux sync.RWMutex +} + +func (sd *SubscriptionData) Lock() { + sd.mux.Lock() +} + +func (sd *SubscriptionData) Unlock() { + sd.mux.Unlock() } func (sd *SubscriptionData) Clone() *SubscriptionData { + sd.mux.RLock() + defer sd.mux.RUnlock() cloned := &SubscriptionData{ ClassFilterMode: sd.ClassFilterMode, Topic: sd.Topic,