Skip to content

Commit

Permalink
fix: [2.4] replicate message exception when the ttMsgEnable config is…
Browse files Browse the repository at this point in the history
… changed dynamically (#38440)

- issue: #38177
- pr: #38178

Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG authored Dec 14, 2024
1 parent 1da4ac4 commit 4c896c6
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 38 deletions.
2 changes: 1 addition & 1 deletion internal/datanode/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(config.serverID)).Inc()
log.Info("datanode AsProducer", zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick.GetValue()))

m.EnableProduce(true)
m.ForceEnableProduce(true)

updater = newMqStatsUpdater(config, m)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/flow_graph_dmstream_input_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (mtm *mockTtMsgStream) CheckTopicValid(channel string) error {
return nil
}

func (mtm *mockTtMsgStream) EnableProduce(can bool) {
func (mtm *mockTtMsgStream) ForceEnableProduce(can bool) {
}

func TestNewDmInputNode(t *testing.T) {
Expand Down
14 changes: 7 additions & 7 deletions internal/proxy/mock_msgstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

type mockMsgStream struct {
msgstream.MsgStream
asProducer func([]string)
setRepack func(repackFunc msgstream.RepackFunc)
close func()
enableProduce func(bool)
asProducer func([]string)
setRepack func(repackFunc msgstream.RepackFunc)
close func()
forceEnableProduce func(bool)
}

func (m *mockMsgStream) AsProducer(producers []string) {
Expand All @@ -34,9 +34,9 @@ func (m *mockMsgStream) Close() {
}
}

func (m *mockMsgStream) EnableProduce(enabled bool) {
if m.enableProduce != nil {
m.enableProduce(enabled)
func (m *mockMsgStream) ForceEnableProduce(enabled bool) {
if m.forceEnableProduce != nil {
m.forceEnableProduce(enabled)
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (ms *simpleMockMsgStream) CheckTopicValid(topic string) error {
return nil
}

func (ms *simpleMockMsgStream) EnableProduce(enabled bool) {
func (ms *simpleMockMsgStream) ForceEnableProduce(enabled bool) {
}

func newSimpleMockMsgStream() *simpleMockMsgStream {
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (node *Proxy) Init() error {
zap.Error(err))
return err
}
node.replicateMsgStream.EnableProduce(true)
node.replicateMsgStream.ForceEnableProduce(true)
node.replicateMsgStream.AsProducer([]string{replicateMsgChannel})

node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory)
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/replicate_stream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (m *ReplicateStreamManager) newMsgStreamResource(channel string) resource.N
}
msgStream.SetRepackFunc(replicatePackFunc)
msgStream.AsProducer([]string{channel})
msgStream.EnableProduce(true)
msgStream.ForceEnableProduce(true)

res := resource.NewSimpleResource(msgStream, ReplicateMsgStreamTyp, channel, ReplicateMsgStreamExpireTime, func() {
msgStream.Close()
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/replicate_stream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestReplicateManager(t *testing.T) {
mockMsgStream.asProducer = func(producers []string) {
i++
}
mockMsgStream.enableProduce = func(b bool) {
mockMsgStream.forceEnableProduce = func(b bool) {
i++
}
mockMsgStream.close = func() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/mq/msgstream/mock_msgstream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 19 additions & 17 deletions pkg/mq/msgstream/mq_msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ type mqMsgStream struct {
consumers map[string]mqwrapper.Consumer
consumerChannels []string

repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
closeRWMutex *sync.RWMutex
streamCancel func()
bufSize int64
producerLock *sync.RWMutex
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
enableProduce atomic.Value
configEvent config.EventHandler
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
closeRWMutex *sync.RWMutex
streamCancel func()
bufSize int64
producerLock *sync.RWMutex
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
ttMsgEnable atomic.Value
forceEnableProduce atomic.Value
configEvent config.EventHandler
}

// NewMqMsgStream is used to generate a new mqMsgStream object
Expand Down Expand Up @@ -104,14 +105,15 @@ func NewMqMsgStream(ctx context.Context,
closed: 0,
}
ctxLog := log.Ctx(ctx)
stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.forceEnableProduce.Store(false)
stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) {
value, err := strconv.ParseBool(event.Value)
if err != nil {
ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err))
return
}
stream.enableProduce.Store(value)
stream.ttMsgEnable.Store(value)
ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce()))
})
paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent)
Expand Down Expand Up @@ -265,12 +267,12 @@ func (ms *mqMsgStream) GetProduceChannels() []string {
return ms.producerChannels
}

func (ms *mqMsgStream) EnableProduce(can bool) {
ms.enableProduce.Store(can)
func (ms *mqMsgStream) ForceEnableProduce(can bool) {
ms.forceEnableProduce.Store(can)
}

func (ms *mqMsgStream) isEnabledProduce() bool {
return ms.enableProduce.Load().(bool)
return ms.forceEnableProduce.Load().(bool) || ms.ttMsgEnable.Load().(bool)
}

func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
Expand Down
12 changes: 8 additions & 4 deletions pkg/mq/msgstream/mq_msgstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func TestStream_ConfigEvent(t *testing.T) {
}

func TestStream_PulsarMsgStream_Insert(t *testing.T) {
Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false")
defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key)
pulsarAddress := getPulsarAddress()
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
Expand All @@ -129,12 +131,12 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)

{
inputStream.EnableProduce(false)
inputStream.ForceEnableProduce(false)
err := inputStream.Produce(&msgPack)
require.Error(t, err)
}

inputStream.EnableProduce(true)
inputStream.ForceEnableProduce(true)
err := inputStream.Produce(&msgPack)
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))

Expand Down Expand Up @@ -187,6 +189,8 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
}

func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false")
defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key)
pulsarAddress := getPulsarAddress()
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
Expand All @@ -202,12 +206,12 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)

{
inputStream.EnableProduce(false)
inputStream.ForceEnableProduce(false)
_, err := inputStream.Broadcast(&msgPack)
require.Error(t, err)
}

inputStream.EnableProduce(true)
inputStream.ForceEnableProduce(true)
_, err := inputStream.Broadcast(&msgPack)
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))

Expand Down
2 changes: 1 addition & 1 deletion pkg/mq/msgstream/msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type MsgStream interface {
GetLatestMsgID(channel string) (MessageID, error)
CheckTopicValid(channel string) error

EnableProduce(can bool)
ForceEnableProduce(can bool)
}

type Factory interface {
Expand Down

0 comments on commit 4c896c6

Please sign in to comment.