Skip to content

Commit

Permalink
mrjob: restore vmem limit after launching job
Browse files Browse the repository at this point in the history
When --monitor is enabled, we've been seeing issues with errors like
runtime/cgo: out of memory in thread_start

This appears to be due to setting the vmem limit too low in some cases.
Once we've launched the subprocess, restore the vmem limit to what it
was originally, to avoid this sort of problem; monitor is supposed to be
for the stage code, not mrjob itself.
  • Loading branch information
adam-azarchs committed Dec 12, 2023
1 parent 2981344 commit 68756c6
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 21 deletions.
49 changes: 32 additions & 17 deletions cmd/mrjob/mrjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,25 +318,40 @@ func (self *runner) StartJob(args []string) error {
self.metadata.MetadataFilePath(core.PerfData),
self.metadata.MetadataFilePath(core.ProfileOut))
}
if self.monitoring && self.jobInfo.VMemGB > 0 {
// Exclude mrjob's vmem usage from the rlimit.
mem, _ := core.GetProcessTreeMemory(self.jobInfo.Pid, true, nil)
amount := int64(self.jobInfo.VMemGB)*1024*1024*1024 - mem.Vmem
if amount < mem.Vmem+1024*1024 {
amount = mem.Vmem + 1024*1024
if err := func(cmd *exec.Cmd) error {
if self.monitoring && self.jobInfo.VMemGB > 0 {
// Exclude mrjob's vmem usage from the rlimit.
mem, _ := core.GetProcessTreeMemory(self.jobInfo.Pid, true, nil)
amount := int64(self.jobInfo.VMemGB)*1024*1024*1024 - mem.Vmem
if amount < mem.Vmem+1024*1024 {
amount = mem.Vmem + 1024*1024
}
if oldAmount, err := core.SetVMemRLimit(uint64(amount)); err != nil {
util.LogError(err, "monitor",
"Could not set VM rlimit.")
} else {
// After launching the subprocess, restore the vmem
// limit for this process. Otherwise the go runtime can run
// into various kinds of trouble.
defer func(amt uint64) {
if _, err := core.SetVMemRLimit(amt); err != nil {
util.LogError(err, "monitor",
"Could not restore VM rlimit.")
}
}(oldAmount)
}
}
if err := core.SetVMemRLimit(uint64(amount)); err != nil {
util.LogError(err, "monitor",
"Could not set VM rlimit.")
if err := func() error {
util.EnterCriticalSection()
defer util.ExitCriticalSection()
self.job = cmd
return self.job.Start()
}(); err != nil {
self.errorReader.Close()
return err
}
}
if err := func() error {
util.EnterCriticalSection()
defer util.ExitCriticalSection()
self.job = cmd
return self.job.Start()
}(); err != nil {
self.errorReader.Close()
return nil
}(cmd); err != nil {
return err
}
if err := self.startProfile(); err != nil {
Expand Down
12 changes: 8 additions & 4 deletions martian/core/rlimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,18 @@ func CheckMaxVmem(amount uint64) uint64 {
return min
}

func SetVMemRLimit(amount uint64) error {
func SetVMemRLimit(amount uint64) (uint64, error) {
var rlim unix.Rlimit
if err := unix.Getrlimit(unix.RLIMIT_AS, &rlim); err != nil {
return err
return rlim.Cur, err
} else if rlim.Max != unix.RLIM_INFINITY && rlim.Max < amount {
return fmt.Errorf("could not set RLIMIT_AS %d > %d",
return rlim.Cur, fmt.Errorf("could not set RLIMIT_AS %d > %d",
amount, rlim.Max)
} else if rlim.Cur == amount {
// Nothing to do.
return amount, nil
}
oldAmount := rlim.Cur
rlim.Cur = amount
return unix.Setrlimit(unix.RLIMIT_AS, &rlim)
return oldAmount, unix.Setrlimit(unix.RLIMIT_AS, &rlim)
}

0 comments on commit 68756c6

Please sign in to comment.