diff --git a/pluginmanager/config_update_test.go b/pluginmanager/config_update_test.go index a9f19b2b55..8c9346630a 100644 --- a/pluginmanager/config_update_test.go +++ b/pluginmanager/config_update_test.go @@ -23,10 +23,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/suite" + "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/plugins/flusher/checker" - - "github.com/stretchr/testify/suite" ) var updateConfigName = "update_mock_block" @@ -76,8 +76,7 @@ func (s *configUpdateTestSuite) TestConfigUpdate() { // unblock old config checkFlusher.Block = false time.Sleep(time.Second * time.Duration(5)) - s.Equal(10000, checkFlusher.GetLogCount()) - // this magic number(10000) must exceed number of logs can be hold in processor channel(LogsChan) + aggregator buffer(defaultLogGroup) + flusher channel(LogGroupsChan) + s.Equal(0, checkFlusher.GetLogCount()) s.Equal(20000, GetConfigFlushers(LogtailConfig[noblockUpdateConfigName].PluginRunner)[0].(*checker.FlusherChecker).GetLogCount()) } @@ -99,7 +98,7 @@ func (s *configUpdateTestSuite) TestConfigUpdateMany() { s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs") checkFlusher.Block = false time.Sleep(time.Second * time.Duration(5)) - s.Equal(checkFlusher.GetLogCount(), 10000) + s.Equal(checkFlusher.GetLogCount(), 0) // load normal config for i := 0; i < 5; i++ { @@ -168,7 +167,5 @@ func (s *configUpdateTestSuite) TestHoldOnExitTimeout() { s.Equal(0, checkFlusher.GetLogCount()) checkFlusher.Block = false time.Sleep(time.Second * time.Duration(5)) - s.Equal(10000, checkFlusher.GetLogCount()) - time.Sleep(time.Second * 10) - s.NoError(Resume()) + s.Equal(0, checkFlusher.GetLogCount()) } diff --git a/pluginmanager/plugin_runner_v1.go b/pluginmanager/plugin_runner_v1.go index 22c6d8c70b..d6c2202227 100644 --- a/pluginmanager/plugin_runner_v1.go +++ b/pluginmanager/plugin_runner_v1.go @@ -380,6 +380,8 @@ func (p *pluginv1Runner) Stop(exit bool) error { for _, flusher := range p.FlusherPlugins { flusher.Flusher.SetUrgent(exit) } + p.LogstoreConfig.FlushOutFlag = true + for _, service := range p.ServicePlugins { _ = service.Stop() } @@ -392,7 +394,6 @@ func (p *pluginv1Runner) Stop(exit bool) error { p.AggregateControl.WaitCancel() logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done") - p.LogstoreConfig.FlushOutFlag = true p.FlushControl.WaitCancel() if exit && p.FlushOutStore.Len() > 0 { diff --git a/pluginmanager/plugin_runner_v2.go b/pluginmanager/plugin_runner_v2.go index 12b6e989a1..5bf45e13fc 100644 --- a/pluginmanager/plugin_runner_v2.go +++ b/pluginmanager/plugin_runner_v2.go @@ -375,6 +375,8 @@ func (p *pluginv2Runner) Stop(exit bool) error { for _, flusher := range p.FlusherPlugins { flusher.SetUrgent(exit) } + p.LogstoreConfig.FlushOutFlag = true + for _, serviceInput := range p.ServicePlugins { _ = serviceInput.Stop() } @@ -387,7 +389,6 @@ func (p *pluginv2Runner) Stop(exit bool) error { p.AggregateControl.WaitCancel() logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done") - p.LogstoreConfig.FlushOutFlag = true p.FlushControl.WaitCancel() if exit && p.FlushOutStore.Len() > 0 {