Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [2.4] replicate message exception when the ttMsgEnable config is changed dynamically #38440

Merged
merged 1 commit into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/datanode/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(config.serverID)).Inc()
log.Info("datanode AsProducer", zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick.GetValue()))

m.EnableProduce(true)
m.ForceEnableProduce(true)

Check warning on line 286 in internal/datanode/data_sync_service.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/data_sync_service.go#L286

Added line #L286 was not covered by tests

updater = newMqStatsUpdater(config, m)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/flow_graph_dmstream_input_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (mtm *mockTtMsgStream) CheckTopicValid(channel string) error {
return nil
}

func (mtm *mockTtMsgStream) EnableProduce(can bool) {
func (mtm *mockTtMsgStream) ForceEnableProduce(can bool) {
}

func TestNewDmInputNode(t *testing.T) {
Expand Down
14 changes: 7 additions & 7 deletions internal/proxy/mock_msgstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

type mockMsgStream struct {
msgstream.MsgStream
asProducer func([]string)
setRepack func(repackFunc msgstream.RepackFunc)
close func()
enableProduce func(bool)
asProducer func([]string)
setRepack func(repackFunc msgstream.RepackFunc)
close func()
forceEnableProduce func(bool)
}

func (m *mockMsgStream) AsProducer(producers []string) {
Expand All @@ -34,9 +34,9 @@ func (m *mockMsgStream) Close() {
}
}

func (m *mockMsgStream) EnableProduce(enabled bool) {
if m.enableProduce != nil {
m.enableProduce(enabled)
func (m *mockMsgStream) ForceEnableProduce(enabled bool) {
if m.forceEnableProduce != nil {
m.forceEnableProduce(enabled)
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (ms *simpleMockMsgStream) CheckTopicValid(topic string) error {
return nil
}

func (ms *simpleMockMsgStream) EnableProduce(enabled bool) {
func (ms *simpleMockMsgStream) ForceEnableProduce(enabled bool) {
}

func newSimpleMockMsgStream() *simpleMockMsgStream {
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (node *Proxy) Init() error {
zap.Error(err))
return err
}
node.replicateMsgStream.EnableProduce(true)
node.replicateMsgStream.ForceEnableProduce(true)
node.replicateMsgStream.AsProducer([]string{replicateMsgChannel})

node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory)
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/replicate_stream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (m *ReplicateStreamManager) newMsgStreamResource(channel string) resource.N
}
msgStream.SetRepackFunc(replicatePackFunc)
msgStream.AsProducer([]string{channel})
msgStream.EnableProduce(true)
msgStream.ForceEnableProduce(true)

res := resource.NewSimpleResource(msgStream, ReplicateMsgStreamTyp, channel, ReplicateMsgStreamExpireTime, func() {
msgStream.Close()
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/replicate_stream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestReplicateManager(t *testing.T) {
mockMsgStream.asProducer = func(producers []string) {
i++
}
mockMsgStream.enableProduce = func(b bool) {
mockMsgStream.forceEnableProduce = func(b bool) {
i++
}
mockMsgStream.close = func() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/mq/msgstream/mock_msgstream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 19 additions & 17 deletions pkg/mq/msgstream/mq_msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ type mqMsgStream struct {
consumers map[string]mqwrapper.Consumer
consumerChannels []string

repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
closeRWMutex *sync.RWMutex
streamCancel func()
bufSize int64
producerLock *sync.RWMutex
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
enableProduce atomic.Value
configEvent config.EventHandler
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
closeRWMutex *sync.RWMutex
streamCancel func()
bufSize int64
producerLock *sync.RWMutex
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
ttMsgEnable atomic.Value
forceEnableProduce atomic.Value
configEvent config.EventHandler
}

// NewMqMsgStream is used to generate a new mqMsgStream object
Expand Down Expand Up @@ -104,14 +105,15 @@ func NewMqMsgStream(ctx context.Context,
closed: 0,
}
ctxLog := log.Ctx(ctx)
stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.forceEnableProduce.Store(false)
stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) {
value, err := strconv.ParseBool(event.Value)
if err != nil {
ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err))
return
}
stream.enableProduce.Store(value)
stream.ttMsgEnable.Store(value)
ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce()))
})
paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent)
Expand Down Expand Up @@ -265,12 +267,12 @@ func (ms *mqMsgStream) GetProduceChannels() []string {
return ms.producerChannels
}

func (ms *mqMsgStream) EnableProduce(can bool) {
ms.enableProduce.Store(can)
func (ms *mqMsgStream) ForceEnableProduce(can bool) {
ms.forceEnableProduce.Store(can)
}

func (ms *mqMsgStream) isEnabledProduce() bool {
return ms.enableProduce.Load().(bool)
return ms.forceEnableProduce.Load().(bool) || ms.ttMsgEnable.Load().(bool)
}

func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
Expand Down
12 changes: 8 additions & 4 deletions pkg/mq/msgstream/mq_msgstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func TestStream_ConfigEvent(t *testing.T) {
}

func TestStream_PulsarMsgStream_Insert(t *testing.T) {
Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false")
defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key)
pulsarAddress := getPulsarAddress()
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
Expand All @@ -129,12 +131,12 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)

{
inputStream.EnableProduce(false)
inputStream.ForceEnableProduce(false)
err := inputStream.Produce(&msgPack)
require.Error(t, err)
}

inputStream.EnableProduce(true)
inputStream.ForceEnableProduce(true)
err := inputStream.Produce(&msgPack)
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))

Expand Down Expand Up @@ -187,6 +189,8 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
}

func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false")
defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key)
pulsarAddress := getPulsarAddress()
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
Expand All @@ -202,12 +206,12 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)

{
inputStream.EnableProduce(false)
inputStream.ForceEnableProduce(false)
_, err := inputStream.Broadcast(&msgPack)
require.Error(t, err)
}

inputStream.EnableProduce(true)
inputStream.ForceEnableProduce(true)
_, err := inputStream.Broadcast(&msgPack)
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))

Expand Down
2 changes: 1 addition & 1 deletion pkg/mq/msgstream/msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type MsgStream interface {
GetLatestMsgID(channel string) (MessageID, error)
CheckTopicValid(channel string) error

EnableProduce(can bool)
ForceEnableProduce(can bool)
}

type Factory interface {
Expand Down
Loading