From 72eb39c3aac35cd69b96e03a905fb06e4e3c3a20 Mon Sep 17 00:00:00 2001 From: SimFG Date: Tue, 3 Dec 2024 15:51:24 +0800 Subject: [PATCH] fix: the issue of replicate message exception when the ttMsgEnable config is changed dynamically Signed-off-by: SimFG --- internal/datacoord/mock_test.go | 2 +- .../flow_graph_dmstream_input_node_test.go | 2 +- internal/mocks/mock_grpc_client.go | 1 - internal/proxy/impl_test.go | 2 +- internal/proxy/mock_msgstream_test.go | 14 ++++---- internal/proxy/mock_test.go | 2 +- internal/proxy/proxy.go | 2 +- internal/proxy/replicate_stream_manager.go | 2 +- .../proxy/replicate_stream_manager_test.go | 2 +- internal/rootcoord/util.go | 18 ++++++++-- pkg/go.sum | 2 -- pkg/mq/msgstream/mock_msgstream.go | 20 +++++------ pkg/mq/msgstream/mq_msgstream.go | 36 ++++++++++--------- pkg/mq/msgstream/mq_msgstream_test.go | 12 ++++--- pkg/mq/msgstream/msgstream.go | 2 +- 15 files changed, 68 insertions(+), 51 deletions(-) diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 9f64dbb7223af..2a1973d916497 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -18,7 +18,6 @@ package datacoord import ( "context" - "github.com/milvus-io/milvus/pkg/kv" "testing" "time" @@ -38,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/tsoutil" diff --git a/internal/flushcommon/pipeline/flow_graph_dmstream_input_node_test.go b/internal/flushcommon/pipeline/flow_graph_dmstream_input_node_test.go index 4a6fcda1a46db..2f6a298a206fd 100644 --- a/internal/flushcommon/pipeline/flow_graph_dmstream_input_node_test.go +++ b/internal/flushcommon/pipeline/flow_graph_dmstream_input_node_test.go @@ -100,7 +100,7 @@ func (mtm *mockTtMsgStream) CheckTopicValid(channel string) error { return nil } -func (mtm *mockTtMsgStream) EnableProduce(can bool) { +func (mtm *mockTtMsgStream) ForceEnableProduce(can bool) { } func TestNewDmInputNode(t *testing.T) { diff --git a/internal/mocks/mock_grpc_client.go b/internal/mocks/mock_grpc_client.go index 4d722f96ef1fa..383f8ff3309fe 100644 --- a/internal/mocks/mock_grpc_client.go +++ b/internal/mocks/mock_grpc_client.go @@ -360,7 +360,6 @@ func (_c *MockGrpcClient_SetInternalTLSCertPool_Call[T]) RunAndReturn(run func(* return _c } - // SetInternalTLSServerName provides a mock function with given fields: cp func (_m *MockGrpcClient[T]) SetInternalTLSServerName(cp string) { _m.Called(cp) diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index 8e995d9537226..2e7287d93d8e7 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -1497,7 +1497,7 @@ func TestProxy_ReplicateMessage(t *testing.T) { msgStreamObj := msgstream.NewMockMsgStream(t) msgStreamObj.EXPECT().SetRepackFunc(mock.Anything).Return() msgStreamObj.EXPECT().AsProducer(mock.Anything, mock.Anything).Return() - msgStreamObj.EXPECT().EnableProduce(mock.Anything).Return() + msgStreamObj.EXPECT().ForceEnableProduce(mock.Anything).Return() msgStreamObj.EXPECT().Close().Return() mockMsgID1 := mqcommon.NewMockMessageID(t) mockMsgID2 := mqcommon.NewMockMessageID(t) diff --git a/internal/proxy/mock_msgstream_test.go b/internal/proxy/mock_msgstream_test.go index 9b99888d88f14..93cf069bf49a7 100644 --- a/internal/proxy/mock_msgstream_test.go +++ b/internal/proxy/mock_msgstream_test.go @@ -10,10 +10,10 @@ import ( type mockMsgStream struct { msgstream.MsgStream - asProducer func([]string) - setRepack func(repackFunc msgstream.RepackFunc) - close func() - enableProduce func(bool) + asProducer func([]string) + setRepack func(repackFunc msgstream.RepackFunc) + close func() + forceEnableProduce func(bool) } func (m *mockMsgStream) AsProducer(ctx context.Context, producers []string) { @@ -34,9 +34,9 @@ func (m *mockMsgStream) Close() { } } -func (m *mockMsgStream) EnableProduce(enabled bool) { - if m.enableProduce != nil { - m.enableProduce(enabled) +func (m *mockMsgStream) ForceEnableProduce(enabled bool) { + if m.forceEnableProduce != nil { + m.forceEnableProduce(enabled) } } diff --git a/internal/proxy/mock_test.go b/internal/proxy/mock_test.go index 08b1a2723226a..2b89f4fbc3e5d 100644 --- a/internal/proxy/mock_test.go +++ b/internal/proxy/mock_test.go @@ -311,7 +311,7 @@ func (ms *simpleMockMsgStream) CheckTopicValid(topic string) error { return nil } -func (ms *simpleMockMsgStream) EnableProduce(enabled bool) { +func (ms *simpleMockMsgStream) ForceEnableProduce(enabled bool) { } func newSimpleMockMsgStream() *simpleMockMsgStream { diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 1e0915f68feec..500be86a1a63b 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -277,7 +277,7 @@ func (node *Proxy) Init() error { zap.Error(err)) return err } - node.replicateMsgStream.EnableProduce(true) + node.replicateMsgStream.ForceEnableProduce(true) node.replicateMsgStream.AsProducer(node.ctx, []string{replicateMsgChannel}) node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory) diff --git a/internal/proxy/replicate_stream_manager.go b/internal/proxy/replicate_stream_manager.go index 651e09b51f57f..0dce25d311f29 100644 --- a/internal/proxy/replicate_stream_manager.go +++ b/internal/proxy/replicate_stream_manager.go @@ -43,7 +43,7 @@ func (m *ReplicateStreamManager) newMsgStreamResource(ctx context.Context, chann } msgStream.SetRepackFunc(replicatePackFunc) msgStream.AsProducer(ctx, []string{channel}) - msgStream.EnableProduce(true) + msgStream.ForceEnableProduce(true) res := resource.NewSimpleResource(msgStream, ReplicateMsgStreamTyp, channel, ReplicateMsgStreamExpireTime, func() { msgStream.Close() diff --git a/internal/proxy/replicate_stream_manager_test.go b/internal/proxy/replicate_stream_manager_test.go index 962a62ceb3673..a1064ae156733 100644 --- a/internal/proxy/replicate_stream_manager_test.go +++ b/internal/proxy/replicate_stream_manager_test.go @@ -34,7 +34,7 @@ func TestReplicateManager(t *testing.T) { mockMsgStream.asProducer = func(producers []string) { i++ } - mockMsgStream.enableProduce = func(b bool) { + mockMsgStream.forceEnableProduce = func(b bool) { i++ } mockMsgStream.close = func() { diff --git a/internal/rootcoord/util.go b/internal/rootcoord/util.go index 7912f6bd0ad00..8ee2b25463166 100644 --- a/internal/rootcoord/util.go +++ b/internal/rootcoord/util.go @@ -357,10 +357,24 @@ func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordCl }) } } - findMaxLagChannel(queryNodeTTDelay, dataNodeTTDelay) + var errStr string + findMaxLagChannel(queryNodeTTDelay) if maxLag > 0 && len(maxLagChannel) != 0 { - return fmt.Errorf("max timetick lag execced threhold, max timetick lag:%s on channel:%s", maxLag, maxLagChannel) + errStr = fmt.Sprintf("query max timetick lag:%s on channel:%s", maxLag, maxLagChannel) } + maxLagChannel = "" + maxLag = 0 + findMaxLagChannel(dataNodeTTDelay) + if maxLag > 0 && len(maxLagChannel) != 0 { + if errStr != "" { + errStr += ", " + } + errStr += fmt.Sprintf("data max timetick lag:%s on channel:%s", maxLag, maxLagChannel) + } + if errStr != "" { + return fmt.Errorf("max timetick lag execced threhold: %s", errStr) + } + return nil } diff --git a/pkg/go.sum b/pkg/go.sum index 96ab4e4fcb94a..007134b419a88 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -488,8 +488,6 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277 h1:5/35+F32fs6ifVzI1e+VkUNpK0gWyXQSdZVnmNUFrrg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b h1:iPPhnFx+s7FF53UeWj7A4EYhPRMFPL6mHqyQw7qRjeQ= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= diff --git a/pkg/mq/msgstream/mock_msgstream.go b/pkg/mq/msgstream/mock_msgstream.go index 47a1b9cb6db93..169d03a756607 100644 --- a/pkg/mq/msgstream/mock_msgstream.go +++ b/pkg/mq/msgstream/mock_msgstream.go @@ -292,35 +292,35 @@ func (_c *MockMsgStream_Close_Call) RunAndReturn(run func()) *MockMsgStream_Clos return _c } -// EnableProduce provides a mock function with given fields: can -func (_m *MockMsgStream) EnableProduce(can bool) { +// ForceEnableProduce provides a mock function with given fields: can +func (_m *MockMsgStream) ForceEnableProduce(can bool) { _m.Called(can) } -// MockMsgStream_EnableProduce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EnableProduce' -type MockMsgStream_EnableProduce_Call struct { +// MockMsgStream_ForceEnableProduce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceEnableProduce' +type MockMsgStream_ForceEnableProduce_Call struct { *mock.Call } -// EnableProduce is a helper method to define mock.On call +// ForceEnableProduce is a helper method to define mock.On call // - can bool -func (_e *MockMsgStream_Expecter) EnableProduce(can interface{}) *MockMsgStream_EnableProduce_Call { - return &MockMsgStream_EnableProduce_Call{Call: _e.mock.On("EnableProduce", can)} +func (_e *MockMsgStream_Expecter) ForceEnableProduce(can interface{}) *MockMsgStream_ForceEnableProduce_Call { + return &MockMsgStream_ForceEnableProduce_Call{Call: _e.mock.On("ForceEnableProduce", can)} } -func (_c *MockMsgStream_EnableProduce_Call) Run(run func(can bool)) *MockMsgStream_EnableProduce_Call { +func (_c *MockMsgStream_ForceEnableProduce_Call) Run(run func(can bool)) *MockMsgStream_ForceEnableProduce_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(bool)) }) return _c } -func (_c *MockMsgStream_EnableProduce_Call) Return() *MockMsgStream_EnableProduce_Call { +func (_c *MockMsgStream_ForceEnableProduce_Call) Return() *MockMsgStream_ForceEnableProduce_Call { _c.Call.Return() return _c } -func (_c *MockMsgStream_EnableProduce_Call) RunAndReturn(run func(bool)) *MockMsgStream_EnableProduce_Call { +func (_c *MockMsgStream_ForceEnableProduce_Call) RunAndReturn(run func(bool)) *MockMsgStream_ForceEnableProduce_Call { _c.Call.Return(run) return _c } diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 808b4da318865..b4032d48fc56e 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -59,18 +59,19 @@ type mqMsgStream struct { consumers map[string]mqwrapper.Consumer consumerChannels []string - repackFunc RepackFunc - unmarshal UnmarshalDispatcher - receiveBuf chan *MsgPack - closeRWMutex *sync.RWMutex - streamCancel func() - bufSize int64 - producerLock *sync.RWMutex - consumerLock *sync.Mutex - closed int32 - onceChan sync.Once - enableProduce atomic.Value - configEvent config.EventHandler + repackFunc RepackFunc + unmarshal UnmarshalDispatcher + receiveBuf chan *MsgPack + closeRWMutex *sync.RWMutex + streamCancel func() + bufSize int64 + producerLock *sync.RWMutex + consumerLock *sync.Mutex + closed int32 + onceChan sync.Once + ttMsgEnable atomic.Value + forceEnableProduce atomic.Value + configEvent config.EventHandler } // NewMqMsgStream is used to generate a new mqMsgStream object @@ -105,14 +106,15 @@ func NewMqMsgStream(ctx context.Context, closed: 0, } ctxLog := log.Ctx(ctx) - stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool()) + stream.forceEnableProduce.Store(false) + stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool()) stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) { value, err := strconv.ParseBool(event.Value) if err != nil { ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err)) return } - stream.enableProduce.Store(value) + stream.ttMsgEnable.Store(value) ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce())) }) paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent) @@ -266,12 +268,12 @@ func (ms *mqMsgStream) GetProduceChannels() []string { return ms.producerChannels } -func (ms *mqMsgStream) EnableProduce(can bool) { - ms.enableProduce.Store(can) +func (ms *mqMsgStream) ForceEnableProduce(can bool) { + ms.forceEnableProduce.Store(can) } func (ms *mqMsgStream) isEnabledProduce() bool { - return ms.enableProduce.Load().(bool) + return ms.forceEnableProduce.Load().(bool) || ms.ttMsgEnable.Load().(bool) } func (ms *mqMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error { diff --git a/pkg/mq/msgstream/mq_msgstream_test.go b/pkg/mq/msgstream/mq_msgstream_test.go index b46ca44533942..6c9973f3373cd 100644 --- a/pkg/mq/msgstream/mq_msgstream_test.go +++ b/pkg/mq/msgstream/mq_msgstream_test.go @@ -114,6 +114,8 @@ func TestStream_ConfigEvent(t *testing.T) { } func TestStream_PulsarMsgStream_Insert(t *testing.T) { + Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false") + defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key) pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -129,12 +131,12 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) { outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) { - inputStream.EnableProduce(false) + inputStream.ForceEnableProduce(false) err := inputStream.Produce(ctx, &msgPack) require.Error(t, err) } - inputStream.EnableProduce(true) + inputStream.ForceEnableProduce(true) err := inputStream.Produce(ctx, &msgPack) require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) @@ -187,6 +189,8 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { } func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { + Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false") + defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key) pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -202,12 +206,12 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) { - inputStream.EnableProduce(false) + inputStream.ForceEnableProduce(false) _, err := inputStream.Broadcast(ctx, &msgPack) require.Error(t, err) } - inputStream.EnableProduce(true) + inputStream.ForceEnableProduce(true) _, err := inputStream.Broadcast(ctx, &msgPack) require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) diff --git a/pkg/mq/msgstream/msgstream.go b/pkg/mq/msgstream/msgstream.go index 6dcc8271e2617..24709de81cf90 100644 --- a/pkg/mq/msgstream/msgstream.go +++ b/pkg/mq/msgstream/msgstream.go @@ -70,7 +70,7 @@ type MsgStream interface { GetLatestMsgID(channel string) (MessageID, error) CheckTopicValid(channel string) error - EnableProduce(can bool) + ForceEnableProduce(can bool) } type Factory interface {