Skip to content

Commit

Permalink
fix: the issue of replicate message exception when the ttMsgEnable co…
Browse files Browse the repository at this point in the history
…nfig is changed dynamically

Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Dec 13, 2024
1 parent efbfa1c commit 72eb39c
Show file tree
Hide file tree
Showing 15 changed files with 68 additions and 51 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package datacoord

import (
"context"
"github.com/milvus-io/milvus/pkg/kv"
"testing"
"time"

Expand All @@ -38,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (mtm *mockTtMsgStream) CheckTopicValid(channel string) error {
return nil
}

func (mtm *mockTtMsgStream) EnableProduce(can bool) {
func (mtm *mockTtMsgStream) ForceEnableProduce(can bool) {
}

func TestNewDmInputNode(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion internal/mocks/mock_grpc_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/proxy/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,7 @@ func TestProxy_ReplicateMessage(t *testing.T) {
msgStreamObj := msgstream.NewMockMsgStream(t)
msgStreamObj.EXPECT().SetRepackFunc(mock.Anything).Return()
msgStreamObj.EXPECT().AsProducer(mock.Anything, mock.Anything).Return()
msgStreamObj.EXPECT().EnableProduce(mock.Anything).Return()
msgStreamObj.EXPECT().ForceEnableProduce(mock.Anything).Return()
msgStreamObj.EXPECT().Close().Return()
mockMsgID1 := mqcommon.NewMockMessageID(t)
mockMsgID2 := mqcommon.NewMockMessageID(t)
Expand Down
14 changes: 7 additions & 7 deletions internal/proxy/mock_msgstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

type mockMsgStream struct {
msgstream.MsgStream
asProducer func([]string)
setRepack func(repackFunc msgstream.RepackFunc)
close func()
enableProduce func(bool)
asProducer func([]string)
setRepack func(repackFunc msgstream.RepackFunc)
close func()
forceEnableProduce func(bool)
}

func (m *mockMsgStream) AsProducer(ctx context.Context, producers []string) {
Expand All @@ -34,9 +34,9 @@ func (m *mockMsgStream) Close() {
}
}

func (m *mockMsgStream) EnableProduce(enabled bool) {
if m.enableProduce != nil {
m.enableProduce(enabled)
func (m *mockMsgStream) ForceEnableProduce(enabled bool) {
if m.forceEnableProduce != nil {
m.forceEnableProduce(enabled)
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (ms *simpleMockMsgStream) CheckTopicValid(topic string) error {
return nil
}

func (ms *simpleMockMsgStream) EnableProduce(enabled bool) {
func (ms *simpleMockMsgStream) ForceEnableProduce(enabled bool) {
}

func newSimpleMockMsgStream() *simpleMockMsgStream {
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (node *Proxy) Init() error {
zap.Error(err))
return err
}
node.replicateMsgStream.EnableProduce(true)
node.replicateMsgStream.ForceEnableProduce(true)
node.replicateMsgStream.AsProducer(node.ctx, []string{replicateMsgChannel})

node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory)
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/replicate_stream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (m *ReplicateStreamManager) newMsgStreamResource(ctx context.Context, chann
}
msgStream.SetRepackFunc(replicatePackFunc)
msgStream.AsProducer(ctx, []string{channel})
msgStream.EnableProduce(true)
msgStream.ForceEnableProduce(true)

res := resource.NewSimpleResource(msgStream, ReplicateMsgStreamTyp, channel, ReplicateMsgStreamExpireTime, func() {
msgStream.Close()
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/replicate_stream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestReplicateManager(t *testing.T) {
mockMsgStream.asProducer = func(producers []string) {
i++
}
mockMsgStream.enableProduce = func(b bool) {
mockMsgStream.forceEnableProduce = func(b bool) {
i++
}
mockMsgStream.close = func() {
Expand Down
18 changes: 16 additions & 2 deletions internal/rootcoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,24 @@ func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordCl
})
}
}
findMaxLagChannel(queryNodeTTDelay, dataNodeTTDelay)

var errStr string
findMaxLagChannel(queryNodeTTDelay)
if maxLag > 0 && len(maxLagChannel) != 0 {
return fmt.Errorf("max timetick lag execced threhold, max timetick lag:%s on channel:%s", maxLag, maxLagChannel)
errStr = fmt.Sprintf("query max timetick lag:%s on channel:%s", maxLag, maxLagChannel)

Check warning on line 364 in internal/rootcoord/util.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/util.go#L364

Added line #L364 was not covered by tests
}
maxLagChannel = ""
maxLag = 0
findMaxLagChannel(dataNodeTTDelay)
if maxLag > 0 && len(maxLagChannel) != 0 {
if errStr != "" {
errStr += ", "
}

Check warning on line 372 in internal/rootcoord/util.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/util.go#L371-L372

Added lines #L371 - L372 were not covered by tests
errStr += fmt.Sprintf("data max timetick lag:%s on channel:%s", maxLag, maxLagChannel)
}
if errStr != "" {
return fmt.Errorf("max timetick lag execced threhold: %s", errStr)
}

return nil
}
2 changes: 0 additions & 2 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,6 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277 h1:5/35+F32fs6ifVzI1e+VkUNpK0gWyXQSdZVnmNUFrrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241204065646-180ce3a8d277/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b h1:iPPhnFx+s7FF53UeWj7A4EYhPRMFPL6mHqyQw7qRjeQ=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
Expand Down
20 changes: 10 additions & 10 deletions pkg/mq/msgstream/mock_msgstream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 19 additions & 17 deletions pkg/mq/msgstream/mq_msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,19 @@ type mqMsgStream struct {
consumers map[string]mqwrapper.Consumer
consumerChannels []string

repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
closeRWMutex *sync.RWMutex
streamCancel func()
bufSize int64
producerLock *sync.RWMutex
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
enableProduce atomic.Value
configEvent config.EventHandler
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
closeRWMutex *sync.RWMutex
streamCancel func()
bufSize int64
producerLock *sync.RWMutex
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
ttMsgEnable atomic.Value
forceEnableProduce atomic.Value
configEvent config.EventHandler
}

// NewMqMsgStream is used to generate a new mqMsgStream object
Expand Down Expand Up @@ -105,14 +106,15 @@ func NewMqMsgStream(ctx context.Context,
closed: 0,
}
ctxLog := log.Ctx(ctx)
stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.forceEnableProduce.Store(false)
stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) {
value, err := strconv.ParseBool(event.Value)
if err != nil {
ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err))
return
}
stream.enableProduce.Store(value)
stream.ttMsgEnable.Store(value)
ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce()))
})
paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent)
Expand Down Expand Up @@ -266,12 +268,12 @@ func (ms *mqMsgStream) GetProduceChannels() []string {
return ms.producerChannels
}

func (ms *mqMsgStream) EnableProduce(can bool) {
ms.enableProduce.Store(can)
func (ms *mqMsgStream) ForceEnableProduce(can bool) {
ms.forceEnableProduce.Store(can)
}

func (ms *mqMsgStream) isEnabledProduce() bool {
return ms.enableProduce.Load().(bool)
return ms.forceEnableProduce.Load().(bool) || ms.ttMsgEnable.Load().(bool)
}

func (ms *mqMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error {
Expand Down
12 changes: 8 additions & 4 deletions pkg/mq/msgstream/mq_msgstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func TestStream_ConfigEvent(t *testing.T) {
}

func TestStream_PulsarMsgStream_Insert(t *testing.T) {
Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false")
defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key)
pulsarAddress := getPulsarAddress()
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
Expand All @@ -129,12 +131,12 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)

{
inputStream.EnableProduce(false)
inputStream.ForceEnableProduce(false)
err := inputStream.Produce(ctx, &msgPack)
require.Error(t, err)
}

inputStream.EnableProduce(true)
inputStream.ForceEnableProduce(true)
err := inputStream.Produce(ctx, &msgPack)
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))

Expand Down Expand Up @@ -187,6 +189,8 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
}

func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false")
defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key)
pulsarAddress := getPulsarAddress()
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
Expand All @@ -202,12 +206,12 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)

{
inputStream.EnableProduce(false)
inputStream.ForceEnableProduce(false)
_, err := inputStream.Broadcast(ctx, &msgPack)
require.Error(t, err)
}

inputStream.EnableProduce(true)
inputStream.ForceEnableProduce(true)
_, err := inputStream.Broadcast(ctx, &msgPack)
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))

Expand Down
2 changes: 1 addition & 1 deletion pkg/mq/msgstream/msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type MsgStream interface {
GetLatestMsgID(channel string) (MessageID, error)
CheckTopicValid(channel string) error

EnableProduce(can bool)
ForceEnableProduce(can bool)
}

type Factory interface {
Expand Down

0 comments on commit 72eb39c

Please sign in to comment.