From c4c394d6ef6174f2afe574f0fc3aa5e95676d887 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 10 Jun 2021 12:16:09 +0800 Subject: [PATCH] fix panic when getting memory pressure fails --- cdc/puller/sorter/backend_pool.go | 16 ++++++++++--- cdc/puller/sorter/backend_pool_test.go | 33 +++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/cdc/puller/sorter/backend_pool.go b/cdc/puller/sorter/backend_pool.go index 61997752119..76509495be0 100644 --- a/cdc/puller/sorter/backend_pool.go +++ b/cdc/puller/sorter/backend_pool.go @@ -112,17 +112,27 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { // update memPressure m, err := memory.Get() + + failpoint.Inject("getMemoryPressureFails", func() { + m = nil + err = errors.New("injected get memory pressure failure") + }) + if err != nil { failpoint.Inject("sorterDebug", func() { log.Panic("unified sorter: getting system memory usage failed", zap.Error(err)) }) log.Warn("unified sorter: getting system memory usage failed", zap.Error(err)) + // Reports a 100% memory pressure, so that the backEndPool will allocate fileBackEnds. + // We default to fileBackEnds because they are unlikely to cause OOMs. If IO errors are + // encountered, we can fail gracefully. + atomic.StoreInt32(&ret.memPressure, 100) + } else { + memPressure := m.Used * 100 / m.Total + atomic.StoreInt32(&ret.memPressure, int32(memPressure)) } - memPressure := m.Used * 100 / m.Total - atomic.StoreInt32(&ret.memPressure, int32(memPressure)) - if memPressure := ret.memoryPressure(); memPressure > 50 { log.Debug("unified sorter: high memory pressure", zap.Int32("memPressure", memPressure), zap.Int64("usedBySorter", ret.sorterMemoryUsage())) diff --git a/cdc/puller/sorter/backend_pool_test.go b/cdc/puller/sorter/backend_pool_test.go index 8f4cec331bf..20852f2ad3e 100644 --- a/cdc/puller/sorter/backend_pool_test.go +++ b/cdc/puller/sorter/backend_pool_test.go @@ -107,7 +107,7 @@ func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) { err := os.Chmod(dir, 0o311) // no permission to `ls` c.Assert(err, check.IsNil) - conf := config.GetDefaultServerConfig() + conf := config.GetGlobalServerConfig() conf.Sorter.MaxMemoryPressure = 0 // force using files backEndPool, err := newBackEndPool(dir, "") @@ -141,6 +141,7 @@ func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) { err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(100)") c.Assert(err, check.IsNil) + defer failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint") //nolint:errcheck backEndPool, err := newBackEndPool("/tmp/sorter", "") c.Assert(err, check.IsNil) @@ -296,3 +297,33 @@ func (s *backendPoolSuite) TestCleanUpStaleLockNoPermission(c *check.C) { mockP.assertFilesExist(c) } + +// TestGetMemoryPressureFailure verifies that the backendPool can handle gracefully failures that happen when +// getting the current system memory pressure. Such a failure is usually caused by a lack of file descriptor quota +// set by the operating system. +func (s *backendPoolSuite) TestGetMemoryPressureFailure(c *check.C) { + defer testleak.AfterTest(c)() + + err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/getMemoryPressureFails", "return(true)") + c.Assert(err, check.IsNil) + defer failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/getMemoryPressureFails") //nolint:errcheck + + dir := c.MkDir() + backEndPool, err := newBackEndPool(dir, "") + c.Assert(err, check.IsNil) + c.Assert(backEndPool, check.NotNil) + defer backEndPool.terminate() + + after := time.After(time.Second * 20) + tick := time.Tick(time.Second * 1) + for { + select { + case <-after: + c.Fatal("TestGetMemoryPressureFailure timed out") + case <-tick: + if backEndPool.memoryPressure() == 100 { + return + } + } + } +}