Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrateExpiredJobs job 过多 #159

Open
yanqibin opened this issue Nov 25, 2021 · 2 comments
Open

migrateExpiredJobs job 过多 #159

yanqibin opened this issue Nov 25, 2021 · 2 comments

Comments

@yanqibin
Copy link

migrateExpiredJobs 中 同时符合的 job 过多时 会导致内存溢出

@yanqibin
Copy link
Author

yanqibin commented Nov 25, 2021

/**
 * 移动延迟任务
 *
 * @param string $from
 * @param string $to
 * @param bool   $attempt
 */
public function migrateExpiredJobs($from, $to, $attempt = true)
{
    $this->redis->watch($from);
    $time = time();
    $limit = 100;
    $jobs = $this->getExpiredJobs($from, $time, ['limit' => [0, $limit]]);
    if(count($jobs)>0){
        $this->transaction(function () use ($from, $to, $time, $jobs, $attempt,$limit) {
            if (count($jobs) < $limit) {
                // 数量少
                $this->removeExpiredJobs($from, $time);
                $this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt);
            }else{

                $page = 0;

                $new_self = (new Redis(array_merge($this->options,['persistent'=>false])));
                while(true){
                    $this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt);
                    if (count($jobs) < $limit) {
                        break;
                    }
                    $page++;

                    // 事务之外获取数据   当前事务内 会返回自身 object
                    $jobs = $new_self->getExpiredJobs($from,$time, ['limit' => [$page*$limit, $limit]]);
                    if (count($jobs) <= 0) {
                        break;
                    }
                }
                $this->removeExpiredJobs($from, $time);
            }



        });
    }


    $this->redis->unwatch();
}

@yanqibin
Copy link
Author

/**
 * 获取所有到期任务
 *
 * @param  string $from
 * @param  int    $time
 * @return array
 */
protected function getExpiredJobs($from, $time, $options = [])
{

    return $this->redis->zRangeByScore($from, '-inf', $time, $options);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants