From e8529e7fa8eb721d96f2839899ddf612f0257f94 Mon Sep 17 00:00:00 2001 From: SimFG Date: Fri, 13 Dec 2024 15:27:27 +0800 Subject: [PATCH] fix: [2.4] replicate message exception when the ttMsgEnable config is changed dynamically Signed-off-by: SimFG --- internal/datanode/data_sync_service.go | 2 +- .../flow_graph_dmstream_input_node_test.go | 2 +- internal/proxy/mock_msgstream_test.go | 14 ++++---- internal/proxy/mock_test.go | 2 +- internal/proxy/proxy.go | 2 +- internal/proxy/replicate_stream_manager.go | 2 +- .../proxy/replicate_stream_manager_test.go | 2 +- pkg/mq/msgstream/mock_msgstream.go | 6 ++-- pkg/mq/msgstream/mq_msgstream.go | 36 ++++++++++--------- pkg/mq/msgstream/mq_msgstream_test.go | 12 ++++--- pkg/mq/msgstream/msgstream.go | 2 +- 11 files changed, 44 insertions(+), 38 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index f399e45508c90..f7628101265a1 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -283,7 +283,7 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(config.serverID)).Inc() log.Info("datanode AsProducer", zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick.GetValue())) - m.EnableProduce(true) + m.ForceEnableProduce(true) updater = newMqStatsUpdater(config, m) } diff --git a/internal/datanode/flow_graph_dmstream_input_node_test.go b/internal/datanode/flow_graph_dmstream_input_node_test.go index 14315aae988bd..d4edc3cb93fd6 100644 --- a/internal/datanode/flow_graph_dmstream_input_node_test.go +++ b/internal/datanode/flow_graph_dmstream_input_node_test.go @@ -103,7 +103,7 @@ func (mtm *mockTtMsgStream) CheckTopicValid(channel string) error { return nil } -func (mtm *mockTtMsgStream) EnableProduce(can bool) { +func (mtm *mockTtMsgStream) ForceEnableProduce(can bool) { } func TestNewDmInputNode(t *testing.T) { diff --git a/internal/proxy/mock_msgstream_test.go b/internal/proxy/mock_msgstream_test.go index 613dd97b94057..4e5a09a6cf5ce 100644 --- a/internal/proxy/mock_msgstream_test.go +++ b/internal/proxy/mock_msgstream_test.go @@ -10,10 +10,10 @@ import ( type mockMsgStream struct { msgstream.MsgStream - asProducer func([]string) - setRepack func(repackFunc msgstream.RepackFunc) - close func() - enableProduce func(bool) + asProducer func([]string) + setRepack func(repackFunc msgstream.RepackFunc) + close func() + forceEnableProduce func(bool) } func (m *mockMsgStream) AsProducer(producers []string) { @@ -34,9 +34,9 @@ func (m *mockMsgStream) Close() { } } -func (m *mockMsgStream) EnableProduce(enabled bool) { - if m.enableProduce != nil { - m.enableProduce(enabled) +func (m *mockMsgStream) ForceEnableProduce(enabled bool) { + if m.forceEnableProduce != nil { + m.forceEnableProduce(enabled) } } diff --git a/internal/proxy/mock_test.go b/internal/proxy/mock_test.go index 8132e5bb3a06d..0d4f01ce95ad3 100644 --- a/internal/proxy/mock_test.go +++ b/internal/proxy/mock_test.go @@ -311,7 +311,7 @@ func (ms *simpleMockMsgStream) CheckTopicValid(topic string) error { return nil } -func (ms *simpleMockMsgStream) EnableProduce(enabled bool) { +func (ms *simpleMockMsgStream) ForceEnableProduce(enabled bool) { } func newSimpleMockMsgStream() *simpleMockMsgStream { diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index ec79bfaa18b1d..2e4b8d2bbab6e 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -272,7 +272,7 @@ func (node *Proxy) Init() error { zap.Error(err)) return err } - node.replicateMsgStream.EnableProduce(true) + node.replicateMsgStream.ForceEnableProduce(true) node.replicateMsgStream.AsProducer([]string{replicateMsgChannel}) node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory) diff --git a/internal/proxy/replicate_stream_manager.go b/internal/proxy/replicate_stream_manager.go index 5bf01d1f6e244..51a26d639bec1 100644 --- a/internal/proxy/replicate_stream_manager.go +++ b/internal/proxy/replicate_stream_manager.go @@ -43,7 +43,7 @@ func (m *ReplicateStreamManager) newMsgStreamResource(channel string) resource.N } msgStream.SetRepackFunc(replicatePackFunc) msgStream.AsProducer([]string{channel}) - msgStream.EnableProduce(true) + msgStream.ForceEnableProduce(true) res := resource.NewSimpleResource(msgStream, ReplicateMsgStreamTyp, channel, ReplicateMsgStreamExpireTime, func() { msgStream.Close() diff --git a/internal/proxy/replicate_stream_manager_test.go b/internal/proxy/replicate_stream_manager_test.go index 962a62ceb3673..a1064ae156733 100644 --- a/internal/proxy/replicate_stream_manager_test.go +++ b/internal/proxy/replicate_stream_manager_test.go @@ -34,7 +34,7 @@ func TestReplicateManager(t *testing.T) { mockMsgStream.asProducer = func(producers []string) { i++ } - mockMsgStream.enableProduce = func(b bool) { + mockMsgStream.forceEnableProduce = func(b bool) { i++ } mockMsgStream.close = func() { diff --git a/pkg/mq/msgstream/mock_msgstream.go b/pkg/mq/msgstream/mock_msgstream.go index 8d1d2dbfad1b2..da6afc22216e3 100644 --- a/pkg/mq/msgstream/mock_msgstream.go +++ b/pkg/mq/msgstream/mock_msgstream.go @@ -274,11 +274,11 @@ func (_c *MockMsgStream_Close_Call) RunAndReturn(run func()) *MockMsgStream_Clos } // EnableProduce provides a mock function with given fields: can -func (_m *MockMsgStream) EnableProduce(can bool) { +func (_m *MockMsgStream) ForceEnableProduce(can bool) { _m.Called(can) } -// MockMsgStream_EnableProduce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EnableProduce' +// MockMsgStream_EnableProduce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceEnableProduce' type MockMsgStream_EnableProduce_Call struct { *mock.Call } @@ -286,7 +286,7 @@ type MockMsgStream_EnableProduce_Call struct { // EnableProduce is a helper method to define mock.On call // - can bool func (_e *MockMsgStream_Expecter) EnableProduce(can interface{}) *MockMsgStream_EnableProduce_Call { - return &MockMsgStream_EnableProduce_Call{Call: _e.mock.On("EnableProduce", can)} + return &MockMsgStream_EnableProduce_Call{Call: _e.mock.On("ForceEnableProduce", can)} } func (_c *MockMsgStream_EnableProduce_Call) Run(run func(can bool)) *MockMsgStream_EnableProduce_Call { diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index ab67c6733a634..4327a2d1e6346 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -58,18 +58,19 @@ type mqMsgStream struct { consumers map[string]mqwrapper.Consumer consumerChannels []string - repackFunc RepackFunc - unmarshal UnmarshalDispatcher - receiveBuf chan *MsgPack - closeRWMutex *sync.RWMutex - streamCancel func() - bufSize int64 - producerLock *sync.RWMutex - consumerLock *sync.Mutex - closed int32 - onceChan sync.Once - enableProduce atomic.Value - configEvent config.EventHandler + repackFunc RepackFunc + unmarshal UnmarshalDispatcher + receiveBuf chan *MsgPack + closeRWMutex *sync.RWMutex + streamCancel func() + bufSize int64 + producerLock *sync.RWMutex + consumerLock *sync.Mutex + closed int32 + onceChan sync.Once + ttMsgEnable atomic.Value + forceEnableProduce atomic.Value + configEvent config.EventHandler } // NewMqMsgStream is used to generate a new mqMsgStream object @@ -104,14 +105,15 @@ func NewMqMsgStream(ctx context.Context, closed: 0, } ctxLog := log.Ctx(ctx) - stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool()) + stream.forceEnableProduce.Store(false) + stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool()) stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) { value, err := strconv.ParseBool(event.Value) if err != nil { ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err)) return } - stream.enableProduce.Store(value) + stream.ttMsgEnable.Store(value) ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce())) }) paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent) @@ -265,12 +267,12 @@ func (ms *mqMsgStream) GetProduceChannels() []string { return ms.producerChannels } -func (ms *mqMsgStream) EnableProduce(can bool) { - ms.enableProduce.Store(can) +func (ms *mqMsgStream) ForceEnableProduce(can bool) { + ms.forceEnableProduce.Store(can) } func (ms *mqMsgStream) isEnabledProduce() bool { - return ms.enableProduce.Load().(bool) + return ms.forceEnableProduce.Load().(bool) || ms.ttMsgEnable.Load().(bool) } func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { diff --git a/pkg/mq/msgstream/mq_msgstream_test.go b/pkg/mq/msgstream/mq_msgstream_test.go index 9870a14422a92..c1bad0c5e4684 100644 --- a/pkg/mq/msgstream/mq_msgstream_test.go +++ b/pkg/mq/msgstream/mq_msgstream_test.go @@ -114,6 +114,8 @@ func TestStream_ConfigEvent(t *testing.T) { } func TestStream_PulsarMsgStream_Insert(t *testing.T) { + Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false") + defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key) pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -129,12 +131,12 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) { outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) { - inputStream.EnableProduce(false) + inputStream.ForceEnableProduce(false) err := inputStream.Produce(&msgPack) require.Error(t, err) } - inputStream.EnableProduce(true) + inputStream.ForceEnableProduce(true) err := inputStream.Produce(&msgPack) require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) @@ -187,6 +189,8 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { } func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { + Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false") + defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key) pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -202,12 +206,12 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) { - inputStream.EnableProduce(false) + inputStream.ForceEnableProduce(false) _, err := inputStream.Broadcast(&msgPack) require.Error(t, err) } - inputStream.EnableProduce(true) + inputStream.ForceEnableProduce(true) _, err := inputStream.Broadcast(&msgPack) require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) diff --git a/pkg/mq/msgstream/msgstream.go b/pkg/mq/msgstream/msgstream.go index 4d6e3b0a9c883..91d533783d545 100644 --- a/pkg/mq/msgstream/msgstream.go +++ b/pkg/mq/msgstream/msgstream.go @@ -70,7 +70,7 @@ type MsgStream interface { GetLatestMsgID(channel string) (MessageID, error) CheckTopicValid(channel string) error - EnableProduce(can bool) + ForceEnableProduce(can bool) } type Factory interface {