send_job.php 5.4 KB
<?php

include_once "../vendor/autoload.php";

// 这里试试不用多进程模式,用多协程模式
function start(){
    // 删除key
    redis()->delete('send_job_is_stop');
    // 开启协程
    \Co\run(function (){

        $cNum = 0;//协程运行的数量
        while (true){
            try {
                // 是否要停止
                if(redis()->get('send_job_is_stop')=='stop'){
                    break;
                }

                $lists  =   db()->all(\Model\sendJobsSql::sendList());
                // 循环
                foreach ($lists as $list){
                    go(function ($data) use (&$cNum){
                        $cNum++; // 协程数+1
                        // 表单数据
                        $data['maildata'] = json_decode($data['maildata'],true);
                        // 查询邮箱
                        $email = db()->first(\Model\emailSql::first($data['email_id']));
                        // 是否是单发送
                        if($data['maildata']['massSuit']??0){
                            $tos    =   $data['maildata']['tos'];
                            foreach ($tos as $to){

                                // 是否已发送过了
                                if(db()->count(\Model\sendJobStatusSql::count($data['id'],$to['email']))){
                                    continue;
                                }

                                // 每个收件人单独发送
                                $data['maildata']['tos'] = [$to];
                                $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);

                                // 插入紫薯精
                                db()->insert(\Model\sendJobStatusSql::$table,[
                                    'job_id'  =>  $data['id'],
                                    'to_email'  =>  $to['email'],
                                    'status'    =>  $result[0] ? 1 : 0,
                                    'error'    =>  $result[0] ? $result[1] : ''
                                ]);


                                // 时间距离下次的时间
                                if($data['maildata']['masssuit_interval_send']??[]){
                                    $time = rand($data['maildata']['masssuit_interval_send']['start'],$data['maildata']['masssuit_interval_send']['end']);
                                    if($time){
                                        while (true){
                                            // 没5秒循环一次
                                            if(redis()->get('send_job_is_stop')=='stop'){
                                                break;
                                            }
                                            $time-=5;
                                            co::sleep(5);
                                            // 执行下一次了
                                            if (!$time){
                                                break;
                                            }
                                        }
                                    }
                                }

                            }

                            // 更新状态
                            \Model\sendJobsSql::upStatus($data['id'],1,db());

                        }else{
                            $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
                            // 更新状态
                            \Model\sendJobsSql::upStatus($data['id'],1,db());
                            // 插入紫薯精
                            db()->insert(\Model\sendJobStatusSql::$table,[
                                'job_id'  =>  $data['id'],
                                'to_email'  =>  'all',
                                'status'    =>  $result[0] ? 1 : 0,
                                'error'    =>  $result[0] ? $result[1] : ''
                            ]);
                        }

                        // 协程结束后
                        co::defer(function () use(&$cNum){
                            $cNum--;
                            // 结束后要关闭数据库链接,不然链接一直暂用
                            db()->close();
                            redis()->close();
                        });

                    },$list);
                }
            }catch (Throwable $e){
                logs($e->getMessage().$e->getTraceAsString());
            }

            // 暂停5秒
            co::sleep(5);
        }

        // 这个是等待所有协程退出
        while (true){
            if(!$cNum){
                break;
            }
            co::sleep(1);
        }


    });
}





$file = explode('/',__FILE__);
$file = end($file);
$ps = "ps -ef | grep \"{$file} start\" | grep -v grep | wc -l";

switch ($argv[1]??0){
    case 'start':{
        $num = exec($ps);
        if($num){
            echo '正则运行,请勿重复运行';
        }else{
            start();
        }
        break;
    }
    case 'stop':{
        echo "正在退出程序...\n非必要请不要强制kill掉进程\n";

        redis()->set('send_job_is_stop','stop',86400*5);

        while (true){

            $num = exec($ps);
            if(!$num){
                break;
            }
            sleep(1);
        }
        echo "已退出程序\n";
        break;
    }
    default:{
        break;
    }
}