作者 邓超

1

  1 +<?php
  2 +
  3 +include_once "../vendor/autoload.php";
  4 +
  5 +// 这里试试不用多进程模式,用多协程模式
  6 +function start(){
  7 + // 删除key
  8 + redis()->delete('send_job_is_stop');
  9 + // 开启协程
  10 + \Co\run(function (){
  11 +
  12 + $cNum = 0;//协程运行的数量
  13 + while (true){
  14 + try {
  15 + // 是否要停止
  16 + if(redis()->get('send_job_is_stop')=='stop'){
  17 + break;
  18 + }
  19 +
  20 + $lists = db()->all(\Model\sendJobsSql::sendList());
  21 + // 循环
  22 + foreach ($lists as $list){
  23 + go(function ($data) use (&$cNum){
  24 + $cNum++; // 协程数+1
  25 + // 表单数据
  26 + $data['maildata'] = json_decode($data['maildata'],true);
  27 + // 查询邮箱
  28 + $email = db()->first(\Model\emailSql::first($data['email_id']));
  29 + // 是否是单发送
  30 + if($data['maildata']['massSuit']??0){
  31 + $tos = $data['maildata']['tos'];
  32 + foreach ($tos as $to){
  33 +
  34 + // 是否已发送过了
  35 + if(db()->count(\Model\sendJobStatusSql::count($data['id'],$to['email']))){
  36 + continue;
  37 + }
  38 +
  39 + // 每个收件人单独发送
  40 + $data['maildata']['tos'] = [$to];
  41 + $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
  42 +
  43 + // 插入紫薯精
  44 + db()->insert(\Model\sendJobStatusSql::$table,[
  45 + 'job_id' => $data['id'],
  46 + 'to_email' => $to['email'],
  47 + 'status' => $result[0] ? 1 : 0,
  48 + 'error' => $result[0] ? $result[1] : ''
  49 + ]);
  50 +
  51 +
  52 + // 时间距离下次的时间
  53 + if($data['maildata']['masssuit_interval_send']??[]){
  54 + $time = rand($data['maildata']['masssuit_interval_send']['start'],$data['maildata']['masssuit_interval_send']['end']);
  55 + if($time){
  56 + while (true){
  57 + // 没5秒循环一次
  58 + if(redis()->get('send_job_is_stop')=='stop'){
  59 + break;
  60 + }
  61 + $time-=5;
  62 + co::sleep(5);
  63 + // 执行下一次了
  64 + if (!$time){
  65 + break;
  66 + }
  67 + }
  68 + }
  69 + }
  70 +
  71 + }
  72 +
  73 + // 更新状态
  74 + \Model\sendJobsSql::upStatus($data['id'],1,db());
  75 +
  76 + }else{
  77 + $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
  78 + // 更新状态
  79 + \Model\sendJobsSql::upStatus($data['id'],1,db());
  80 + // 插入紫薯精
  81 + db()->insert(\Model\sendJobStatusSql::$table,[
  82 + 'job_id' => $data['id'],
  83 + 'to_email' => 'all',
  84 + 'status' => $result[0] ? 1 : 0,
  85 + 'error' => $result[0] ? $result[1] : ''
  86 + ]);
  87 + }
  88 +
  89 + // 协程结束后
  90 + co::defer(function () use(&$cNum){
  91 + $cNum--;
  92 + // 结束后要关闭数据库链接,不然链接一直暂用
  93 + db()->close();
  94 + redis()->close();
  95 + });
  96 +
  97 + },$list);
  98 + }
  99 + }catch (Throwable $e){
  100 + logs($e->getMessage().$e->getTraceAsString());
  101 + }
  102 +
  103 + // 暂停5秒
  104 + co::sleep(5);
  105 + }
  106 +
  107 + // 这个是等待所有协程退出
  108 + while (true){
  109 + if(!$cNum){
  110 + break;
  111 + }
  112 + co::sleep(1);
  113 + }
  114 +
  115 +
  116 + });
  117 +}
  118 +
  119 +
  120 +
  121 +
  122 +
  123 +$file = explode('/',__FILE__);
  124 +$file = end($file);
  125 +$ps = "ps -ef | grep \"{$file} start\" | grep -v grep | wc -l";
  126 +
  127 +switch ($argv[1]??0){
  128 + case 'start':{
  129 + $num = exec($ps);
  130 + if($num){
  131 + echo '正则运行,请勿重复运行';
  132 + }else{
  133 + start();
  134 + }
  135 + break;
  136 + }
  137 + case 'stop':{
  138 + echo "正在退出程序...\n非必要请不要强制kill掉进程\n";
  139 +
  140 + redis()->set('send_job_is_stop','stop',86400*5);
  141 +
  142 + while (true){
  143 +
  144 + $num = exec($ps);
  145 + if(!$num){
  146 + break;
  147 + }
  148 + sleep(1);
  149 + }
  150 + echo "已退出程序\n";
  151 + break;
  152 + }
  153 + default:{
  154 + break;
  155 + }
  156 +}
@@ -18,6 +18,17 @@ class sendJobStatusSql { @@ -18,6 +18,17 @@ class sendJobStatusSql {
18 public static $table = 'send_job_status'; 18 public static $table = 'send_job_status';
19 19
20 20
  21 + /**
  22 + * 统计
  23 + * @param int $job_id
  24 + * @param string $to
  25 + * @return string
  26 + * @author:dc
  27 + * @time 2023/4/11 16:13
  28 + */
  29 + public static function count(int $job_id, string $to):string {
  30 + return "select count(*) from `".self::$table."` where `job_id` = {$job_id} and `to_email` = '{$to}'";
  31 + }
21 32
22 33
23 34
@@ -30,5 +30,23 @@ class sendJobsSql { @@ -30,5 +30,23 @@ class sendJobsSql {
30 } 30 }
31 31
32 32
  33 + /**
  34 + * 更新状态
  35 + * @param int $id
  36 + * @param int $status
  37 + * @param null $db
  38 + * @author:dc
  39 + * @time 2023/4/11 16:06
  40 + */
  41 + public static function upStatus(int $id, int $status, $db=null){
  42 + $db = $db?:db();
  43 + // 更新状态
  44 + $db->update(\Model\sendJobsSql::$table,[
  45 + 'status' => $status
  46 + ],dbWhere([
  47 + 'id' => $id
  48 + ]));
  49 + }
  50 +
33 51
34 } 52 }