diff --git a/internal/datacoord/knapsack.go b/internal/datacoord/knapsack.go index c40a586ebb2a3..4c69e234df4a3 100644 --- a/internal/datacoord/knapsack.go +++ b/internal/datacoord/knapsack.go @@ -21,8 +21,9 @@ import ( "sort" "github.com/bits-and-blooms/bitset" - "github.com/milvus-io/milvus/pkg/log" "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" ) type Sizable interface { diff --git a/internal/rootcoord/util.go b/internal/rootcoord/util.go index 7912f6bd0ad00..88c3d91bd41ba 100644 --- a/internal/rootcoord/util.go +++ b/internal/rootcoord/util.go @@ -308,7 +308,7 @@ func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordCl delay := now.Sub(minTt) if delay.Milliseconds() >= maxDelay.Milliseconds() { - queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel, delay) + queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel+"/query", delay) } } } @@ -331,7 +331,7 @@ func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordCl delay := now.Sub(minTt) if delay.Milliseconds() >= maxDelay.Milliseconds() { - dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel, delay) + dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel+"/data", delay) } } } diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 09d7121985d25..7c2e3b866bd3e 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -68,6 +68,7 @@ type mqMsgStream struct { consumerLock *sync.Mutex closed int32 onceChan sync.Once + ttMsgEnable atomic.Value enableProduce atomic.Value configEvent config.EventHandler } @@ -104,14 +105,14 @@ func NewMqMsgStream(ctx context.Context, closed: 0, } ctxLog := log.Ctx(ctx) - stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool()) + stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool()) stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) { value, err := strconv.ParseBool(event.Value) if err != nil { ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err)) return } - stream.enableProduce.Store(value) + stream.ttMsgEnable.Store(value) ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce())) }) paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent) @@ -270,7 +271,7 @@ func (ms *mqMsgStream) EnableProduce(can bool) { } func (ms *mqMsgStream) isEnabledProduce() bool { - return ms.enableProduce.Load().(bool) + return ms.enableProduce.Load().(bool) || ms.ttMsgEnable.Load().(bool) } func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {