From 6db3a8a33b8e66d434331cf7f6c924cf41204524 Mon Sep 17 00:00:00 2001 From: dengchao <200582249@qq.com> Date: Tue, 18 Mar 2025 15:43:47 +0800 Subject: [PATCH] go --- cmd/send_job.php | 179 +++++++++++++++++++++++++++++------------------------------------------------------------------------------------------------------------------------------------------------------ lib/SwGo.php | 8 ++++++++ 2 files changed, 37 insertions(+), 150 deletions(-) diff --git a/cmd/send_job.php b/cmd/send_job.php index 2a69d6e..80946ff 100644 --- a/cmd/send_job.php +++ b/cmd/send_job.php @@ -5,66 +5,14 @@ include_once "../vendor/autoload.php"; // 这里试试不用多进程模式,用多协程模式 \Lib\DbPool::$clientNumber = 60; +\Lib\RedisPool::$clientNumber = 60; class SendJob { - public $cnum = 0; - - private $run_timer = 0; - - public function __construct() - { - $this->run_timer = time(); - } - - /** - * 是否停止 - * @return bool - * @author:dc - * @time 2024/4/10 9:12 - */ - private function isStop(){ - // 运行超过1天的 停止 - if($this->run_timer < (time()-43200)){ -// @posix_kill(getmypid(), SIGTERM); - return true; - } - return redis()->get('send_job_is_stop') == 'stop'; - } - - /** - * 休眠 - * @param float $sleep - * @return bool - * @author:dc - * @time 2024/4/10 9:12 - */ - private function s_sleep(float $sleep):bool { - if($sleep > 0){ - $t = microtime(1); - - while (!$this->isStop()){ - co::sleep(0.1); - if($sleep - (microtime(1)-$t) <= 0){ - break; - } - } - } - return true; - } - - public function start(){ _echo('启动邮件群发任务 '.getmypid()); - // 删除key - redis()->delete('send_job_is_stop'); - while (1){ - // 是否要停止 - if($this->isStop()){ - break; - } $lists = db()->all(\Model\sendJobsSql::sendList(500)); $lists = $lists?$lists:[]; @@ -92,18 +40,8 @@ class SendJob { } } // 休眠30秒 - $this->s_sleep(30); - - } - + co::sleep(30); - // 这个是等待所有协程退出 - while (true){ - _echo('等待协程退出...'); - if(!$this->cnum){ - break; - } - co::sleep(1); } } @@ -118,16 +56,16 @@ class SendJob { */ public function go_($list){ // 控制50个协程内 - while ($this->cnum>=50){ - co::sleep(0.5); + while (\Lib\SwGo::$runNumber >= 50){ + co::sleep(1); } // 占用 id if(redis()->add('send_job_run_id_'.$list['id'],$list['id'],600)){ - go(function ($data) { + \Lib\SwGo::start(function ($data) { _echo('正在执行任务 '.$data['id']); - $this->cnum++; // 协程数+1 + // 表单数据 $data['maildata'] = json_decode($data['maildata'],true); // 查询邮箱 @@ -184,13 +122,8 @@ class SendJob { _echo('进入时间等待区 '.$to['email'].' 等待:'.$time); $block = false; while (true){ - // 没5秒循环一次 - if($this->isStop()){ - $block = true; - break; - } $time-=5; - $this->s_sleep(5); + co::sleep(5); // 执行下一次了 if (!$time){ $block = true; @@ -231,36 +164,26 @@ class SendJob { } - // 协程结束后 - co::defer(function ($id) use($data){ - $this->cnum--; - // 验证是否完成 - if($data['maildata']['massSuit']??0){ - $dst = db()->first(\Model\sendJobsSql::isStatus($data['id'])); - if($dst && $dst['status'] != 3){ - $total = db()->first(\Model\sendJobStatusSql::countSum($data['id'])); - if($total){ - // 更新状态 - db()->update(\Model\sendJobsSql::$table,[ - 'status' => $total['t'] == $data['total'] ? 2 : 0, - 'success' => $total['s'], - 'error' => $total['e'], - ],dbWhere(['id'=>$data['id']])); - } + },function ($data){ + // 验证是否完成 + if($data['maildata']['massSuit']??0){ + $dst = db()->first(\Model\sendJobsSql::isStatus($data['id'])); + if($dst && $dst['status'] != 3){ + $total = db()->first(\Model\sendJobStatusSql::countSum($data['id'])); + if($total){ + // 更新状态 + db()->update(\Model\sendJobsSql::$table,[ + 'status' => $total['t'] == $data['total'] ? 2 : 0, + 'success' => $total['s'], + 'error' => $total['e'], + ],dbWhere(['id'=>$data['id']])); } } + } + // 删除占用 + redis()->delete('send_job_run_id_'.$data['id']); - // 写入日志 - \Lib\Log::getInstance()->write(); - - // 删除占用 - redis()->delete('send_job_run_id_'.$data['id']); - - - _echo('执行任务完成'.$data['id']); - - db()->close(); - }); + _echo('执行任务完成'.$data['id']); },$list); } @@ -268,55 +191,11 @@ class SendJob { } } +// 开启协程 +\Co\run(function (){ + (new SendJob)->start(); -$ps = "ps -ef | grep \"send_job.php start\" | grep -v grep | wc -l"; - -switch ($argv[1]??0){ - case 'start':{ - - // 开启协程 - \Co\run(function (){ - - $handler = function ($signal){ - // 可以处理其他程序 - redis()->set('send_job_is_stop','stop'); - - _echo('收到退出信号 '.$signal); - }; + _echo('进程已退出'); - \Swoole\Process::signal(SIGTERM,$handler); - \Swoole\Process::signal(SIGINT,$handler); - - (new SendJob)->start(); - - _echo('进程已退出'); - - db()->close(); - }); - - break; - } - case 'stop':{ - \Co\run(function ($ps){ - echo "正在退出程序...\n非必要请不要强制kill掉进程\n"; - - redis()->set('send_job_is_stop','stop'); - - while (true){ - - $num = exec($ps); - if(!$num){ - break; - } - co::sleep(0.2); - } - echo "已退出程序\n"; - },$ps); - - break; - } - default:{ - break; - } -} +}); diff --git a/lib/SwGo.php b/lib/SwGo.php index 59954ef..14e4e02 100644 --- a/lib/SwGo.php +++ b/lib/SwGo.php @@ -37,9 +37,17 @@ class SwGo { go(function (\Closure $run,...$param){ self::$runNumber++; + $end = null; + if(empty($param[0]) && $param[0] instanceof \Closure){ + $end = $param[0]; + unset($param[0]); + $param = array_values($param); + } $run(...$param); + if($end) $end(...$param); + // 写入日志 \Lib\Log::getInstance()->write(); // 释放 mysql -- libgit2 0.24.0