作者 邓超

优化 发送邮件脚本

正在显示 1 个修改的文件 包含 104 行增加47 行删除
@@ -3,35 +3,96 @@ @@ -3,35 +3,96 @@
3 include_once "../vendor/autoload.php"; 3 include_once "../vendor/autoload.php";
4 4
5 // 这里试试不用多进程模式,用多协程模式 5 // 这里试试不用多进程模式,用多协程模式
6 -function start(){  
7 - _echo('启动邮件群发任务');  
8 - // 删除key  
9 - redis()->delete('send_job_is_stop');  
10 - // 开启协程  
11 - \Co\run(function (){  
12 6
13 - $cNum = 0;//协程运行的数量  
14 - $maxRunNum = 500;  
15 - while ($maxRunNum){  
16 - $maxRunNum--;  
17 - if(!$maxRunNum){ 7 +
  8 +
  9 +class SendJob {
  10 +
  11 + public $cnum = 0;
  12 +
  13 + /**
  14 + * 是否停止
  15 + * @return bool
  16 + * @author:dc
  17 + * @time 2024/4/10 9:12
  18 + */
  19 + private function isStop(){
  20 + return redis()->get('send_job_is_stop') == 'stop';
  21 + }
  22 +
  23 + /**
  24 + * 休眠
  25 + * @param float $sleep
  26 + * @return bool
  27 + * @author:dc
  28 + * @time 2024/4/10 9:12
  29 + */
  30 + private function s_sleep(float $sleep):bool {
  31 + if($sleep > 0){
  32 + $t = microtime(1);
  33 +
  34 + while (!$this->isStop()){
  35 + co::sleep(0.1);
  36 + if($sleep - (microtime(1)-$t) <= 0){
18 break; 37 break;
19 } 38 }
20 - try { 39 + }
  40 + }
  41 + return true;
  42 + }
  43 +
  44 +
  45 + public function start(){
  46 + _echo('启动邮件群发任务 '.getmypid());
  47 + // 删除key
  48 + redis()->delete('send_job_is_stop');
  49 +
  50 +
  51 + while (1){
21 // 是否要停止 52 // 是否要停止
22 - if(redis()->get('send_job_is_stop')=='stop'){ 53 + if($this->isStop()){
23 break; 54 break;
24 } 55 }
25 56
26 $lists = db()->all(\Model\sendJobsSql::sendList(500)); 57 $lists = db()->all(\Model\sendJobsSql::sendList(500));
27 $lists = $lists?$lists:[]; 58 $lists = $lists?$lists:[];
28 - // 循环 59 +
  60 + if($lists){
29 foreach ($lists as $list){ 61 foreach ($lists as $list){
  62 + $this->go_($list);
  63 + }
  64 + }else{
  65 + // 休眠30秒
  66 + $this->s_sleep(30);
  67 + }
  68 + }
  69 +
  70 +
  71 + // 这个是等待所有协程退出
  72 + while (true){
  73 + _echo('等待协程退出...');
  74 + if(!$this->cnum){
  75 + break;
  76 + }
  77 + co::sleep(1);
  78 + }
  79 +
  80 + }
30 81
  82 +
  83 + /**
  84 + * @param $list
  85 + * @throws \Lib\Err
  86 + * @throws \PHPMailer\PHPMailer\Exception
  87 + * @author:dc
  88 + * @time 2024/4/10 9:25
  89 + */
  90 + public function go_($list){
31 // 占用 id 91 // 占用 id
32 - if(redis()->add('send_job_run_id_'.$list['id'],$list['id'],3600)){  
33 - go(function ($data) use (&$cNum){  
34 - $cNum++; // 协程数+1 92 + if(redis()->add('send_job_run_id_'.$list['id'],$list['id'],600)){
  93 + go(function ($data) {
  94 + _echo('正在执行任务 '.$data['id']);
  95 + $this->cnum++; // 协程数+1
35 // 表单数据 96 // 表单数据
36 $data['maildata'] = json_decode($data['maildata'],true); 97 $data['maildata'] = json_decode($data['maildata'],true);
37 // 查询邮箱 98 // 查询邮箱
@@ -42,6 +103,9 @@ function start(){ @@ -42,6 +103,9 @@ function start(){
42 if($data['maildata']['massSuit']??0){ 103 if($data['maildata']['massSuit']??0){
43 $tos = $data['maildata']['tos']; 104 $tos = $data['maildata']['tos'];
44 foreach ($tos as $to){ 105 foreach ($tos as $to){
  106 + _echo('正在执行任务 发送邮件 '.$to['email']);
  107 + // 续时间
  108 + redis()->set('send_job_run_id_'.$data['id'],$data['id'],600);
45 109
46 // 是否暂停 110 // 是否暂停
47 $dst = db()->first(\Model\sendJobsSql::isStatus($data['id'])); 111 $dst = db()->first(\Model\sendJobsSql::isStatus($data['id']));
@@ -76,12 +140,12 @@ function start(){ @@ -76,12 +140,12 @@ function start(){
76 $block = false; 140 $block = false;
77 while (true){ 141 while (true){
78 // 没5秒循环一次 142 // 没5秒循环一次
79 - if(redis()->get('send_job_is_stop')=='stop'){ 143 + if($this->isStop()){
80 $block = true; 144 $block = true;
81 break; 145 break;
82 } 146 }
83 $time-=5; 147 $time-=5;
84 - co::sleep(5); 148 + $this->s_sleep(5);
85 // 执行下一次了 149 // 执行下一次了
86 if (!$time){ 150 if (!$time){
87 $block = true; 151 $block = true;
@@ -97,7 +161,8 @@ function start(){ @@ -97,7 +161,8 @@ function start(){
97 161
98 } 162 }
99 163
100 - }else{ 164 + }
  165 + else{
101 $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email); 166 $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
102 // 更新状态 167 // 更新状态
103 db()->update(\Model\sendJobsSql::$table,[ 168 db()->update(\Model\sendJobsSql::$table,[
@@ -115,8 +180,8 @@ function start(){ @@ -115,8 +180,8 @@ function start(){
115 } 180 }
116 181
117 // 协程结束后 182 // 协程结束后
118 - co::defer(function ($id) use(&$cNum,$data){  
119 - $cNum--; 183 + co::defer(function ($id) use($data){
  184 + $this->cnum--;
120 // 验证是否完成 185 // 验证是否完成
121 if($data['maildata']['massSuit']??0){ 186 if($data['maildata']['massSuit']??0){
122 $total = db()->first(\Model\sendJobStatusSql::countSum($data['id'])); 187 $total = db()->first(\Model\sendJobStatusSql::countSum($data['id']));
@@ -147,41 +212,33 @@ function start(){ @@ -147,41 +212,33 @@ function start(){
147 212
148 } 213 }
149 214
150 - }catch (Throwable $e){  
151 - logs($e->getMessage().$e->getTraceAsString());  
152 - } 215 +}
153 216
154 - \Lib\Log::getInstance()->write();  
155 - // 暂停5秒  
156 - co::sleep(5);  
157 - }  
158 217
159 - // 这个是等待所有协程退出  
160 - while (true){  
161 - if(!$cNum){  
162 - break;  
163 - }  
164 - co::sleep(0.5);  
165 - } 218 +$ps = "ps -ef | grep \"send_job.php start\" | grep -v grep | wc -l";
166 219
  220 +switch ($argv[1]??0){
  221 + case 'start':{
167 222
168 - });  
169 -} 223 + // 开启协程
  224 + \Co\run(function (){
170 225
  226 + $handler = function ($signal){
  227 + // 可以处理其他程序
  228 + redis()->set('send_job_is_stop','stop');
171 229
  230 + _echo('收到退出信号 '.$signal);
  231 + };
172 232
  233 + \Swoole\Process::signal(SIGTERM,$handler);
  234 + \Swoole\Process::signal(SIGINT,$handler);
173 235
  236 + (new SendJob)->start();
174 237
175 -$ps = "ps -ef | grep \"send_job.php start\" | grep -v grep | wc -l"; 238 + _echo('进程已退出');
  239 +
  240 + });
176 241
177 -switch ($argv[1]??0){  
178 - case 'start':{  
179 -// $num = exec($ps);  
180 -// if($num){  
181 -// echo '正则运行,请勿重复运行';  
182 -// }else{  
183 - start();  
184 -// }  
185 break; 242 break;
186 } 243 }
187 case 'stop':{ 244 case 'stop':{