send_job.php 7.7 KB
<?php

use Swoole\Process;

include_once "../vendor/autoload.php";

// 这里试试不用多进程模式,用多协程模式

\Lib\DbPool::$clientNumber = 60;
\Lib\RedisPool::$clientNumber = 60;

class SendJob {

    public function start(){
        _echo('启动邮件群发任务 '.getmypid());

        while (1){

            $lists  =   db()->all(\Model\sendJobsSql::sendList(500));
            $lists = $lists?$lists:[];

            if($lists){
                foreach ($lists as $list){
                    if($list['status'] === 1) {
                        $total = db()->first(\Model\sendJobStatusSql::countSum($list['id']));
                        if ($total && $total['t'] == $list['total']) {
                            // 更新状态
                            db()->update(\Model\sendJobsSql::$table, [
                                'status' => 2,
                                'success' => $total['s'],
                                'error' => $total['e'],
                            ], dbWhere(['id' => $list['id']]));

                            continue;
                        }

                    }

                    $this->go_($list);


                }
            }else{
                _echo('没有找到任务');
            }
                // 休眠30秒
            co::sleep(30);

        }

    }


    /**
     * @param $list
     * @throws \Lib\Err
     * @throws \PHPMailer\PHPMailer\Exception
     * @author:dc
     * @time 2024/4/10 9:25
     */
    public function go_($list){
        // 控制50个协程内
        while (\Lib\SwGo::$runNumber >= 50){
            co::sleep(1);
        }


        // 占用 id
        if(redis()->add('send_job_run_id_'.$list['id'],$list['id'],600)){
            \Lib\SwGo::start(function ($data) {
                _echo('正在执行任务 '.$data['id']);

                // 表单数据
                $data['maildata'] = json_decode($data['maildata'],true);
                // 查询邮箱
                $email = db()->first(\Model\emailSql::first($data['email_id']));

                // 更新状态
                db()->update(\Model\sendJobsSql::$table,[
                    'status'    =>  1,
                    'total'  =>  count(array_unique(array_map('strtolower',array_column($data['maildata']['tos']??[],'email'))))
                ],dbWhere([
                    'id'    =>  $data['id']
                ]));

                _echo('更新任务状态 '.$data['id']);
                // 是否是单发送
                if($data['maildata']['massSuit']??0){
                    $tos    =   $data['maildata']['tos'];
                    foreach ($tos as $to){

                        // 续时间
                        redis()->set('send_job_run_id_'.$data['id'],$data['id'],600);

                        // 是否暂停
                        $dst = db()->first(\Model\sendJobsSql::isStatus($data['id']));
                        if($dst && $dst['status'] === 3){
                            break;
                        }

                        // 是否已发送过了
                        if(db()->count(\Model\sendJobStatusSql::count($data['id'],$to['email']))){
                            continue;
                        }
                        _echo('正在执行任务 发送邮件 '.$to['email']);

                        // 每个收件人单独发送
                        $data['maildata']['tos'] = [$to];
                        //替换邮件内容中的指定字段为客户名字
                        $data['maildata']['body'] = str_replace('{customer_name}', $to['name'], $data['maildata']['body']);
                        $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
                        _echo('邮件发送 '.json_encode($result,JSON_UNESCAPED_UNICODE));
                        // 插入紫薯精
                        db()->insert(\Model\sendJobStatusSql::$table,[
                            'job_id'  =>  $data['id'],
                            'to_email'  =>  $to['email'],
                            'status'    =>  $result[0] ? 1 : 0,
                            'error'    =>  $result[1]
                        ]);


                        // 时间距离下次的时间
                        if($data['maildata']['masssuit_interval_send']??[]){
                            $time = rand($data['maildata']['masssuit_interval_send']['start'],$data['maildata']['masssuit_interval_send']['end']);
                            if($time){
                                _echo('进入时间等待区 '.$to['email'].' 等待:'.$time);
                                $block = false;
                                while (true){
                                    $time-=5;
                                    co::sleep(5);
                                    // 执行下一次了
                                    if (!$time){
                                        $block = true;
                                        break;
                                    }
                                }

                                if($block){
                                    break;
                                }
                            }
                        }

                    }

                }
                else{
                    // 是否已发送过了
                    if(!db()->count(\Model\sendJobStatusSql::count($data['id'],'all'))){
                        $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
                        // 更新状态
                        db()->update(\Model\sendJobsSql::$table,[
                            'status'    =>  2,
                            'success'   =>  $result[0] ? $data['total'] : 0,
                            'error'   =>  $result[0] ? 0 : $data['total'],
                        ],dbWhere(['id'=>$data['id']]));
                        // 插入紫薯精
                        db()->insert(\Model\sendJobStatusSql::$table,[
                            'job_id'  =>  $data['id'],
                            'to_email'  =>  'all',
                            'status'    =>  $result[0] ? 1 : 0,
                            'error'    =>  $result[0] ? $result[1] : ''
                        ]);
                    }else{
                        _echo('发送过了 '.$data['id']);
                    }


                }

            },function ($data){
                // 验证是否完成
                if($data['maildata']['massSuit']??0){
                    $dst = db()->first(\Model\sendJobsSql::isStatus($data['id']));
                    if($dst && $dst['status'] != 3){
                        $total = db()->first(\Model\sendJobStatusSql::countSum($data['id']));
                        if($total){
                            // 更新状态
                            db()->update(\Model\sendJobsSql::$table,[
                                'status'    =>  $total['t'] == $data['total'] ? 2 : 0,
                                'success'   =>  $total['s'],
                                'error'   =>  $total['e'],
                            ],dbWhere(['id'=>$data['id']]));
                        }
                    }
                }
                // 删除占用
                redis()->delete('send_job_run_id_'.$data['id']);

                _echo('执行任务完成'.$data['id']);

            },$list);
        }

    }

}


$pm = new Process\Manager();

// 启动业务进程
$pm->addBatch(2,function (Process\Pool $pool, int $worker_id) {
    if($worker_id==0){
        $time = time();
        while (1){
            if(time() - $time > 3600){
                $pool->shutdown();
                break;
            }
            co::sleep(5);
        }
        return 0;
    }

    (new SendJob)->start();
},true);

$pm->start();