send_job.php
7.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
<?php
include_once "../vendor/autoload.php";
// 这里试试不用多进程模式,用多协程模式
\Lib\DbPool::$clientNumber = 60;
\Lib\RedisPool::$clientNumber = 60;
class SendJob {
    public function start(){
        _echo('启动邮件群发任务 '.getmypid());
        while (1){
            $lists  =   db()->all(\Model\sendJobsSql::sendList(500));
            $lists = $lists?$lists:[];
            if($lists){
                foreach ($lists as $list){
                    if($list['status'] === 1) {
                        $total = db()->first(\Model\sendJobStatusSql::countSum($list['id']));
                        if ($total && $total['t'] == $list['total']) {
                            // 更新状态
                            db()->update(\Model\sendJobsSql::$table, [
                                'status' => 2,
                                'success' => $total['s'],
                                'error' => $total['e'],
                            ], dbWhere(['id' => $list['id']]));
                            continue;
                        }
                    }
                    $this->go_($list);
                }
            }
                // 休眠30秒
            co::sleep(30);
        }
    }
    /**
     * @param $list
     * @throws \Lib\Err
     * @throws \PHPMailer\PHPMailer\Exception
     * @author:dc
     * @time 2024/4/10 9:25
     */
    public function go_($list){
        // 控制50个协程内
        while (\Lib\SwGo::$runNumber >= 50){
            co::sleep(1);
        }
        // 占用 id
        if(redis()->add('send_job_run_id_'.$list['id'],$list['id'],600)){
            \Lib\SwGo::start(function ($data) {
                _echo('正在执行任务 '.$data['id']);
                // 表单数据
                $data['maildata'] = json_decode($data['maildata'],true);
                // 查询邮箱
                $email = db()->first(\Model\emailSql::first($data['email_id']));
                // 更新状态
                db()->update(\Model\sendJobsSql::$table,[
                    'status'    =>  1,
                    'total'  =>  count(array_unique(array_map('strtolower',array_column($data['maildata']['tos']??[],'email'))))
                ],dbWhere([
                    'id'    =>  $data['id']
                ]));
                _echo('更新任务状态 '.$data['id']);
                // 是否是单发送
                if($data['maildata']['massSuit']??0){
                    $tos    =   $data['maildata']['tos'];
                    foreach ($tos as $to){
                        // 续时间
                        redis()->set('send_job_run_id_'.$data['id'],$data['id'],600);
                        // 是否暂停
                        $dst = db()->first(\Model\sendJobsSql::isStatus($data['id']));
                        if($dst && $dst['status'] === 3){
                            break;
                        }
                        // 是否已发送过了
                        if(db()->count(\Model\sendJobStatusSql::count($data['id'],$to['email']))){
                            continue;
                        }
                        _echo('正在执行任务 发送邮件 '.$to['email']);
                        // 每个收件人单独发送
                        $data['maildata']['tos'] = [$to];
                        //替换邮件内容中的指定字段为客户名字
                        $data['maildata']['body'] = str_replace('{customer_name}', $to['name'], $data['maildata']['body']);
                        $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
                        _echo('邮件发送 '.json_encode($result,JSON_UNESCAPED_UNICODE));
                        // 插入紫薯精
                        db()->insert(\Model\sendJobStatusSql::$table,[
                            'job_id'  =>  $data['id'],
                            'to_email'  =>  $to['email'],
                            'status'    =>  $result[0] ? 1 : 0,
                            'error'    =>  $result[1]
                        ]);
                        // 时间距离下次的时间
                        if($data['maildata']['masssuit_interval_send']??[]){
                            $time = rand($data['maildata']['masssuit_interval_send']['start'],$data['maildata']['masssuit_interval_send']['end']);
                            if($time){
                                _echo('进入时间等待区 '.$to['email'].' 等待:'.$time);
                                $block = false;
                                while (true){
                                    $time-=5;
                                    co::sleep(5);
                                    // 执行下一次了
                                    if (!$time){
                                        $block = true;
                                        break;
                                    }
                                }
                                if($block){
                                    break;
                                }
                            }
                        }
                    }
                }
                else{
                    // 是否已发送过了
                    if(!db()->count(\Model\sendJobStatusSql::count($data['id'],'all'))){
                        $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
                        // 更新状态
                        db()->update(\Model\sendJobsSql::$table,[
                            'status'    =>  2,
                            'success'   =>  $result[0] ? $data['total'] : 0,
                            'error'   =>  $result[0] ? 0 : $data['total'],
                        ],dbWhere(['id'=>$data['id']]));
                        // 插入紫薯精
                        db()->insert(\Model\sendJobStatusSql::$table,[
                            'job_id'  =>  $data['id'],
                            'to_email'  =>  'all',
                            'status'    =>  $result[0] ? 1 : 0,
                            'error'    =>  $result[0] ? $result[1] : ''
                        ]);
                    }else{
                        _echo('发送过了 '.$data['id']);
                    }
                }
            },function ($data){
                // 验证是否完成
                if($data['maildata']['massSuit']??0){
                    $dst = db()->first(\Model\sendJobsSql::isStatus($data['id']));
                    if($dst && $dst['status'] != 3){
                        $total = db()->first(\Model\sendJobStatusSql::countSum($data['id']));
                        if($total){
                            // 更新状态
                            db()->update(\Model\sendJobsSql::$table,[
                                'status'    =>  $total['t'] == $data['total'] ? 2 : 0,
                                'success'   =>  $total['s'],
                                'error'   =>  $total['e'],
                            ],dbWhere(['id'=>$data['id']]));
                        }
                    }
                }
                // 删除占用
                redis()->delete('send_job_run_id_'.$data['id']);
                _echo('执行任务完成'.$data['id']);
            },$list);
        }
    }
}
// 开启协程
\Co\run(function (){
    (new SendJob)->start();
    _echo('进程已退出');
});