Skip to content

Commit

Permalink
fix: the issue of replicate message exception when the ttMsgEnable co…
Browse files Browse the repository at this point in the history
…nfig is changed dynamically

Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Dec 3, 2024
1 parent 767b7e6 commit fbfae41
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
3 changes: 2 additions & 1 deletion internal/datacoord/knapsack.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"sort"

"github.com/bits-and-blooms/bitset"
"github.com/milvus-io/milvus/pkg/log"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
)

type Sizable interface {
Expand Down
4 changes: 2 additions & 2 deletions internal/rootcoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordCl
delay := now.Sub(minTt)

if delay.Milliseconds() >= maxDelay.Milliseconds() {
queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel, delay)
queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel+"/query", delay)
}
}
}
Expand All @@ -331,7 +331,7 @@ func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordCl
delay := now.Sub(minTt)

if delay.Milliseconds() >= maxDelay.Milliseconds() {
dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel, delay)
dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel+"/data", delay)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/mq/msgstream/mq_msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type mqMsgStream struct {
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
ttMsgEnable atomic.Value
enableProduce atomic.Value
configEvent config.EventHandler
}
Expand Down Expand Up @@ -104,14 +105,14 @@ func NewMqMsgStream(ctx context.Context,
closed: 0,
}
ctxLog := log.Ctx(ctx)
stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) {
value, err := strconv.ParseBool(event.Value)
if err != nil {
ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err))
return
}
stream.enableProduce.Store(value)
stream.ttMsgEnable.Store(value)
ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce()))
})
paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent)
Expand Down Expand Up @@ -270,7 +271,7 @@ func (ms *mqMsgStream) EnableProduce(can bool) {
}

func (ms *mqMsgStream) isEnabledProduce() bool {
return ms.enableProduce.Load().(bool)
return ms.enableProduce.Load().(bool) || ms.ttMsgEnable.Load().(bool)
}

func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
Expand Down

0 comments on commit fbfae41

Please sign in to comment.