作者 邓超

1

@@ -17,85 +17,93 @@ function start(){ @@ -17,85 +17,93 @@ function start(){
17 break; 17 break;
18 } 18 }
19 19
20 - $lists = db()->all(\Model\sendJobsSql::sendList()); 20 + $lists = db()->all(\Model\sendJobsSql::sendList(500-$cNum));
21 // 循环 21 // 循环
22 foreach ($lists as $list){ 22 foreach ($lists as $list){
23 - go(function ($data) use (&$cNum){  
24 - $cNum++; // 协程数+1  
25 - // 表单数据  
26 - $data['maildata'] = json_decode($data['maildata'],true);  
27 - // 查询邮箱  
28 - $email = db()->first(\Model\emailSql::first($data['email_id']));  
29 - // 是否是单发送  
30 - if($data['maildata']['massSuit']??0){  
31 - $tos = $data['maildata']['tos'];  
32 - foreach ($tos as $to){  
33 -  
34 - // 是否已发送过了  
35 - if(db()->count(\Model\sendJobStatusSql::count($data['id'],$to['email']))){  
36 - continue; 23 +
  24 + // 占用 id
  25 + if(redis()->add('send_job_run_id_'.$list['id'],$list['id'])){
  26 + go(function ($data) use (&$cNum){
  27 + $cNum++; // 协程数+1
  28 + // 表单数据
  29 + $data['maildata'] = json_decode($data['maildata'],true);
  30 + // 查询邮箱
  31 + $email = db()->first(\Model\emailSql::first($data['email_id']));
  32 + // 是否是单发送
  33 + if($data['maildata']['massSuit']??0){
  34 + $tos = $data['maildata']['tos'];
  35 + foreach ($tos as $to){
  36 +
  37 + // 是否已发送过了
  38 + if(db()->count(\Model\sendJobStatusSql::count($data['id'],$to['email']))){
  39 + continue;
  40 + }
  41 +
  42 + // 每个收件人单独发送
  43 + $data['maildata']['tos'] = [$to];
  44 + $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
  45 +
  46 + // 插入紫薯精
  47 + db()->insert(\Model\sendJobStatusSql::$table,[
  48 + 'job_id' => $data['id'],
  49 + 'to_email' => $to['email'],
  50 + 'status' => $result[0] ? 1 : 0,
  51 + 'error' => $result[0] ? $result[1] : ''
  52 + ]);
  53 +
  54 +
  55 + // 时间距离下次的时间
  56 + if($data['maildata']['masssuit_interval_send']??[]){
  57 + $time = rand($data['maildata']['masssuit_interval_send']['start'],$data['maildata']['masssuit_interval_send']['end']);
  58 + if($time){
  59 + while (true){
  60 + // 没5秒循环一次
  61 + if(redis()->get('send_job_is_stop')=='stop'){
  62 + break;
  63 + }
  64 + $time-=5;
  65 + co::sleep(5);
  66 + // 执行下一次了
  67 + if (!$time){
  68 + break;
  69 + }
  70 + }
  71 + }
  72 + }
  73 +
37 } 74 }
38 75
39 - // 每个收件人单独发送  
40 - $data['maildata']['tos'] = [$to];  
41 - $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email); 76 + // 更新状态
  77 + \Model\sendJobsSql::upStatus($data['id'],1,db());
42 78
  79 + }else{
  80 + $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
  81 + // 更新状态
  82 + \Model\sendJobsSql::upStatus($data['id'],1,db());
43 // 插入紫薯精 83 // 插入紫薯精
44 db()->insert(\Model\sendJobStatusSql::$table,[ 84 db()->insert(\Model\sendJobStatusSql::$table,[
45 'job_id' => $data['id'], 85 'job_id' => $data['id'],
46 - 'to_email' => $to['email'], 86 + 'to_email' => 'all',
47 'status' => $result[0] ? 1 : 0, 87 'status' => $result[0] ? 1 : 0,
48 'error' => $result[0] ? $result[1] : '' 88 'error' => $result[0] ? $result[1] : ''
49 ]); 89 ]);
  90 + }
50 91
  92 + // 协程结束后
  93 + co::defer(function ($id) use(&$cNum,$data){
  94 + $cNum--;
  95 + // 结束后要关闭数据库链接,不然链接一直暂用
  96 + db()->close();
  97 + // 删除占用
  98 + redis()->delete('send_job_run_id_'.$data['id']);
  99 + redis()->close();
  100 + });
51 101
52 - // 时间距离下次的时间  
53 - if($data['maildata']['masssuit_interval_send']??[]){  
54 - $time = rand($data['maildata']['masssuit_interval_send']['start'],$data['maildata']['masssuit_interval_send']['end']);  
55 - if($time){  
56 - while (true){  
57 - // 没5秒循环一次  
58 - if(redis()->get('send_job_is_stop')=='stop'){  
59 - break;  
60 - }  
61 - $time-=5;  
62 - co::sleep(5);  
63 - // 执行下一次了  
64 - if (!$time){  
65 - break;  
66 - }  
67 - }  
68 - }  
69 - }  
70 -  
71 - } 102 + },$list);
  103 + }
72 104
73 - // 更新状态  
74 - \Model\sendJobsSql::upStatus($data['id'],1,db());  
75 -  
76 - }else{  
77 - $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);  
78 - // 更新状态  
79 - \Model\sendJobsSql::upStatus($data['id'],1,db());  
80 - // 插入紫薯精  
81 - db()->insert(\Model\sendJobStatusSql::$table,[  
82 - 'job_id' => $data['id'],  
83 - 'to_email' => 'all',  
84 - 'status' => $result[0] ? 1 : 0,  
85 - 'error' => $result[0] ? $result[1] : ''  
86 - ]);  
87 - }  
88 -  
89 - // 协程结束后  
90 - co::defer(function () use(&$cNum){  
91 - $cNum--;  
92 - // 结束后要关闭数据库链接,不然链接一直暂用  
93 - db()->close();  
94 - redis()->close();  
95 - });  
96 -  
97 - },$list);  
98 } 105 }
  106 +
99 }catch (Throwable $e){ 107 }catch (Throwable $e){
100 logs($e->getMessage().$e->getTraceAsString()); 108 logs($e->getMessage().$e->getTraceAsString());
101 } 109 }
@@ -24,9 +24,9 @@ class sendJobsSql { @@ -24,9 +24,9 @@ class sendJobsSql {
24 * @time 2023/4/11 14:56 24 * @time 2023/4/11 14:56
25 * @return string 25 * @return string
26 */ 26 */
27 - public static function sendList():string { 27 + public static function sendList($limit):string {
28 // 控制在500数量,协程数量就控制 28 // 控制在500数量,协程数量就控制
29 - return "select * from `".self::$table."` where `status` = 0 and `send_time` <= ".time()." limit 500"; 29 + return "select * from `".self::$table."` where `status` = 0 and `send_time` <= ".time()." limit {$limit}";
30 } 30 }
31 31
32 32