diff --git a/Test/Event.php b/Test/Event.php index 77d14ea7..ed922211 100644 --- a/Test/Event.php +++ b/Test/Event.php @@ -25,7 +25,7 @@ public function onInit() { var_dump("slow sql 耗时:$runTime, sql:$realSql"); }); - $wairGroup = new GoWaitGroup(); + $waitGroup = new GoWaitGroup(); if(!$this->isWorkerService()) { // 创建一个测试自定义进程 diff --git a/Test/Protocol/conf.php b/Test/Protocol/conf.php index 6a26a8ef..73414ffb 100644 --- a/Test/Protocol/conf.php +++ b/Test/Protocol/conf.php @@ -64,6 +64,10 @@ // 是否内存化线上实时任务 'enable_table_tick_task' => true, + // 是否开启内存回收 + 'enable_gc_mem_cache' => false, + 'gc_mem_cache_tick_time' => 10, + // 内存表定义 'table' => [ 'table_process' => [ diff --git a/Test/WorkerCron/LocalOrder/LocalOrderHandle.php b/Test/WorkerCron/LocalOrder/LocalOrderHandle.php index 41993403..83e06a87 100644 --- a/Test/WorkerCron/LocalOrder/LocalOrderHandle.php +++ b/Test/WorkerCron/LocalOrder/LocalOrderHandle.php @@ -5,6 +5,8 @@ use Swoolefy\Core\Crontab\AbstractCronController; use Swoolefy\Core\Log\LogManager; use Swoolefy\Core\Swfy; +use Swoolefy\Core\SystemEnv; +use Swoolefy\Worker\AbstractBaseWorker; use Test\Factory; use Test\Logger\RunLog; @@ -27,10 +29,15 @@ public function doCronTask($cron, string $cronName) // var_dump(env("HOST_NAME")); // var_dump(env('HOST_PASSWORD')); // var_dump(Swfy::getConf()['bjg']); - + SystemEnv::clearEnvRepository(); RunLog::info("this is a cron test log"); var_dump("cron start"); sleep(3); + var_dump(SystemEnv::get('WEB_SITE_HOST')); + + + sleep(60); + //AbstractBaseWorker::getProcessInstance()->reboot(3); var_dump("cron end"); // goApp(function() { diff --git a/Test/WorkerCron/conf/product_conf.php b/Test/WorkerCron/conf/product_conf.php index 2e2dfe06..ee3617ee 100644 --- a/Test/WorkerCron/conf/product_conf.php +++ b/Test/WorkerCron/conf/product_conf.php @@ -7,14 +7,14 @@ 'handler' => \Swoolefy\Worker\Cron\CronLocalProcess::class, 'worker_num' => 1, // 默认动态进程数量 'max_handle' => 100, //消费达到10000后reboot进程 - 'life_time' => 3600, // 每隔3600s重启进程 + 'life_time' => 60, // 每隔3600s重启进程 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待 'extend_data' => [], 'args' => [ 'cron_name' => 'cancel-order', // 定时任务名称 'handler_class' => \Test\WorkerCron\LocalOrder\LocalOrderHandle::class, //处理类 //'cron_expression' => '*/1 * * * *', // 每分钟执行一次 - 'with_block_lapping' => 0, // with_block_lapping = 1,表示每轮任务只能阻塞执行,必须等上一轮任务执行完毕,下一轮才能执行; with_block_lapping = 0, 表示每轮任务时间到了,都可执行,是并发非租塞的 + 'with_block_lapping' => 1, // with_block_lapping = 1,表示每轮任务只能阻塞执行,必须等上一轮任务执行完毕,下一轮才能执行; with_block_lapping = 0, 表示每轮任务时间到了,都可执行,是并发非租塞的 'run_in_background' => 1, // 后台运行,不受主进程的退出影响 'cron_expression' => 5, // 10s执行一次 ], diff --git a/Test/WorkerCron/conf/schedule_conf.php b/Test/WorkerCron/conf/schedule_conf.php index 69784c76..07a14df3 100644 --- a/Test/WorkerCron/conf/schedule_conf.php +++ b/Test/WorkerCron/conf/schedule_conf.php @@ -2,18 +2,18 @@ return // 定时fork进程处理任务 [ - [ - 'process_name' => 'system-schedule-task', // 进程名称 - 'handler' => \Swoolefy\Worker\Cron\CronForkProcess::class, - 'description' => '系统fork模式任务调度', - 'worker_num' => 1, // 默认动态进程数量 - 'max_handle' => 100, //消费达到10000后reboot进程 - 'life_time' => 3600, // 每隔3600s重启进程 - 'limit_run_coroutine_num' => 10, // 当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待 - 'extend_data' => [], - 'args' => [ - // 定时任务列表 - 'task_list' => \Test\Scripts\Kernel::buildScheduleTaskList() - ], - ], +// [ +// 'process_name' => 'system-schedule-task', // 进程名称 +// 'handler' => \Swoolefy\Worker\Cron\CronForkProcess::class, +// 'description' => '系统fork模式任务调度', +// 'worker_num' => 1, // 默认动态进程数量 +// 'max_handle' => 100, //消费达到10000后reboot进程 +// 'life_time' => 3600, // 每隔3600s重启进程 +// 'limit_run_coroutine_num' => 10, // 当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待 +// 'extend_data' => [], +// 'args' => [ +// // 定时任务列表 +// 'task_list' => \Test\Scripts\Kernel::buildScheduleTaskList() +// ], +// ], ]; \ No newline at end of file diff --git a/Test/WorkerDaemon/PipeWorkerProcess.php b/Test/WorkerDaemon/PipeWorkerProcess.php index a7dce479..e21e241a 100644 --- a/Test/WorkerDaemon/PipeWorkerProcess.php +++ b/Test/WorkerDaemon/PipeWorkerProcess.php @@ -8,22 +8,13 @@ class PipeWorkerProcess extends \Swoolefy\Worker\AbstractWorkerProcess { - public function run() - { - //Application::getApp()->get('log')->addInfo('pllllllllllll'); - while (1) { - if ($this->isExiting()) { - sleep(1); - continue; - } -// LogManager::getInstance()->getLogger('log')->info('kkkkkkkkkkkkkkkk'); -// var_dump('CID='.\Swoole\Coroutine::getCid()); -// var_dump('PipeWorker'); - $a = 1; - $b = 2; - $c = 3; - goApp(function ($a, $b) use($c) { + public function loopHandle() + { + $a = 1; + $b = 2; + $c = 3; + goApp(function ($a, $b) use($c) { goApp(function () use($a, $b) { goApp(function () use($a, $b) { var_dump($a, $b); @@ -31,35 +22,64 @@ public function run() }); }, $a, $b); - sleep(10); - var_dump("gggggggggggggggggggggggggg"); - } - - - -// $db = Application::getApp()->get('db'); -// $result = $db->createCommand('select * from tbl_users limit 1')->queryAll(); -// dump($result); - -// \Swoole\Coroutine::create(function () { -// (new \Swoolefy\Core\EventApp)->registerApp(function (EventController $eventApp) { -// var_dump('mmmmmmmmmmmmmmmmmmmmmmmmm'); -// }); -// }); - - - -// if($this->isWorker0()) { -// $this->notifyMasterCreateDynamicProcess($this->getProcessName(), 1); -// } - - var_dump($this->limitCurrentRunCoroutineNum); -// if($this->isWorker0()) { -// sleep(5); -// $this->reboot(); -// } + var_dump('start start'); + sleep(15); + var_dump("end end end "); } +// public function run() +// { +// //Application::getApp()->get('log')->addInfo('pllllllllllll'); +//// while (1) { +//// if ($this->isExiting()) { +//// sleep(1); +//// continue; +//// } +//// +////// LogManager::getInstance()->getLogger('log')->info('kkkkkkkkkkkkkkkk'); +////// var_dump('CID='.\Swoole\Coroutine::getCid()); +////// var_dump('PipeWorker'); +//// $a = 1; +//// $b = 2; +//// $c = 3; +//// goApp(function ($a, $b) use($c) { +//// goApp(function () use($a, $b) { +//// goApp(function () use($a, $b) { +//// var_dump($a, $b); +//// }); +//// }); +//// }, $a, $b); +//// +//// var_dump('start start'); +//// sleep(120); +//// var_dump("gggggggggggggggggggggggggg"); +//// } +// +// +// +//// $db = Application::getApp()->get('db'); +//// $result = $db->createCommand('select * from tbl_users limit 1')->queryAll(); +//// dump($result); +// +//// \Swoole\Coroutine::create(function () { +//// (new \Swoolefy\Core\EventApp)->registerApp(function (EventController $eventApp) { +//// var_dump('mmmmmmmmmmmmmmmmmmmmmmmmm'); +//// }); +//// }); +// +// +// +//// if($this->isWorker0()) { +//// $this->notifyMasterCreateDynamicProcess($this->getProcessName(), 1); +//// } +// +// var_dump($this->limitCurrentRunCoroutineNum); +//// if($this->isWorker0()) { +//// sleep(5); +//// $this->reboot(); +//// } +// } + public function onHandleException(\Throwable $throwable, array $context = []) { parent::onHandleException($throwable, $context); diff --git a/Test/WorkerDaemon/conf/monitor_conf.php b/Test/WorkerDaemon/conf/monitor_conf.php index 7a706e65..15233643 100644 --- a/Test/WorkerDaemon/conf/monitor_conf.php +++ b/Test/WorkerDaemon/conf/monitor_conf.php @@ -1,15 +1,15 @@ 'test-monitor-worker', - 'handler' => \Test\WorkerDaemon\MonitorWorkerProcess::class, - 'worker_num' => 1, // 默认动态进程数量 - 'max_handle' => 100, //消费达到10000后reboot进程 - 'life_time' => 60, // 每隔3600s重启进程 - 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待 - 'extend_data' => [], - 'args' => [] - ], +// [ +// // 监控进程 +// 'process_name' => 'test-monitor-worker', +// 'handler' => \Test\WorkerDaemon\MonitorWorkerProcess::class, +// 'worker_num' => 1, // 默认动态进程数量 +// 'max_handle' => 100, //消费达到10000后reboot进程 +// 'life_time' => 60, // 每隔3600s重启进程 +// 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待 +// 'extend_data' => [], +// 'args' => [] +// ], ]; \ No newline at end of file diff --git a/Test/WorkerDaemon/conf/pipe_conf.php b/Test/WorkerDaemon/conf/pipe_conf.php index dba0efff..59f4fbc7 100644 --- a/Test/WorkerDaemon/conf/pipe_conf.php +++ b/Test/WorkerDaemon/conf/pipe_conf.php @@ -4,21 +4,21 @@ [ 'process_name' => 'test-pipe-worker', 'handler' => \Test\WorkerDaemon\PipeWorkerProcess::class, - 'worker_num' => 3, // 默认动态进程数量 + 'worker_num' => 1, // 默认动态进程数量 'max_handle' => 100, //消费达到10000后reboot进程 - 'life_time' => 3600, // 每隔3600s重启进程 - 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待 - 'extend_data' => [], - 'args' => [] - ], - [ - 'process_name' => 'tick-pipe-worker-test', - 'handler' => \Test\WorkerDaemon\PipeTestWorkerProcess::class, - 'worker_num' => 3, // 默认动态进程数量 - 'max_handle' => 100, //消费达到10000后reboot进程 - 'life_time' => 3600, // 每隔3600s重启进程 + 'life_time' => '* * * * *', // 每隔3600s重启进程 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待 'extend_data' => [], 'args' => [] ], +// [ +// 'process_name' => 'tick-pipe-worker-test', +// 'handler' => \Test\WorkerDaemon\PipeTestWorkerProcess::class, +// 'worker_num' => 3, // 默认动态进程数量 +// 'max_handle' => 100, //消费达到10000后reboot进程 +// 'life_time' => 3600, // 每隔3600s重启进程 +// 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待 +// 'extend_data' => [], +// 'args' => [] +// ], ]; \ No newline at end of file diff --git a/src/Cmd/BaseCmd.php b/src/Cmd/BaseCmd.php index a7490748..903d2298 100644 --- a/src/Cmd/BaseCmd.php +++ b/src/Cmd/BaseCmd.php @@ -154,7 +154,7 @@ protected function checkRunning(array &$config) fmtPrintError('[' . APP_NAME . ']' . " Server is running, pid={$pid}, pidFile={$pidFile}"); exit(0); } else { - fmtPrintError('[' . WORKER_SERVICE_NAME . ']' . " is running, pid={$pid}, pidFile={$pidFile}"); + fmtPrintError('[' . WORKER_SERVICE_NAME . '-server]' . " is running, pid={$pid}, pidFile={$pidFile}"); exit(0); } } diff --git a/src/Core/EventCtrl.php b/src/Core/EventCtrl.php index 5b658128..95d23bbe 100644 --- a/src/Core/EventCtrl.php +++ b/src/Core/EventCtrl.php @@ -195,6 +195,7 @@ public function workerStart($server, $worker_id) if(!SystemEnv::isWorkerService()) { $this->registerComponentPools(); } + $this->registerGcMemCaches(); static::onWorkerStart($server, $worker_id); } @@ -308,6 +309,20 @@ protected function clearComponentPools() } } + /** + * @return void + */ + protected function registerGcMemCaches() + { + $conf =BaseServer::getConf(); + if (isset($conf['enable_gc_mem_cache']) && !empty($conf['enable_gc_mem_cache'])) { + $time = $conf['gc_mem_cache_tick_time'] ?? 30; + \Swoole\Timer::tick($time * 1000, function () { + gc_mem_caches(); + }); + } + } + /** * eachStartInfo */ diff --git a/src/Core/Process/AbstractProcess.php b/src/Core/Process/AbstractProcess.php index b0fcc83f..4eaa2acd 100644 --- a/src/Core/Process/AbstractProcess.php +++ b/src/Core/Process/AbstractProcess.php @@ -142,7 +142,7 @@ public function __start(Process $process) \Swoole\Event::del($process->pipe); \Swoole\Event::exit(); - // 脚本模式下.任务进程退出时,父进程也得退出 + // script 模式下.任务进程退出时,父进程也得退出 if (SystemEnv::isScriptService()) { $swooleMasterPid = Swfy::getMasterPid(); \Swoole\Process::kill($swooleMasterPid, SIGTERM); diff --git a/src/Core/SystemEnv.php b/src/Core/SystemEnv.php index b9adbdb0..b83ea516 100644 --- a/src/Core/SystemEnv.php +++ b/src/Core/SystemEnv.php @@ -159,7 +159,7 @@ public static function getOption(string $name, bool $force = false) $options = self::inputOptions(); } } - $value = trim($options[$name],'\'') ?? ''; + $value = trim($options[$name],'\'') ?? ''; $value = trim($value,' '); return $value; } @@ -308,6 +308,14 @@ public static function disablePutenv() static::$repository = null; } + /** + * @return void + */ + public static function clearEnvRepository() + { + static::$repository = null; + } + /** * Get the environment repository instance. * diff --git a/src/Http/config.php b/src/Http/config.php index 88c9bb40..a7612406 100644 --- a/src/Http/config.php +++ b/src/Http/config.php @@ -64,6 +64,9 @@ // 是否内存化线上实时任务 'enable_table_tick_task' => true, + // 是否开启内存回收 + 'enable_gc_mem_cache' => true, + 'gc_mem_cache_tick_time' => 10, // 内存表定义 'table' => [ diff --git a/src/Mqtt/config.php b/src/Mqtt/config.php index e32d56db..3735e599 100644 --- a/src/Mqtt/config.php +++ b/src/Mqtt/config.php @@ -50,6 +50,10 @@ 'enable_table_tick_task' => true, + // 是否开启内存回收 + 'enable_gc_mem_cache' => true, + 'gc_mem_cache_tick_time' => 10, + 'mqtt' => [ 'username' => '', 'password' => '', diff --git a/src/Udp/config.php b/src/Udp/config.php index c64cc780..9a565cca 100644 --- a/src/Udp/config.php +++ b/src/Udp/config.php @@ -51,6 +51,10 @@ 'enable_table_tick_task' => true, + // 是否开启内存回收 + 'enable_gc_mem_cache' => true, + 'gc_mem_cache_tick_time' => 10, + // 依赖于EnableSysCollector = true,否则设置没有意义,不生效 'enable_pv_collector' => true, 'enable_sys_collector' => true, diff --git a/src/Websocket/config.php b/src/Websocket/config.php index 79ad1a80..cc98c31a 100644 --- a/src/Websocket/config.php +++ b/src/Websocket/config.php @@ -53,6 +53,10 @@ 'enable_table_tick_task' => true, + // 是否开启内存回收 + 'enable_gc_mem_cache' => true, + 'gc_mem_cache_tick_time' => 10, + // 依赖于EnableSysCollector = true,否则设置没有意义,不生效 'enable_pv_collector' => true, 'enable_sys_collector' => true, diff --git a/src/Worker/AbstractBaseWorker.php b/src/Worker/AbstractBaseWorker.php index c69b3db9..1f962c27 100644 --- a/src/Worker/AbstractBaseWorker.php +++ b/src/Worker/AbstractBaseWorker.php @@ -479,17 +479,17 @@ public function __start(Process $swooleProcess) if (!$this->handing) { $exitFunction($timerId, $masterPid); }else { - $this->fmtWriteInfo("Cron Process={$this->getProcessName()} is handing, pid={$this->getPid()}"); + $this->fmtWriteInfo("【cron-task-handing】Cron Process={$this->getProcessName()} is handing, pid={$this->getPid()}"); } }else if (SystemEnv::isDaemonService()) { // daemon防止任务还在进行中,强制退出 - // 定时检查到主进程 $$tickCheckMasterOffCount 次已经kill掉了,但子进程也不能一直不退出,否则成了僵尸进程了,这里做一个兜底退出,1800秒后强制退出 + // 定时检查到主进程 $tickCheckMasterOffCount 次已经kill掉了,但子进程也不能一直不退出,否则成了僵尸进程了,这里做一个兜底退出,1800秒后强制退出 $lastTime = $this->args['check_master_live_tick_time'] * $tickCheckMasterOffCount; $this->fmtWriteInfo("Daemon Process={$this->getProcessName()} last master off time={$lastTime}, tickCheckMasterOffCount={$tickCheckMasterOffCount}, pid={$this->getPid()}"); if (!$this->useLoopHandle || ($lastTime > 1800) ) { $exitFunction($timerId, $masterPid); }else { - $this->fmtWriteInfo("Daemon Process={$this->getProcessName()} is handing, pid={$this->getPid()}"); + $this->fmtWriteInfo("【daemon-task-handing】Daemon Process={$this->getProcessName()} is handing, pid={$this->getPid()}"); } }else { $exitFunction($timerId, $masterPid); @@ -512,7 +512,7 @@ public function __start(Process $swooleProcess) } } - if ($this->isMasterLive() && $this->getProcessWorkerId() == 0 && $this->masterPid) { + if ($this->isMasterLive() && $this->isWorker0() && $this->masterPid && $this->isDue()) { $this->saveMasterId($this->masterPid); } @@ -1022,7 +1022,7 @@ public function isDue(): bool { if($this->isRebooting() || $this->isForceExit() || $this->isExiting() || $this->waitToExit) { sleep(1); - $this->fmtWriteInfo("Process Wait to Exit or Reboot"); + $this->fmtWriteInfo("【{$this->getProcessName()}】Process Wait to Exit or Reboot,Do not something"); return false; } return true; @@ -1200,11 +1200,11 @@ public function getStartTime() /** * reboot * - * @param float $wait_time + * @param float $afterWaitTime * @param bool $includeDynamicProcess * @return bool */ - public function reboot(float $waitTime = 10, bool $includeDynamicProcess = true) + public function reboot(float $afterWaitTime = 10, bool $includeDynamicProcess = true) { if(!$includeDynamicProcess) { if (!$this->isStaticProcess()) { @@ -1218,12 +1218,12 @@ public function reboot(float $waitTime = 10, bool $includeDynamicProcess = true) return false; } - if ($waitTime < 0) { - $waitTime = $this->getWaitTime(); + if ($afterWaitTime < 0) { + $afterWaitTime = $this->getWaitTime(); } - if ($waitTime <= 5) { - $waitTime = 5; + if ($afterWaitTime <= 5) { + $afterWaitTime = 5; } $pid = $this->getPid(); @@ -1231,21 +1231,11 @@ public function reboot(float $waitTime = 10, bool $includeDynamicProcess = true) // 优先通知master进程先拉起子进程 $this->notifyMasterRebootNewProcess($this->getProcessName()); $this->isReboot = true; - $this->readyRebootTime = time() + $waitTime; + $this->readyRebootTime = time() + $afterWaitTime; $channel = new Channel(1); - $timerId = \Swoole\Timer::after($waitTime * 1000, function () use ($pid) { - try { - $this->runtimeCoroutineWait($this->maxWaitTimeOfExit); - (new \Swoolefy\Core\EventApp)->registerApp(function () { - $this->onShutDown(); - }); - } catch (\Throwable $throwable) { - $this->onHandleException($throwable); - } finally { - // 自身进程退出 - $this->kill($pid, SIGTERM); - } + $timerId = \Swoole\Timer::after($afterWaitTime * 1000, function () use ($pid) { + $this->exitNow($pid, 20); }); $this->rebootTimerId = $timerId; @@ -1290,7 +1280,6 @@ public function exit(bool $isForce = false, ?float $waitTime = 10) $channel = new Channel(1); $this->exitTimerId = \Swoole\Timer::after($waitTime * 1000, function () use ($pid) { - $this->waitToExit = true; $this->exitNow($pid, $this->maxWaitTimeOfExit); }); // block wait to exit @@ -1321,6 +1310,7 @@ protected function exitNow(int $pid, int $maxWaitTimeOfExit) } catch (\Throwable $throwable) { $this->onHandleException($throwable); } finally { + $this->writeLog("【{$this->getProcessName()}】进程退出,pid={$this->getPid()}"); if ($this->isForceExit) { $this->kill($pid, SIGKILL); } else { @@ -1332,32 +1322,41 @@ protected function exitNow(int $pid, int $maxWaitTimeOfExit) /** * registerTickReboot register time reboot, will be called in init() function * - * @param $cron_expression * @return void */ - protected function registerTickReboot($cron_expression) + protected function registerTickReboot() { /** * local模式下的定时任务模式下不能设置定时重启,否则长时间执行的任务会被kill掉,而是在回调函数注册callback闭包来判断是否达到重启时间 * @see \Swoolefy\Worker\Cron\CronLocalProcess */ if (SystemEnv::isCronService() && $this instanceof \Swoolefy\Worker\Cron\CronLocalProcess) { + if (!is_numeric($this->lifeTime)) { + $this->lifeTime = 3600; + }else { + if ($this->lifeTime < 60) { + $this->lifeTime = 60; + } + } + return; + } + + $lifeTime = $this->lifeTime; + // daemon下使用loopHandle模式,则不注册定时重启,会在业务处理完后重启 + if ($this->useLoopHandle && is_numeric($lifeTime)) { return; } - if (is_numeric($cron_expression)) { - $randNum = rand(30, 60); + if (is_numeric($lifeTime)) { + $randTickTime = rand(10, 15); + $tickTime = $randTickTime * 1000; // for Example reboot/600s after 600s reboot this process - if ($cron_expression < 120) { - $sleepTime = 60; - $tickTime = (30 + $randNum) * 1000; - } else { - $sleepTime = $cron_expression; - $tickTime = (60 + $randNum) * 1000; + if ($lifeTime < 60) { + $lifeTime = 60; } - \Swoole\Timer::tick($tickTime, function ($timerId) use ($sleepTime) { - if (time() - $this->getStartTime() >= $sleepTime) { + \Swoole\Timer::tick($tickTime, function ($timerId) use ($lifeTime) { + if (time() >= $this->getStartTime() + $lifeTime) { $this->reboot($this->waitTime); \Swoole\Timer::clear($timerId); } @@ -1368,10 +1367,10 @@ protected function registerTickReboot($cron_expression) // cron expression of timer to reboot this process CrontabManager::getInstance()->addRule( 'system-register-tick-reboot', - $cron_expression, + $lifeTime, function () use ($randSleep, $isWorkerId0) { if(!$isWorkerId0) { - $this->reboot($this->waitTime + $randSleep); + $this->reboot($randSleep); } $this->reboot($this->waitTime); }); diff --git a/src/Worker/AbstractWorkerProcess.php b/src/Worker/AbstractWorkerProcess.php index e6748b8f..9af89e9e 100644 --- a/src/Worker/AbstractWorkerProcess.php +++ b/src/Worker/AbstractWorkerProcess.php @@ -26,7 +26,7 @@ abstract class AbstractWorkerProcess extends AbstractBaseWorker protected $maxHandle = 10000; /** - * @var int + * @var int|string */ protected $lifeTime = 3600; @@ -55,7 +55,7 @@ protected function init() $this->lifeTime = $this->getArgs()['life_time'] ?? $this->lifeTime; $this->currentRunCoroutineLastCid = $this->getArgs()['current_run_coroutine_last_cid'] ?? $this->maxHandle * 10; $this->limitCurrentRunCoroutineNum = $this->getArgs()['limit_run_coroutine_num'] ?? null; - $this->registerTickReboot($this->lifeTime); + $this->registerTickReboot(); $this->onInit(); } @@ -98,6 +98,7 @@ public function run() $this->useLoopHandle = true; while (true) { if (!$this->isDue()) { + $this->fmtWriteInfo("【{$this->getProcessName()}】守护进程退出|重启中,不再处理任务"); continue; } @@ -113,6 +114,13 @@ public function run() $pid = $this->getPid(); $this->exitNow($pid, 5); } + + // 定时任务处理完之后,判断达到一定时间,然后重启进程 + if (is_numeric($this->lifeTime)) { + if ( (time() > $this->getStartTime() + $this->lifeTime) && $this->isDue()) { + $this->reboot(5); + } + } } } } diff --git a/src/Worker/Cron/CronLocalProcess.php b/src/Worker/Cron/CronLocalProcess.php index 51802456..a918cfa7 100644 --- a/src/Worker/Cron/CronLocalProcess.php +++ b/src/Worker/Cron/CronLocalProcess.php @@ -63,8 +63,15 @@ public function run() function (): bool { // 上一个任务未执行完,下一个任务到来时不执行,返回false结束 if ($this->withBlockLapping && $this->handing) { + $this->fmtWriteInfo("【{$this->getProcessName()}】进程定时任务还在处理中,暂时不再处理下一个任务"); return false; } + + if (!$this->isDue()) { + $this->fmtWriteInfo("【{$this->getProcessName()}】定时任务进程退出|重启中,暂时不再处理任务"); + return false; + } + $this->handing = true; return true; }, @@ -72,13 +79,15 @@ function () { $this->handing = false; // 任务业务处理完,接收waitToExit=true的指令,进程退出 if ($this->waitToExit) { - $this->exit(true, 10); + $this->exitNow($this->getPid(), 5); return false; } - // 定时任务处理完之后,达到一定时间,判断然后重启进程 - if ( (time() > $this->getStartTime() + 3600) && $this->isDue()) { - $this->reboot(5); + // 定时任务处理完之后,判断达到一定时间,然后重启进程 + if (is_numeric($this->lifeTime)) { + if ( (time() > $this->getStartTime() + $this->lifeTime) && $this->isDue()) { + $this->reboot(5); + } } }); }catch (\Throwable $exception) { diff --git a/src/Worker/Traits/SystemTrait.php b/src/Worker/Traits/SystemTrait.php index eec9f763..5262bfa1 100644 --- a/src/Worker/Traits/SystemTrait.php +++ b/src/Worker/Traits/SystemTrait.php @@ -87,7 +87,7 @@ protected function inChildrenProcessEnv(): bool * @param $msg * @return void */ - private function fmtWriteInfo($msg) + protected function fmtWriteInfo($msg) { initConsoleStyleIo()->write("$msg", true); $this->writeLog($msg); @@ -99,7 +99,7 @@ private function fmtWriteInfo($msg) * @param $msg * @return void */ - private function fmtWriteError($msg) + protected function fmtWriteError($msg) { initConsoleStyleIo()->write("$msg", true); $this->writeLog($msg); @@ -109,7 +109,7 @@ private function fmtWriteError($msg) * @param string $msg * @return void */ - private function writeLog(string $msg) + protected function writeLog(string $msg) { if (defined('WORKER_CTL_LOG_FILE')) { if (defined('MAX_LOG_FILE_SIZE')) {