Skip to content

Commit

Permalink
fix panic when getting memory pressure fails
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored and ti-chi-bot committed Jun 10, 2021
1 parent ffbdae3 commit c4c394d
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 4 deletions.
16 changes: 13 additions & 3 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,27 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) {

// update memPressure
m, err := memory.Get()

failpoint.Inject("getMemoryPressureFails", func() {
m = nil
err = errors.New("injected get memory pressure failure")
})

if err != nil {
failpoint.Inject("sorterDebug", func() {
log.Panic("unified sorter: getting system memory usage failed", zap.Error(err))
})

log.Warn("unified sorter: getting system memory usage failed", zap.Error(err))
// Reports a 100% memory pressure, so that the backEndPool will allocate fileBackEnds.
// We default to fileBackEnds because they are unlikely to cause OOMs. If IO errors are
// encountered, we can fail gracefully.
atomic.StoreInt32(&ret.memPressure, 100)
} else {
memPressure := m.Used * 100 / m.Total
atomic.StoreInt32(&ret.memPressure, int32(memPressure))
}

memPressure := m.Used * 100 / m.Total
atomic.StoreInt32(&ret.memPressure, int32(memPressure))

if memPressure := ret.memoryPressure(); memPressure > 50 {
log.Debug("unified sorter: high memory pressure", zap.Int32("memPressure", memPressure),
zap.Int64("usedBySorter", ret.sorterMemoryUsage()))
Expand Down
33 changes: 32 additions & 1 deletion cdc/puller/sorter/backend_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) {
err := os.Chmod(dir, 0o311) // no permission to `ls`
c.Assert(err, check.IsNil)

conf := config.GetDefaultServerConfig()
conf := config.GetGlobalServerConfig()
conf.Sorter.MaxMemoryPressure = 0 // force using files

backEndPool, err := newBackEndPool(dir, "")
Expand Down Expand Up @@ -141,6 +141,7 @@ func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) {

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(100)")
c.Assert(err, check.IsNil)
defer failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint") //nolint:errcheck

backEndPool, err := newBackEndPool("/tmp/sorter", "")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -296,3 +297,33 @@ func (s *backendPoolSuite) TestCleanUpStaleLockNoPermission(c *check.C) {

mockP.assertFilesExist(c)
}

// TestGetMemoryPressureFailure verifies that the backendPool can handle gracefully failures that happen when
// getting the current system memory pressure. Such a failure is usually caused by a lack of file descriptor quota
// set by the operating system.
func (s *backendPoolSuite) TestGetMemoryPressureFailure(c *check.C) {
defer testleak.AfterTest(c)()

err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/getMemoryPressureFails", "return(true)")
c.Assert(err, check.IsNil)
defer failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/getMemoryPressureFails") //nolint:errcheck

dir := c.MkDir()
backEndPool, err := newBackEndPool(dir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)
defer backEndPool.terminate()

after := time.After(time.Second * 20)
tick := time.Tick(time.Second * 1)
for {
select {
case <-after:
c.Fatal("TestGetMemoryPressureFailure timed out")
case <-tick:
if backEndPool.memoryPressure() == 100 {
return
}
}
}
}

0 comments on commit c4c394d

Please sign in to comment.