作者 邓超

go

@@ -5,66 +5,14 @@ include_once "../vendor/autoload.php"; @@ -5,66 +5,14 @@ include_once "../vendor/autoload.php";
5 // 这里试试不用多进程模式,用多协程模式 5 // 这里试试不用多进程模式,用多协程模式
6 6
7 \Lib\DbPool::$clientNumber = 60; 7 \Lib\DbPool::$clientNumber = 60;
  8 +\Lib\RedisPool::$clientNumber = 60;
8 9
9 class SendJob { 10 class SendJob {
10 11
11 - public $cnum = 0;  
12 -  
13 - private $run_timer = 0;  
14 -  
15 - public function __construct()  
16 - {  
17 - $this->run_timer = time();  
18 - }  
19 -  
20 - /**  
21 - * 是否停止  
22 - * @return bool  
23 - * @author:dc  
24 - * @time 2024/4/10 9:12  
25 - */  
26 - private function isStop(){  
27 - // 运行超过1天的 停止  
28 - if($this->run_timer < (time()-43200)){  
29 -// @posix_kill(getmypid(), SIGTERM);  
30 - return true;  
31 - }  
32 - return redis()->get('send_job_is_stop') == 'stop';  
33 - }  
34 -  
35 - /**  
36 - * 休眠  
37 - * @param float $sleep  
38 - * @return bool  
39 - * @author:dc  
40 - * @time 2024/4/10 9:12  
41 - */  
42 - private function s_sleep(float $sleep):bool {  
43 - if($sleep > 0){  
44 - $t = microtime(1);  
45 -  
46 - while (!$this->isStop()){  
47 - co::sleep(0.1);  
48 - if($sleep - (microtime(1)-$t) <= 0){  
49 - break;  
50 - }  
51 - }  
52 - }  
53 - return true;  
54 - }  
55 -  
56 -  
57 public function start(){ 12 public function start(){
58 _echo('启动邮件群发任务 '.getmypid()); 13 _echo('启动邮件群发任务 '.getmypid());
59 - // 删除key  
60 - redis()->delete('send_job_is_stop');  
61 -  
62 14
63 while (1){ 15 while (1){
64 - // 是否要停止  
65 - if($this->isStop()){  
66 - break;  
67 - }  
68 16
69 $lists = db()->all(\Model\sendJobsSql::sendList(500)); 17 $lists = db()->all(\Model\sendJobsSql::sendList(500));
70 $lists = $lists?$lists:[]; 18 $lists = $lists?$lists:[];
@@ -92,20 +40,10 @@ class SendJob { @@ -92,20 +40,10 @@ class SendJob {
92 } 40 }
93 } 41 }
94 // 休眠30秒 42 // 休眠30秒
95 - $this->s_sleep(30); 43 + co::sleep(30);
96 44
97 } 45 }
98 46
99 -  
100 - // 这个是等待所有协程退出  
101 - while (true){  
102 - _echo('等待协程退出...');  
103 - if(!$this->cnum){  
104 - break;  
105 - }  
106 - co::sleep(1);  
107 - }  
108 -  
109 } 47 }
110 48
111 49
@@ -118,16 +56,16 @@ class SendJob { @@ -118,16 +56,16 @@ class SendJob {
118 */ 56 */
119 public function go_($list){ 57 public function go_($list){
120 // 控制50个协程内 58 // 控制50个协程内
121 - while ($this->cnum>=50){  
122 - co::sleep(0.5); 59 + while (\Lib\SwGo::$runNumber >= 50){
  60 + co::sleep(1);
123 } 61 }
124 62
125 63
126 // 占用 id 64 // 占用 id
127 if(redis()->add('send_job_run_id_'.$list['id'],$list['id'],600)){ 65 if(redis()->add('send_job_run_id_'.$list['id'],$list['id'],600)){
128 - go(function ($data) { 66 + \Lib\SwGo::start(function ($data) {
129 _echo('正在执行任务 '.$data['id']); 67 _echo('正在执行任务 '.$data['id']);
130 - $this->cnum++; // 协程数+1 68 +
131 // 表单数据 69 // 表单数据
132 $data['maildata'] = json_decode($data['maildata'],true); 70 $data['maildata'] = json_decode($data['maildata'],true);
133 // 查询邮箱 71 // 查询邮箱
@@ -184,13 +122,8 @@ class SendJob { @@ -184,13 +122,8 @@ class SendJob {
184 _echo('进入时间等待区 '.$to['email'].' 等待:'.$time); 122 _echo('进入时间等待区 '.$to['email'].' 等待:'.$time);
185 $block = false; 123 $block = false;
186 while (true){ 124 while (true){
187 - // 没5秒循环一次  
188 - if($this->isStop()){  
189 - $block = true;  
190 - break;  
191 - }  
192 $time-=5; 125 $time-=5;
193 - $this->s_sleep(5); 126 + co::sleep(5);
194 // 执行下一次了 127 // 执行下一次了
195 if (!$time){ 128 if (!$time){
196 $block = true; 129 $block = true;
@@ -231,9 +164,7 @@ class SendJob { @@ -231,9 +164,7 @@ class SendJob {
231 164
232 } 165 }
233 166
234 - // 协程结束后  
235 - co::defer(function ($id) use($data){  
236 - $this->cnum--; 167 + },function ($data){
237 // 验证是否完成 168 // 验证是否完成
238 if($data['maildata']['massSuit']??0){ 169 if($data['maildata']['massSuit']??0){
239 $dst = db()->first(\Model\sendJobsSql::isStatus($data['id'])); 170 $dst = db()->first(\Model\sendJobsSql::isStatus($data['id']));
@@ -249,74 +180,22 @@ class SendJob { @@ -249,74 +180,22 @@ class SendJob {
249 } 180 }
250 } 181 }
251 } 182 }
252 -  
253 - // 写入日志  
254 - \Lib\Log::getInstance()->write();  
255 -  
256 // 删除占用 183 // 删除占用
257 redis()->delete('send_job_run_id_'.$data['id']); 184 redis()->delete('send_job_run_id_'.$data['id']);
258 185
259 -  
260 _echo('执行任务完成'.$data['id']); 186 _echo('执行任务完成'.$data['id']);
261 187
262 - db()->close();  
263 - });  
264 -  
265 },$list); 188 },$list);
266 } 189 }
267 190
268 } 191 }
269 192
270 } 193 }
271 -  
272 -  
273 -$ps = "ps -ef | grep \"send_job.php start\" | grep -v grep | wc -l";  
274 -  
275 -switch ($argv[1]??0){  
276 - case 'start':{  
277 -  
278 - // 开启协程  
279 - \Co\run(function (){  
280 -  
281 - $handler = function ($signal){  
282 - // 可以处理其他程序  
283 - redis()->set('send_job_is_stop','stop');  
284 -  
285 - _echo('收到退出信号 '.$signal);  
286 - };  
287 -  
288 - \Swoole\Process::signal(SIGTERM,$handler);  
289 - \Swoole\Process::signal(SIGINT,$handler); 194 +// 开启协程
  195 +\Co\run(function (){
290 196
291 (new SendJob)->start(); 197 (new SendJob)->start();
292 198
293 _echo('进程已退出'); 199 _echo('进程已退出');
294 200
295 - db()->close();  
296 - });  
297 -  
298 - break;  
299 - }  
300 - case 'stop':{  
301 - \Co\run(function ($ps){  
302 - echo "正在退出程序...\n非必要请不要强制kill掉进程\n";  
303 -  
304 - redis()->set('send_job_is_stop','stop');  
305 -  
306 - while (true){  
307 -  
308 - $num = exec($ps);  
309 - if(!$num){  
310 - break;  
311 - }  
312 - co::sleep(0.2);  
313 - }  
314 - echo "已退出程序\n";  
315 - },$ps);  
316 -  
317 - break;  
318 - }  
319 - default:{  
320 - break;  
321 - }  
322 -} 201 +});
@@ -37,9 +37,17 @@ class SwGo { @@ -37,9 +37,17 @@ class SwGo {
37 37
38 go(function (\Closure $run,...$param){ 38 go(function (\Closure $run,...$param){
39 self::$runNumber++; 39 self::$runNumber++;
  40 + $end = null;
  41 + if(empty($param[0]) && $param[0] instanceof \Closure){
  42 + $end = $param[0];
  43 + unset($param[0]);
  44 + $param = array_values($param);
  45 + }
40 46
41 $run(...$param); 47 $run(...$param);
42 48
  49 + if($end) $end(...$param);
  50 +
43 // 写入日志 51 // 写入日志
44 \Lib\Log::getInstance()->write(); 52 \Lib\Log::getInstance()->write();
45 // 释放 mysql 53 // 释放 mysql